From 8df7708cb599bced24cb2d3cbccf93e91438b3a0 Mon Sep 17 00:00:00 2001
From: Pedro Pombeiro <noreply@pedro.pombei.ro>
Date: Mon, 11 Dec 2023 11:17:53 +0000
Subject: [PATCH] Add pause control strategy for ClickHouse migrations

Allows pausing Sidekiq jobs when a ClickHouse migration is running
---
 app/workers/click_house/events_sync_worker.rb |   1 +
 app/workers/concerns/click_house_worker.rb    |  30 ++++
 ...se_clickhouse_workers_during_migration.yml |   8 +
 ...or_clickhouse_workers_during_migration.yml |   8 +
 .../clickhouse/clickhouse_within_gitlab.md    |  20 +++
 doc/development/sidekiq/worker_attributes.md  |   2 +-
 .../ci_finished_builds_sync_worker.rb         |   1 +
 .../ci_finished_builds_sync_worker_spec.rb    |   6 +
 lib/click_house/migration_support/errors.rb   |  57 +++++++
 .../migration_support/exclusive_lock.rb       |  78 ++++++++++
 .../migration_support/migration_context.rb    |   2 +-
 .../migration_support/migration_error.rb      |  55 -------
 lib/click_house/migration_support/migrator.rb |  25 +---
 .../migration_support/sidekiq_middleware.rb   |  25 ++++
 lib/gitlab/sidekiq_middleware.rb              |   1 +
 .../sidekiq_middleware/pause_control.rb       |   1 +
 .../strategies/click_house_migration.rb       |  18 +++
 .../migration_support/exclusive_lock_spec.rb  | 140 ++++++++++++++++++
 .../migration_context_spec.rb                 |  24 +--
 .../sidekiq_middleware_spec.rb                |  61 ++++++++
 .../strategies/click_house_migration_spec.rb  |  66 +++++++++
 .../sidekiq_middleware/pause_control_spec.rb  |  18 ++-
 spec/lib/gitlab/sidekiq_middleware_spec.rb    |   4 +-
 spec/tooling/quality/test_level_spec.rb       |   4 +-
 .../click_house/events_sync_worker_spec.rb    |   6 +
 .../concerns/click_house_worker_spec.rb       |  88 +++++++++++
 .../concerns/worker_attributes_spec.rb        |   2 +-
 tooling/quality/test_level.rb                 |   1 -
 28 files changed, 651 insertions(+), 101 deletions(-)
 create mode 100644 app/workers/concerns/click_house_worker.rb
 create mode 100644 config/feature_flags/development/pause_clickhouse_workers_during_migration.yml
 create mode 100644 config/feature_flags/development/wait_for_clickhouse_workers_during_migration.yml
 create mode 100644 lib/click_house/migration_support/errors.rb
 create mode 100644 lib/click_house/migration_support/exclusive_lock.rb
 delete mode 100644 lib/click_house/migration_support/migration_error.rb
 create mode 100644 lib/click_house/migration_support/sidekiq_middleware.rb
 create mode 100644 lib/gitlab/sidekiq_middleware/pause_control/strategies/click_house_migration.rb
 create mode 100644 spec/lib/click_house/migration_support/exclusive_lock_spec.rb
 rename spec/{ => lib}/click_house/migration_support/migration_context_spec.rb (93%)
 create mode 100644 spec/lib/click_house/migration_support/sidekiq_middleware_spec.rb
 create mode 100644 spec/lib/gitlab/sidekiq_middleware/pause_control/strategies/click_house_migration_spec.rb
 create mode 100644 spec/workers/concerns/click_house_worker_spec.rb

diff --git a/app/workers/click_house/events_sync_worker.rb b/app/workers/click_house/events_sync_worker.rb
index 5936d30b8b2ca..ea9d610bd5204 100644
--- a/app/workers/click_house/events_sync_worker.rb
+++ b/app/workers/click_house/events_sync_worker.rb
@@ -3,6 +3,7 @@
 module ClickHouse
   class EventsSyncWorker
     include ApplicationWorker
+    include ClickHouseWorker
     include Gitlab::ExclusiveLeaseHelpers
     include Gitlab::Utils::StrongMemoize
 
diff --git a/app/workers/concerns/click_house_worker.rb b/app/workers/concerns/click_house_worker.rb
new file mode 100644
index 0000000000000..6399796f6dfb2
--- /dev/null
+++ b/app/workers/concerns/click_house_worker.rb
@@ -0,0 +1,30 @@
+# frozen_string_literal: true
+
+module ClickHouseWorker
+  extend ActiveSupport::Concern
+
+  class_methods do
+    def register_click_house_worker?
+      click_house_worker_attrs.present?
+    end
+
+    def click_house_worker_attrs
+      get_class_attribute(:click_house_worker_attrs)
+    end
+
+    def click_house_migration_lock(ttl)
+      raise ArgumentError unless ttl.is_a?(ActiveSupport::Duration)
+
+      set_class_attribute(
+        :click_house_worker_attrs,
+        (click_house_worker_attrs || {}).merge(migration_lock_ttl: ttl)
+      )
+    end
+  end
+
+  included do
+    click_house_migration_lock(ClickHouse::MigrationSupport::ExclusiveLock::DEFAULT_CLICKHOUSE_WORKER_TTL)
+
+    pause_control :click_house_migration
+  end
+end
diff --git a/config/feature_flags/development/pause_clickhouse_workers_during_migration.yml b/config/feature_flags/development/pause_clickhouse_workers_during_migration.yml
new file mode 100644
index 0000000000000..f2a02c95632e3
--- /dev/null
+++ b/config/feature_flags/development/pause_clickhouse_workers_during_migration.yml
@@ -0,0 +1,8 @@
+---
+name: pause_clickhouse_workers_during_migration
+introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/138166
+rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/433389
+milestone: '16.7'
+type: development
+group: group::runner
+default_enabled: false
diff --git a/config/feature_flags/development/wait_for_clickhouse_workers_during_migration.yml b/config/feature_flags/development/wait_for_clickhouse_workers_during_migration.yml
new file mode 100644
index 0000000000000..dc67db6c14816
--- /dev/null
+++ b/config/feature_flags/development/wait_for_clickhouse_workers_during_migration.yml
@@ -0,0 +1,8 @@
+---
+name: wait_for_clickhouse_workers_during_migration
+introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/138166
+rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/433389
+milestone: '16.7'
+type: development
+group: group::runner
+default_enabled: false
diff --git a/doc/development/database/clickhouse/clickhouse_within_gitlab.md b/doc/development/database/clickhouse/clickhouse_within_gitlab.md
index f3305092868b0..a459f89b1855e 100644
--- a/doc/development/database/clickhouse/clickhouse_within_gitlab.md
+++ b/doc/development/database/clickhouse/clickhouse_within_gitlab.md
@@ -205,6 +205,26 @@ end
 NOTE:
 It's important to test and verify efficient batching of database records from PostgreSQL. Consider using the techniques described in the [Iterating tables in batches](../iterating_tables_in_batches.md).
 
