From 1e241e7cb85afed391836ea00b2bc41407679e18 Mon Sep 17 00:00:00 2001
From: Gregorius Marco <gmarco@gitlab.com>
Date: Fri, 28 Apr 2023 17:26:01 +0000
Subject: [PATCH] Merge branch 'marcogreg/routing-rules-initializer-1' into
 'master'

Update default initializer value for Sidekiq routing_rules

See merge request gitlab-org/gitlab!97908
---
 config/settings.rb                            |  5 +-
 lib/gitlab/sidekiq_config/worker_router.rb    |  5 ++
 sidekiq_cluster/cli.rb                        | 25 +++++++
 spec/bin/sidekiq_cluster_spec.rb              |  7 +-
 spec/commands/sidekiq_cluster/cli_spec.rb     | 70 +++++++++++++++----
 spec/config/settings_spec.rb                  |  4 +-
 .../sidekiq_config/worker_router_spec.rb      | 29 +++++---
 .../duplicate_jobs/server_spec.rb             |  8 +--
 spec/lib/gitlab/sidekiq_migrate_jobs_spec.rb  |  8 +--
 9 files changed, 126 insertions(+), 35 deletions(-)

diff --git a/config/settings.rb b/config/settings.rb
index e03d9877e1cb2..c25531a33112e 100644
--- a/config/settings.rb
+++ b/config/settings.rb
@@ -171,11 +171,12 @@ def load_dynamic_cron_schedules!
     cron_jobs['gitlab_service_ping_worker']['cron'] ||= cron_for_service_ping
   end
 
-  # Route jobs to queue based on worker name.
+  # Route all jobs to 'default' queue. This setting is meant for self-managed instances use to keep things simple.
+  # See https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1491
   def build_sidekiq_routing_rules(rules)
     return rules unless rules.nil? || rules&.empty?
 
-    [[Gitlab::SidekiqConfig::WorkerMatcher::WILDCARD_MATCH, nil]]
+    [[Gitlab::SidekiqConfig::WorkerMatcher::WILDCARD_MATCH, 'default']]
   end
 
   private
diff --git a/lib/gitlab/sidekiq_config/worker_router.rb b/lib/gitlab/sidekiq_config/worker_router.rb
index 0670e5521df65..6d5ecb64065de 100644
--- a/lib/gitlab/sidekiq_config/worker_router.rb
+++ b/lib/gitlab/sidekiq_config/worker_router.rb
@@ -77,6 +77,11 @@ def route(worker_klass)
       def parse_routing_rules(routing_rules)
         raise InvalidRoutingRuleError, 'The set of routing rule must be an array' unless routing_rules.is_a?(Array)
 
+        unless routing_rules.last&.first == WorkerMatcher::WILDCARD_MATCH
+          Gitlab::AppLogger.warn "sidekiq.routing_rules config is missing a catch-all `*` entry as the last rule. " \
+                                 "Consider adding `[['*', 'default']]` at the end of routing_rules."
+        end
+
         routing_rules.map do |rule_tuple|
           raise InvalidRoutingRuleError, "Routing rule `#{rule_tuple.inspect}` is invalid" unless valid_routing_rule?(rule_tuple)
 
diff --git a/sidekiq_cluster/cli.rb b/sidekiq_cluster/cli.rb
index 23b05bf2d16b5..22cddead3e435 100644
--- a/sidekiq_cluster/cli.rb
+++ b/sidekiq_cluster/cli.rb
@@ -28,6 +28,11 @@ class CLI
       # The signals that should simply be forwarded to the workers.
       FORWARD_SIGNALS = %i(TTIN USR1 USR2 HUP).freeze
 
+      # The default queues that each Sidekiq process always listens to if routing rules are not customized:
+      # - `default` queue comes from config initializer's Settings.build_sidekiq_routing_rules
+      # - `mailers` queue comes from Gitlab::Application.config.action_mailer.deliver_later_queue_name
+      DEFAULT_QUEUES = %w[default mailers].freeze
+
       CommandError = Class.new(StandardError)
 
       def initialize(log_output = $stderr)
@@ -93,6 +98,26 @@ def run(argv = ARGV)
             'No queues found, you must select at least one queue'
         end
 
