Skip to content
代码片段 群组 项目
未验证 提交 b32e6059 编辑于 作者: Gregorius Marco's avatar Gregorius Marco 提交者: GitLab
浏览文件

Revert "Merge branch 'mg-remove-sidekiq-min-max-concurrency' into 'master'"

This reverts merge request !149690
上级 402a6c1a
No related branches found
No related tags found
无相关合并请求
...@@ -38,7 +38,10 @@ class CLI ...@@ -38,7 +38,10 @@ class CLI
def initialize(log_output = $stderr) def initialize(log_output = $stderr)
# https://github.com/mperham/sidekiq/wiki/Advanced-Options#concurrency # https://github.com/mperham/sidekiq/wiki/Advanced-Options#concurrency
# https://ruby.social/@getajobmike/109326475545816363 # https://ruby.social/@getajobmike/109326475545816363
@concurrency = 20 @max_concurrency = 20
@min_concurrency = 0
# TODO: to be set to 20 once max_concurrency and min_concurrency is removed https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/2760
@concurrency = 0
@environment = ENV['RAILS_ENV'] || 'development' @environment = ENV['RAILS_ENV'] || 'development'
@metrics_dir = ENV["prometheus_multiproc_dir"] || File.absolute_path("tmp/prometheus_multiproc_dir/sidekiq") @metrics_dir = ENV["prometheus_multiproc_dir"] || File.absolute_path("tmp/prometheus_multiproc_dir/sidekiq")
@pid = nil @pid = nil
...@@ -88,6 +91,12 @@ def run(argv = ARGV) ...@@ -88,6 +91,12 @@ def run(argv = ARGV)
'No queues found, you must select at least one queue' 'No queues found, you must select at least one queue'
end end
if routing_rules.empty?
# setting min_concurrency equal to max_concurrency so that the concurrency eventually
# is set to 20 (default value) instead of based on the number of queues, which is only 2+1 in this case.
@min_concurrency = @min_concurrency == 0 ? @max_concurrency : @min_concurrency
end
if @list_queues if @list_queues
puts queue_groups.map(&:sort) # rubocop:disable Rails/Output puts queue_groups.map(&:sort) # rubocop:disable Rails/Output
...@@ -111,6 +120,8 @@ def start_and_supervise_workers(queue_groups) ...@@ -111,6 +120,8 @@ def start_and_supervise_workers(queue_groups)
queue_groups, queue_groups,
env: @environment, env: @environment,
directory: @rails_path, directory: @rails_path,
max_concurrency: @max_concurrency,
min_concurrency: @min_concurrency,
concurrency: @concurrency, concurrency: @concurrency,
dryrun: @dryrun, dryrun: @dryrun,
timeout: @soft_timeout_seconds timeout: @soft_timeout_seconds
...@@ -203,6 +214,14 @@ def option_parser ...@@ -203,6 +214,14 @@ def option_parser
@concurrency = int.to_i @concurrency = int.to_i
end end
opt.on('-m', '--max-concurrency INT', 'Maximum threads to use with Sidekiq (default: 20, 0 to disable)') do |int|
@max_concurrency = int.to_i
end
opt.on('--min-concurrency INT', 'Minimum threads to use with Sidekiq (default: 0)') do |int|
@min_concurrency = int.to_i
end
opt.on('-e', '--environment ENV', 'The application environment') do |env| opt.on('-e', '--environment ENV', 'The application environment') do |env|
@environment = env @environment = env
end end
......
...@@ -36,10 +36,12 @@ module SidekiqCluster ...@@ -36,10 +36,12 @@ module SidekiqCluster
# #
# Returns an Array containing the waiter threads (from Process.detach) of # Returns an Array containing the waiter threads (from Process.detach) of
# the started processes. # the started processes.
def self.start(queues, env: :development, directory: Dir.pwd, concurrency: 20, timeout: DEFAULT_SOFT_TIMEOUT_SECONDS, dryrun: false) def self.start(queues, env: :development, directory: Dir.pwd, max_concurrency: 20, min_concurrency: 0, concurrency: 0, timeout: DEFAULT_SOFT_TIMEOUT_SECONDS, dryrun: false)
queues.map.with_index do |pair, index| queues.map.with_index do |pair, index|
start_sidekiq(pair, env: env, start_sidekiq(pair, env: env,
directory: directory, directory: directory,
max_concurrency: max_concurrency,
min_concurrency: min_concurrency,
concurrency: concurrency, concurrency: concurrency,
worker_id: index, worker_id: index,
timeout: timeout, timeout: timeout,
...@@ -50,11 +52,13 @@ def self.start(queues, env: :development, directory: Dir.pwd, concurrency: 20, t ...@@ -50,11 +52,13 @@ def self.start(queues, env: :development, directory: Dir.pwd, concurrency: 20, t
# Starts a Sidekiq process that processes _only_ the given queues. # Starts a Sidekiq process that processes _only_ the given queues.
# #
# Returns the PID of the started process. # Returns the PID of the started process.
def self.start_sidekiq(queues, env:, directory:, concurrency:, worker_id:, timeout:, dryrun:) # rubocop: disable Metrics/ParameterLists -- max_concurrency and min_concurrency will be removed in 17.0
def self.start_sidekiq(queues, env:, directory:, max_concurrency:, min_concurrency:, concurrency:, worker_id:, timeout:, dryrun:)
# rubocop: enable Metrics/ParameterLists
counts = count_by_queue(queues) counts = count_by_queue(queues)
cmd = %w[bundle exec sidekiq] cmd = %w[bundle exec sidekiq]
cmd << "-c#{concurrency}" cmd << "-c#{self.concurrency(queues, min_concurrency, max_concurrency, concurrency)}"
cmd << "-e#{env}" cmd << "-e#{env}"
cmd << "-t#{timeout}" cmd << "-t#{timeout}"
cmd << "-gqueues:#{proc_details(counts)}" cmd << "-gqueues:#{proc_details(counts)}"
...@@ -99,5 +103,15 @@ def self.proc_details(counts) ...@@ -99,5 +103,15 @@ def self.proc_details(counts)
end end
end.join(',') end.join(',')
end end
def self.concurrency(queues, min_concurrency, max_concurrency, concurrency)
return concurrency if concurrency > 0
concurrency_from_queues = queues.length + 1
max = max_concurrency > 0 ? max_concurrency : concurrency_from_queues
min = [min_concurrency, max].min
concurrency_from_queues.clamp(min, max)
end
end end
end end
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
let(:cli) { described_class.new('/dev/null') } let(:cli) { described_class.new('/dev/null') }
let(:timeout) { Gitlab::SidekiqCluster::DEFAULT_SOFT_TIMEOUT_SECONDS } let(:timeout) { Gitlab::SidekiqCluster::DEFAULT_SOFT_TIMEOUT_SECONDS }
let(:default_options) do let(:default_options) do
{ env: 'test', directory: Dir.pwd, dryrun: false, timeout: timeout, concurrency: 20 } { env: 'test', directory: Dir.pwd, max_concurrency: 20, min_concurrency: 0, dryrun: false, timeout: timeout, concurrency: 0 }
end end
let(:sidekiq_exporter_enabled) { false } let(:sidekiq_exporter_enabled) { false }
...@@ -120,6 +120,28 @@ ...@@ -120,6 +120,28 @@
end end
end end
context 'with --max-concurrency flag' do
it 'starts Sidekiq workers for specified queues with a max concurrency' do
expected_queues = [%w[foo bar baz], %w[solo]]
expect(Gitlab::SidekiqCluster).to receive(:start)
.with(expected_queues, default_options.merge(max_concurrency: 2))
.and_return([])
cli.run(%w[foo,bar,baz solo -m 2])
end
end
context 'with --min-concurrency flag' do
it 'starts Sidekiq workers for specified queues with a min concurrency' do
expected_queues = [%w[foo bar baz], %w[solo]]
expect(Gitlab::SidekiqCluster).to receive(:start)
.with(expected_queues, default_options.merge(min_concurrency: 2))
.and_return([])
cli.run(%w[foo,bar,baz solo --min-concurrency 2])
end
end
context 'with --concurrency flag' do context 'with --concurrency flag' do
it 'starts Sidekiq workers for specified queues with the fixed concurrency' do it 'starts Sidekiq workers for specified queues with the fixed concurrency' do
expected_queues = [%w[foo bar baz], %w[solo]] expected_queues = [%w[foo bar baz], %w[solo]]
...@@ -181,7 +203,8 @@ ...@@ -181,7 +203,8 @@
expect { cli.run(%w[foo]) }.not_to raise_error expect { cli.run(%w[foo]) }.not_to raise_error
end end
it "starts Sidekiq workers with DEFAULT_QUEUES" do it "starts Sidekiq workers with DEFAULT_QUEUES and min_concurrency = max_concurrency" do
default_options[:min_concurrency] = default_options[:max_concurrency]
expect(Gitlab::SidekiqCluster).to receive(:start) expect(Gitlab::SidekiqCluster).to receive(:start)
.with([described_class::DEFAULT_QUEUES], default_options) .with([described_class::DEFAULT_QUEUES], default_options)
.and_return([]) .and_return([])
...@@ -191,6 +214,7 @@ ...@@ -191,6 +214,7 @@
context 'with multi argument queues' do context 'with multi argument queues' do
it 'starts with multiple DEFAULT_QUEUES' do it 'starts with multiple DEFAULT_QUEUES' do
default_options[:min_concurrency] = default_options[:max_concurrency]
expected_queues = [%w[default mailers], %w[default mailers]] expected_queues = [%w[default mailers], %w[default mailers]]
expect(Gitlab::SidekiqCluster) expect(Gitlab::SidekiqCluster)
...@@ -206,7 +230,8 @@ ...@@ -206,7 +230,8 @@
stub_config(sidekiq: { routing_rules: [] }) stub_config(sidekiq: { routing_rules: [] })
end end
it "starts Sidekiq workers with DEFAULT_QUEUES" do it "starts Sidekiq workers with DEFAULT_QUEUES and min_concurrency = max_concurrency" do
default_options[:min_concurrency] = default_options[:max_concurrency]
expect(Gitlab::SidekiqCluster).to receive(:start) expect(Gitlab::SidekiqCluster).to receive(:start)
.with([described_class::DEFAULT_QUEUES], default_options) .with([described_class::DEFAULT_QUEUES], default_options)
.and_return([]) .and_return([])
...@@ -215,7 +240,8 @@ ...@@ -215,7 +240,8 @@
end end
context "with 4 wildcard * as argument" do context "with 4 wildcard * as argument" do
it "starts 4 Sidekiq workers all with DEFAULT_QUEUES" do it "starts 4 Sidekiq workers all with DEFAULT_QUEUES and min_concurrency = max_concurrency" do
default_options[:min_concurrency] = default_options[:max_concurrency]
expect(Gitlab::SidekiqCluster).to receive(:start) expect(Gitlab::SidekiqCluster).to receive(:start)
.with([described_class::DEFAULT_QUEUES] * 4, default_options) .with([described_class::DEFAULT_QUEUES] * 4, default_options)
.and_return([]) .and_return([])
...@@ -223,6 +249,18 @@ ...@@ -223,6 +249,18 @@
cli.run(%w[* * * *]) cli.run(%w[* * * *])
end end
end end
context "with min-concurrency flag" do
it "starts Sidekiq workers with DEFAULT_QUEUES and min_concurrency as specified" do
options = default_options.dup
options[:min_concurrency] = 10
expect(Gitlab::SidekiqCluster).to receive(:start)
.with([described_class::DEFAULT_QUEUES] * 4, options)
.and_return([])
cli.run(%w[* * * * --min-concurrency 10])
end
end
end end
end end
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
"ENABLE_SIDEKIQ_CLUSTER" => "1", "ENABLE_SIDEKIQ_CLUSTER" => "1",
"SIDEKIQ_WORKER_ID" => "0" "SIDEKIQ_WORKER_ID" => "0"
}, },
"bundle", "exec", "sidekiq", "-c20", "-eproduction", "-t25", "-gqueues:foo", "-rfoo/bar", "-qfoo,1", process_options "bundle", "exec", "sidekiq", "-c10", "-eproduction", "-t25", "-gqueues:foo", "-rfoo/bar", "-qfoo,1", process_options
).and_return(1) ).and_return(1)
expect(Process).to receive(:detach).ordered.with(1) expect(Process).to receive(:detach).ordered.with(1)
...@@ -27,21 +27,23 @@ ...@@ -27,21 +27,23 @@
"ENABLE_SIDEKIQ_CLUSTER" => "1", "ENABLE_SIDEKIQ_CLUSTER" => "1",
"SIDEKIQ_WORKER_ID" => "1" "SIDEKIQ_WORKER_ID" => "1"
}, },
"bundle", "exec", "sidekiq", "-c20", "-eproduction", "-t25", "-gqueues:bar,baz", "-rfoo/bar", "-qbar,1", "-qbaz,1", process_options "bundle", "exec", "sidekiq", "-c10", "-eproduction", "-t25", "-gqueues:bar,baz", "-rfoo/bar", "-qbar,1", "-qbaz,1", process_options
).and_return(2) ).and_return(2)
expect(Process).to receive(:detach).ordered.with(2) expect(Process).to receive(:detach).ordered.with(2)
described_class.start([%w[foo], %w[bar baz]], env: :production, directory: 'foo/bar', concurrency: 20) described_class.start([%w[foo], %w[bar baz]], env: :production, directory: 'foo/bar', max_concurrency: 20, min_concurrency: 10)
end end
it 'starts Sidekiq with the given queues and sensible default options' do it 'starts Sidekiq with the given queues and sensible default options' do
expected_options = { expected_options = {
env: :development, env: :development,
directory: an_instance_of(String), directory: an_instance_of(String),
max_concurrency: 20,
min_concurrency: 0,
worker_id: an_instance_of(Integer), worker_id: an_instance_of(Integer),
timeout: 25, timeout: 25,
dryrun: false, dryrun: false,
concurrency: 20 concurrency: 0
} }
expect(described_class).to receive(:start_sidekiq).ordered.with(%w[foo bar baz], expected_options) expect(described_class).to receive(:start_sidekiq).ordered.with(%w[foo bar baz], expected_options)
...@@ -54,7 +56,7 @@ ...@@ -54,7 +56,7 @@
describe '.start_sidekiq' do describe '.start_sidekiq' do
let(:first_worker_id) { 0 } let(:first_worker_id) { 0 }
let(:options) do let(:options) do
{ env: :production, directory: 'foo/bar', worker_id: first_worker_id, timeout: 10, dryrun: false, concurrency: 20 } { env: :production, directory: 'foo/bar', max_concurrency: 20, min_concurrency: 0, worker_id: first_worker_id, timeout: 10, dryrun: false, concurrency: 0 }
end end
let(:env) { { "ENABLE_SIDEKIQ_CLUSTER" => "1", "SIDEKIQ_WORKER_ID" => first_worker_id.to_s } } let(:env) { { "ENABLE_SIDEKIQ_CLUSTER" => "1", "SIDEKIQ_WORKER_ID" => first_worker_id.to_s } }
...@@ -97,4 +99,32 @@ ...@@ -97,4 +99,32 @@
expect(described_class.count_by_queue(queues)).to eq(%w[foo] => 2, %w[bar baz] => 1) expect(described_class.count_by_queue(queues)).to eq(%w[foo] => 2, %w[bar baz] => 1)
end end
end end
describe '.concurrency' do
using RSpec::Parameterized::TableSyntax
where(:queue_count, :min, :max, :fixed_concurrency, :expected) do
# without fixed concurrency
2 | 0 | 0 | 0 | 3 # No min or max specified
2 | 0 | 9 | 0 | 3 # No min specified, value < max
2 | 1 | 4 | 0 | 3 # Value between min and max
2 | 4 | 5 | 0 | 4 # Value below range
5 | 2 | 3 | 0 | 3 # Value above range
2 | 1 | 1 | 0 | 1 # Value above explicit setting (min == max)
0 | 3 | 3 | 0 | 3 # Value below explicit setting (min == max)
1 | 4 | 3 | 0 | 3 # Min greater than max
# with fixed concurrency, expected always equal to fixed_concurrency
1 | 0 | 20 | 20 | 20
1 | 0 | 20 | 10 | 10
1 | 20 | 20 | 10 | 10
5 | 0 | 0 | 10 | 10
end
with_them do
let(:queues) { Array.new(queue_count) }
it { expect(described_class.concurrency(queues, min, max, fixed_concurrency)).to eq(expected) }
end
end
end end
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册