diff --git a/db/post_migrate/20240124043507_migrate_sidekiq_queued_and_future_jobs.rb b/db/post_migrate/20240124043507_migrate_sidekiq_queued_and_future_jobs.rb new file mode 100644 index 0000000000000000000000000000000000000000..a657bd9689a6afac01fa531e43813bd2b4321f5f --- /dev/null +++ b/db/post_migrate/20240124043507_migrate_sidekiq_queued_and_future_jobs.rb @@ -0,0 +1,140 @@ +# frozen_string_literal: true + +class MigrateSidekiqQueuedAndFutureJobs < Gitlab::Database::Migration[2.2] + milestone '16.10' + + class SidekiqMigrateJobs + LOG_FREQUENCY_QUEUES = 10 + LOG_FREQUENCY = 1000 + + attr_reader :logger, :mappings + + # mappings is a hash of WorkerClassName => target_queue_name + def initialize(mappings, logger: nil) + @mappings = mappings + @logger = logger + end + + # Migrates jobs from queues that are outside the mappings + # rubocop: disable Cop/SidekiqRedisCall -- for migration + def migrate_queues + routing_rules_queues = mappings.values.uniq + logger&.info("List of queues based on routing rules: #{routing_rules_queues}") + Sidekiq.redis do |conn| + conn.scan("MATCH", "queue:*") do |key| + next unless conn.type(key) == 'list' + + queue_from = key.split(':', 2).last + next if routing_rules_queues.include?(queue_from) + + migrate_queue(conn, queue_from) + end + end + logger&.info("Done migrating queued jobs.") + end + + # Migrate jobs in SortedSets, i.e. scheduled and retry sets. + def migrate_set(sidekiq_set) + scanned = 0 + migrated = 0 + + estimated_size = Sidekiq.redis { |c| c.zcard(sidekiq_set) } + logger&.info("Processing #{sidekiq_set} set. Estimated size: #{estimated_size}.") + + Sidekiq.redis do |c| + c.zscan(sidekiq_set) do |job, score| + if scanned > 0 && scanned % LOG_FREQUENCY == 0 + logger&.info("In progress. Scanned records: #{scanned}. Migrated records: #{migrated}.") + end + + scanned += 1 + + job_hash = Gitlab::Json.load(job) + destination_queue = mappings[job_hash['class']] + + unless mappings.has_key?(job_hash['class']) + logger&.info("Skipping job from #{job_hash['class']}. No destination queue found.") + next + end + + next if job_hash['queue'] == destination_queue + + job_hash['queue'] = destination_queue + + migrated += migrate_job_in_set(c, sidekiq_set, job, score, job_hash) + end + end + + logger&.info("Done. Scanned records: #{scanned}. Migrated records: #{migrated}.") + + { + scanned: scanned, + migrated: migrated + } + end + # rubocop: enable Cop/SidekiqRedisCall + + def migrate_job_in_set(conn, sidekiq_set, job, score, job_hash) + removed = conn.zrem(sidekiq_set, job) + + conn.zadd(sidekiq_set, score, Gitlab::Json.dump(job_hash)) if removed > 0 + + removed + end + + private + + def migrate_queue(conn, queue_from) + logger&.info("Migrating #{queue_from} queue") + + migrated = 0 + while queue_length(conn, queue_from) > 0 + begin + if migrated >= 0 && migrated % LOG_FREQUENCY_QUEUES == 0 + logger&.info("Migrating from #{queue_from}. Total: #{queue_length(conn, + queue_from)}. Migrated: #{migrated}.") + end + + job = conn.rpop("queue:#{queue_from}") + job_hash = update_job_hash(job) + next unless job_hash + + conn.lpush("queue:#{job_hash['queue']}", Sidekiq.dump_json(job_hash)) + migrated += 1 + rescue JSON::ParserError + logger&.error("Unmarshal JSON payload from SidekiqMigrateJobs failed. Job: #{job}") + next + end + end + + logger&.info("Finished migrating #{queue_from} queue") + end + + def update_job_hash(job) + job_hash = Sidekiq.load_json(job) + return unless mappings.has_key?(job_hash['class']) + + destination_queue = mappings[job_hash['class']] + job_hash['queue'] = destination_queue + job_hash + end + + def queue_length(conn, queue_name) + conn.llen("queue:#{queue_name}") + end + end + + def up + return if Gitlab.com? + + mappings = Gitlab::SidekiqConfig.worker_queue_mappings + logger = ::Gitlab::BackgroundMigration::Logger.build + migrator = SidekiqMigrateJobs.new(mappings, logger: logger) + migrator.migrate_queues + %w[schedule retry].each { |set| migrator.migrate_set(set) } + end + + def down + # no-op + end +end diff --git a/db/schema_migrations/20240124043507 b/db/schema_migrations/20240124043507 new file mode 100644 index 0000000000000000000000000000000000000000..e3b8e57af82f9752775bad4bbbc83cedca0684e9 --- /dev/null +++ b/db/schema_migrations/20240124043507 @@ -0,0 +1 @@ +b9c539b3b6752562118241be435c16cd0371442bc039bc6b3b3cc3297dc67e77 \ No newline at end of file diff --git a/sidekiq_cluster/cli.rb b/sidekiq_cluster/cli.rb index 4ea29a1333f8f7d26e7a76e059ee7684e76b8015..ca3fb0d51eaf7240cb9527e59ed2aae81ebbcc93 100644 --- a/sidekiq_cluster/cli.rb +++ b/sidekiq_cluster/cli.rb @@ -110,14 +110,13 @@ def run(argv = ARGV) # This means all jobs go to 'default' queue and mailer jobs go to 'mailers' queue. # See config/initializers/1_settings.rb and Settings.build_sidekiq_routing_rules. # - # Now, in case queue_selector is used, we ensure all Sidekiq processes are still processing jobs - # from default and mailers queues. - # https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1491 + # We can override queue_groups to listen to just the default queues, any more additional queues + # incurs CPU overhead in Redis. if routing_rules.empty? - queue_groups.each do |queues| - queues.concat(DEFAULT_QUEUES) - queues.uniq! - end + queue_groups.map! { DEFAULT_QUEUES } + # setting min_concurrency equal to max_concurrency so that the concurrency eventually + # is set to 20 (default value) instead of based on the number of queues, which is only 2+1 in this case. + @min_concurrency = @min_concurrency == 0 ? @max_concurrency : @min_concurrency end if @list_queues diff --git a/spec/bin/sidekiq_cluster_spec.rb b/spec/bin/sidekiq_cluster_spec.rb index b36fb82c295afc7ad45eb3d3af82e262d5c2ae55..3324b9f1f3b636c7ef586f3d931562a3a0c8064b 100644 --- a/spec/bin/sidekiq_cluster_spec.rb +++ b/spec/bin/sidekiq_cluster_spec.rb @@ -12,7 +12,7 @@ context 'when selecting some queues and excluding others' do where(:args, :included, :excluded) do %w[--negate cronjob] | '-qdefault,1' | '-qcronjob,1' - %w[--queue-selector resource_boundary=cpu] | %w[-qupdate_merge_requests,1 -qdefault,1 -qmailers,1] | + %w[--queue-selector resource_boundary=cpu] | %w[-qdefault,1 -qmailers,1] | '-qauthorized_keys_worker,1' end @@ -43,7 +43,7 @@ expect(status).to be(0) expect(output).to include('bundle exec sidekiq') expect(Shellwords.split(output)).to include('-qdefault,1') - expect(Shellwords.split(output)).to include('-qcronjob:ci_archive_traces_cron,1') + expect(Shellwords.split(output)).to include('-qmailers,1') end end end diff --git a/spec/commands/sidekiq_cluster/cli_spec.rb b/spec/commands/sidekiq_cluster/cli_spec.rb index 47a428245913b0dff4c9712549b6f5b4baf4e744..f6511007a5111e247110a7ee0cbe2c47d5a4180a 100644 --- a/spec/commands/sidekiq_cluster/cli_spec.rb +++ b/spec/commands/sidekiq_cluster/cli_spec.rb @@ -58,285 +58,311 @@ end context 'with arguments' do - it 'starts the Sidekiq workers' do - expect(Gitlab::SidekiqCluster).to receive(:start) - .with([['foo'] + described_class::DEFAULT_QUEUES], default_options) - .and_return([]) - - cli.run(%w[foo]) - end + context 'with routing rules specified' do + before do + stub_config(sidekiq: { routing_rules: [['resource_boundary=cpu', 'foo']] }) + end - it 'allows the special * selector' do - worker_queues = %w[foo bar baz] + it 'starts the Sidekiq workers' do + expect(Gitlab::SidekiqCluster).to receive(:start) + .with([['foo']], default_options) + .and_return([]) - expect(Gitlab::SidekiqConfig::CliMethods) - .to receive(:worker_queues).and_return(worker_queues) + cli.run(%w[foo]) + end - expect(Gitlab::SidekiqCluster) - .to receive(:start).with([worker_queues], default_options).and_return([]) + it 'allows the special * selector' do + worker_queues = %w[foo bar baz] - cli.run(%w[*]) - end + expect(Gitlab::SidekiqConfig::CliMethods) + .to receive(:worker_queues).and_return(worker_queues) - it 'raises an error when the arguments contain newlines' do - invalid_arguments = [ - ["foo\n"], - ["foo\r"], - %W[foo b\nar] - ] + expect(Gitlab::SidekiqCluster) + .to receive(:start).with([worker_queues], default_options).and_return([]) - invalid_arguments.each do |arguments| - expect { cli.run(arguments) }.to raise_error(described_class::CommandError) + cli.run(%w[*]) end - end - context 'with --negate flag' do - it 'starts Sidekiq workers for all queues in all_queues.yml except the ones in argv' do - expect(Gitlab::SidekiqConfig::CliMethods).to receive(:worker_queues).and_return(['baz']) - expect(Gitlab::SidekiqCluster).to receive(:start) - .with([['baz'] + described_class::DEFAULT_QUEUES], default_options) - .and_return([]) + it 'raises an error when the arguments contain newlines' do + invalid_arguments = [ + ["foo\n"], + ["foo\r"], + %W[foo b\nar] + ] - cli.run(%w[foo -n]) + invalid_arguments.each do |arguments| + expect { cli.run(arguments) }.to raise_error(described_class::CommandError) + end end - end - context 'with --max-concurrency flag' do - it 'starts Sidekiq workers for specified queues with a max concurrency' do - expected_queues = [%w[foo bar baz], %w[solo]].each { |queues| queues.concat(described_class::DEFAULT_QUEUES) } - expect(Gitlab::SidekiqConfig::CliMethods).to receive(:worker_queues).and_return(%w[foo bar baz]) - expect(Gitlab::SidekiqCluster).to receive(:start) - .with(expected_queues, default_options.merge(max_concurrency: 2)) - .and_return([]) + context 'with --negate flag' do + it 'starts Sidekiq workers for all queues in all_queues.yml except the ones in argv' do + expect(Gitlab::SidekiqConfig::CliMethods).to receive(:worker_queues).and_return(['baz']) + expect(Gitlab::SidekiqCluster).to receive(:start) + .with([['baz']], default_options) + .and_return([]) - cli.run(%w[foo,bar,baz solo -m 2]) + cli.run(%w[foo -n]) + end end - end - context 'with --min-concurrency flag' do - it 'starts Sidekiq workers for specified queues with a min concurrency' do - expected_queues = [%w[foo bar baz], %w[solo]].each { |queues| queues.concat(described_class::DEFAULT_QUEUES) } - expect(Gitlab::SidekiqConfig::CliMethods).to receive(:worker_queues).and_return(%w[foo bar baz]) - expect(Gitlab::SidekiqCluster).to receive(:start) - .with(expected_queues, default_options.merge(min_concurrency: 2)) - .and_return([]) + context 'with --max-concurrency flag' do + it 'starts Sidekiq workers for specified queues with a max concurrency' do + expected_queues = [%w[foo bar baz], %w[solo]] + expect(Gitlab::SidekiqConfig::CliMethods).to receive(:worker_queues).and_return(%w[foo bar baz]) + expect(Gitlab::SidekiqCluster).to receive(:start) + .with(expected_queues, default_options.merge(max_concurrency: 2)) + .and_return([]) - cli.run(%w[foo,bar,baz solo --min-concurrency 2]) + cli.run(%w[foo,bar,baz solo -m 2]) + end end - end - context 'with --concurrency flag' do - it 'starts Sidekiq workers for specified queues with the fixed concurrency' do - expected_queues = [%w[foo bar baz], %w[solo]].each { |queues| queues.concat(described_class::DEFAULT_QUEUES) } - expect(Gitlab::SidekiqConfig::CliMethods).to receive(:worker_queues).and_return(%w[foo bar baz]) - expect(Gitlab::SidekiqCluster).to receive(:start) - .with(expected_queues, default_options.merge(concurrency: 2)) - .and_return([]) + context 'with --min-concurrency flag' do + it 'starts Sidekiq workers for specified queues with a min concurrency' do + expected_queues = [%w[foo bar baz], %w[solo]] + expect(Gitlab::SidekiqConfig::CliMethods).to receive(:worker_queues).and_return(%w[foo bar baz]) + expect(Gitlab::SidekiqCluster).to receive(:start) + .with(expected_queues, default_options.merge(min_concurrency: 2)) + .and_return([]) - cli.run(%w[foo,bar,baz solo -c 2]) + cli.run(%w[foo,bar,baz solo --min-concurrency 2]) + end end - end - context 'with --timeout flag' do - it 'when given', 'starts Sidekiq workers with given timeout' do - expect(Gitlab::SidekiqCluster).to receive(:start) - .with([['foo'] + described_class::DEFAULT_QUEUES], default_options.merge(timeout: 10)) - .and_return([]) + context 'with --concurrency flag' do + it 'starts Sidekiq workers for specified queues with the fixed concurrency' do + expected_queues = [%w[foo bar baz], %w[solo]] + expect(Gitlab::SidekiqConfig::CliMethods).to receive(:worker_queues).and_return(%w[foo bar baz]) + expect(Gitlab::SidekiqCluster).to receive(:start) + .with(expected_queues, default_options.merge(concurrency: 2)) + .and_return([]) - cli.run(%w[foo --timeout 10]) + cli.run(%w[foo,bar,baz solo -c 2]) + end end - it 'when not given', 'starts Sidekiq workers with default timeout' do - expect(Gitlab::SidekiqCluster).to receive(:start) - .with([['foo'] + described_class::DEFAULT_QUEUES], default_options.merge(timeout: - Gitlab::SidekiqCluster::DEFAULT_SOFT_TIMEOUT_SECONDS)) - .and_return([]) + context 'with --timeout flag' do + it 'when given', 'starts Sidekiq workers with given timeout' do + expect(Gitlab::SidekiqCluster).to receive(:start) + .with([['foo']], default_options.merge(timeout: 10)) + .and_return([]) - cli.run(%w[foo]) - end - end + cli.run(%w[foo --timeout 10]) + end + + it 'when not given', 'starts Sidekiq workers with default timeout' do + expect(Gitlab::SidekiqCluster).to receive(:start) + .with([['foo']], default_options.merge(timeout: + Gitlab::SidekiqCluster::DEFAULT_SOFT_TIMEOUT_SECONDS)) + .and_return([]) - context 'with --list-queues flag' do - it 'errors when given --list-queues and --dryrun' do - expect { cli.run(%w[foo --list-queues --dryrun]) }.to raise_error(described_class::CommandError) + cli.run(%w[foo]) + end end - it 'prints out a list of queues in alphabetical order' do - expected_queues = [ - 'default', - 'epics:epics_update_epics_dates', - 'epics_new_epic_issue', - 'mailers', - 'new_epic', - 'todos_destroyer:todos_destroyer_confidential_epic' - ] + context 'with --list-queues flag' do + it 'errors when given --list-queues and --dryrun' do + expect { cli.run(%w[foo --list-queues --dryrun]) }.to raise_error(described_class::CommandError) + end + + it 'prints out a list of queues in alphabetical order' do + expected_queues = [ + 'default', + 'epics:epics_update_epics_dates', + 'epics_new_epic_issue', + 'mailers', + 'new_epic', + 'todos_destroyer:todos_destroyer_confidential_epic' + ] - allow(Gitlab::SidekiqConfig::CliMethods).to receive(:query_queues).and_return(expected_queues.shuffle) + allow(Gitlab::SidekiqConfig::CliMethods).to receive(:query_queues).and_return(expected_queues.shuffle) - expect(cli).to receive(:puts).with([expected_queues]) + expect(cli).to receive(:puts).with([expected_queues]) - cli.run(%w[--queue-selector feature_category=epics --list-queues]) + cli.run(%w[--queue-selector feature_category=epics --list-queues]) + end end - end - context 'queue namespace expansion' do - it 'starts Sidekiq workers for all queues in all_queues.yml with a namespace in argv' do - expect(Gitlab::SidekiqConfig::CliMethods).to receive(:worker_queues).and_return(['cronjob:foo', 'cronjob:bar']) - expect(Gitlab::SidekiqCluster).to receive(:start) - .with([['cronjob', 'cronjob:foo', 'cronjob:bar'] + - described_class::DEFAULT_QUEUES], default_options) - .and_return([]) + context 'queue namespace expansion' do + it 'starts Sidekiq workers for all queues in all_queues.yml with a namespace in argv' do + expect(Gitlab::SidekiqConfig::CliMethods).to receive(:worker_queues).and_return(['cronjob:foo', 'cronjob:bar']) + expect(Gitlab::SidekiqCluster).to receive(:start) + .with([['cronjob', 'cronjob:foo', 'cronjob:bar']], default_options) + .and_return([]) - cli.run(%w[cronjob]) + cli.run(%w[cronjob]) + end end - end - context "with --queue-selector" do - where do - { - 'memory-bound queues' => { - query: 'resource_boundary=memory', - included_queues: %w[project_export], - excluded_queues: %w[merge] - }, - 'memory- or CPU-bound queues' => { - query: 'resource_boundary=memory,cpu', - included_queues: %w[auto_merge:auto_merge_process project_export], - excluded_queues: %w[merge] - }, - 'high urgency CI queues' => { - query: 'feature_category=continuous_integration&urgency=high', - included_queues: %w[pipeline_default:ci_drop_pipeline], - excluded_queues: %w[merge] - }, - 'CPU-bound high urgency CI queues' => { - query: 'feature_category=continuous_integration&urgency=high&resource_boundary=cpu', - included_queues: %w[pipeline_default:ci_create_downstream_pipeline], - excluded_queues: %w[pipeline_default:ci_drop_pipeline merge] - }, - 'CPU-bound high urgency non-CI queues' => { - query: 'feature_category!=continuous_integration&urgency=high&resource_boundary=cpu', - included_queues: %w[new_issue], - excluded_queues: %w[pipeline_default:ci_create_downstream_pipeline] - }, - 'CI and SCM queues' => { - query: 'feature_category=continuous_integration|feature_category=source_code_management', - included_queues: %w[pipeline_default:ci_drop_pipeline merge], - excluded_queues: %w[] + context "with --queue-selector" do + where do + { + 'memory-bound queues' => { + query: 'resource_boundary=memory', + included_queues: %w[project_export], + excluded_queues: %w[merge] + }, + 'memory- or CPU-bound queues' => { + query: 'resource_boundary=memory,cpu', + included_queues: %w[auto_merge:auto_merge_process project_export], + excluded_queues: %w[merge] + }, + 'high urgency CI queues' => { + query: 'feature_category=continuous_integration&urgency=high', + included_queues: %w[pipeline_default:ci_drop_pipeline], + excluded_queues: %w[merge] + }, + 'CPU-bound high urgency CI queues' => { + query: 'feature_category=continuous_integration&urgency=high&resource_boundary=cpu', + included_queues: %w[pipeline_default:ci_create_downstream_pipeline], + excluded_queues: %w[pipeline_default:ci_drop_pipeline merge] + }, + 'CPU-bound high urgency non-CI queues' => { + query: 'feature_category!=continuous_integration&urgency=high&resource_boundary=cpu', + included_queues: %w[new_issue], + excluded_queues: %w[pipeline_default:ci_create_downstream_pipeline] + }, + 'CI and SCM queues' => { + query: 'feature_category=continuous_integration|feature_category=source_code_management', + included_queues: %w[pipeline_default:ci_drop_pipeline merge], + excluded_queues: %w[] + } } - } - end + end - with_them do - it 'expands queues by attributes' do - expect(Gitlab::SidekiqCluster).to receive(:start) do |queues, opts| - expect(opts).to eq(default_options) - expect(queues.first).to include(*included_queues) - expect(queues.first).not_to include(*excluded_queues) - expect(queues.first).to include(*described_class::DEFAULT_QUEUES) + with_them do + it 'expands queues by attributes' do + expect(Gitlab::SidekiqCluster).to receive(:start) do |queues, opts| + expect(opts).to eq(default_options) + expect(queues.first).to include(*included_queues) + expect(queues.first).not_to include(*excluded_queues) - [] + [] + end + + cli.run(%W[--queue-selector #{query}]) end - cli.run(%W[--queue-selector #{query}]) - end + it 'works when negated' do + expect(Gitlab::SidekiqCluster).to receive(:start) do |queues, opts| + expect(opts).to eq(default_options) + expect(queues.first).not_to include(*included_queues) + expect(queues.first).to include(*excluded_queues) - it 'works when negated' do - expect(Gitlab::SidekiqCluster).to receive(:start) do |queues, opts| - expect(opts).to eq(default_options) - expect(queues.first).not_to include(*included_queues) - expect(queues.first).to include(*excluded_queues) - expect(queues.first).to include(*described_class::DEFAULT_QUEUES) + [] + end - [] + cli.run(%W[--negate --queue-selector #{query}]) end + end - cli.run(%W[--negate --queue-selector #{query}]) + it 'expands multiple queue groups correctly' do + expected_workers = + if Gitlab.ee? + [ + %w[incident_management_close_incident status_page_publish], + %w[bulk_imports_pipeline bulk_imports_pipeline_batch bulk_imports_relation_batch_export bulk_imports_relation_export project_export projects_import_export_parallel_project_export projects_import_export_relation_export repository_import project_template_export] + ] + else + [ + %w[incident_management_close_incident], + %w[bulk_imports_pipeline bulk_imports_pipeline_batch bulk_imports_relation_batch_export bulk_imports_relation_export project_export projects_import_export_parallel_project_export projects_import_export_relation_export repository_import] + ] + end + + expect(Gitlab::SidekiqCluster) + .to receive(:start) + .with(expected_workers, default_options) + .and_return([]) + + cli.run(%w[--queue-selector feature_category=incident_management&has_external_dependencies=true resource_boundary=memory&feature_category=importers]) end - end - it 'expands multiple queue groups correctly' do - expected_workers = - if Gitlab.ee? - [ - %w[incident_management_close_incident status_page_publish] + described_class::DEFAULT_QUEUES, - %w[bulk_imports_pipeline bulk_imports_pipeline_batch bulk_imports_relation_batch_export bulk_imports_relation_export project_export projects_import_export_parallel_project_export projects_import_export_relation_export repository_import project_template_export] + - described_class::DEFAULT_QUEUES - ] - else - [ - %w[incident_management_close_incident] + described_class::DEFAULT_QUEUES, - %w[bulk_imports_pipeline bulk_imports_pipeline_batch bulk_imports_relation_batch_export bulk_imports_relation_export project_export projects_import_export_parallel_project_export projects_import_export_relation_export repository_import] + - described_class::DEFAULT_QUEUES - ] - end + it 'allows the special * selector' do + worker_queues = %w[foo bar baz] - expect(Gitlab::SidekiqCluster) - .to receive(:start) - .with(expected_workers, default_options) - .and_return([]) + expect(Gitlab::SidekiqConfig::CliMethods) + .to receive(:worker_queues).and_return(worker_queues) - cli.run(%w[--queue-selector feature_category=incident_management&has_external_dependencies=true resource_boundary=memory&feature_category=importers]) - end + expect(Gitlab::SidekiqCluster) + .to receive(:start).with([worker_queues], default_options).and_return([]) - it 'allows the special * selector' do - worker_queues = %w[foo bar baz] + cli.run(%w[--queue-selector *]) + end - expect(Gitlab::SidekiqConfig::CliMethods) - .to receive(:worker_queues).and_return(worker_queues) + it 'errors when the selector matches no queues' do + expect(Gitlab::SidekiqCluster).not_to receive(:start) - expect(Gitlab::SidekiqCluster) - .to receive(:start).with([worker_queues], default_options).and_return([]) + expect { cli.run(%w[--queue-selector has_external_dependencies=true&has_external_dependencies=false]) } + .to raise_error(described_class::CommandError) + end + + it 'errors on an invalid query multiple queue groups correctly' do + expect(Gitlab::SidekiqCluster).not_to receive(:start) + + expect { cli.run(%w[--queue-selector unknown_field=chatops]) } + .to raise_error(Gitlab::SidekiqConfig::WorkerMatcher::QueryError) + end + end + end - cli.run(%w[--queue-selector *]) + context "without sidekiq setting specified" do + before do + stub_config(sidekiq: nil) end - it 'errors when the selector matches no queues' do - expect(Gitlab::SidekiqCluster).not_to receive(:start) + it "does not throw an error" do + allow(Gitlab::SidekiqCluster).to receive(:start).and_return([]) - expect { cli.run(%w[--queue-selector has_external_dependencies=true&has_external_dependencies=false]) } - .to raise_error(described_class::CommandError) + expect { cli.run(%w[foo]) }.not_to raise_error end - it 'errors on an invalid query multiple queue groups correctly' do - expect(Gitlab::SidekiqCluster).not_to receive(:start) + it "starts Sidekiq workers with DEFAULT_QUEUES and min_concurrency = max_concurrency" do + default_options[:min_concurrency] = default_options[:max_concurrency] + expect(Gitlab::SidekiqCluster).to receive(:start) + .with([described_class::DEFAULT_QUEUES], default_options) + .and_return([]) - expect { cli.run(%w[--queue-selector unknown_field=chatops]) } - .to raise_error(Gitlab::SidekiqConfig::WorkerMatcher::QueryError) + cli.run(%w[foo]) end end - context "with routing rules specified" do + context "without routing rules" do before do - stub_config(sidekiq: { routing_rules: [['resource_boundary=cpu', 'foo']] }) + stub_config(sidekiq: { routing_rules: [] }) end - it "starts Sidekiq workers only for given queues without any additional DEFAULT_QUEUES" do + it "starts Sidekiq workers with DEFAULT_QUEUES and min_concurrency = max_concurrency" do + default_options[:min_concurrency] = default_options[:max_concurrency] expect(Gitlab::SidekiqCluster).to receive(:start) - .with([['foo']], default_options) + .with([described_class::DEFAULT_QUEUES], default_options) .and_return([]) cli.run(%w[foo]) end - end - context "with sidekiq settings not specified" do - before do - stub_config(sidekiq: nil) - end - - it "does not throw an error" do - allow(Gitlab::SidekiqCluster).to receive(:start).and_return([]) + context "with 4 wildcard * as argument" do + it "starts 4 Sidekiq workers all with DEFAULT_QUEUES and min_concurrency = max_concurrency" do + default_options[:min_concurrency] = default_options[:max_concurrency] + expect(Gitlab::SidekiqCluster).to receive(:start) + .with([described_class::DEFAULT_QUEUES] * 4, default_options) + .and_return([]) - expect { cli.run(%w[foo]) }.not_to raise_error + cli.run(%w[* * * *]) + end end - it "starts Sidekiq workers with given queues, and additional default and mailers queues (DEFAULT_QUEUES)" do - expect(Gitlab::SidekiqCluster).to receive(:start) - .with([['foo'] + described_class::DEFAULT_QUEUES], default_options) - .and_return([]) + context "with min-concurrency flag" do + it "starts Sidekiq workers with DEFAULT_QUEUES and min_concurrency as specified" do + options = default_options.dup + options[:min_concurrency] = 10 + expect(Gitlab::SidekiqCluster).to receive(:start) + .with([described_class::DEFAULT_QUEUES] * 4, options) + .and_return([]) - cli.run(%w[foo]) + cli.run(%w[* * * * --min-concurrency 10]) + end end end end diff --git a/spec/migrations/20240124043507_migrate_sidekiq_queued_and_future_jobs_spec.rb b/spec/migrations/20240124043507_migrate_sidekiq_queued_and_future_jobs_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..558d455b13a663ed9d1bba15ea2cddead30fac30 --- /dev/null +++ b/spec/migrations/20240124043507_migrate_sidekiq_queued_and_future_jobs_spec.rb @@ -0,0 +1,245 @@ +# frozen_string_literal: true + +require 'spec_helper' +require_migration! + +RSpec.describe MigrateSidekiqQueuedAndFutureJobs, :clean_gitlab_redis_queues, feature_category: :scalability do + let(:email_receiver_queue) { 'email_receiver' } + let(:mappings_mocked) { true } + let(:mappings) { { "EmailReceiverWorker" => "default" } } + + around do |example| + EmailReceiverWorker.sidekiq_options queue: email_receiver_queue + Sidekiq::Testing.disable!(&example) + EmailReceiverWorker.set_queue + end + + describe '#up', :aggregate_failures, :silence_stdout do + context 'when migrating queued jobs' do + let(:email_receiver_jobs_count_pre) { 2 } + let(:default_jobs_count_pre) { 0 } + + let(:email_receiver_jobs_count_post) { 0 } + let(:default_jobs_count_post) { 2 } + + before do + EmailReceiverWorker.perform_async('foo') + EmailReceiverWorker.perform_async('bar') + end + + shared_examples 'migrates queued jobs' do + it 'migrates the jobs to the correct destination queue' do + allow(Gitlab::SidekiqConfig).to receive(:worker_queue_mappings).and_return(mappings) if mappings_mocked + + expect(queue_length('email_receiver')).to eq(email_receiver_jobs_count_pre) + expect(queue_length('default')).to eq(default_jobs_count_pre) + migrate! + expect(queue_length('email_receiver')).to eq(email_receiver_jobs_count_post) + expect(queue_length('default')).to eq(default_jobs_count_post) + + jobs = list_jobs('default') + expect(jobs[0]).to include("class" => "EmailReceiverWorker", "queue" => "default", "args" => ["bar"]) + expect(jobs[1]).to include("class" => "EmailReceiverWorker", "queue" => "default", "args" => ["foo"]) + end + end + + context 'with worker_queue_mappings mocked' do + let(:mappings_mocked) { true } + + it_behaves_like 'migrates queued jobs' + + context 'when jobs are already in the correct queue' do + let(:email_receiver_queue) { 'default' } + let(:email_receiver_jobs_count_pre) { 0 } + let(:default_jobs_count_pre) { 2 } + + let(:email_receiver_jobs_count_post) { 0 } + let(:default_jobs_count_post) { 2 } + + it_behaves_like 'migrates queued jobs' + end + end + + context 'without worker_queue_mappings mocked' do + # Assuming Settings.sidekiq.routing_rules is [['*', 'default']] + # If routing_rules or Gitlab::SidekiqConfig.worker_queue_mappings changed, + # this spec might be failing. We'll have to adjust the migration or this spec. + let(:mappings_mocked) { false } + + it_behaves_like 'migrates queued jobs' + end + + context 'with illegal JSON payload' do + let(:job) { '{foo: 1}' } + + before do + Sidekiq.redis do |conn| + conn.lpush("queue:email_receiver", job) + end + end + + it 'logs an error' do + allow(::Gitlab::BackgroundMigration::Logger).to receive(:build).and_return(Logger.new($stdout)) + migrate! + expect($stdout.string).to include("Unmarshal JSON payload from SidekiqMigrateJobs failed. Job: #{job}") + end + end + + context 'when run in GitLab.com' do + it 'skips the migration' do + allow(Gitlab).to receive(:com?).and_return(true) + expect(described_class::SidekiqMigrateJobs).not_to receive(:new) + migrate! + end + end + + def queue_length(queue_name) + Sidekiq.redis do |conn| + conn.llen("queue:#{queue_name}") + end + end + + def list_jobs(queue_name) + Sidekiq.redis { |conn| conn.lrange("queue:#{queue_name}", 0, -1) } + .map { |item| Sidekiq.load_json item } + end + end + + context 'when migrating future jobs' do + include_context 'when handling retried jobs' + let(:schedule_jobs_count_in_email_receiver_pre) { 3 } + let(:retry_jobs_count_in_email_receiver_pre) { 2 } + let(:schedule_jobs_count_in_default_pre) { 0 } + let(:retry_jobs_count_in_default_pre) { 0 } + + let(:schedule_jobs_count_in_email_receiver_post) { 0 } + let(:retry_jobs_count_in_email_receiver_post) { 0 } + let(:schedule_jobs_count_in_default_post) { 3 } + let(:retry_jobs_count_in_default_post) { 2 } + + before do + allow(Gitlab::SidekiqConfig).to receive(:worker_queue_mappings).and_return(mappings) if mappings_mocked + EmailReceiverWorker.perform_in(1.hour, 'foo') + EmailReceiverWorker.perform_in(2.hours, 'bar') + EmailReceiverWorker.perform_in(3.hours, 'baz') + retry_in(EmailReceiverWorker, 1.hour, 0) + retry_in(EmailReceiverWorker, 2.hours, 0) + end + + shared_examples 'migrates scheduled and retried jobs' do + it 'migrates to correct destination queue' do + queues = %w[email_receiver default] + job_types = %w[schedule retry] + worker = EmailReceiverWorker.to_s + queues.each do |queue| + job_types.each do |job_type| + jobs_pre = scan_jobs(job_type, queue, worker) + expect(jobs_pre.length).to eq(send("#{job_type}_jobs_count_in_#{queue}_pre")) + end + end + + migrate! + + queues.each do |queue| + job_types.each do |job_type| + jobs_post = scan_jobs(job_type, queue, worker) + expect(jobs_post.length).to eq(send("#{job_type}_jobs_count_in_#{queue}_post")) + end + end + end + + it 'logs output at the start, finish, and in between set' do + stub_const("#{described_class}::SidekiqMigrateJobs::LOG_FREQUENCY", 1) + allow(::Gitlab::BackgroundMigration::Logger).to receive(:build).and_return(Logger.new($stdout)) + + migrate! + + expect($stdout.string).to include('Processing schedule set') + expect($stdout.string).to include('Processing retry set') + expect($stdout.string).to include('In progress') + expect($stdout.string).to include('Done') + end + end + + context 'with worker_queue_mappings mocked' do + let(:mappings_mocked) { true } + + it_behaves_like 'migrates scheduled and retried jobs' + + context 'when jobs are already in the correct queue' do + let(:email_receiver_queue) { 'default' } + let(:schedule_jobs_count_in_email_receiver_pre) { 0 } + let(:retry_jobs_count_in_email_receiver_pre) { 0 } + let(:schedule_jobs_count_in_default_pre) { 3 } + let(:retry_jobs_count_in_default_pre) { 2 } + + let(:schedule_jobs_count_in_email_receiver_post) { 0 } + let(:retry_jobs_count_in_email_receiver_post) { 0 } + let(:schedule_jobs_count_in_default_post) { 3 } + let(:retry_jobs_count_in_default_post) { 2 } + + it_behaves_like 'migrates scheduled and retried jobs' + end + + context 'when job doesnt match mappings' do + let(:mappings) { { "AuthorizedProjectsWorker" => "default" } } + + it 'logs skipping the job' do + allow(::Gitlab::BackgroundMigration::Logger).to receive(:build).and_return(Logger.new($stdout)) + + migrate! + + expect($stdout.string).to include('Skipping job from EmailReceiverWorker. No destination queue found.') + end + end + end + + context 'without worker_queue_mappings mocked' do + let(:mappings_mocked) { false } + + it_behaves_like 'migrates scheduled and retried jobs' + end + + context 'when there are matching jobs that got removed during migration' do + it 'does not try to migrate jobs' do + allow(::Gitlab::BackgroundMigration::Logger).to receive(:build).and_return(Logger.new($stdout)) + + freeze_time do + allow_next_instance_of(described_class::SidekiqMigrateJobs) do |migrator| + allow(migrator).to receive(:migrate_job_in_set).and_wrap_original do |meth, *args| + Sidekiq.redis { |c| c.zrem('schedule', args.third) } + Sidekiq.redis { |c| c.zrem('retry', args.third) } + + meth.call(*args) + end + end + + migrate! + # schedule jobs + expect($stdout.string).to include("Done. Scanned records: 3. Migrated records: 0.") + # retry jobs + expect($stdout.string).to include("Done. Scanned records: 2. Migrated records: 0.") + end + end + end + + context 'when run in GitLab.com' do + it 'skips the migration' do + allow(Gitlab).to receive(:com?).and_return(true) + expect(described_class::SidekiqMigrateJobs).not_to receive(:new) + migrate! + end + end + + def set_length(set) + Sidekiq.redis { |c| c.zcard(set) } + end + + def scan_jobs(set_name, queue_name, class_name) + Sidekiq.redis { |c| c.zrange(set_name, 0, -1) } + .map { |item| Gitlab::Json.load(item) } + .select { |job| job['queue'] == queue_name && job['class'] == class_name } + end + end + end +end