From 4843e5c1fb0f42c6614e9cf59e8f865a7f9da19b Mon Sep 17 00:00:00 2001
From: Gregorius Marco <gmarco@gitlab.com>
Date: Tue, 30 Jan 2024 17:08:13 +0800
Subject: [PATCH] Add concurrency option on sidekiq-cluster CLI

The new concurrency option is a fixed value that doesnt depend on the
queues length. It is means to be a simple replacement over the
min-concurrency and max-concurrency.
https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/2760

Changelog: added
---
 sidekiq_cluster/cli.rb                       |  7 +++++
 sidekiq_cluster/sidekiq_cluster.rb           | 13 +++++---
 spec/commands/sidekiq_cluster/cli_spec.rb    | 14 ++++++++-
 spec/sidekiq_cluster/sidekiq_cluster_spec.rb | 32 ++++++++++++--------
 4 files changed, 49 insertions(+), 17 deletions(-)

diff --git a/sidekiq_cluster/cli.rb b/sidekiq_cluster/cli.rb
index ddedc63e4586e..4ea29a1333f8f 100644
--- a/sidekiq_cluster/cli.rb
+++ b/sidekiq_cluster/cli.rb
@@ -40,6 +40,8 @@ def initialize(log_output = $stderr)
         # https://ruby.social/@getajobmike/109326475545816363
         @max_concurrency = 20
         @min_concurrency = 0
+        # TODO: to be set to 20 once max_concurrency and min_concurrency is removed https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/2760
+        @concurrency = 0
         @environment = ENV['RAILS_ENV'] || 'development'
         @metrics_dir = ENV["prometheus_multiproc_dir"] || File.absolute_path("tmp/prometheus_multiproc_dir/sidekiq")
         @pid = nil
@@ -143,6 +145,7 @@ def start_and_supervise_workers(queue_groups)
           directory: @rails_path,
           max_concurrency: @max_concurrency,
           min_concurrency: @min_concurrency,
+          concurrency: @concurrency,
           dryrun: @dryrun,
           timeout: @soft_timeout_seconds
         )
@@ -220,6 +223,10 @@ def option_parser
             abort opt.to_s
           end
 
+          opt.on('-c', '--concurrency INT', 'Number of threads to use with Sidekiq (default: 0)') do |int|
+            @concurrency = int.to_i
+          end
+
           opt.on('-m', '--max-concurrency INT', 'Maximum threads to use with Sidekiq (default: 20, 0 to disable)') do |int|
             @max_concurrency = int.to_i
           end
diff --git a/sidekiq_cluster/sidekiq_cluster.rb b/sidekiq_cluster/sidekiq_cluster.rb
index 1ed08e7e83915..579bb0264138b 100644
--- a/sidekiq_cluster/sidekiq_cluster.rb
+++ b/sidekiq_cluster/sidekiq_cluster.rb
@@ -36,12 +36,13 @@ module SidekiqCluster
     #
     # Returns an Array containing the waiter threads (from Process.detach) of
     # the started processes.
-    def self.start(queues, env: :development, directory: Dir.pwd, max_concurrency: 20, min_concurrency: 0, timeout: DEFAULT_SOFT_TIMEOUT_SECONDS, dryrun: false)
+    def self.start(queues, env: :development, directory: Dir.pwd, max_concurrency: 20, min_concurrency: 0, concurrency: 0, timeout: DEFAULT_SOFT_TIMEOUT_SECONDS, dryrun: false)
       queues.map.with_index do |pair, index|
         start_sidekiq(pair, env: env,
                             directory: directory,
                             max_concurrency: max_concurrency,
                             min_concurrency: min_concurrency,
+                            concurrency: concurrency,
                             worker_id: index,
                             timeout: timeout,
                             dryrun: dryrun)
