diff --git a/app/workers/click_house/events_sync_worker.rb b/app/workers/click_house/events_sync_worker.rb index 5936d30b8b2cafae59d3609ecc2a0ff57b8381d8..ea9d610bd5204cdd9515fb84c1c257f89abf3901 100644 --- a/app/workers/click_house/events_sync_worker.rb +++ b/app/workers/click_house/events_sync_worker.rb @@ -3,6 +3,7 @@ module ClickHouse class EventsSyncWorker include ApplicationWorker + include ClickHouseWorker include Gitlab::ExclusiveLeaseHelpers include Gitlab::Utils::StrongMemoize diff --git a/app/workers/concerns/click_house_worker.rb b/app/workers/concerns/click_house_worker.rb new file mode 100644 index 0000000000000000000000000000000000000000..6399796f6dfb2acacdb5bf48ddfbda55c20db9a0 --- /dev/null +++ b/app/workers/concerns/click_house_worker.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +module ClickHouseWorker + extend ActiveSupport::Concern + + class_methods do + def register_click_house_worker? + click_house_worker_attrs.present? + end + + def click_house_worker_attrs + get_class_attribute(:click_house_worker_attrs) + end + + def click_house_migration_lock(ttl) + raise ArgumentError unless ttl.is_a?(ActiveSupport::Duration) + + set_class_attribute( + :click_house_worker_attrs, + (click_house_worker_attrs || {}).merge(migration_lock_ttl: ttl) + ) + end + end + + included do + click_house_migration_lock(ClickHouse::MigrationSupport::ExclusiveLock::DEFAULT_CLICKHOUSE_WORKER_TTL) + + pause_control :click_house_migration + end +end diff --git a/config/feature_flags/development/pause_clickhouse_workers_during_migration.yml b/config/feature_flags/development/pause_clickhouse_workers_during_migration.yml new file mode 100644 index 0000000000000000000000000000000000000000..f2a02c95632e3b97516e974b89511a8756263120 --- /dev/null +++ b/config/feature_flags/development/pause_clickhouse_workers_during_migration.yml @@ -0,0 +1,8 @@ +--- +name: pause_clickhouse_workers_during_migration +introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/138166 +rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/433389 +milestone: '16.7' +type: development +group: group::runner +default_enabled: false diff --git a/config/feature_flags/development/wait_for_clickhouse_workers_during_migration.yml b/config/feature_flags/development/wait_for_clickhouse_workers_during_migration.yml new file mode 100644 index 0000000000000000000000000000000000000000..dc67db6c148165401be2ea38f235453a742199ff --- /dev/null +++ b/config/feature_flags/development/wait_for_clickhouse_workers_during_migration.yml @@ -0,0 +1,8 @@ +--- +name: wait_for_clickhouse_workers_during_migration +introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/138166 +rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/433389 +milestone: '16.7' +type: development +group: group::runner +default_enabled: false diff --git a/doc/development/database/clickhouse/clickhouse_within_gitlab.md b/doc/development/database/clickhouse/clickhouse_within_gitlab.md index f3305092868b07f885c9217a380962b3de7499cc..a459f89b1855e57d0e7cad8c030ab33f8e6f23e8 100644 --- a/doc/development/database/clickhouse/clickhouse_within_gitlab.md +++ b/doc/development/database/clickhouse/clickhouse_within_gitlab.md @@ -205,6 +205,26 @@ end NOTE: It's important to test and verify efficient batching of database records from PostgreSQL. Consider using the techniques described in the [Iterating tables in batches](../iterating_tables_in_batches.md). +## Implementing Sidekiq workers + +Sidekiq workers leveraging ClickHouse databases should include the `ClickHouseWorker` module. +This ensures that the worker is paused while database migrations are running, +and that migrations do not run while the worker is active. + +```ruby +# events_sync_worker.rb +# frozen_string_literal: true + +module ClickHouse + class EventsSyncWorker + include ApplicationWorker + include ClickHouseWorker + + ... + end +end +``` + ## Testing ClickHouse is enabled on CI/CD but to avoid significantly affecting the pipeline runtime we've decided to run the ClickHouse server for test cases tagged with `:click_house` only. diff --git a/doc/development/sidekiq/worker_attributes.md b/doc/development/sidekiq/worker_attributes.md index 3b74d5469cde6b6edffbe9f9a44c7f11fc9da842..016bf0b663412db1a46d888a327ba131f8f3d9ea 100644 --- a/doc/development/sidekiq/worker_attributes.md +++ b/doc/development/sidekiq/worker_attributes.md @@ -336,7 +336,7 @@ worker that checks if any paused jobs must be restarted. To use `pause_control`, you can: - Use one of the strategies defined in `lib/gitlab/sidekiq_middleware/pause_control/strategies/`. -- Define a custom strategy in `lib/gitlab/sidekiq_middleware/pause_control/strategies/` and add the strategy to `lib/gitlab/sidekiq_middleware/pause_control/strategies.rb`. +- Define a custom strategy in `lib/gitlab/sidekiq_middleware/pause_control/strategies/` and add the strategy to `lib/gitlab/sidekiq_middleware/pause_control.rb`. For example: diff --git a/ee/app/workers/click_house/ci_finished_builds_sync_worker.rb b/ee/app/workers/click_house/ci_finished_builds_sync_worker.rb index a0c8f9e76a5147dcb56ecb8817430b1285c20107..d1988c010c0a56ee8c4b17f089a94fdedb14f6ab 100644 --- a/ee/app/workers/click_house/ci_finished_builds_sync_worker.rb +++ b/ee/app/workers/click_house/ci_finished_builds_sync_worker.rb @@ -3,6 +3,7 @@ module ClickHouse class CiFinishedBuildsSyncWorker include ApplicationWorker + include ClickHouseWorker idempotent! data_consistency :delayed diff --git a/ee/spec/workers/click_house/ci_finished_builds_sync_worker_spec.rb b/ee/spec/workers/click_house/ci_finished_builds_sync_worker_spec.rb index 9843b284297aea924d488332d34b0c6dc2190376..fcf6f638ecf5e189114b75cce1b63aa821021e77 100644 --- a/ee/spec/workers/click_house/ci_finished_builds_sync_worker_spec.rb +++ b/ee/spec/workers/click_house/ci_finished_builds_sync_worker_spec.rb @@ -14,6 +14,12 @@ create_sync_events build1 end + specify do + expect(worker.class.click_house_worker_attrs).to match( + a_hash_including(migration_lock_ttl: ClickHouse::MigrationSupport::ExclusiveLock::DEFAULT_CLICKHOUSE_WORKER_TTL) + ) + end + include_examples 'an idempotent worker' do it 'calls CiFinishedBuildsSyncService and returns its response payload' do expect(worker).to receive(:log_extra_metadata_on_done) diff --git a/lib/click_house/migration_support/errors.rb b/lib/click_house/migration_support/errors.rb new file mode 100644 index 0000000000000000000000000000000000000000..f8c6e5a94e06ef8e51a66967bf4e6ebd9916210c --- /dev/null +++ b/lib/click_house/migration_support/errors.rb @@ -0,0 +1,57 @@ +# frozen_string_literal: true + +module ClickHouse + module MigrationSupport + module Errors + class Base < StandardError + def initialize(message = nil) + message = "\n\n#{message}\n\n" if message + super + end + end + + class IllegalMigrationNameError < Base + def initialize(name = nil) + if name + super("Illegal name for migration file: #{name}\n\t(only lower case letters, numbers, and '_' allowed).") + else + super('Illegal name for migration.') + end + end + end + + IrreversibleMigration = Class.new(Base) + LockError = Class.new(Base) + + class DuplicateMigrationVersionError < Base + def initialize(version = nil) + if version + super("Multiple migrations have the version number #{version}.") + else + super('Duplicate migration version error.') + end + end + end + + class DuplicateMigrationNameError < Base + def initialize(name = nil) + if name + super("Multiple migrations have the name #{name}.") + else + super('Duplicate migration name.') + end + end + end + + class UnknownMigrationVersionError < Base + def initialize(version = nil) + if version + super("No migration with version number #{version}.") + else + super('Unknown migration version.') + end + end + end + end + end +end diff --git a/lib/click_house/migration_support/exclusive_lock.rb b/lib/click_house/migration_support/exclusive_lock.rb new file mode 100644 index 0000000000000000000000000000000000000000..e06ff4c0306fced5b18356f446846508f09778df --- /dev/null +++ b/lib/click_house/migration_support/exclusive_lock.rb @@ -0,0 +1,78 @@ +# frozen_string_literal: true + +module ClickHouse + module MigrationSupport + class ExclusiveLock + MIGRATION_LEASE_KEY = 'click_house:migrations' + MIGRATION_RETRY_DELAY = ->(num) { 0.2.seconds * (num**2) } + MIGRATION_LOCK_DURATION = 1.hour + + ACTIVE_WORKERS_REDIS_KEY = 'click_house:workers:active_workers' + DEFAULT_CLICKHOUSE_WORKER_TTL = 30.minutes + WORKERS_WAIT_SLEEP = 5.seconds + + class << self + include ::Gitlab::ExclusiveLeaseHelpers + + def register_running_worker(worker_class, worker_id) + ttl = worker_class.click_house_worker_attrs[:migration_lock_ttl].from_now.utc + + Gitlab::Redis::ClusterSharedState.with do |redis| + redis.zadd(ACTIVE_WORKERS_REDIS_KEY, ttl.to_i, worker_id, gt: true) + + yield + ensure + redis.zrem(ACTIVE_WORKERS_REDIS_KEY, worker_id) + end + end + + def execute_migration + in_lock(MIGRATION_LEASE_KEY, ttl: MIGRATION_LOCK_DURATION, retries: 5, sleep_sec: MIGRATION_RETRY_DELAY) do + wait_until_workers_inactive(DEFAULT_CLICKHOUSE_WORKER_TTL.from_now) + + yield + end + rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError => e + raise ClickHouse::MigrationSupport::Errors::LockError, e.message + end + + def pause_workers? + Gitlab::ExclusiveLease.new(MIGRATION_LEASE_KEY, timeout: 0).exists? + end + + def active_sidekiq_workers? + Gitlab::Redis::ClusterSharedState.with do |redis| + min = Time.now.utc.to_i + + # expire keys in the past + redis.zremrangebyscore(ACTIVE_WORKERS_REDIS_KEY, 0, "(#{min}") + # Return if any workers are registered with a future expiry date + redis.zrange(ACTIVE_WORKERS_REDIS_KEY, min, '+inf', by_score: true, limit: [0, 1]).any? + end + end + + def wait_until_workers_inactive(worker_wait_ttl) + # Wait until the collection in ClickHouseWorker::CLICKHOUSE_ACTIVE_WORKERS_KEY is empty, + # before continuing migration. + workers_active = true + + loop do + return if Feature.disabled?(:wait_for_clickhouse_workers_during_migration) + + workers_active = active_sidekiq_workers? + break unless workers_active + break if Time.current >= worker_wait_ttl + + sleep WORKERS_WAIT_SLEEP.to_i + end + + return unless workers_active + + raise ClickHouse::MigrationSupport::Errors::LockError, 'Timed out waiting for active workers' + end + end + + private_class_method :wait_until_workers_inactive + end + end +end diff --git a/lib/click_house/migration_support/migration_context.rb b/lib/click_house/migration_support/migration_context.rb index 82cb287ba66f86c0bfe98f3c76a2df12d776c4ee..2f2c44d4ec723c2090d4f11876fa8e5fcdfe0ce2 100644 --- a/lib/click_house/migration_support/migration_context.rb +++ b/lib/click_house/migration_support/migration_context.rb @@ -45,7 +45,7 @@ def migrations migrations = migration_files.map do |file| version, name, scope = parse_migration_filename(file) - raise ClickHouse::MigrationSupport::IllegalMigrationNameError, file unless version + raise ClickHouse::MigrationSupport::Errors::IllegalMigrationNameError, file unless version version = version.to_i name = name.camelize diff --git a/lib/click_house/migration_support/migration_error.rb b/lib/click_house/migration_support/migration_error.rb deleted file mode 100644 index c8046c0d5a87685a7e086b2c2bcb83d8380a909a..0000000000000000000000000000000000000000 --- a/lib/click_house/migration_support/migration_error.rb +++ /dev/null @@ -1,55 +0,0 @@ -# frozen_string_literal: true - -module ClickHouse - module MigrationSupport - class MigrationError < StandardError - def initialize(message = nil) - message = "\n\n#{message}\n\n" if message - super - end - end - - class IllegalMigrationNameError < MigrationError - def initialize(name = nil) - if name - super("Illegal name for migration file: #{name}\n\t(only lower case letters, numbers, and '_' allowed).") - else - super('Illegal name for migration.') - end - end - end - - IrreversibleMigration = Class.new(MigrationError) - LockError = Class.new(MigrationError) - - class DuplicateMigrationVersionError < MigrationError - def initialize(version = nil) - if version - super("Multiple migrations have the version number #{version}.") - else - super('Duplicate migration version error.') - end - end - end - - class DuplicateMigrationNameError < MigrationError - def initialize(name = nil) - if name - super("Multiple migrations have the name #{name}.") - else - super('Duplicate migration name.') - end - end - end - - class UnknownMigrationVersionError < MigrationError - def initialize(version = nil) - if version - super("No migration with version number #{version}.") - else - super('Unknown migration version.') - end - end - end - end -end diff --git a/lib/click_house/migration_support/migrator.rb b/lib/click_house/migration_support/migrator.rb index 8ad7a189e73a0e3dad30ace1f28e98f1d96dc4fa..00c42cb8b05b04afe76ce3164b37782af324bb87 100644 --- a/lib/click_house/migration_support/migrator.rb +++ b/lib/click_house/migration_support/migrator.rb @@ -3,18 +3,12 @@ module ClickHouse module MigrationSupport class Migrator - include ::Gitlab::ExclusiveLeaseHelpers - class << self attr_accessor :migrations_paths end attr_accessor :logger - LEASE_KEY = 'click_house:migrations' - RETRY_DELAY = ->(num) { 0.2.seconds * (num**2) } - LOCK_DURATION = 1.hour - self.migrations_paths = ["db/click_house/migrate"] def initialize( @@ -42,11 +36,9 @@ def current_migration alias_method :current, :current_migration def migrate - in_lock(LEASE_KEY, ttl: LOCK_DURATION, retries: 5, sleep_sec: RETRY_DELAY) do + ClickHouse::MigrationSupport::ExclusiveLock.execute_migration do migrate_without_lock end - rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError => e - raise ClickHouse::MigrationSupport::LockError, e.message end def runnable @@ -92,18 +84,9 @@ def ensure_schema_migration_table(database) @schema_migration.create_table(database) end - # Used for running a specific migration. - def run_without_lock - migration = migrations.detect { |m| m.version == @target_version } - - raise ClickHouse::MigrationSupport::UnknownMigrationVersionError, @target_version if migration.nil? - - execute_migration(migration) - end - # Used for running multiple migrations up to or down to a certain value. def migrate_without_lock - raise ClickHouse::MigrationSupport::UnknownMigrationVersionError, @target_version if invalid_target? + raise ClickHouse::MigrationSupport::Errors::UnknownMigrationVersionError, @target_version if invalid_target? runnable.each(&method(:execute_migration)) # rubocop: disable Performance/MethodObjectAsBlock -- Execute through proxy end @@ -149,10 +132,10 @@ def start def validate(migrations) name, = migrations.group_by(&:name).find { |_, v| v.length > 1 } - raise ClickHouse::MigrationSupport::DuplicateMigrationNameError, name if name + raise ClickHouse::MigrationSupport::Errors::DuplicateMigrationNameError, name if name version, = migrations.group_by(&:version).find { |_, v| v.length > 1 } - raise ClickHouse::MigrationSupport::DuplicateMigrationVersionError, version if version + raise ClickHouse::MigrationSupport::Errors::DuplicateMigrationVersionError, version if version end def record_version_state_after_migrating(database, version) diff --git a/lib/click_house/migration_support/sidekiq_middleware.rb b/lib/click_house/migration_support/sidekiq_middleware.rb new file mode 100644 index 0000000000000000000000000000000000000000..e4e6c453e8dd7c5e0c03fc6178901019dee33e39 --- /dev/null +++ b/lib/click_house/migration_support/sidekiq_middleware.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +module ClickHouse + module MigrationSupport + class SidekiqMiddleware + def call(worker, job, queue) + return yield unless register_worker?(worker.class) + + ::ClickHouse::MigrationSupport::ExclusiveLock.register_running_worker(worker.class, worker_id(job, queue)) do + yield + end + end + + private + + def worker_id(job, queue) + [queue, job['jid']].join(':') + end + + def register_worker?(worker_class) + worker_class.respond_to?(:click_house_migration_lock) && worker_class.register_click_house_worker? + end + end + end +end diff --git a/lib/gitlab/sidekiq_middleware.rb b/lib/gitlab/sidekiq_middleware.rb index e1c155a48481387c66db7967e94567d31ccf3777..96bda86ab08649ccb208f1df608480d827325dd9 100644 --- a/lib/gitlab/sidekiq_middleware.rb +++ b/lib/gitlab/sidekiq_middleware.rb @@ -37,6 +37,7 @@ def self.server_configurator(metrics: true, arguments_logger: true, skip_jobs: t chain.add ::Gitlab::SidekiqStatus::ServerMiddleware chain.add ::Gitlab::SidekiqMiddleware::WorkerContext::Server chain.add ::Gitlab::SidekiqMiddleware::PauseControl::Server + chain.add ::ClickHouse::MigrationSupport::SidekiqMiddleware # DuplicateJobs::Server should be placed at the bottom, but before the SidekiqServerMiddleware, # so we can compare the latest WAL location against replica chain.add ::Gitlab::SidekiqMiddleware::DuplicateJobs::Server diff --git a/lib/gitlab/sidekiq_middleware/pause_control.rb b/lib/gitlab/sidekiq_middleware/pause_control.rb index 2f0fd0cc7997a436429d7f0a2c32d4ad135b22aa..8f4da7267d70e299501237070891e1a8eef3da9b 100644 --- a/lib/gitlab/sidekiq_middleware/pause_control.rb +++ b/lib/gitlab/sidekiq_middleware/pause_control.rb @@ -8,6 +8,7 @@ module PauseControl UnknownStrategyError = Class.new(StandardError) STRATEGIES = { + click_house_migration: ::Gitlab::SidekiqMiddleware::PauseControl::Strategies::ClickHouseMigration, zoekt: ::Gitlab::SidekiqMiddleware::PauseControl::Strategies::Zoekt, none: ::Gitlab::SidekiqMiddleware::PauseControl::Strategies::None }.freeze diff --git a/lib/gitlab/sidekiq_middleware/pause_control/strategies/click_house_migration.rb b/lib/gitlab/sidekiq_middleware/pause_control/strategies/click_house_migration.rb new file mode 100644 index 0000000000000000000000000000000000000000..adeb05245673e6a21c4015f86bc2a4bf107e1109 --- /dev/null +++ b/lib/gitlab/sidekiq_middleware/pause_control/strategies/click_house_migration.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +module Gitlab + module SidekiqMiddleware + module PauseControl + module Strategies + class ClickHouseMigration < Base + override :should_pause? + def should_pause? + return false unless Feature.enabled?(:pause_clickhouse_workers_during_migration) + + ClickHouse::MigrationSupport::ExclusiveLock.pause_workers? + end + end + end + end + end +end diff --git a/spec/lib/click_house/migration_support/exclusive_lock_spec.rb b/spec/lib/click_house/migration_support/exclusive_lock_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..5176cc752666addf7eaef212f137ee18d22b784b --- /dev/null +++ b/spec/lib/click_house/migration_support/exclusive_lock_spec.rb @@ -0,0 +1,140 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe ClickHouse::MigrationSupport::ExclusiveLock, feature_category: :database do + include ExclusiveLeaseHelpers + + let(:worker_class) do + # This worker will be active longer than the ClickHouse worker TTL + Class.new do + def self.name + 'TestWorker' + end + + include ::ApplicationWorker + include ::ClickHouseWorker + + def perform(*); end + end + end + + before do + stub_const('TestWorker', worker_class) + end + + describe '.register_running_worker' do + before do + TestWorker.click_house_migration_lock(10.seconds) + end + + it 'yields without arguments' do + expect { |b| described_class.register_running_worker(worker_class, 'test', &b) }.to yield_with_no_args + end + + it 'registers worker for a limited period of time', :freeze_time, :aggregate_failures do + expect(described_class.active_sidekiq_workers?).to eq false + + described_class.register_running_worker(worker_class, 'test') do + expect(described_class.active_sidekiq_workers?).to eq true + travel 9.seconds + expect(described_class.active_sidekiq_workers?).to eq true + travel 2.seconds + expect(described_class.active_sidekiq_workers?).to eq false + end + end + end + + describe '.pause_workers?' do + subject(:pause_workers?) { described_class.pause_workers? } + + it { is_expected.to eq false } + + context 'with lock taken' do + let!(:lease) { stub_exclusive_lease_taken(described_class::MIGRATION_LEASE_KEY) } + + it { is_expected.to eq true } + end + end + + describe '.execute_migration' do + it 'yields without raising error' do + expect { |b| described_class.execute_migration(&b) }.to yield_with_no_args + end + + context 'when migration lock is taken' do + let!(:lease) { stub_exclusive_lease_taken(described_class::MIGRATION_LEASE_KEY) } + + it 'raises LockError' do + expect do + expect { |b| described_class.execute_migration(&b) }.not_to yield_control + end.to raise_error ::ClickHouse::MigrationSupport::Errors::LockError + end + end + + context 'when ClickHouse workers are still active', :freeze_time do + let(:sleep_time) { described_class::WORKERS_WAIT_SLEEP } + let!(:started_at) { Time.current } + + def migration + expect { |b| described_class.execute_migration(&b) }.to yield_with_no_args + end + + around do |example| + described_class.register_running_worker(worker_class, anything) do + example.run + end + end + + it 'waits for workers and raises ClickHouse::MigrationSupport::LockError if workers do not stop in time' do + expect(described_class).to receive(:sleep).at_least(1).with(sleep_time) { travel(sleep_time) } + + expect { migration }.to raise_error(ClickHouse::MigrationSupport::Errors::LockError, + /Timed out waiting for active workers/) + expect(Time.current - started_at).to eq(described_class::DEFAULT_CLICKHOUSE_WORKER_TTL) + end + + context 'when wait_for_clickhouse_workers_during_migration FF is disabled' do + before do + stub_feature_flags(wait_for_clickhouse_workers_during_migration: false) + end + + it 'runs migration without waiting for workers' do + expect { migration }.not_to raise_error + expect(Time.current - started_at).to eq(0.0) + end + end + + it 'ignores expired workers' do + travel(described_class::DEFAULT_CLICKHOUSE_WORKER_TTL + 1.second) + + migration + end + + context 'when worker registration is almost expiring' do + let(:worker_class) do + # This worker will be active for less than the ClickHouse worker TTL + Class.new do + def self.name + 'TestWorker' + end + + include ::ApplicationWorker + include ::ClickHouseWorker + + click_house_migration_lock( + ClickHouse::MigrationSupport::ExclusiveLock::DEFAULT_CLICKHOUSE_WORKER_TTL - 1.second) + + def perform(*); end + end + end + + it 'completes migration' do + expect(described_class).to receive(:sleep).at_least(1).with(sleep_time) { travel(sleep_time) } + + expect { migration }.not_to raise_error + end + end + end + end +end diff --git a/spec/click_house/migration_support/migration_context_spec.rb b/spec/lib/click_house/migration_support/migration_context_spec.rb similarity index 93% rename from spec/click_house/migration_support/migration_context_spec.rb rename to spec/lib/click_house/migration_support/migration_context_spec.rb index 98ed0a69c3a6b07eb621905d0674f332b37cc112..65aa6b56aa7d7ce30f181d1581382c61db4efd59 100644 --- a/spec/click_house/migration_support/migration_context_spec.rb +++ b/spec/lib/click_house/migration_support/migration_context_spec.rb @@ -2,8 +2,6 @@ require 'spec_helper' -require_relative '../../../lib/click_house/migration_support/migration_error' - RSpec.describe ClickHouse::MigrationSupport::MigrationContext, click_house: :without_migrations, feature_category: :database do include ClickHouseTestHelpers @@ -32,7 +30,15 @@ let(:lease_key) { 'click_house:migrations' } let(:lease_timeout) { 1.hour } + it 'executes migration through ClickHouse::MigrationSupport::ExclusiveLock.execute_migration' do + expect(ClickHouse::MigrationSupport::ExclusiveLock).to receive(:execute_migration) + + # Test that not running execute_migration will not execute migrations + expect { migration }.not_to change { active_schema_migrations_count } + end + it 'creates a table' do + expect(ClickHouse::MigrationSupport::ExclusiveLock).to receive(:execute_migration).and_call_original expect_to_obtain_exclusive_lease(lease_key, timeout: lease_timeout) expect { migration }.to change { active_schema_migrations_count }.from(0).to(1) @@ -47,17 +53,13 @@ context 'when a migration is already running' do let(:migration_name) { 'create_some_table' } - let(:migration_klass) do - require(File.expand_path("#{migrations_dir}/1_#{migration_name}")) - migration_name.camelize.constantize - end before do stub_exclusive_lease_taken(lease_key) end it 'raises error after timeout when migration is executing concurrently' do - expect { migration }.to raise_error(ClickHouse::MigrationSupport::LockError) + expect { migration }.to raise_error(ClickHouse::MigrationSupport::Errors::LockError) .and not_change { active_schema_migrations_count } end end @@ -182,7 +184,7 @@ def clone_database_configuration(source_db_identifier, target_db_identifier, tar let(:migrations_dirname) { 'plain_table_creation' } it 'raises UnknownMigrationVersionError' do - expect { migration }.to raise_error ClickHouse::MigrationSupport::UnknownMigrationVersionError + expect { migration }.to raise_error ClickHouse::MigrationSupport::Errors::UnknownMigrationVersionError expect(active_schema_migrations_count).to eq 0 end @@ -192,7 +194,7 @@ def clone_database_configuration(source_db_identifier, target_db_identifier, tar let(:migrations_dirname) { 'duplicate_name' } it 'raises DuplicateMigrationNameError' do - expect { migration }.to raise_error ClickHouse::MigrationSupport::DuplicateMigrationNameError + expect { migration }.to raise_error ClickHouse::MigrationSupport::Errors::DuplicateMigrationNameError expect(active_schema_migrations_count).to eq 0 end @@ -202,7 +204,7 @@ def clone_database_configuration(source_db_identifier, target_db_identifier, tar let(:migrations_dirname) { 'duplicate_version' } it 'raises DuplicateMigrationVersionError' do - expect { migration }.to raise_error ClickHouse::MigrationSupport::DuplicateMigrationVersionError + expect { migration }.to raise_error ClickHouse::MigrationSupport::Errors::DuplicateMigrationVersionError expect(active_schema_migrations_count).to eq 0 end @@ -272,7 +274,7 @@ def clone_database_configuration(source_db_identifier, target_db_identifier, tar let(:migrations_dirname) { 'plain_table_creation' } it 'raises UnknownMigrationVersionError' do - expect { migration }.to raise_error ClickHouse::MigrationSupport::UnknownMigrationVersionError + expect { migration }.to raise_error ClickHouse::MigrationSupport::Errors::UnknownMigrationVersionError expect(active_schema_migrations_count).to eq 1 end diff --git a/spec/lib/click_house/migration_support/sidekiq_middleware_spec.rb b/spec/lib/click_house/migration_support/sidekiq_middleware_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..03c9edfabaa0f13a5ca40341ae7fb0fa62cfa7a0 --- /dev/null +++ b/spec/lib/click_house/migration_support/sidekiq_middleware_spec.rb @@ -0,0 +1,61 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe ClickHouse::MigrationSupport::SidekiqMiddleware, feature_category: :database do + let(:worker_with_click_house_worker) do + Class.new do + def self.name + 'TestWorker' + end + include ApplicationWorker + include ClickHouseWorker + end + end + + let(:worker_without_click_house_worker) do + Class.new do + def self.name + 'TestWorkerWithoutClickHouseWorker' + end + include ApplicationWorker + end + end + + subject(:middleware) { described_class.new } + + before do + stub_const('TestWorker', worker_with_click_house_worker) + stub_const('TestWorkerWithoutClickHouseWorker', worker_without_click_house_worker) + end + + describe '#call' do + let(:worker) { worker_class.new } + let(:job) { { 'jid' => 123, 'class' => worker_class.name } } + let(:queue) { 'test_queue' } + + context 'when worker does not include ClickHouseWorker' do + let(:worker_class) { worker_without_click_house_worker } + + it 'yields control without registering running worker' do + expect(ClickHouse::MigrationSupport::ExclusiveLock).not_to receive(:register_running_worker) + expect { |b| middleware.call(worker, job, queue, &b) }.to yield_with_no_args + end + end + + context 'when worker includes ClickHouseWorker' do + let(:worker_class) { worker_with_click_house_worker } + + it 'registers running worker and yields control' do + expect(ClickHouse::MigrationSupport::ExclusiveLock) + .to receive(:register_running_worker) + .with(worker_class, 'test_queue:123') + .and_wrap_original do |method, worker_class, worker_id| + expect { |b| method.call(worker_class, worker_id, &b) }.to yield_with_no_args + end + + middleware.call(worker, job, queue) + end + end + end +end diff --git a/spec/lib/gitlab/sidekiq_middleware/pause_control/strategies/click_house_migration_spec.rb b/spec/lib/gitlab/sidekiq_middleware/pause_control/strategies/click_house_migration_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..470c860fb605acc63445abaa76357a1f0193863d --- /dev/null +++ b/spec/lib/gitlab/sidekiq_middleware/pause_control/strategies/click_house_migration_spec.rb @@ -0,0 +1,66 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Gitlab::SidekiqMiddleware::PauseControl::Strategies::ClickHouseMigration, feature_category: :database do + let(:worker_class) do + Class.new do + def self.name + 'TestPauseWorker' + end + + include ::ApplicationWorker + include ::ClickHouseWorker + + def perform(*); end + end + end + + before do + stub_const('TestPauseWorker', worker_class) + end + + describe '#call' do + include Gitlab::ExclusiveLeaseHelpers + + shared_examples 'a worker being executed' do + it 'schedules the job' do + expect(Gitlab::SidekiqMiddleware::PauseControl::PauseControlService).not_to receive(:add_to_waiting_queue!) + + worker_class.perform_async('args1') + + expect(worker_class.jobs.count).to eq(1) + end + end + + context 'when lock is not taken' do + it_behaves_like 'a worker being executed' + end + + context 'when lock is taken' do + include ExclusiveLeaseHelpers + + around do |example| + ClickHouse::MigrationSupport::ExclusiveLock.execute_migration do + example.run + end + end + + it 'does not schedule the job' do + expect(Gitlab::SidekiqMiddleware::PauseControl::PauseControlService).to receive(:add_to_waiting_queue!).once + + worker_class.perform_async('args1') + + expect(worker_class.jobs.count).to eq(0) + end + + context 'when pause_clickhouse_workers_during_migration FF is disabled' do + before do + stub_feature_flags(pause_clickhouse_workers_during_migration: false) + end + + it_behaves_like 'a worker being executed' + end + end + end +end diff --git a/spec/lib/gitlab/sidekiq_middleware/pause_control_spec.rb b/spec/lib/gitlab/sidekiq_middleware/pause_control_spec.rb index a0cce0f61a0c7a898f4b541f4fe7be7d2a0b5217..2cb98b43051cb8a307f5bde34aa2126b1f07fd8a 100644 --- a/spec/lib/gitlab/sidekiq_middleware/pause_control_spec.rb +++ b/spec/lib/gitlab/sidekiq_middleware/pause_control_spec.rb @@ -1,19 +1,23 @@ # frozen_string_literal: true require 'fast_spec_helper' +require 'rspec-parameterized' RSpec.describe Gitlab::SidekiqMiddleware::PauseControl, feature_category: :global_search do describe '.for' do - it 'returns the right class for `zoekt`' do - expect(described_class.for(:zoekt)).to eq(::Gitlab::SidekiqMiddleware::PauseControl::Strategies::Zoekt) - end + using RSpec::Parameterized::TableSyntax - it 'returns the right class for `none`' do - expect(described_class.for(:none)).to eq(::Gitlab::SidekiqMiddleware::PauseControl::Strategies::None) + where(:strategy_name, :expected_class) do + :none | ::Gitlab::SidekiqMiddleware::PauseControl::Strategies::None + :unknown | ::Gitlab::SidekiqMiddleware::PauseControl::Strategies::None + :click_house_migration | ::Gitlab::SidekiqMiddleware::PauseControl::Strategies::ClickHouseMigration + :zoekt | ::Gitlab::SidekiqMiddleware::PauseControl::Strategies::Zoekt end - it 'returns nil when passing an unknown key' do - expect(described_class.for(:unknown)).to eq(::Gitlab::SidekiqMiddleware::PauseControl::Strategies::None) + with_them do + it 'returns the right class' do + expect(described_class.for(strategy_name)).to eq(expected_class) + end end end end diff --git a/spec/lib/gitlab/sidekiq_middleware_spec.rb b/spec/lib/gitlab/sidekiq_middleware_spec.rb index 5a38d1b77505752afeb9a515b8ffec5854efbd32..a5c6df5e9d5ada65a00ee3a25409ef682920a178 100644 --- a/spec/lib/gitlab/sidekiq_middleware_spec.rb +++ b/spec/lib/gitlab/sidekiq_middleware_spec.rb @@ -3,7 +3,7 @@ require 'spec_helper' require 'sidekiq/testing' -RSpec.describe Gitlab::SidekiqMiddleware do +RSpec.describe Gitlab::SidekiqMiddleware, feature_category: :shared do let(:job_args) { [0.01] } let(:disabled_sidekiq_middlewares) { [] } let(:chain) { Sidekiq::Middleware::Chain.new(Sidekiq) } @@ -33,6 +33,7 @@ def perform(*args) configurator.call(chain) stub_feature_flags("drop_sidekiq_jobs_#{worker_class.name}": false) # not dropping the job end + it "passes through the right middlewares", :aggregate_failures do enabled_sidekiq_middlewares.each do |middleware| expect_next_instances_of(middleware, 1, true) do |middleware_instance| @@ -68,6 +69,7 @@ def perform(*args) ::Gitlab::SidekiqVersioning::Middleware, ::Gitlab::SidekiqStatus::ServerMiddleware, ::Gitlab::SidekiqMiddleware::WorkerContext::Server, + ::ClickHouse::MigrationSupport::SidekiqMiddleware, ::Gitlab::SidekiqMiddleware::DuplicateJobs::Server, ::Gitlab::Database::LoadBalancing::SidekiqServerMiddleware, ::Gitlab::SidekiqMiddleware::SkipJobs diff --git a/spec/tooling/quality/test_level_spec.rb b/spec/tooling/quality/test_level_spec.rb index d7d04015b48060630c800e7a3809a66591a8b275..6ccd2e46f7bb327ddcd4305d41d3fdc0dac9e885 100644 --- a/spec/tooling/quality/test_level_spec.rb +++ b/spec/tooling/quality/test_level_spec.rb @@ -46,7 +46,7 @@ context 'when level is unit' do it 'returns a pattern' do expect(subject.pattern(:unit)) - .to eq("spec/{bin,channels,click_house,components,config,contracts,db,dependencies,elastic,elastic_integration,experiments,factories,finders,frontend,graphql,haml_lint,helpers,initializers,lib,metrics_server,models,policies,presenters,rack_servers,replicators,routing,rubocop,scripts,serializers,services,sidekiq,sidekiq_cluster,spam,support_specs,tasks,uploaders,validators,views,workers,tooling}{,/**/}*_spec.rb") + .to eq("spec/{bin,channels,components,config,contracts,db,dependencies,elastic,elastic_integration,experiments,factories,finders,frontend,graphql,haml_lint,helpers,initializers,lib,metrics_server,models,policies,presenters,rack_servers,replicators,routing,rubocop,scripts,serializers,services,sidekiq,sidekiq_cluster,spam,support_specs,tasks,uploaders,validators,views,workers,tooling}{,/**/}*_spec.rb") end end @@ -121,7 +121,7 @@ context 'when level is unit' do it 'returns a regexp' do expect(subject.regexp(:unit)) - .to eq(%r{spec/(bin|channels|click_house|components|config|contracts|db|dependencies|elastic|elastic_integration|experiments|factories|finders|frontend|graphql|haml_lint|helpers|initializers|lib|metrics_server|models|policies|presenters|rack_servers|replicators|routing|rubocop|scripts|serializers|services|sidekiq|sidekiq_cluster|spam|support_specs|tasks|uploaders|validators|views|workers|tooling)/}) + .to eq(%r{spec/(bin|channels|components|config|contracts|db|dependencies|elastic|elastic_integration|experiments|factories|finders|frontend|graphql|haml_lint|helpers|initializers|lib|metrics_server|models|policies|presenters|rack_servers|replicators|routing|rubocop|scripts|serializers|services|sidekiq|sidekiq_cluster|spam|support_specs|tasks|uploaders|validators|views|workers|tooling)/}) end end diff --git a/spec/workers/click_house/events_sync_worker_spec.rb b/spec/workers/click_house/events_sync_worker_spec.rb index 28a885629a4f789eadc48c0ce8189129f89004d4..da74e5e376dcb58c2755cc36d60020c9eb1ea8a5 100644 --- a/spec/workers/click_house/events_sync_worker_spec.rb +++ b/spec/workers/click_house/events_sync_worker_spec.rb @@ -5,6 +5,12 @@ RSpec.describe ClickHouse::EventsSyncWorker, feature_category: :value_stream_management do let(:worker) { described_class.new } + specify do + expect(worker.class.click_house_worker_attrs).to match( + a_hash_including(migration_lock_ttl: ClickHouse::MigrationSupport::ExclusiveLock::DEFAULT_CLICKHOUSE_WORKER_TTL) + ) + end + it_behaves_like 'an idempotent worker' do context 'when the event_sync_worker_for_click_house feature flag is on', :click_house do before do diff --git a/spec/workers/concerns/click_house_worker_spec.rb b/spec/workers/concerns/click_house_worker_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..cb8bf9c75789d11afcc8549a292ac0446385b582 --- /dev/null +++ b/spec/workers/concerns/click_house_worker_spec.rb @@ -0,0 +1,88 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe ClickHouseWorker, feature_category: :database do + let(:worker) do + Class.new do + def self.name + 'DummyWorker' + end + + include ApplicationWorker + include ClickHouseWorker + + def perform + AnotherWorker.perform_async('identifier') + end + end + end + + let(:another_worker) do + Class.new do + def self.name + 'AnotherWorker' + end + + include ApplicationWorker + end + end + + before do + stub_const('DummyWorker', worker) + stub_const('AnotherWorker', another_worker) + end + + describe '.register_click_house_worker?' do + subject(:register_click_house_worker?) { worker.register_click_house_worker? } + + context 'when click_house_migration_lock is set' do + before do + worker.click_house_migration_lock(1.minute) + end + + it { is_expected.to be(true) } + end + + context 'when click_house_migration_lock is not set' do + it { is_expected.to be(true) } + end + + context 'when worker does not include module' do + it { expect(another_worker).not_to respond_to(:register_click_house_worker?) } + end + end + + describe '.click_house_worker_attrs' do + subject(:click_house_worker_attrs) { worker.click_house_migration_lock(ttl) } + + let(:ttl) { 1.minute } + + it { expect { click_house_worker_attrs }.not_to raise_error } + it { is_expected.to match(a_hash_including(migration_lock_ttl: 60.seconds)) } + + context 'with invalid ttl' do + let(:ttl) { {} } + + it 'raises exception' do + expect { click_house_worker_attrs }.to raise_error(ArgumentError) + end + end + end + + it 'registers ClickHouse worker' do + expect(worker.register_click_house_worker?).to be_truthy + expect(another_worker).not_to respond_to(:register_click_house_worker?) + end + + it 'sets default TTL for worker registration' do + expect(worker.click_house_worker_attrs).to match( + a_hash_including(migration_lock_ttl: ClickHouse::MigrationSupport::ExclusiveLock::DEFAULT_CLICKHOUSE_WORKER_TTL) + ) + end + + it 'registers worker to pause on ClickHouse migrations' do + expect(worker.get_pause_control).to eq(:click_house_migration) + expect(another_worker.get_pause_control).to be_nil + end +end diff --git a/spec/workers/concerns/worker_attributes_spec.rb b/spec/workers/concerns/worker_attributes_spec.rb index 767a55162fbf3a1757090b1ab4b907e1a3d0a565..1c9d9a5a1ad4fe903dc99e2dcd84d336cf62d055 100644 --- a/spec/workers/concerns/worker_attributes_spec.rb +++ b/spec/workers/concerns/worker_attributes_spec.rb @@ -75,7 +75,7 @@ def self.name describe '.data_consistency' do context 'with invalid data_consistency' do - it 'raise exception' do + it 'raises exception' do expect { worker.data_consistency(:invalid) } .to raise_error('Invalid data consistency: invalid') end diff --git a/tooling/quality/test_level.rb b/tooling/quality/test_level.rb index 050eb4f4dafd646b0b299565281fddb5be40ad0c..20e00763f65438fb3a167c5abe2618ead8baa5fb 100644 --- a/tooling/quality/test_level.rb +++ b/tooling/quality/test_level.rb @@ -18,7 +18,6 @@ class TestLevel unit: %w[ bin channels - click_house components config contracts