+        begin
+          routing_rules = ::Gitlab.config.sidekiq.routing_rules
+        rescue StandardError
+          routing_rules = []
+        end
+
+        # Routing rules are defaulted to [['*', 'default']] if not specified.
+        # This means all jobs go to 'default' queue and mailer jobs go to 'mailers' queue.
+        # See config/initializers/1_settings.rb and Settings.build_sidekiq_routing_rules.
+        #
+        # Now, in case queue_selector is used, we ensure all Sidekiq processes are still processing jobs
+        # from default and mailers queues.
+        # https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1491
+        if routing_rules.empty?
+          queue_groups.each do |queues|
+            queues.concat(DEFAULT_QUEUES)
+            queues.uniq!
+          end
+        end
+
         if @list_queues
           puts queue_groups.map(&:sort) # rubocop:disable Rails/Output
 
diff --git a/spec/bin/sidekiq_cluster_spec.rb b/spec/bin/sidekiq_cluster_spec.rb
index eb014c511e344..b36fb82c295af 100644
--- a/spec/bin/sidekiq_cluster_spec.rb
+++ b/spec/bin/sidekiq_cluster_spec.rb
@@ -12,7 +12,8 @@
   context 'when selecting some queues and excluding others' do
     where(:args, :included, :excluded) do
       %w[--negate cronjob] | '-qdefault,1' | '-qcronjob,1'
-      %w[--queue-selector resource_boundary=cpu] | '-qupdate_merge_requests,1' | '-qdefault,1'
+      %w[--queue-selector resource_boundary=cpu] | %w[-qupdate_merge_requests,1 -qdefault,1 -qmailers,1] |
+        '-qauthorized_keys_worker,1'
     end
 
     with_them do
@@ -23,8 +24,8 @@
 
         expect(status).to be(0)
         expect(output).to include('bundle exec sidekiq')
-        expect(Shellwords.split(output)).to include(included)
-        expect(Shellwords.split(output)).not_to include(excluded)
+        expect(Shellwords.split(output)).to include(*included)
+        expect(Shellwords.split(output)).not_to include(*excluded)
       end
     end
   end
diff --git a/spec/commands/sidekiq_cluster/cli_spec.rb b/spec/commands/sidekiq_cluster/cli_spec.rb
index 499432c260522..085be1ceac2e4 100644
--- a/spec/commands/sidekiq_cluster/cli_spec.rb
+++ b/spec/commands/sidekiq_cluster/cli_spec.rb
@@ -37,6 +37,8 @@
     allow(supervisor).to receive(:supervise)
 
     allow(Prometheus::CleanupMultiprocDirService).to receive(:new).and_return(metrics_cleanup_service)
+
+    stub_config(sidekiq: { routing_rules: [] })
   end
 
   around do |example|
@@ -58,7 +60,7 @@
     context 'with arguments' do
       it 'starts the Sidekiq workers' do
         expect(Gitlab::SidekiqCluster).to receive(:start)
-                                            .with([['foo']], default_options)
+                                            .with([['foo'] + described_class::DEFAULT_QUEUES], default_options)
                                             .and_return([])
 
         cli.run(%w(foo))
@@ -92,7 +94,7 @@
         it 'starts Sidekiq workers for all queues in all_queues.yml except the ones in argv' do
           expect(Gitlab::SidekiqConfig::CliMethods).to receive(:worker_queues).and_return(['baz'])
           expect(Gitlab::SidekiqCluster).to receive(:start)
-                                              .with([['baz']], default_options)
+                                              .with([['baz'] + described_class::DEFAULT_QUEUES], default_options)
                                               .and_return([])
 
           cli.run(%w(foo -n))
@@ -101,9 +103,10 @@
 
       context 'with --max-concurrency flag' do
         it 'starts Sidekiq workers for specified queues with a max concurrency' do
+          expected_queues = [%w(foo bar baz), %w(solo)].each { |queues| queues.concat(described_class::DEFAULT_QUEUES) }
           expect(Gitlab::SidekiqConfig::CliMethods).to receive(:worker_queues).and_return(%w(foo bar baz))
           expect(Gitlab::SidekiqCluster).to receive(:start)
-                                              .with([%w(foo bar baz), %w(solo)], default_options.merge(max_concurrency: 2))
+                                              .with(expected_queues, default_options.merge(max_concurrency: 2))
                                               .and_return([])
 
           cli.run(%w(foo,bar,baz solo -m 2))
@@ -112,9 +115,10 @@
 
       context 'with --min-concurrency flag' do
         it 'starts Sidekiq workers for specified queues with a min concurrency' do