+## Implementing Sidekiq workers
+
+Sidekiq workers leveraging ClickHouse databases should include the `ClickHouseWorker` module.
+This ensures that the worker is paused while database migrations are running,
+and that migrations do not run while the worker is active.
+
+```ruby
+# events_sync_worker.rb
+# frozen_string_literal: true
+
+module ClickHouse
+  class EventsSyncWorker
+    include ApplicationWorker
+    include ClickHouseWorker
+
+    ...
+  end
+end
+```
+
 ## Testing
 
 ClickHouse is enabled on CI/CD but to avoid significantly affecting the pipeline runtime we've decided to run the ClickHouse server for test cases tagged with `:click_house` only.
diff --git a/doc/development/sidekiq/worker_attributes.md b/doc/development/sidekiq/worker_attributes.md
index 3b74d5469cde6..016bf0b663412 100644
--- a/doc/development/sidekiq/worker_attributes.md
+++ b/doc/development/sidekiq/worker_attributes.md
@@ -336,7 +336,7 @@ worker that checks if any paused jobs must be restarted.
 To use `pause_control`, you can:
 
 - Use one of the strategies defined in `lib/gitlab/sidekiq_middleware/pause_control/strategies/`.
-- Define a custom strategy in `lib/gitlab/sidekiq_middleware/pause_control/strategies/` and add the strategy to `lib/gitlab/sidekiq_middleware/pause_control/strategies.rb`.
+- Define a custom strategy in `lib/gitlab/sidekiq_middleware/pause_control/strategies/` and add the strategy to `lib/gitlab/sidekiq_middleware/pause_control.rb`.
 
 For example:
 
diff --git a/ee/app/workers/click_house/ci_finished_builds_sync_worker.rb b/ee/app/workers/click_house/ci_finished_builds_sync_worker.rb
index a0c8f9e76a514..d1988c010c0a5 100644
--- a/ee/app/workers/click_house/ci_finished_builds_sync_worker.rb
+++ b/ee/app/workers/click_house/ci_finished_builds_sync_worker.rb
@@ -3,6 +3,7 @@
 module ClickHouse
   class CiFinishedBuildsSyncWorker
     include ApplicationWorker
+    include ClickHouseWorker
 
     idempotent!
     data_consistency :delayed
diff --git a/ee/spec/workers/click_house/ci_finished_builds_sync_worker_spec.rb b/ee/spec/workers/click_house/ci_finished_builds_sync_worker_spec.rb
index 9843b284297ae..fcf6f638ecf5e 100644
--- a/ee/spec/workers/click_house/ci_finished_builds_sync_worker_spec.rb
+++ b/ee/spec/workers/click_house/ci_finished_builds_sync_worker_spec.rb
@@ -14,6 +14,12 @@
     create_sync_events build1
   end
 
+  specify do
+    expect(worker.class.click_house_worker_attrs).to match(
+      a_hash_including(migration_lock_ttl: ClickHouse::MigrationSupport::ExclusiveLock::DEFAULT_CLICKHOUSE_WORKER_TTL)
+    )
+  end
+
   include_examples 'an idempotent worker' do
     it 'calls CiFinishedBuildsSyncService and returns its response payload' do
       expect(worker).to receive(:log_extra_metadata_on_done)
