From b32e6059e90b84bc4c1cc2a315a835451e350cb7 Mon Sep 17 00:00:00 2001 From: Gregorius Marco <gmarco@gitlab.com> Date: Wed, 17 Apr 2024 16:34:39 +0000 Subject: [PATCH] Revert "Merge branch 'mg-remove-sidekiq-min-max-concurrency' into 'master'" This reverts merge request !149690 --- sidekiq_cluster/cli.rb | 21 ++++++++- sidekiq_cluster/sidekiq_cluster.rb | 20 +++++++-- spec/commands/sidekiq_cluster/cli_spec.rb | 46 ++++++++++++++++++-- spec/sidekiq_cluster/sidekiq_cluster_spec.rb | 40 ++++++++++++++--- 4 files changed, 114 insertions(+), 13 deletions(-) diff --git a/sidekiq_cluster/cli.rb b/sidekiq_cluster/cli.rb index e3045c1776833..1f02d9b39c0a1 100644 --- a/sidekiq_cluster/cli.rb +++ b/sidekiq_cluster/cli.rb @@ -38,7 +38,10 @@ class CLI def initialize(log_output = $stderr) # https://github.com/mperham/sidekiq/wiki/Advanced-Options#concurrency # https://ruby.social/@getajobmike/109326475545816363 - @concurrency = 20 + @max_concurrency = 20 + @min_concurrency = 0 + # TODO: to be set to 20 once max_concurrency and min_concurrency is removed https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/2760 + @concurrency = 0 @environment = ENV['RAILS_ENV'] || 'development' @metrics_dir = ENV["prometheus_multiproc_dir"] || File.absolute_path("tmp/prometheus_multiproc_dir/sidekiq") @pid = nil @@ -88,6 +91,12 @@ def run(argv = ARGV) 'No queues found, you must select at least one queue' end + if routing_rules.empty? + # setting min_concurrency equal to max_concurrency so that the concurrency eventually + # is set to 20 (default value) instead of based on the number of queues, which is only 2+1 in this case. + @min_concurrency = @min_concurrency == 0 ? @max_concurrency : @min_concurrency + end + if @list_queues puts queue_groups.map(&:sort) # rubocop:disable Rails/Output @@ -111,6 +120,8 @@ def start_and_supervise_workers(queue_groups) queue_groups, env: @environment, directory: @rails_path, + max_concurrency: @max_concurrency, + min_concurrency: @min_concurrency, concurrency: @concurrency, dryrun: @dryrun, timeout: @soft_timeout_seconds @@ -203,6 +214,14 @@ def option_parser @concurrency = int.to_i end + opt.on('-m', '--max-concurrency INT', 'Maximum threads to use with Sidekiq (default: 20, 0 to disable)') do |int| + @max_concurrency = int.to_i + end + + opt.on('--min-concurrency INT', 'Minimum threads to use with Sidekiq (default: 0)') do |int| + @min_concurrency = int.to_i + end + opt.on('-e', '--environment ENV', 'The application environment') do |env| @environment = env end diff --git a/sidekiq_cluster/sidekiq_cluster.rb b/sidekiq_cluster/sidekiq_cluster.rb index 2ad6efbc3afa3..579bb0264138b 100644 --- a/sidekiq_cluster/sidekiq_cluster.rb +++ b/sidekiq_cluster/sidekiq_cluster.rb @@ -36,10 +36,12 @@ module SidekiqCluster # # Returns an Array containing the waiter threads (from Process.detach) of # the started processes. - def self.start(queues, env: :development, directory: Dir.pwd, concurrency: 20, timeout: DEFAULT_SOFT_TIMEOUT_SECONDS, dryrun: false) + def self.start(queues, env: :development, directory: Dir.pwd, max_concurrency: 20, min_concurrency: 0, concurrency: 0, timeout: DEFAULT_SOFT_TIMEOUT_SECONDS, dryrun: false) queues.map.with_index do |pair, index| start_sidekiq(pair, env: env, directory: directory, + max_concurrency: max_concurrency, + min_concurrency: min_concurrency, concurrency: concurrency, worker_id: index, timeout: timeout, @@ -50,11 +52,13 @@ def self.start(queues, env: :development, directory: Dir.pwd, concurrency: 20, t # Starts a Sidekiq process that processes _only_ the given queues. # # Returns the PID of the started process. - def self.start_sidekiq(queues, env:, directory:, concurrency:, worker_id:, timeout:, dryrun:) + # rubocop: disable Metrics/ParameterLists -- max_concurrency and min_concurrency will be removed in 17.0 + def self.start_sidekiq(queues, env:, directory:, max_concurrency:, min_concurrency:, concurrency:, worker_id:, timeout:, dryrun:) + # rubocop: enable Metrics/ParameterLists counts = count_by_queue(queues) cmd = %w[bundle exec sidekiq] - cmd << "-c#{concurrency}" + cmd << "-c#{self.concurrency(queues, min_concurrency, max_concurrency, concurrency)}" cmd << "-e#{env}" cmd << "-t#{timeout}" cmd << "-gqueues:#{proc_details(counts)}" @@ -99,5 +103,15 @@ def self.proc_details(counts) end end.join(',') end + + def self.concurrency(queues, min_concurrency, max_concurrency, concurrency) + return concurrency if concurrency > 0 + + concurrency_from_queues = queues.length + 1 + max = max_concurrency > 0 ? max_concurrency : concurrency_from_queues + min = [min_concurrency, max].min + + concurrency_from_queues.clamp(min, max) + end end end diff --git a/spec/commands/sidekiq_cluster/cli_spec.rb b/spec/commands/sidekiq_cluster/cli_spec.rb index 8be89d7a0acfc..43eba23dd5828 100644 --- a/spec/commands/sidekiq_cluster/cli_spec.rb +++ b/spec/commands/sidekiq_cluster/cli_spec.rb @@ -12,7 +12,7 @@ let(:cli) { described_class.new('/dev/null') } let(:timeout) { Gitlab::SidekiqCluster::DEFAULT_SOFT_TIMEOUT_SECONDS } let(:default_options) do - { env: 'test', directory: Dir.pwd, dryrun: false, timeout: timeout, concurrency: 20 } + { env: 'test', directory: Dir.pwd, max_concurrency: 20, min_concurrency: 0, dryrun: false, timeout: timeout, concurrency: 0 } end let(:sidekiq_exporter_enabled) { false } @@ -120,6 +120,28 @@ end end + context 'with --max-concurrency flag' do + it 'starts Sidekiq workers for specified queues with a max concurrency' do + expected_queues = [%w[foo bar baz], %w[solo]] + expect(Gitlab::SidekiqCluster).to receive(:start) + .with(expected_queues, default_options.merge(max_concurrency: 2)) + .and_return([]) + + cli.run(%w[foo,bar,baz solo -m 2]) + end + end + + context 'with --min-concurrency flag' do + it 'starts Sidekiq workers for specified queues with a min concurrency' do + expected_queues = [%w[foo bar baz], %w[solo]] + expect(Gitlab::SidekiqCluster).to receive(:start) + .with(expected_queues, default_options.merge(min_concurrency: 2)) + .and_return([]) + + cli.run(%w[foo,bar,baz solo --min-concurrency 2]) + end + end + context 'with --concurrency flag' do it 'starts Sidekiq workers for specified queues with the fixed concurrency' do expected_queues = [%w[foo bar baz], %w[solo]] @@ -181,7 +203,8 @@ expect { cli.run(%w[foo]) }.not_to raise_error end - it "starts Sidekiq workers with DEFAULT_QUEUES" do + it "starts Sidekiq workers with DEFAULT_QUEUES and min_concurrency = max_concurrency" do + default_options[:min_concurrency] = default_options[:max_concurrency] expect(Gitlab::SidekiqCluster).to receive(:start) .with([described_class::DEFAULT_QUEUES], default_options) .and_return([]) @@ -191,6 +214,7 @@ context 'with multi argument queues' do it 'starts with multiple DEFAULT_QUEUES' do + default_options[:min_concurrency] = default_options[:max_concurrency] expected_queues = [%w[default mailers], %w[default mailers]] expect(Gitlab::SidekiqCluster) @@ -206,7 +230,8 @@ stub_config(sidekiq: { routing_rules: [] }) end - it "starts Sidekiq workers with DEFAULT_QUEUES" do + it "starts Sidekiq workers with DEFAULT_QUEUES and min_concurrency = max_concurrency" do + default_options[:min_concurrency] = default_options[:max_concurrency] expect(Gitlab::SidekiqCluster).to receive(:start) .with([described_class::DEFAULT_QUEUES], default_options) .and_return([]) @@ -215,7 +240,8 @@ end context "with 4 wildcard * as argument" do - it "starts 4 Sidekiq workers all with DEFAULT_QUEUES" do + it "starts 4 Sidekiq workers all with DEFAULT_QUEUES and min_concurrency = max_concurrency" do + default_options[:min_concurrency] = default_options[:max_concurrency] expect(Gitlab::SidekiqCluster).to receive(:start) .with([described_class::DEFAULT_QUEUES] * 4, default_options) .and_return([]) @@ -223,6 +249,18 @@ cli.run(%w[* * * *]) end end + + context "with min-concurrency flag" do + it "starts Sidekiq workers with DEFAULT_QUEUES and min_concurrency as specified" do + options = default_options.dup + options[:min_concurrency] = 10 + expect(Gitlab::SidekiqCluster).to receive(:start) + .with([described_class::DEFAULT_QUEUES] * 4, options) + .and_return([]) + + cli.run(%w[* * * * --min-concurrency 10]) + end + end end end diff --git a/spec/sidekiq_cluster/sidekiq_cluster_spec.rb b/spec/sidekiq_cluster/sidekiq_cluster_spec.rb index 3ce1117a34e91..d79636366641a 100644 --- a/spec/sidekiq_cluster/sidekiq_cluster_spec.rb +++ b/spec/sidekiq_cluster/sidekiq_cluster_spec.rb @@ -19,7 +19,7 @@ "ENABLE_SIDEKIQ_CLUSTER" => "1", "SIDEKIQ_WORKER_ID" => "0" }, - "bundle", "exec", "sidekiq", "-c20", "-eproduction", "-t25", "-gqueues:foo", "-rfoo/bar", "-qfoo,1", process_options + "bundle", "exec", "sidekiq", "-c10", "-eproduction", "-t25", "-gqueues:foo", "-rfoo/bar", "-qfoo,1", process_options ).and_return(1) expect(Process).to receive(:detach).ordered.with(1) @@ -27,21 +27,23 @@ "ENABLE_SIDEKIQ_CLUSTER" => "1", "SIDEKIQ_WORKER_ID" => "1" }, - "bundle", "exec", "sidekiq", "-c20", "-eproduction", "-t25", "-gqueues:bar,baz", "-rfoo/bar", "-qbar,1", "-qbaz,1", process_options + "bundle", "exec", "sidekiq", "-c10", "-eproduction", "-t25", "-gqueues:bar,baz", "-rfoo/bar", "-qbar,1", "-qbaz,1", process_options ).and_return(2) expect(Process).to receive(:detach).ordered.with(2) - described_class.start([%w[foo], %w[bar baz]], env: :production, directory: 'foo/bar', concurrency: 20) + described_class.start([%w[foo], %w[bar baz]], env: :production, directory: 'foo/bar', max_concurrency: 20, min_concurrency: 10) end it 'starts Sidekiq with the given queues and sensible default options' do expected_options = { env: :development, directory: an_instance_of(String), + max_concurrency: 20, + min_concurrency: 0, worker_id: an_instance_of(Integer), timeout: 25, dryrun: false, - concurrency: 20 + concurrency: 0 } expect(described_class).to receive(:start_sidekiq).ordered.with(%w[foo bar baz], expected_options) @@ -54,7 +56,7 @@ describe '.start_sidekiq' do let(:first_worker_id) { 0 } let(:options) do - { env: :production, directory: 'foo/bar', worker_id: first_worker_id, timeout: 10, dryrun: false, concurrency: 20 } + { env: :production, directory: 'foo/bar', max_concurrency: 20, min_concurrency: 0, worker_id: first_worker_id, timeout: 10, dryrun: false, concurrency: 0 } end let(:env) { { "ENABLE_SIDEKIQ_CLUSTER" => "1", "SIDEKIQ_WORKER_ID" => first_worker_id.to_s } } @@ -97,4 +99,32 @@ expect(described_class.count_by_queue(queues)).to eq(%w[foo] => 2, %w[bar baz] => 1) end end + + describe '.concurrency' do + using RSpec::Parameterized::TableSyntax + + where(:queue_count, :min, :max, :fixed_concurrency, :expected) do + # without fixed concurrency + 2 | 0 | 0 | 0 | 3 # No min or max specified + 2 | 0 | 9 | 0 | 3 # No min specified, value < max + 2 | 1 | 4 | 0 | 3 # Value between min and max + 2 | 4 | 5 | 0 | 4 # Value below range + 5 | 2 | 3 | 0 | 3 # Value above range + 2 | 1 | 1 | 0 | 1 # Value above explicit setting (min == max) + 0 | 3 | 3 | 0 | 3 # Value below explicit setting (min == max) + 1 | 4 | 3 | 0 | 3 # Min greater than max + + # with fixed concurrency, expected always equal to fixed_concurrency + 1 | 0 | 20 | 20 | 20 + 1 | 0 | 20 | 10 | 10 + 1 | 20 | 20 | 10 | 10 + 5 | 0 | 0 | 10 | 10 + end + + with_them do + let(:queues) { Array.new(queue_count) } + + it { expect(described_class.concurrency(queues, min, max, fixed_concurrency)).to eq(expected) } + end + end end -- GitLab