+          expected_queues = [%w(foo bar baz), %w(solo)].each { |queues| queues.concat(described_class::DEFAULT_QUEUES) }
           expect(Gitlab::SidekiqConfig::CliMethods).to receive(:worker_queues).and_return(%w(foo bar baz))
           expect(Gitlab::SidekiqCluster).to receive(:start)
-                                              .with([%w(foo bar baz), %w(solo)], default_options.merge(min_concurrency: 2))
+                                              .with(expected_queues, default_options.merge(min_concurrency: 2))
                                               .and_return([])
 
           cli.run(%w(foo,bar,baz solo --min-concurrency 2))
@@ -124,7 +128,7 @@
       context 'with --timeout flag' do
         it 'when given', 'starts Sidekiq workers with given timeout' do
           expect(Gitlab::SidekiqCluster).to receive(:start)
-            .with([['foo']], default_options.merge(timeout: 10))
+            .with([['foo'] + described_class::DEFAULT_QUEUES], default_options.merge(timeout: 10))
             .and_return([])
 
           cli.run(%w(foo --timeout 10))
@@ -132,7 +136,8 @@
 
         it 'when not given', 'starts Sidekiq workers with default timeout' do
           expect(Gitlab::SidekiqCluster).to receive(:start)
-            .with([['foo']], default_options.merge(timeout: Gitlab::SidekiqCluster::DEFAULT_SOFT_TIMEOUT_SECONDS))
+            .with([['foo'] + described_class::DEFAULT_QUEUES], default_options.merge(timeout:
+                                                            Gitlab::SidekiqCluster::DEFAULT_SOFT_TIMEOUT_SECONDS))
             .and_return([])
 
           cli.run(%w(foo))
@@ -146,8 +151,10 @@
 
         it 'prints out a list of queues in alphabetical order' do
           expected_queues = [
+            'default',
             'epics:epics_update_epics_dates',
             'epics_new_epic_issue',
+            'mailers',
             'new_epic',
             'todos_destroyer:todos_destroyer_confidential_epic'
           ]
@@ -164,7 +171,8 @@
         it 'starts Sidekiq workers for all queues in all_queues.yml with a namespace in argv' do
           expect(Gitlab::SidekiqConfig::CliMethods).to receive(:worker_queues).and_return(['cronjob:foo', 'cronjob:bar'])
           expect(Gitlab::SidekiqCluster).to receive(:start)
-                                              .with([['cronjob', 'cronjob:foo', 'cronjob:bar']], default_options)
+                                              .with([['cronjob', 'cronjob:foo', 'cronjob:bar'] +
+                                                       described_class::DEFAULT_QUEUES], default_options)
                                               .and_return([])
 
           cli.run(%w(cronjob))
@@ -202,7 +210,7 @@
             'CI and SCM queues' => {
               query: 'feature_category=continuous_integration|feature_category=source_code_management',
               included_queues: %w(pipeline_default:ci_drop_pipeline merge),
-              excluded_queues: %w(mailers)
+              excluded_queues: %w()
             }
           }
         end
@@ -213,6 +221,7 @@
               expect(opts).to eq(default_options)
               expect(queues.first).to include(*included_queues)
               expect(queues.first).not_to include(*excluded_queues)
+              expect(queues.first).to include(*described_class::DEFAULT_QUEUES)
 
               []
             end
@@ -225,6 +234,7 @@
               expect(opts).to eq(default_options)
               expect(queues.first).not_to include(*included_queues)
               expect(queues.first).to include(*excluded_queues)
+              expect(queues.first).to include(*described_class::DEFAULT_QUEUES)
 
               []
             end
@@ -237,13 +247,15 @@
           expected_workers =
             if Gitlab.ee?
               [
-                %w[cronjob:clusters_integrations_check_prometheus_health incident_management_close_incident status_page_publish],
-                %w[bulk_imports_pipeline bulk_imports_relation_export project_export projects_import_export_parallel_project_export projects_import_export_relation_export repository_import project_template_export]
+                %w[cronjob:clusters_integrations_check_prometheus_health incident_management_close_incident status_page_publish] + described_class::DEFAULT_QUEUES,
+                %w[bulk_imports_pipeline bulk_imports_relation_export project_export projects_import_export_parallel_project_export projects_import_export_relation_export repository_import project_template_export] +
+                  described_class::DEFAULT_QUEUES
               ]
             else
               [
-                %w[cronjob:clusters_integrations_check_prometheus_health incident_management_close_incident],
-                %w[bulk_imports_pipeline bulk_imports_relation_export project_export projects_import_export_parallel_project_export projects_import_export_relation_export repository_import]
+                %w[cronjob:clusters_integrations_check_prometheus_health incident_management_close_incident] + described_class::DEFAULT_QUEUES,
+                %w[bulk_imports_pipeline bulk_imports_relation_export project_export projects_import_export_parallel_project_export projects_import_export_relation_export repository_import] +
+                  described_class::DEFAULT_QUEUES
               ]
             end
 