diff --git a/lib/click_house/migration_support/errors.rb b/lib/click_house/migration_support/errors.rb
new file mode 100644
index 0000000000000..f8c6e5a94e06e
--- /dev/null
+++ b/lib/click_house/migration_support/errors.rb
@@ -0,0 +1,57 @@
+# frozen_string_literal: true
+
+module ClickHouse
+  module MigrationSupport
+    module Errors
+      class Base < StandardError
+        def initialize(message = nil)
+          message = "\n\n#{message}\n\n" if message
+          super
+        end
+      end
+
+      class IllegalMigrationNameError < Base
+        def initialize(name = nil)
+          if name
+            super("Illegal name for migration file: #{name}\n\t(only lower case letters, numbers, and '_' allowed).")
+          else
+            super('Illegal name for migration.')
+          end
+        end
+      end
+
+      IrreversibleMigration = Class.new(Base)
+      LockError = Class.new(Base)
+
+      class DuplicateMigrationVersionError < Base
+        def initialize(version = nil)
+          if version
+            super("Multiple migrations have the version number #{version}.")
+          else
+            super('Duplicate migration version error.')
+          end
+        end
+      end
+
+      class DuplicateMigrationNameError < Base
+        def initialize(name = nil)
+          if name
+            super("Multiple migrations have the name #{name}.")
+          else
+            super('Duplicate migration name.')
+          end
+        end
+      end
+
+      class UnknownMigrationVersionError < Base
+        def initialize(version = nil)
+          if version
+            super("No migration with version number #{version}.")
+          else
+            super('Unknown migration version.')
+          end
+        end
+      end
+    end
+  end
+end
diff --git a/lib/click_house/migration_support/exclusive_lock.rb b/lib/click_house/migration_support/exclusive_lock.rb
new file mode 100644
index 0000000000000..e06ff4c0306fc
--- /dev/null
+++ b/lib/click_house/migration_support/exclusive_lock.rb
@@ -0,0 +1,78 @@
+# frozen_string_literal: true
+
+module ClickHouse
+  module MigrationSupport
+    class ExclusiveLock
+      MIGRATION_LEASE_KEY = 'click_house:migrations'
+      MIGRATION_RETRY_DELAY = ->(num) { 0.2.seconds * (num**2) }
+      MIGRATION_LOCK_DURATION = 1.hour
+
+      ACTIVE_WORKERS_REDIS_KEY = 'click_house:workers:active_workers'
+      DEFAULT_CLICKHOUSE_WORKER_TTL = 30.minutes
+      WORKERS_WAIT_SLEEP = 5.seconds
+
+      class << self
+        include ::Gitlab::ExclusiveLeaseHelpers
+
+        def register_running_worker(worker_class, worker_id)
+          ttl = worker_class.click_house_worker_attrs[:migration_lock_ttl].from_now.utc
+
+          Gitlab::Redis::ClusterSharedState.with do |redis|
+            redis.zadd(ACTIVE_WORKERS_REDIS_KEY, ttl.to_i, worker_id, gt: true)
+
+            yield
+          ensure
+            redis.zrem(ACTIVE_WORKERS_REDIS_KEY, worker_id)
+          end
+        end
+
+        def execute_migration
+          in_lock(MIGRATION_LEASE_KEY, ttl: MIGRATION_LOCK_DURATION, retries: 5, sleep_sec: MIGRATION_RETRY_DELAY) do
+            wait_until_workers_inactive(DEFAULT_CLICKHOUSE_WORKER_TTL.from_now)
+
+            yield
+          end
+        rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError => e
+          raise ClickHouse::MigrationSupport::Errors::LockError, e.message
+        end
+
+        def pause_workers?
+          Gitlab::ExclusiveLease.new(MIGRATION_LEASE_KEY, timeout: 0).exists?
+        end
+
+        def active_sidekiq_workers?
+          Gitlab::Redis::ClusterSharedState.with do |redis|
+            min = Time.now.utc.to_i
+
+            # expire keys in the past
+            redis.zremrangebyscore(ACTIVE_WORKERS_REDIS_KEY, 0, "(#{min}")
+            # Return if any workers are registered with a future expiry date
+            redis.zrange(ACTIVE_WORKERS_REDIS_KEY, min, '+inf', by_score: true, limit: [0, 1]).any?
+          end
+        end
+
+        def wait_until_workers_inactive(worker_wait_ttl)
+          # Wait until the collection in ClickHouseWorker::CLICKHOUSE_ACTIVE_WORKERS_KEY is empty,
+          # before continuing migration.
+          workers_active = true
+
+          loop do
+            return if Feature.disabled?(:wait_for_clickhouse_workers_during_migration)
+
+            workers_active = active_sidekiq_workers?
+            break unless workers_active
+            break if Time.current >= worker_wait_ttl
+
+            sleep WORKERS_WAIT_SLEEP.to_i
+          end
+
+          return unless workers_active
+
+          raise ClickHouse::MigrationSupport::Errors::LockError, 'Timed out waiting for active workers'
+        end
+      end
+
+      private_class_method :wait_until_workers_inactive
+    end
+  end
+end
diff --git a/lib/click_house/migration_support/migration_context.rb b/lib/click_house/migration_support/migration_context.rb
index 82cb287ba66f8..2f2c44d4ec723 100644
--- a/lib/click_house/migration_support/migration_context.rb
+++ b/lib/click_house/migration_support/migration_context.rb
@@ -45,7 +45,7 @@ def migrations
         migrations = migration_files.map do |file|
           version, name, scope = parse_migration_filename(file)
 
-          raise ClickHouse::MigrationSupport::IllegalMigrationNameError, file unless version
+          raise ClickHouse::MigrationSupport::Errors::IllegalMigrationNameError, file unless version
 
           version = version.to_i
           name = name.camelize
diff --git a/lib/click_house/migration_support/migration_error.rb b/lib/click_house/migration_support/migration_error.rb
deleted file mode 100644
index c8046c0d5a876..0000000000000
--- a/lib/click_house/migration_support/migration_error.rb
+++ /dev/null
@@ -1,55 +0,0 @@
-# frozen_string_literal: true
-
-module ClickHouse
-  module MigrationSupport
-    class MigrationError < StandardError
-      def initialize(message = nil)
-        message = "\n\n#{message}\n\n" if message
-        super
-      end
-    end
-
-    class IllegalMigrationNameError < MigrationError
-      def initialize(name = nil)
-        if name
-          super("Illegal name for migration file: #{name}\n\t(only lower case letters, numbers, and '_' allowed).")
-        else
-          super('Illegal name for migration.')
-        end
-      end
-    end
-
-    IrreversibleMigration = Class.new(MigrationError)
-    LockError = Class.new(MigrationError)
-
-    class DuplicateMigrationVersionError < MigrationError
-      def initialize(version = nil)
-        if version
-          super("Multiple migrations have the version number #{version}.")
-        else
-          super('Duplicate migration version error.')
-        end
-      end
-    end
-
-    class DuplicateMigrationNameError < MigrationError
-      def initialize(name = nil)
-        if name
-          super("Multiple migrations have the name #{name}.")
-        else
-          super('Duplicate migration name.')
-        end
-      end
-    end
-
-    class UnknownMigrationVersionError < MigrationError
-      def initialize(version = nil)
-        if version
-          super("No migration with version number #{version}.")
-        else
-          super('Unknown migration version.')
-        end
-      end
-    end
-  end
-end
diff --git a/lib/click_house/migration_support/migrator.rb b/lib/click_house/migration_support/migrator.rb
index 8ad7a189e73a0..00c42cb8b05b0 100644
--- a/lib/click_house/migration_support/migrator.rb
+++ b/lib/click_house/migration_support/migrator.rb
@@ -3,18 +3,12 @@
 module ClickHouse
   module MigrationSupport
     class Migrator
-      include ::Gitlab::ExclusiveLeaseHelpers
-
       class << self
         attr_accessor :migrations_paths
       end
 
       attr_accessor :logger
 
