diff --git a/ee/lib/gitlab/sidekiq_cluster.rb b/ee/lib/gitlab/sidekiq_cluster.rb index 4ffa9b4d5b3e2c89b1170b35f89cc38fed6a0f42..7b2d8ce7a554f6a433c1fd0a615314d64c809f4a 100644 --- a/ee/lib/gitlab/sidekiq_cluster.rb +++ b/ee/lib/gitlab/sidekiq_cluster.rb @@ -64,14 +64,16 @@ def self.parse_queues(array) # directory - The directory of the Rails application. # # Returns an Array containing the PIDs of the started processes. - def self.start(queues, env, directory = Dir.pwd, max_concurrency = 50, dryrun: false) - queues.map { |pair| start_sidekiq(pair, env, directory, max_concurrency, dryrun: dryrun) } + def self.start(queues, env: :development, directory: Dir.pwd, max_concurrency: 50, dryrun: false) + queues.map.with_index do |pair, index| + start_sidekiq(pair, env: env, directory: directory, max_concurrency: max_concurrency, worker_id: index, dryrun: dryrun) + end end # Starts a Sidekiq process that processes _only_ the given queues. # # Returns the PID of the started process. - def self.start_sidekiq(queues, env, directory = Dir.pwd, max_concurrency = 50, dryrun: false) + def self.start_sidekiq(queues, env:, directory:, max_concurrency:, worker_id:, dryrun:) counts = count_by_queue(queues) cmd = %w[bundle exec sidekiq] @@ -90,7 +92,8 @@ def self.start_sidekiq(queues, env, directory = Dir.pwd, max_concurrency = 50, d end pid = Process.spawn( - { 'ENABLE_SIDEKIQ_CLUSTER' => '1' }, + { 'ENABLE_SIDEKIQ_CLUSTER' => '1', + 'SIDEKIQ_WORKER_ID' => worker_id.to_s }, *cmd, pgroup: true, err: $stderr, diff --git a/ee/lib/gitlab/sidekiq_cluster/cli.rb b/ee/lib/gitlab/sidekiq_cluster/cli.rb index d39bf0d6dcf357246477c16420fafe91d58b6bf2..a44b8dc801bab3c5dc624abe38e806977078cbd8 100644 --- a/ee/lib/gitlab/sidekiq_cluster/cli.rb +++ b/ee/lib/gitlab/sidekiq_cluster/cli.rb @@ -49,7 +49,8 @@ def run(argv = ARGV) @logger.info("Starting cluster with #{queue_groups.length} processes") - @processes = SidekiqCluster.start(queue_groups, @environment, @rails_path, @max_concurrency, dryrun: @dryrun) + @processes = SidekiqCluster.start(queue_groups, env: @environment, directory: @rails_path, + max_concurrency: @max_concurrency, dryrun: @dryrun) return if @dryrun diff --git a/ee/spec/lib/gitlab/sidekiq_cluster/cli_spec.rb b/ee/spec/lib/gitlab/sidekiq_cluster/cli_spec.rb index d450b5872be8be812e447b38ed584cac8b776ad3..899ad964498c13378275664b1b693cefaa3a0348 100644 --- a/ee/spec/lib/gitlab/sidekiq_cluster/cli_spec.rb +++ b/ee/spec/lib/gitlab/sidekiq_cluster/cli_spec.rb @@ -4,6 +4,9 @@ describe Gitlab::SidekiqCluster::CLI do let(:cli) { described_class.new('/dev/null') } + let(:default_options) do + { env: 'test', directory: Dir.pwd, max_concurrency: 50, dryrun: false } + end describe '#run' do context 'without any arguments' do @@ -21,7 +24,7 @@ it 'starts the Sidekiq workers' do expect(Gitlab::SidekiqCluster).to receive(:start) - .with([['foo']], 'test', Dir.pwd, 50, dryrun: false) + .with([['foo']], default_options) .and_return([]) cli.run(%w(foo)) @@ -31,7 +34,7 @@ it 'starts Sidekiq workers for all queues in all_queues.yml except the ones in argv' do expect(Gitlab::SidekiqConfig).to receive(:worker_queues).and_return(['baz']) expect(Gitlab::SidekiqCluster).to receive(:start) - .with([['baz']], 'test', Dir.pwd, 50, dryrun: false) + .with([['baz']], default_options) .and_return([]) cli.run(%w(foo -n)) @@ -42,7 +45,7 @@ it 'starts Sidekiq workers for specified queues with a max concurrency' do expect(Gitlab::SidekiqConfig).to receive(:worker_queues).and_return(%w(foo bar baz)) expect(Gitlab::SidekiqCluster).to receive(:start) - .with([%w(foo bar baz), %w(solo)], 'test', Dir.pwd, 2, dryrun: false) + .with([%w(foo bar baz), %w(solo)], default_options.merge(max_concurrency: 2)) .and_return([]) cli.run(%w(foo,bar,baz solo -m 2)) @@ -53,7 +56,7 @@ it 'starts Sidekiq workers for all queues in all_queues.yml with a namespace in argv' do expect(Gitlab::SidekiqConfig).to receive(:worker_queues).and_return(['cronjob:foo', 'cronjob:bar']) expect(Gitlab::SidekiqCluster).to receive(:start) - .with([['cronjob', 'cronjob:foo', 'cronjob:bar']], 'test', Dir.pwd, 50, dryrun: false) + .with([['cronjob', 'cronjob:foo', 'cronjob:bar']], default_options) .and_return([]) cli.run(%w(cronjob)) diff --git a/ee/spec/lib/gitlab/sidekiq_cluster_spec.rb b/ee/spec/lib/gitlab/sidekiq_cluster_spec.rb index 5f35c79fbc6e9aecf5e5594a2267d97881c99211..8447e8ea80d52d7a6829bf60948bcedee6f016fb 100644 --- a/ee/spec/lib/gitlab/sidekiq_cluster_spec.rb +++ b/ee/spec/lib/gitlab/sidekiq_cluster_spec.rb @@ -58,36 +58,44 @@ end describe '.start' do - it 'starts Sidekiq with the given queues and environment' do - expect(described_class).to receive(:start_sidekiq) - .ordered.with(%w(foo), :production, 'foo/bar', 50, dryrun: false) + it 'starts Sidekiq with the given queues, environment and options' do + expected_options = { env: :production, directory: 'foo/bar', max_concurrency: 20, dryrun: true } - expect(described_class).to receive(:start_sidekiq) - .ordered.with(%w(bar baz), :production, 'foo/bar', 50, dryrun: false) + expect(described_class).to receive(:start_sidekiq).ordered.with(%w(foo), expected_options.merge(worker_id: 0)) + expect(described_class).to receive(:start_sidekiq).ordered.with(%w(bar baz), expected_options.merge(worker_id: 1)) - described_class.start([%w(foo), %w(bar baz)], :production, 'foo/bar', 50) + described_class.start([%w(foo), %w(bar baz)], env: :production, directory: 'foo/bar', max_concurrency: 20, dryrun: true) end - it 'starts Sidekiq with capped concurrency limits for each queue' do - expect(described_class).to receive(:start_sidekiq) - .ordered.with(%w(foo bar baz), :production, 'foo/bar', 2, dryrun: false) + it 'starts Sidekiq with the given queues and sensible default options' do + expected_options = { + env: :development, + directory: an_instance_of(String), + max_concurrency: 50, + worker_id: an_instance_of(Integer), + dryrun: false + } - expect(described_class).to receive(:start_sidekiq) - .ordered.with(%w(solo), :production, 'foo/bar', 2, dryrun: false) + expect(described_class).to receive(:start_sidekiq).ordered.with(%w(foo bar baz), expected_options) + expect(described_class).to receive(:start_sidekiq).ordered.with(%w(solo), expected_options) - described_class.start([%w(foo bar baz), %w(solo)], :production, 'foo/bar', 2) + described_class.start([%w(foo bar baz), %w(solo)]) end end describe '.start_sidekiq' do - let(:env) { { "ENABLE_SIDEKIQ_CLUSTER" => "1" } } + let(:first_worker_id) { 0 } + let(:options) do + { env: :production, directory: 'foo/bar', max_concurrency: 20, worker_id: first_worker_id, dryrun: false } + end + let(:env) { { "ENABLE_SIDEKIQ_CLUSTER" => "1", "SIDEKIQ_WORKER_ID" => first_worker_id.to_s } } let(:args) { ['bundle', 'exec', 'sidekiq', anything, '-eproduction', *([anything] * 5)] } it 'starts a Sidekiq process' do allow(Process).to receive(:spawn).and_return(1) expect(described_class).to receive(:wait_async).with(1) - expect(described_class.start_sidekiq(%w(foo), :production)).to eq(1) + expect(described_class.start_sidekiq(%w(foo), options)).to eq(1) end it 'handles duplicate queue names' do @@ -97,7 +105,7 @@ .and_return(1) expect(described_class).to receive(:wait_async).with(1) - expect(described_class.start_sidekiq(%w(foo foo bar baz), :production)).to eq(1) + expect(described_class.start_sidekiq(%w(foo foo bar baz), options)).to eq(1) end it 'runs the sidekiq process in a new process group' do @@ -107,7 +115,7 @@ .and_return(1) allow(described_class).to receive(:wait_async) - expect(described_class.start_sidekiq(%w(foo bar baz), :production)).to eq(1) + expect(described_class.start_sidekiq(%w(foo bar baz), options)).to eq(1) end end diff --git a/lib/gitlab/sidekiq_middleware/metrics.rb b/lib/gitlab/sidekiq_middleware/metrics.rb index 8af353d8674fa601c625e20e9240462d131fe343..86762a2d02b3ef877973456579cc861abce0ec59 100644 --- a/lib/gitlab/sidekiq_middleware/metrics.rb +++ b/lib/gitlab/sidekiq_middleware/metrics.rb @@ -9,6 +9,8 @@ class Metrics def initialize @metrics = init_metrics + + @metrics[:sidekiq_concurrency].set({}, Sidekiq.options[:concurrency].to_i) end def call(_worker, job, queue) @@ -45,7 +47,8 @@ def init_metrics sidekiq_jobs_completion_seconds: ::Gitlab::Metrics.histogram(:sidekiq_jobs_completion_seconds, 'Seconds to complete sidekiq job', {}, SIDEKIQ_LATENCY_BUCKETS), sidekiq_jobs_failed_total: ::Gitlab::Metrics.counter(:sidekiq_jobs_failed_total, 'Sidekiq jobs failed'), sidekiq_jobs_retried_total: ::Gitlab::Metrics.counter(:sidekiq_jobs_retried_total, 'Sidekiq jobs retried'), - sidekiq_running_jobs: ::Gitlab::Metrics.gauge(:sidekiq_running_jobs, 'Number of Sidekiq jobs running', {}, :livesum) + sidekiq_running_jobs: ::Gitlab::Metrics.gauge(:sidekiq_running_jobs, 'Number of Sidekiq jobs running', {}, :all), + sidekiq_concurrency: ::Gitlab::Metrics.gauge(:sidekiq_concurrency, 'Maximum number of Sidekiq jobs', {}, :all) } end diff --git a/lib/prometheus/pid_provider.rb b/lib/prometheus/pid_provider.rb index e0f7e7e0a9e0a5fbc153f1c06565d09e38681105..228639357ac09dc09d968bf4e66cbc4e9bb71265 100644 --- a/lib/prometheus/pid_provider.rb +++ b/lib/prometheus/pid_provider.rb @@ -6,7 +6,7 @@ module PidProvider def worker_id if Sidekiq.server? - 'sidekiq' + sidekiq_worker_id elsif defined?(Unicorn::Worker) unicorn_worker_id elsif defined?(::Puma) @@ -18,6 +18,14 @@ def worker_id private + def sidekiq_worker_id + if worker = ENV['SIDEKIQ_WORKER_ID'] + "sidekiq_#{worker}" + else + 'sidekiq' + end + end + def unicorn_worker_id if matches = process_name.match(/unicorn.*worker\[([0-9]+)\]/) "unicorn_#{matches[1]}" diff --git a/spec/lib/gitlab/sidekiq_middleware/metrics_spec.rb b/spec/lib/gitlab/sidekiq_middleware/metrics_spec.rb index 806112fcb16645b074da1794661f1379bead8b18..ec4c8560f2282c9f4a393478693bdcc3ac19a0bf 100644 --- a/spec/lib/gitlab/sidekiq_middleware/metrics_spec.rb +++ b/spec/lib/gitlab/sidekiq_middleware/metrics_spec.rb @@ -3,25 +3,37 @@ require 'spec_helper' describe Gitlab::SidekiqMiddleware::Metrics do - describe '#call' do - let(:middleware) { described_class.new } - let(:worker) { double(:worker) } - - let(:completion_seconds_metric) { double('completion seconds metric') } - let(:user_execution_seconds_metric) { double('user execution seconds metric') } - let(:failed_total_metric) { double('failed total metric') } - let(:retried_total_metric) { double('retried total metric') } - let(:running_jobs_metric) { double('running jobs metric') } + let(:middleware) { described_class.new } + + let(:concurrency_metric) { double('concurrency metric') } + let(:completion_seconds_metric) { double('completion seconds metric') } + let(:user_execution_seconds_metric) { double('user execution seconds metric') } + let(:failed_total_metric) { double('failed total metric') } + let(:retried_total_metric) { double('retried total metric') } + let(:running_jobs_metric) { double('running jobs metric') } + + before do + allow(Gitlab::Metrics).to receive(:histogram).with(:sidekiq_jobs_completion_seconds, anything, anything, anything).and_return(completion_seconds_metric) + allow(Gitlab::Metrics).to receive(:histogram).with(:sidekiq_jobs_cpu_seconds, anything, anything, anything).and_return(user_execution_seconds_metric) + allow(Gitlab::Metrics).to receive(:counter).with(:sidekiq_jobs_failed_total, anything).and_return(failed_total_metric) + allow(Gitlab::Metrics).to receive(:counter).with(:sidekiq_jobs_retried_total, anything).and_return(retried_total_metric) + allow(Gitlab::Metrics).to receive(:gauge).with(:sidekiq_running_jobs, anything, {}, :all).and_return(running_jobs_metric) + allow(Gitlab::Metrics).to receive(:gauge).with(:sidekiq_concurrency, anything, {}, :all).and_return(concurrency_metric) + + allow(running_jobs_metric).to receive(:increment) + allow(concurrency_metric).to receive(:set) + end - before do - allow(Gitlab::Metrics).to receive(:histogram).with(:sidekiq_jobs_completion_seconds, anything, anything, anything).and_return(completion_seconds_metric) - allow(Gitlab::Metrics).to receive(:histogram).with(:sidekiq_jobs_cpu_seconds, anything, anything, anything).and_return(user_execution_seconds_metric) - allow(Gitlab::Metrics).to receive(:counter).with(:sidekiq_jobs_failed_total, anything).and_return(failed_total_metric) - allow(Gitlab::Metrics).to receive(:counter).with(:sidekiq_jobs_retried_total, anything).and_return(retried_total_metric) - allow(Gitlab::Metrics).to receive(:gauge).with(:sidekiq_running_jobs, anything, {}, :livesum).and_return(running_jobs_metric) + describe '#initialize' do + it 'sets general metrics' do + expect(concurrency_metric).to receive(:set).with({}, Sidekiq.options[:concurrency].to_i) - allow(running_jobs_metric).to receive(:increment) + middleware end + end + + describe '#call' do + let(:worker) { double(:worker) } it 'yields block' do allow(completion_seconds_metric).to receive(:observe) @@ -30,7 +42,7 @@ expect { |b| middleware.call(worker, {}, :test, &b) }.to yield_control.once end - it 'sets metrics' do + it 'sets queue specific metrics' do labels = { queue: :test } allow(middleware).to receive(:get_thread_cputime).and_return(1, 3) diff --git a/spec/lib/prometheus/pid_provider_spec.rb b/spec/lib/prometheus/pid_provider_spec.rb index ba843b272548b19d107e98b77fd25f032a150f9b..6fdc11b14c40be76290ae7e862fa3fbd90e7e361 100644 --- a/spec/lib/prometheus/pid_provider_spec.rb +++ b/spec/lib/prometheus/pid_provider_spec.rb @@ -18,7 +18,17 @@ expect(Sidekiq).to receive(:server?).and_return(true) end - it { is_expected.to eq 'sidekiq' } + context 'in a clustered setup' do + before do + stub_env('SIDEKIQ_WORKER_ID', '123') + end + + it { is_expected.to eq 'sidekiq_123' } + end + + context 'in a single process setup' do + it { is_expected.to eq 'sidekiq' } + end end context 'when running in Unicorn mode' do