@@ -51,11 +52,13 @@ def self.start(queues, env: :development, directory: Dir.pwd, max_concurrency: 2
     # Starts a Sidekiq process that processes _only_ the given queues.
     #
     # Returns the PID of the started process.
-    def self.start_sidekiq(queues, env:, directory:, max_concurrency:, min_concurrency:, worker_id:, timeout:, dryrun:)
+    # rubocop: disable Metrics/ParameterLists -- max_concurrency and min_concurrency will be removed in 17.0
+    def self.start_sidekiq(queues, env:, directory:, max_concurrency:, min_concurrency:, concurrency:, worker_id:, timeout:, dryrun:)
+      # rubocop: enable Metrics/ParameterLists
       counts = count_by_queue(queues)
 
       cmd = %w[bundle exec sidekiq]
-      cmd << "-c#{self.concurrency(queues, min_concurrency, max_concurrency)}"
+      cmd << "-c#{self.concurrency(queues, min_concurrency, max_concurrency, concurrency)}"
       cmd << "-e#{env}"
       cmd << "-t#{timeout}"
       cmd << "-gqueues:#{proc_details(counts)}"
@@ -101,7 +104,9 @@ def self.proc_details(counts)
       end.join(',')
     end
 
-    def self.concurrency(queues, min_concurrency, max_concurrency)
+    def self.concurrency(queues, min_concurrency, max_concurrency, concurrency)
+      return concurrency if concurrency > 0
+
       concurrency_from_queues = queues.length + 1
       max = max_concurrency > 0 ? max_concurrency : concurrency_from_queues
       min = [min_concurrency, max].min
diff --git a/spec/commands/sidekiq_cluster/cli_spec.rb b/spec/commands/sidekiq_cluster/cli_spec.rb
index ceee61fb30218..47a428245913b 100644
--- a/spec/commands/sidekiq_cluster/cli_spec.rb
+++ b/spec/commands/sidekiq_cluster/cli_spec.rb
@@ -12,7 +12,7 @@
   let(:cli) { described_class.new('/dev/null') }
   let(:timeout) { Gitlab::SidekiqCluster::DEFAULT_SOFT_TIMEOUT_SECONDS }
   let(:default_options) do
-    { env: 'test', directory: Dir.pwd, max_concurrency: 20, min_concurrency: 0, dryrun: false, timeout: timeout }
+    { env: 'test', directory: Dir.pwd, max_concurrency: 20, min_concurrency: 0, dryrun: false, timeout: timeout, concurrency: 0 }
   end
 
   let(:sidekiq_exporter_enabled) { false }
@@ -125,6 +125,18 @@
         end
       end
 
+      context 'with --concurrency flag' do
+        it 'starts Sidekiq workers for specified queues with the fixed concurrency' do
+          expected_queues = [%w[foo bar baz], %w[solo]].each { |queues| queues.concat(described_class::DEFAULT_QUEUES) }
+          expect(Gitlab::SidekiqConfig::CliMethods).to receive(:worker_queues).and_return(%w[foo bar baz])
+          expect(Gitlab::SidekiqCluster).to receive(:start)
+                                              .with(expected_queues, default_options.merge(concurrency: 2))
+                                              .and_return([])
+
+          cli.run(%w[foo,bar,baz solo -c 2])
+        end
+      end
+
       context 'with --timeout flag' do
         it 'when given', 'starts Sidekiq workers with given timeout' do
           expect(Gitlab::SidekiqCluster).to receive(:start)
diff --git a/spec/sidekiq_cluster/sidekiq_cluster_spec.rb b/spec/sidekiq_cluster/sidekiq_cluster_spec.rb
index ec5e5d85eebf4..d79636366641a 100644
--- a/spec/sidekiq_cluster/sidekiq_cluster_spec.rb
+++ b/spec/sidekiq_cluster/sidekiq_cluster_spec.rb
@@ -42,7 +42,8 @@
         min_concurrency: 0,
         worker_id: an_instance_of(Integer),
         timeout: 25,
-        dryrun: false
+        dryrun: false,
+        concurrency: 0
       }
 
       expect(described_class).to receive(:start_sidekiq).ordered.with(%w[foo bar baz], expected_options)
@@ -55,7 +56,7 @@
   describe '.start_sidekiq' do
     let(:first_worker_id) { 0 }
     let(:options) do
-      { env: :production, directory: 'foo/bar', max_concurrency: 20, min_concurrency: 0, worker_id: first_worker_id, timeout: 10, dryrun: false }
+      { env: :production, directory: 'foo/bar', max_concurrency: 20, min_concurrency: 0, worker_id: first_worker_id, timeout: 10, dryrun: false, concurrency: 0 }
     end
 
     let(:env) { { "ENABLE_SIDEKIQ_CLUSTER" => "1", "SIDEKIQ_WORKER_ID" => first_worker_id.to_s } }
@@ -102,21 +103,28 @@
   describe '.concurrency' do
     using RSpec::Parameterized::TableSyntax
 
-    where(:queue_count, :min, :max, :expected) do
-      2 | 0 | 0 | 3 # No min or max specified
-      2 | 0 | 9 | 3 # No min specified, value < max
-      2 | 1 | 4 | 3 # Value between min and max
-      2 | 4 | 5 | 4 # Value below range
-      5 | 2 | 3 | 3 # Value above range
-      2 | 1 | 1 | 1 # Value above explicit setting (min == max)
-      0 | 3 | 3 | 3 # Value below explicit setting (min == max)
-      1 | 4 | 3 | 3 # Min greater than max
+    where(:queue_count, :min, :max, :fixed_concurrency, :expected) do
+      # without fixed concurrency
+      2 | 0 | 0 | 0 | 3 # No min or max specified
+      2 | 0 | 9 | 0 | 3 # No min specified, value < max
+      2 | 1 | 4 | 0 | 3 # Value between min and max
+      2 | 4 | 5 | 0 | 4 # Value below range
+      5 | 2 | 3 | 0 | 3 # Value above range
+      2 | 1 | 1 | 0 | 1 # Value above explicit setting (min == max)
+      0 | 3 | 3 | 0 | 3 # Value below explicit setting (min == max)
+      1 | 4 | 3 | 0 | 3 # Min greater than max
+
+      # with fixed concurrency, expected always equal to fixed_concurrency
+      1 | 0  | 20 | 20 | 20
+      1 | 0  | 20 | 10 | 10
+      1 | 20 | 20 | 10 | 10
+      5 | 0  | 0  | 10 | 10
     end
 
     with_them do
       let(:queues) { Array.new(queue_count) }
 
-      it { expect(described_class.concurrency(queues, min, max)).to eq(expected) }
+      it { expect(described_class.concurrency(queues, min, max, fixed_concurrency)).to eq(expected) }
     end
   end
 end
-- 
GitLab