-      LEASE_KEY = 'click_house:migrations'
-      RETRY_DELAY = ->(num) { 0.2.seconds * (num**2) }
-      LOCK_DURATION = 1.hour
-
       self.migrations_paths = ["db/click_house/migrate"]
 
       def initialize(
@@ -42,11 +36,9 @@ def current_migration
       alias_method :current, :current_migration
 
       def migrate
-        in_lock(LEASE_KEY, ttl: LOCK_DURATION, retries: 5, sleep_sec: RETRY_DELAY) do
+        ClickHouse::MigrationSupport::ExclusiveLock.execute_migration do
           migrate_without_lock
         end
-      rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError => e
-        raise ClickHouse::MigrationSupport::LockError, e.message
       end
 
       def runnable
@@ -92,18 +84,9 @@ def ensure_schema_migration_table(database)
         @schema_migration.create_table(database)
       end
 
-      # Used for running a specific migration.
-      def run_without_lock
-        migration = migrations.detect { |m| m.version == @target_version }
-
-        raise ClickHouse::MigrationSupport::UnknownMigrationVersionError, @target_version if migration.nil?
-
-        execute_migration(migration)
-      end
-
       # Used for running multiple migrations up to or down to a certain value.
       def migrate_without_lock
-        raise ClickHouse::MigrationSupport::UnknownMigrationVersionError, @target_version if invalid_target?
+        raise ClickHouse::MigrationSupport::Errors::UnknownMigrationVersionError, @target_version if invalid_target?
 
         runnable.each(&method(:execute_migration)) # rubocop: disable Performance/MethodObjectAsBlock -- Execute through proxy
       end
@@ -149,10 +132,10 @@ def start
 
       def validate(migrations)
         name, = migrations.group_by(&:name).find { |_, v| v.length > 1 }
-        raise ClickHouse::MigrationSupport::DuplicateMigrationNameError, name if name
+        raise ClickHouse::MigrationSupport::Errors::DuplicateMigrationNameError, name if name
 
         version, = migrations.group_by(&:version).find { |_, v| v.length > 1 }
-        raise ClickHouse::MigrationSupport::DuplicateMigrationVersionError, version if version
+        raise ClickHouse::MigrationSupport::Errors::DuplicateMigrationVersionError, version if version
       end
 
       def record_version_state_after_migrating(database, version)
diff --git a/lib/click_house/migration_support/sidekiq_middleware.rb b/lib/click_house/migration_support/sidekiq_middleware.rb
new file mode 100644
index 0000000000000..e4e6c453e8dd7
--- /dev/null
+++ b/lib/click_house/migration_support/sidekiq_middleware.rb
@@ -0,0 +1,25 @@
+# frozen_string_literal: true
+
+module ClickHouse
+  module MigrationSupport
+    class SidekiqMiddleware
+      def call(worker, job, queue)
+        return yield unless register_worker?(worker.class)
+
+        ::ClickHouse::MigrationSupport::ExclusiveLock.register_running_worker(worker.class, worker_id(job, queue)) do
+          yield
+        end
+      end
+
+      private
+
+      def worker_id(job, queue)
+        [queue, job['jid']].join(':')
+      end
+
+      def register_worker?(worker_class)
+        worker_class.respond_to?(:click_house_migration_lock) && worker_class.register_click_house_worker?
+      end
+    end
+  end
+end
diff --git a/lib/gitlab/sidekiq_middleware.rb b/lib/gitlab/sidekiq_middleware.rb
index e1c155a484813..96bda86ab0864 100644
--- a/lib/gitlab/sidekiq_middleware.rb
+++ b/lib/gitlab/sidekiq_middleware.rb
@@ -37,6 +37,7 @@ def self.server_configurator(metrics: true, arguments_logger: true, skip_jobs: t
         chain.add ::Gitlab::SidekiqStatus::ServerMiddleware
         chain.add ::Gitlab::SidekiqMiddleware::WorkerContext::Server
         chain.add ::Gitlab::SidekiqMiddleware::PauseControl::Server
+        chain.add ::ClickHouse::MigrationSupport::SidekiqMiddleware
         # DuplicateJobs::Server should be placed at the bottom, but before the SidekiqServerMiddleware,
         # so we can compare the latest WAL location against replica
         chain.add ::Gitlab::SidekiqMiddleware::DuplicateJobs::Server
diff --git a/lib/gitlab/sidekiq_middleware/pause_control.rb b/lib/gitlab/sidekiq_middleware/pause_control.rb
index 2f0fd0cc7997a..8f4da7267d70e 100644
--- a/lib/gitlab/sidekiq_middleware/pause_control.rb
+++ b/lib/gitlab/sidekiq_middleware/pause_control.rb
@@ -8,6 +8,7 @@ module PauseControl
       UnknownStrategyError = Class.new(StandardError)
 
       STRATEGIES = {
+        click_house_migration: ::Gitlab::SidekiqMiddleware::PauseControl::Strategies::ClickHouseMigration,
         zoekt: ::Gitlab::SidekiqMiddleware::PauseControl::Strategies::Zoekt,
         none: ::Gitlab::SidekiqMiddleware::PauseControl::Strategies::None
       }.freeze
diff --git a/lib/gitlab/sidekiq_middleware/pause_control/strategies/click_house_migration.rb b/lib/gitlab/sidekiq_middleware/pause_control/strategies/click_house_migration.rb
new file mode 100644
index 0000000000000..adeb05245673e
--- /dev/null
+++ b/lib/gitlab/sidekiq_middleware/pause_control/strategies/click_house_migration.rb
@@ -0,0 +1,18 @@
+# frozen_string_literal: true
+
+module Gitlab
+  module SidekiqMiddleware
+    module PauseControl
+      module Strategies
+        class ClickHouseMigration < Base
+          override :should_pause?
+          def should_pause?
+            return false unless Feature.enabled?(:pause_clickhouse_workers_during_migration)
+
+            ClickHouse::MigrationSupport::ExclusiveLock.pause_workers?
+          end
+        end
+      end
+    end
+  end
+end
diff --git a/spec/lib/click_house/migration_support/exclusive_lock_spec.rb b/spec/lib/click_house/migration_support/exclusive_lock_spec.rb
new file mode 100644
index 0000000000000..5176cc752666a
--- /dev/null
+++ b/spec/lib/click_house/migration_support/exclusive_lock_spec.rb
@@ -0,0 +1,140 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe ClickHouse::MigrationSupport::ExclusiveLock, feature_category: :database do
+  include ExclusiveLeaseHelpers
+
+  let(:worker_class) do
+    # This worker will be active longer than the ClickHouse worker TTL
+    Class.new do
+      def self.name
+        'TestWorker'
+      end
+
+      include ::ApplicationWorker
+      include ::ClickHouseWorker
+
+      def perform(*); end
+    end
+  end
+
+  before do
+    stub_const('TestWorker', worker_class)
+  end
+
+  describe '.register_running_worker' do
+    before do
+      TestWorker.click_house_migration_lock(10.seconds)
+    end
+
+    it 'yields without arguments' do
+      expect { |b| described_class.register_running_worker(worker_class, 'test', &b) }.to yield_with_no_args
+    end
+
+    it 'registers worker for a limited period of time', :freeze_time, :aggregate_failures do
+      expect(described_class.active_sidekiq_workers?).to eq false
+
+      described_class.register_running_worker(worker_class, 'test') do
+        expect(described_class.active_sidekiq_workers?).to eq true
+        travel 9.seconds
+        expect(described_class.active_sidekiq_workers?).to eq true
+        travel 2.seconds
+        expect(described_class.active_sidekiq_workers?).to eq false
+      end
+    end
+  end
+
+  describe '.pause_workers?' do
+    subject(:pause_workers?) { described_class.pause_workers? }
+
+    it { is_expected.to eq false }
+
+    context 'with lock taken' do
+      let!(:lease) { stub_exclusive_lease_taken(described_class::MIGRATION_LEASE_KEY) }
+
+      it { is_expected.to eq true }
+    end
+  end
+
+  describe '.execute_migration' do
+    it 'yields without raising error' do
+      expect { |b| described_class.execute_migration(&b) }.to yield_with_no_args
+    end
+
+    context 'when migration lock is taken' do
+      let!(:lease) { stub_exclusive_lease_taken(described_class::MIGRATION_LEASE_KEY) }
+
+      it 'raises LockError' do
+        expect do
+          expect { |b| described_class.execute_migration(&b) }.not_to yield_control
+        end.to raise_error ::ClickHouse::MigrationSupport::Errors::LockError
+      end
+    end
+
+    context 'when ClickHouse workers are still active', :freeze_time do
+      let(:sleep_time) { described_class::WORKERS_WAIT_SLEEP }
+      let!(:started_at) { Time.current }
+
+      def migration
+        expect { |b| described_class.execute_migration(&b) }.to yield_with_no_args
+      end
+
+      around do |example|
+        described_class.register_running_worker(worker_class, anything) do
+          example.run
+        end
+      end
+
+      it 'waits for workers and raises ClickHouse::MigrationSupport::LockError if workers do not stop in time' do
+        expect(described_class).to receive(:sleep).at_least(1).with(sleep_time) { travel(sleep_time) }
+
+        expect { migration }.to raise_error(ClickHouse::MigrationSupport::Errors::LockError,
+          /Timed out waiting for active workers/)
+        expect(Time.current - started_at).to eq(described_class::DEFAULT_CLICKHOUSE_WORKER_TTL)
+      end
+
+      context 'when wait_for_clickhouse_workers_during_migration FF is disabled' do
+        before do
+          stub_feature_flags(wait_for_clickhouse_workers_during_migration: false)
+        end
+
+        it 'runs migration without waiting for workers' do
+          expect { migration }.not_to raise_error
+          expect(Time.current - started_at).to eq(0.0)
+        end
+      end
+
+      it 'ignores expired workers' do
+        travel(described_class::DEFAULT_CLICKHOUSE_WORKER_TTL + 1.second)
+
+        migration
+      end
+
+      context 'when worker registration is almost expiring' do
+        let(:worker_class) do
+          # This worker will be active for less than the ClickHouse worker TTL
+          Class.new do
+            def self.name
+              'TestWorker'
+            end
+
+            include ::ApplicationWorker
+            include ::ClickHouseWorker
+
+            click_house_migration_lock(
+              ClickHouse::MigrationSupport::ExclusiveLock::DEFAULT_CLICKHOUSE_WORKER_TTL - 1.second)
+
+            def perform(*); end
+          end
+        end
+
+        it 'completes migration' do
+          expect(described_class).to receive(:sleep).at_least(1).with(sleep_time) { travel(sleep_time) }
+
+          expect { migration }.not_to raise_error
+        end
+      end
+    end
+  end
+end
diff --git a/spec/click_house/migration_support/migration_context_spec.rb b/spec/lib/click_house/migration_support/migration_context_spec.rb
similarity index 93%
rename from spec/click_house/migration_support/migration_context_spec.rb
rename to spec/lib/click_house/migration_support/migration_context_spec.rb
index 98ed0a69c3a6b..65aa6b56aa7d7 100644
--- a/spec/click_house/migration_support/migration_context_spec.rb
+++ b/spec/lib/click_house/migration_support/migration_context_spec.rb
@@ -2,8 +2,6 @@
 
 require 'spec_helper'
 
-require_relative '../../../lib/click_house/migration_support/migration_error'
-
 RSpec.describe ClickHouse::MigrationSupport::MigrationContext,
   click_house: :without_migrations, feature_category: :database do
   include ClickHouseTestHelpers
@@ -32,7 +30,15 @@
       let(:lease_key) { 'click_house:migrations' }
       let(:lease_timeout) { 1.hour }
 
+      it 'executes migration through ClickHouse::MigrationSupport::ExclusiveLock.execute_migration' do
+        expect(ClickHouse::MigrationSupport::ExclusiveLock).to receive(:execute_migration)
+
+        # Test that not running execute_migration will not execute migrations
+        expect { migration }.not_to change { active_schema_migrations_count }
+      end
+
       it 'creates a table' do
+        expect(ClickHouse::MigrationSupport::ExclusiveLock).to receive(:execute_migration).and_call_original
         expect_to_obtain_exclusive_lease(lease_key, timeout: lease_timeout)
 
         expect { migration }.to change { active_schema_migrations_count }.from(0).to(1)
@@ -47,17 +53,13 @@
 
       context 'when a migration is already running' do
         let(:migration_name) { 'create_some_table' }
-        let(:migration_klass) do
-          require(File.expand_path("#{migrations_dir}/1_#{migration_name}"))
-          migration_name.camelize.constantize
-        end
 
         before do
           stub_exclusive_lease_taken(lease_key)
         end
 
         it 'raises error after timeout when migration is executing concurrently' do
-          expect { migration }.to raise_error(ClickHouse::MigrationSupport::LockError)
+          expect { migration }.to raise_error(ClickHouse::MigrationSupport::Errors::LockError)
             .and not_change { active_schema_migrations_count }
         end
       end
@@ -182,7 +184,7 @@ def clone_database_configuration(source_db_identifier, target_db_identifier, tar
       let(:migrations_dirname) { 'plain_table_creation' }
 
       it 'raises UnknownMigrationVersionError' do
-        expect { migration }.to raise_error ClickHouse::MigrationSupport::UnknownMigrationVersionError
+        expect { migration }.to raise_error ClickHouse::MigrationSupport::Errors::UnknownMigrationVersionError
 
         expect(active_schema_migrations_count).to eq 0
       end
@@ -192,7 +194,7 @@ def clone_database_configuration(source_db_identifier, target_db_identifier, tar
       let(:migrations_dirname) { 'duplicate_name' }
 
       it 'raises DuplicateMigrationNameError' do
-        expect { migration }.to raise_error ClickHouse::MigrationSupport::DuplicateMigrationNameError
+        expect { migration }.to raise_error ClickHouse::MigrationSupport::Errors::DuplicateMigrationNameError
 
         expect(active_schema_migrations_count).to eq 0
       end
@@ -202,7 +204,7 @@ def clone_database_configuration(source_db_identifier, target_db_identifier, tar
       let(:migrations_dirname) { 'duplicate_version' }
 
       it 'raises DuplicateMigrationVersionError' do
-        expect { migration }.to raise_error ClickHouse::MigrationSupport::DuplicateMigrationVersionError
+        expect { migration }.to raise_error ClickHouse::MigrationSupport::Errors::DuplicateMigrationVersionError
 
         expect(active_schema_migrations_count).to eq 0
       end
@@ -272,7 +274,7 @@ def clone_database_configuration(source_db_identifier, target_db_identifier, tar
       let(:migrations_dirname) { 'plain_table_creation' }
 
       it 'raises UnknownMigrationVersionError' do
-        expect { migration }.to raise_error ClickHouse::MigrationSupport::UnknownMigrationVersionError
+        expect { migration }.to raise_error ClickHouse::MigrationSupport::Errors::UnknownMigrationVersionError
 
         expect(active_schema_migrations_count).to eq 1
       end
diff --git a/spec/lib/click_house/migration_support/sidekiq_middleware_spec.rb b/spec/lib/click_house/migration_support/sidekiq_middleware_spec.rb
new file mode 100644
index 0000000000000..03c9edfabaa0f
--- /dev/null
+++ b/spec/lib/click_house/migration_support/sidekiq_middleware_spec.rb
@@ -0,0 +1,61 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe ClickHouse::MigrationSupport::SidekiqMiddleware, feature_category: :database do
+  let(:worker_with_click_house_worker) do
+    Class.new do
+      def self.name
+        'TestWorker'
+      end
+      include ApplicationWorker
+      include ClickHouseWorker
+    end
+  end
+
+  let(:worker_without_click_house_worker) do
+    Class.new do
+      def self.name
+        'TestWorkerWithoutClickHouseWorker'
+      end
+      include ApplicationWorker
+    end
+  end
+
+  subject(:middleware) { described_class.new }
+
+  before do
+    stub_const('TestWorker', worker_with_click_house_worker)
+    stub_const('TestWorkerWithoutClickHouseWorker', worker_without_click_house_worker)
+  end
+
+  describe '#call' do
+    let(:worker) { worker_class.new }
+    let(:job) { { 'jid' => 123, 'class' => worker_class.name } }
+    let(:queue) { 'test_queue' }
+
+    context 'when worker does not include ClickHouseWorker' do
+      let(:worker_class) { worker_without_click_house_worker }
+
+      it 'yields control without registering running worker' do
+        expect(ClickHouse::MigrationSupport::ExclusiveLock).not_to receive(:register_running_worker)
+        expect { |b| middleware.call(worker, job, queue, &b) }.to yield_with_no_args
+      end
+    end
+
+    context 'when worker includes ClickHouseWorker' do
+      let(:worker_class) { worker_with_click_house_worker }
+
+      it 'registers running worker and yields control' do
+        expect(ClickHouse::MigrationSupport::ExclusiveLock)
+          .to receive(:register_running_worker)
+          .with(worker_class, 'test_queue:123')
+          .and_wrap_original do |method, worker_class, worker_id|
+          expect { |b| method.call(worker_class, worker_id, &b) }.to yield_with_no_args
+        end
+
+        middleware.call(worker, job, queue)
+      end
+    end
+  end
+end
diff --git a/spec/lib/gitlab/sidekiq_middleware/pause_control/strategies/click_house_migration_spec.rb b/spec/lib/gitlab/sidekiq_middleware/pause_control/strategies/click_house_migration_spec.rb
new file mode 100644
index 0000000000000..470c860fb605a
--- /dev/null
+++ b/spec/lib/gitlab/sidekiq_middleware/pause_control/strategies/click_house_migration_spec.rb
@@ -0,0 +1,66 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe Gitlab::SidekiqMiddleware::PauseControl::Strategies::ClickHouseMigration, feature_category: :database do
+  let(:worker_class) do
+    Class.new do
+      def self.name
+        'TestPauseWorker'
+      end
+
+      include ::ApplicationWorker
+      include ::ClickHouseWorker
+
+      def perform(*); end
+    end
+  end
+
+  before do
+    stub_const('TestPauseWorker', worker_class)
+  end
+
+  describe '#call' do
+    include Gitlab::ExclusiveLeaseHelpers
+
+    shared_examples 'a worker being executed' do
+      it 'schedules the job' do
+        expect(Gitlab::SidekiqMiddleware::PauseControl::PauseControlService).not_to receive(:add_to_waiting_queue!)
+
+        worker_class.perform_async('args1')
+
+        expect(worker_class.jobs.count).to eq(1)
+      end
+    end
+
+    context 'when lock is not taken' do
+      it_behaves_like 'a worker being executed'
+    end
+
+    context 'when lock is taken' do
+      include ExclusiveLeaseHelpers
+
+      around do |example|
+        ClickHouse::MigrationSupport::ExclusiveLock.execute_migration do
+          example.run
+        end
+      end
+
+      it 'does not schedule the job' do
+        expect(Gitlab::SidekiqMiddleware::PauseControl::PauseControlService).to receive(:add_to_waiting_queue!).once
+
+        worker_class.perform_async('args1')
+
+        expect(worker_class.jobs.count).to eq(0)
+      end
+
+      context 'when pause_clickhouse_workers_during_migration FF is disabled' do
+        before do
+          stub_feature_flags(pause_clickhouse_workers_during_migration: false)
+        end
+
+        it_behaves_like 'a worker being executed'
+      end
+    end
+  end
+end
diff --git a/spec/lib/gitlab/sidekiq_middleware/pause_control_spec.rb b/spec/lib/gitlab/sidekiq_middleware/pause_control_spec.rb
index a0cce0f61a0c7..2cb98b43051cb 100644
--- a/spec/lib/gitlab/sidekiq_middleware/pause_control_spec.rb
+++ b/spec/lib/gitlab/sidekiq_middleware/pause_control_spec.rb
@@ -1,19 +1,23 @@
 # frozen_string_literal: true
 
 require 'fast_spec_helper'
+require 'rspec-parameterized'
 
 RSpec.describe Gitlab::SidekiqMiddleware::PauseControl, feature_category: :global_search do
   describe '.for' do
-    it 'returns the right class for `zoekt`' do
-      expect(described_class.for(:zoekt)).to eq(::Gitlab::SidekiqMiddleware::PauseControl::Strategies::Zoekt)
-    end
+    using RSpec::Parameterized::TableSyntax
 
-    it 'returns the right class for `none`' do
-      expect(described_class.for(:none)).to eq(::Gitlab::SidekiqMiddleware::PauseControl::Strategies::None)
+    where(:strategy_name, :expected_class) do
+      :none                  | ::Gitlab::SidekiqMiddleware::PauseControl::Strategies::None
+      :unknown               | ::Gitlab::SidekiqMiddleware::PauseControl::Strategies::None
+      :click_house_migration | ::Gitlab::SidekiqMiddleware::PauseControl::Strategies::ClickHouseMigration
+      :zoekt                 | ::Gitlab::SidekiqMiddleware::PauseControl::Strategies::Zoekt
     end
 
-    it 'returns nil when passing an unknown key' do
-      expect(described_class.for(:unknown)).to eq(::Gitlab::SidekiqMiddleware::PauseControl::Strategies::None)
+    with_them do
+      it 'returns the right class' do
+        expect(described_class.for(strategy_name)).to eq(expected_class)
+      end
     end
   end
 end
diff --git a/spec/lib/gitlab/sidekiq_middleware_spec.rb b/spec/lib/gitlab/sidekiq_middleware_spec.rb
index 5a38d1b775057..a5c6df5e9d5ad 100644
--- a/spec/lib/gitlab/sidekiq_middleware_spec.rb
+++ b/spec/lib/gitlab/sidekiq_middleware_spec.rb
@@ -3,7 +3,7 @@
 require 'spec_helper'
 require 'sidekiq/testing'
 
-RSpec.describe Gitlab::SidekiqMiddleware do
+RSpec.describe Gitlab::SidekiqMiddleware, feature_category: :shared do
   let(:job_args) { [0.01] }
   let(:disabled_sidekiq_middlewares) { [] }
   let(:chain) { Sidekiq::Middleware::Chain.new(Sidekiq) }
@@ -33,6 +33,7 @@ def perform(*args)
       configurator.call(chain)
       stub_feature_flags("drop_sidekiq_jobs_#{worker_class.name}": false) # not dropping the job
     end
+
     it "passes through the right middlewares", :aggregate_failures do
       enabled_sidekiq_middlewares.each do |middleware|
         expect_next_instances_of(middleware, 1, true) do |middleware_instance|
@@ -68,6 +69,7 @@ def perform(*args)
         ::Gitlab::SidekiqVersioning::Middleware,
         ::Gitlab::SidekiqStatus::ServerMiddleware,
         ::Gitlab::SidekiqMiddleware::WorkerContext::Server,
+        ::ClickHouse::MigrationSupport::SidekiqMiddleware,
         ::Gitlab::SidekiqMiddleware::DuplicateJobs::Server,
         ::Gitlab::Database::LoadBalancing::SidekiqServerMiddleware,
         ::Gitlab::SidekiqMiddleware::SkipJobs
diff --git a/spec/tooling/quality/test_level_spec.rb b/spec/tooling/quality/test_level_spec.rb
index d7d04015b4806..6ccd2e46f7bb3 100644
--- a/spec/tooling/quality/test_level_spec.rb
+++ b/spec/tooling/quality/test_level_spec.rb
@@ -46,7 +46,7 @@
     context 'when level is unit' do
       it 'returns a pattern' do
         expect(subject.pattern(:unit))
-          .to eq("spec/{bin,channels,click_house,components,config,contracts,db,dependencies,elastic,elastic_integration,experiments,factories,finders,frontend,graphql,haml_lint,helpers,initializers,lib,metrics_server,models,policies,presenters,rack_servers,replicators,routing,rubocop,scripts,serializers,services,sidekiq,sidekiq_cluster,spam,support_specs,tasks,uploaders,validators,views,workers,tooling}{,/**/}*_spec.rb")
+          .to eq("spec/{bin,channels,components,config,contracts,db,dependencies,elastic,elastic_integration,experiments,factories,finders,frontend,graphql,haml_lint,helpers,initializers,lib,metrics_server,models,policies,presenters,rack_servers,replicators,routing,rubocop,scripts,serializers,services,sidekiq,sidekiq_cluster,spam,support_specs,tasks,uploaders,validators,views,workers,tooling}{,/**/}*_spec.rb")
       end
     end
 
@@ -121,7 +121,7 @@
     context 'when level is unit' do
       it 'returns a regexp' do
         expect(subject.regexp(:unit))
-          .to eq(%r{spec/(bin|channels|click_house|components|config|contracts|db|dependencies|elastic|elastic_integration|experiments|factories|finders|frontend|graphql|haml_lint|helpers|initializers|lib|metrics_server|models|policies|presenters|rack_servers|replicators|routing|rubocop|scripts|serializers|services|sidekiq|sidekiq_cluster|spam|support_specs|tasks|uploaders|validators|views|workers|tooling)/})
+          .to eq(%r{spec/(bin|channels|components|config|contracts|db|dependencies|elastic|elastic_integration|experiments|factories|finders|frontend|graphql|haml_lint|helpers|initializers|lib|metrics_server|models|policies|presenters|rack_servers|replicators|routing|rubocop|scripts|serializers|services|sidekiq|sidekiq_cluster|spam|support_specs|tasks|uploaders|validators|views|workers|tooling)/})
       end
     end
 
diff --git a/spec/workers/click_house/events_sync_worker_spec.rb b/spec/workers/click_house/events_sync_worker_spec.rb
index 28a885629a4f7..da74e5e376dcb 100644
--- a/spec/workers/click_house/events_sync_worker_spec.rb
+++ b/spec/workers/click_house/events_sync_worker_spec.rb
@@ -5,6 +5,12 @@
 RSpec.describe ClickHouse::EventsSyncWorker, feature_category: :value_stream_management do
   let(:worker) { described_class.new }
 
+  specify do
+    expect(worker.class.click_house_worker_attrs).to match(
+      a_hash_including(migration_lock_ttl: ClickHouse::MigrationSupport::ExclusiveLock::DEFAULT_CLICKHOUSE_WORKER_TTL)
+    )
+  end
+
   it_behaves_like 'an idempotent worker' do
     context 'when the event_sync_worker_for_click_house feature flag is on', :click_house do
       before do
diff --git a/spec/workers/concerns/click_house_worker_spec.rb b/spec/workers/concerns/click_house_worker_spec.rb
new file mode 100644
index 0000000000000..cb8bf9c75789d
--- /dev/null
+++ b/spec/workers/concerns/click_house_worker_spec.rb
@@ -0,0 +1,88 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe ClickHouseWorker, feature_category: :database do
+  let(:worker) do
+    Class.new do
+      def self.name
+        'DummyWorker'
+      end
+
+      include ApplicationWorker
+      include ClickHouseWorker
+
+      def perform
+        AnotherWorker.perform_async('identifier')
+      end
+    end
+  end
+
+  let(:another_worker) do
+    Class.new do
+      def self.name
+        'AnotherWorker'
+      end
+
+      include ApplicationWorker
+    end
+  end
+
+  before do
+    stub_const('DummyWorker', worker)
+    stub_const('AnotherWorker', another_worker)
+  end
+
+  describe '.register_click_house_worker?' do
+    subject(:register_click_house_worker?) { worker.register_click_house_worker? }
+
+    context 'when click_house_migration_lock is set' do
+      before do
+        worker.click_house_migration_lock(1.minute)
+      end
+
+      it { is_expected.to be(true) }
+    end
+
+    context 'when click_house_migration_lock is not set' do
+      it { is_expected.to be(true) }
+    end
+
+    context 'when worker does not include module' do
+      it { expect(another_worker).not_to respond_to(:register_click_house_worker?) }
+    end
+  end
+
+  describe '.click_house_worker_attrs' do
+    subject(:click_house_worker_attrs) { worker.click_house_migration_lock(ttl) }
+
+    let(:ttl) { 1.minute }
+
+    it { expect { click_house_worker_attrs }.not_to raise_error }
+    it { is_expected.to match(a_hash_including(migration_lock_ttl: 60.seconds)) }
+
+    context 'with invalid ttl' do
+      let(:ttl) { {} }
+
+      it 'raises exception' do
+        expect { click_house_worker_attrs }.to raise_error(ArgumentError)
+      end
+    end
+  end
+
+  it 'registers ClickHouse worker' do
+    expect(worker.register_click_house_worker?).to be_truthy
+    expect(another_worker).not_to respond_to(:register_click_house_worker?)
+  end
+
+  it 'sets default TTL for worker registration' do
+    expect(worker.click_house_worker_attrs).to match(
+      a_hash_including(migration_lock_ttl: ClickHouse::MigrationSupport::ExclusiveLock::DEFAULT_CLICKHOUSE_WORKER_TTL)
+    )
+  end
+
+  it 'registers worker to pause on ClickHouse migrations' do
+    expect(worker.get_pause_control).to eq(:click_house_migration)
+    expect(another_worker.get_pause_control).to be_nil
+  end
+end
diff --git a/spec/workers/concerns/worker_attributes_spec.rb b/spec/workers/concerns/worker_attributes_spec.rb
index 767a55162fbf3..1c9d9a5a1ad4f 100644
--- a/spec/workers/concerns/worker_attributes_spec.rb
+++ b/spec/workers/concerns/worker_attributes_spec.rb
@@ -75,7 +75,7 @@ def self.name
 
   describe '.data_consistency' do
     context 'with invalid data_consistency' do
-      it 'raise exception' do
+      it 'raises exception' do
         expect { worker.data_consistency(:invalid) }
           .to raise_error('Invalid data consistency: invalid')
       end
diff --git a/tooling/quality/test_level.rb b/tooling/quality/test_level.rb
index 050eb4f4dafd6..20e00763f6543 100644
--- a/tooling/quality/test_level.rb
+++ b/tooling/quality/test_level.rb
@@ -18,7 +18,6 @@ class TestLevel
       unit: %w[
         bin
         channels
-        click_house
         components
         config
         contracts
-- 
GitLab