@@ -281,6 +293,40 @@
             .to raise_error(Gitlab::SidekiqConfig::WorkerMatcher::QueryError)
         end
       end
+
+      context "with routing rules specified" do
+        before do
+          stub_config(sidekiq: { routing_rules: [['resource_boundary=cpu', 'foo']] })
+        end
+
+        it "starts Sidekiq workers only for given queues without any additional DEFAULT_QUEUES" do
+          expect(Gitlab::SidekiqCluster).to receive(:start)
+                                              .with([['foo']], default_options)
+                                              .and_return([])
+
+          cli.run(%w(foo))
+        end
+      end
+
+      context "with sidekiq settings not specified" do
+        before do
+          stub_config(sidekiq: nil)
+        end
+
+        it "does not throw an error" do
+          allow(Gitlab::SidekiqCluster).to receive(:start).and_return([])
+
+          expect { cli.run(%w(foo)) }.not_to raise_error
+        end
+
+        it "starts Sidekiq workers with given queues, and additional default and mailers queues (DEFAULT_QUEUES)" do
+          expect(Gitlab::SidekiqCluster).to receive(:start)
+                                              .with([['foo'] + described_class::DEFAULT_QUEUES], default_options)
+                                              .and_return([])
+
+          cli.run(%w(foo))
+        end
+      end
     end
 
     context 'metrics server' do
diff --git a/spec/config/settings_spec.rb b/spec/config/settings_spec.rb
index d6cddc215f50f..55e675d5107fa 100644
--- a/spec/config/settings_spec.rb
+++ b/spec/config/settings_spec.rb
@@ -203,8 +203,8 @@
     using RSpec::Parameterized::TableSyntax
 
     where(:input_rules, :result) do
-      nil                         | [['*', nil]]
-      []                          | [['*', nil]]
+      nil                         | [['*', 'default']]
+      []                          | [['*', 'default']]
       [['name=foobar', 'foobar']] | [['name=foobar', 'foobar']]
     end
 
diff --git a/spec/lib/gitlab/sidekiq_config/worker_router_spec.rb b/spec/lib/gitlab/sidekiq_config/worker_router_spec.rb
index ef54cab52758d..ea9d77bcfa4eb 100644
--- a/spec/lib/gitlab/sidekiq_config/worker_router_spec.rb
+++ b/spec/lib/gitlab/sidekiq_config/worker_router_spec.rb
@@ -126,6 +126,7 @@ def self.name
   describe '.global' do
     before do
       described_class.remove_instance_variable(:@global_worker_router) if described_class.instance_variable_defined?(:@global_worker_router)
+      stub_config(sidekiq: { routing_rules: routing_rules })
     end
 
     after do
@@ -136,10 +137,6 @@ def self.name
       include_context 'router examples setup'
 
       with_them do
-        before do
-          stub_config(sidekiq: { routing_rules: routing_rules })
-        end
-
         it 'routes the worker to the correct queue' do
           expect(described_class.global.route(worker)).to eql(expected_queue)
         end
@@ -157,10 +154,6 @@ def self.name
         end
       end
 
-      before do
-        stub_config(sidekiq: { routing_rules: routing_rules })
-      end
-
       context 'invalid routing rules format' do
         let(:routing_rules) { ['feature_category=a'] }
 
@@ -183,6 +176,26 @@ def self.name
         end
       end
     end
+
+    context 'when routing rules is missing `*` as the last rule' do
+      let(:routing_rules) { [['resource_boundary=cpu', 'cpu']] }
+
+      it 'logs a warning' do
+        expect(Gitlab::AppLogger).to receive(:warn).with(a_string_matching('sidekiq.routing_rules config is missing'))
+
+        described_class.global
+      end
+    end
+
+    context 'when routing rules has a `*` rule as the last rule' do
+      let(:routing_rules) { [['resource_boundary=cpu', 'cpu'], ['*', 'default']] }
+
+      it 'does not log any warning' do
+        expect(Gitlab::AppLogger).not_to receive(:warn)
+
+        described_class.global
+      end
+    end
   end
 
   describe '#route' do
diff --git a/spec/lib/gitlab/sidekiq_middleware/duplicate_jobs/server_spec.rb b/spec/lib/gitlab/sidekiq_middleware/duplicate_jobs/server_spec.rb
index 1b01793d80dd9..f65f7a645ea62 100644
--- a/spec/lib/gitlab/sidekiq_middleware/duplicate_jobs/server_spec.rb
+++ b/spec/lib/gitlab/sidekiq_middleware/duplicate_jobs/server_spec.rb
@@ -40,10 +40,10 @@ def self.work; end
     describe '#call' do
       it 'removes the stored job from redis before execution' do
         bare_job = { 'class' => 'TestDeduplicationWorker', 'args' => ['hello'] }
-        job_definition = Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob.new(bare_job.dup, 'test_deduplication')
+        job_definition = Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob.new(bare_job.dup, 'default')
 
         expect(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob)
-          .to receive(:new).with(a_hash_including(bare_job), 'test_deduplication')
+          .to receive(:new).with(a_hash_including(bare_job), 'default')
                 .and_return(job_definition).twice # once in client middleware
 
         expect(job_definition).to receive(:delete!).ordered.and_call_original
@@ -59,10 +59,10 @@ def self.work; end
 
     it 'removes the stored job from redis after execution' do
       bare_job = { 'class' => 'TestDeduplicationWorker', 'args' => ['hello'] }
-      job_definition = Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob.new(bare_job.dup, 'test_deduplication')
+      job_definition = Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob.new(bare_job.dup, 'default')
 
       expect(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob)
-        .to receive(:new).with(a_hash_including(bare_job), 'test_deduplication')
+        .to receive(:new).with(a_hash_including(bare_job), 'default')
               .and_return(job_definition).twice # once in client middleware
 
       expect(TestDeduplicationWorker).to receive(:work).ordered.and_call_original
diff --git a/spec/lib/gitlab/sidekiq_migrate_jobs_spec.rb b/spec/lib/gitlab/sidekiq_migrate_jobs_spec.rb
index 9ed2a0642fccc..c66e36c5621bd 100644
--- a/spec/lib/gitlab/sidekiq_migrate_jobs_spec.rb
+++ b/spec/lib/gitlab/sidekiq_migrate_jobs_spec.rb
@@ -54,7 +54,7 @@ def clear_queues
           expect(migrator.migrate_set(set_name)).to eq(scanned: 3, migrated: 0)
 
           expect(set_after.length).to eq(3)
-          expect(set_after.map(&:first)).to all(include('queue' => 'authorized_projects',
+          expect(set_after.map(&:first)).to all(include('queue' => 'default',
                                                         'class' => 'AuthorizedProjectsWorker'))
         end
       end
@@ -73,7 +73,7 @@ def clear_queues
               if item['class'] == 'AuthorizedProjectsWorker'
                 expect(item).to include('queue' => 'new_queue', 'args' => [i])
               else
-                expect(item).to include('queue' => 'post_receive', 'args' => [i])
+                expect(item).to include('queue' => 'default', 'args' => [i])
               end
 
               expect(score).to be_within(schedule_jitter).of(i.succ.hours.from_now.to_i)
@@ -134,7 +134,7 @@ def clear_queues
             expect(migrator.migrate_set(set_name)).to eq(scanned: 4, migrated: 0)
 
             expect(set_after.length).to eq(3)
-            expect(set_after.map(&:first)).to all(include('queue' => 'authorized_projects'))
+            expect(set_after.map(&:first)).to all(include('queue' => 'default'))
           end
         end
 
@@ -157,7 +157,7 @@ def clear_queues
           expect(migrator.migrate_set(set_name)).to eq(scanned: 4, migrated: 1)
 
           expect(set_after.group_by { |job| job.first['queue'] }.transform_values(&:count))
-            .to eq('authorized_projects' => 6, 'new_queue' => 1)
+            .to eq('default' => 6, 'new_queue' => 1)
         end
 
         it 'iterates through the entire set of jobs' do
-- 
GitLab