Skip to content
代码片段 群组 项目
提交 73b14405 编辑于 作者: Matthias Käppler's avatar Matthias Käppler 提交者: Kamil Trzciński
浏览文件

More fine-grained metrics for sidekiq workers

We currently only measure the global (node-level) business in terms
of jobs being processed. We would instead like to be able to know
for each sidekiq process how saturated it is in terms of actual
concurrency vs requested/max concurrency.

This patch does the following:

- add a worker/process dimension to all existing metrics
- introduce a new metric to observe concurrency per process

by relating "actual job count per process" to "max job count per
process" we can then obtain some sort of saturation metric.
上级 a64354db
No related branches found
No related tags found
无相关合并请求
......@@ -64,14 +64,16 @@ def self.parse_queues(array)
# directory - The directory of the Rails application.
#
# Returns an Array containing the PIDs of the started processes.
def self.start(queues, env, directory = Dir.pwd, max_concurrency = 50, dryrun: false)
queues.map { |pair| start_sidekiq(pair, env, directory, max_concurrency, dryrun: dryrun) }
def self.start(queues, env: :development, directory: Dir.pwd, max_concurrency: 50, dryrun: false)
queues.map.with_index do |pair, index|
start_sidekiq(pair, env: env, directory: directory, max_concurrency: max_concurrency, worker_id: index, dryrun: dryrun)
end
end
# Starts a Sidekiq process that processes _only_ the given queues.
#
# Returns the PID of the started process.
def self.start_sidekiq(queues, env, directory = Dir.pwd, max_concurrency = 50, dryrun: false)
def self.start_sidekiq(queues, env:, directory:, max_concurrency:, worker_id:, dryrun:)
counts = count_by_queue(queues)
cmd = %w[bundle exec sidekiq]
......@@ -90,7 +92,8 @@ def self.start_sidekiq(queues, env, directory = Dir.pwd, max_concurrency = 50, d
end
pid = Process.spawn(
{ 'ENABLE_SIDEKIQ_CLUSTER' => '1' },
{ 'ENABLE_SIDEKIQ_CLUSTER' => '1',
'SIDEKIQ_WORKER_ID' => worker_id.to_s },
*cmd,
pgroup: true,
err: $stderr,
......
......@@ -49,7 +49,8 @@ def run(argv = ARGV)
@logger.info("Starting cluster with #{queue_groups.length} processes")
@processes = SidekiqCluster.start(queue_groups, @environment, @rails_path, @max_concurrency, dryrun: @dryrun)
@processes = SidekiqCluster.start(queue_groups, env: @environment, directory: @rails_path,
max_concurrency: @max_concurrency, dryrun: @dryrun)
return if @dryrun
......
......@@ -4,6 +4,9 @@
describe Gitlab::SidekiqCluster::CLI do
let(:cli) { described_class.new('/dev/null') }
let(:default_options) do
{ env: 'test', directory: Dir.pwd, max_concurrency: 50, dryrun: false }
end
describe '#run' do
context 'without any arguments' do
......@@ -21,7 +24,7 @@
it 'starts the Sidekiq workers' do
expect(Gitlab::SidekiqCluster).to receive(:start)
.with([['foo']], 'test', Dir.pwd, 50, dryrun: false)
.with([['foo']], default_options)
.and_return([])
cli.run(%w(foo))
......@@ -31,7 +34,7 @@
it 'starts Sidekiq workers for all queues in all_queues.yml except the ones in argv' do
expect(Gitlab::SidekiqConfig).to receive(:worker_queues).and_return(['baz'])
expect(Gitlab::SidekiqCluster).to receive(:start)
.with([['baz']], 'test', Dir.pwd, 50, dryrun: false)
.with([['baz']], default_options)
.and_return([])
cli.run(%w(foo -n))
......@@ -42,7 +45,7 @@
it 'starts Sidekiq workers for specified queues with a max concurrency' do
expect(Gitlab::SidekiqConfig).to receive(:worker_queues).and_return(%w(foo bar baz))
expect(Gitlab::SidekiqCluster).to receive(:start)
.with([%w(foo bar baz), %w(solo)], 'test', Dir.pwd, 2, dryrun: false)
.with([%w(foo bar baz), %w(solo)], default_options.merge(max_concurrency: 2))
.and_return([])
cli.run(%w(foo,bar,baz solo -m 2))
......@@ -53,7 +56,7 @@
it 'starts Sidekiq workers for all queues in all_queues.yml with a namespace in argv' do
expect(Gitlab::SidekiqConfig).to receive(:worker_queues).and_return(['cronjob:foo', 'cronjob:bar'])
expect(Gitlab::SidekiqCluster).to receive(:start)
.with([['cronjob', 'cronjob:foo', 'cronjob:bar']], 'test', Dir.pwd, 50, dryrun: false)
.with([['cronjob', 'cronjob:foo', 'cronjob:bar']], default_options)
.and_return([])
cli.run(%w(cronjob))
......
......@@ -58,36 +58,44 @@
end
describe '.start' do
it 'starts Sidekiq with the given queues and environment' do
expect(described_class).to receive(:start_sidekiq)
.ordered.with(%w(foo), :production, 'foo/bar', 50, dryrun: false)
it 'starts Sidekiq with the given queues, environment and options' do
expected_options = { env: :production, directory: 'foo/bar', max_concurrency: 20, dryrun: true }
expect(described_class).to receive(:start_sidekiq)
.ordered.with(%w(bar baz), :production, 'foo/bar', 50, dryrun: false)
expect(described_class).to receive(:start_sidekiq).ordered.with(%w(foo), expected_options.merge(worker_id: 0))
expect(described_class).to receive(:start_sidekiq).ordered.with(%w(bar baz), expected_options.merge(worker_id: 1))
described_class.start([%w(foo), %w(bar baz)], :production, 'foo/bar', 50)
described_class.start([%w(foo), %w(bar baz)], env: :production, directory: 'foo/bar', max_concurrency: 20, dryrun: true)
end
it 'starts Sidekiq with capped concurrency limits for each queue' do
expect(described_class).to receive(:start_sidekiq)
.ordered.with(%w(foo bar baz), :production, 'foo/bar', 2, dryrun: false)
it 'starts Sidekiq with the given queues and sensible default options' do
expected_options = {
env: :development,
directory: an_instance_of(String),
max_concurrency: 50,
worker_id: an_instance_of(Integer),
dryrun: false
}
expect(described_class).to receive(:start_sidekiq)
.ordered.with(%w(solo), :production, 'foo/bar', 2, dryrun: false)
expect(described_class).to receive(:start_sidekiq).ordered.with(%w(foo bar baz), expected_options)
expect(described_class).to receive(:start_sidekiq).ordered.with(%w(solo), expected_options)
described_class.start([%w(foo bar baz), %w(solo)], :production, 'foo/bar', 2)
described_class.start([%w(foo bar baz), %w(solo)])
end
end
describe '.start_sidekiq' do
let(:env) { { "ENABLE_SIDEKIQ_CLUSTER" => "1" } }
let(:first_worker_id) { 0 }
let(:options) do
{ env: :production, directory: 'foo/bar', max_concurrency: 20, worker_id: first_worker_id, dryrun: false }
end
let(:env) { { "ENABLE_SIDEKIQ_CLUSTER" => "1", "SIDEKIQ_WORKER_ID" => first_worker_id.to_s } }
let(:args) { ['bundle', 'exec', 'sidekiq', anything, '-eproduction', *([anything] * 5)] }
it 'starts a Sidekiq process' do
allow(Process).to receive(:spawn).and_return(1)
expect(described_class).to receive(:wait_async).with(1)
expect(described_class.start_sidekiq(%w(foo), :production)).to eq(1)
expect(described_class.start_sidekiq(%w(foo), options)).to eq(1)
end
it 'handles duplicate queue names' do
......@@ -97,7 +105,7 @@
.and_return(1)
expect(described_class).to receive(:wait_async).with(1)
expect(described_class.start_sidekiq(%w(foo foo bar baz), :production)).to eq(1)
expect(described_class.start_sidekiq(%w(foo foo bar baz), options)).to eq(1)
end
it 'runs the sidekiq process in a new process group' do
......@@ -107,7 +115,7 @@
.and_return(1)
allow(described_class).to receive(:wait_async)
expect(described_class.start_sidekiq(%w(foo bar baz), :production)).to eq(1)
expect(described_class.start_sidekiq(%w(foo bar baz), options)).to eq(1)
end
end
......
......@@ -9,6 +9,8 @@ class Metrics
def initialize
@metrics = init_metrics
@metrics[:sidekiq_concurrency].set({}, Sidekiq.options[:concurrency].to_i)
end
def call(_worker, job, queue)
......@@ -45,7 +47,8 @@ def init_metrics
sidekiq_jobs_completion_seconds: ::Gitlab::Metrics.histogram(:sidekiq_jobs_completion_seconds, 'Seconds to complete sidekiq job', {}, SIDEKIQ_LATENCY_BUCKETS),
sidekiq_jobs_failed_total: ::Gitlab::Metrics.counter(:sidekiq_jobs_failed_total, 'Sidekiq jobs failed'),
sidekiq_jobs_retried_total: ::Gitlab::Metrics.counter(:sidekiq_jobs_retried_total, 'Sidekiq jobs retried'),
sidekiq_running_jobs: ::Gitlab::Metrics.gauge(:sidekiq_running_jobs, 'Number of Sidekiq jobs running', {}, :livesum)
sidekiq_running_jobs: ::Gitlab::Metrics.gauge(:sidekiq_running_jobs, 'Number of Sidekiq jobs running', {}, :all),
sidekiq_concurrency: ::Gitlab::Metrics.gauge(:sidekiq_concurrency, 'Maximum number of Sidekiq jobs', {}, :all)
}
end
......
......@@ -6,7 +6,7 @@ module PidProvider
def worker_id
if Sidekiq.server?
'sidekiq'
sidekiq_worker_id
elsif defined?(Unicorn::Worker)
unicorn_worker_id
elsif defined?(::Puma)
......@@ -18,6 +18,14 @@ def worker_id
private
def sidekiq_worker_id
if worker = ENV['SIDEKIQ_WORKER_ID']
"sidekiq_#{worker}"
else
'sidekiq'
end
end
def unicorn_worker_id
if matches = process_name.match(/unicorn.*worker\[([0-9]+)\]/)
"unicorn_#{matches[1]}"
......
......@@ -3,25 +3,37 @@
require 'spec_helper'
describe Gitlab::SidekiqMiddleware::Metrics do
describe '#call' do
let(:middleware) { described_class.new }
let(:worker) { double(:worker) }
let(:completion_seconds_metric) { double('completion seconds metric') }
let(:user_execution_seconds_metric) { double('user execution seconds metric') }
let(:failed_total_metric) { double('failed total metric') }
let(:retried_total_metric) { double('retried total metric') }
let(:running_jobs_metric) { double('running jobs metric') }
let(:middleware) { described_class.new }
let(:concurrency_metric) { double('concurrency metric') }
let(:completion_seconds_metric) { double('completion seconds metric') }
let(:user_execution_seconds_metric) { double('user execution seconds metric') }
let(:failed_total_metric) { double('failed total metric') }
let(:retried_total_metric) { double('retried total metric') }
let(:running_jobs_metric) { double('running jobs metric') }
before do
allow(Gitlab::Metrics).to receive(:histogram).with(:sidekiq_jobs_completion_seconds, anything, anything, anything).and_return(completion_seconds_metric)
allow(Gitlab::Metrics).to receive(:histogram).with(:sidekiq_jobs_cpu_seconds, anything, anything, anything).and_return(user_execution_seconds_metric)
allow(Gitlab::Metrics).to receive(:counter).with(:sidekiq_jobs_failed_total, anything).and_return(failed_total_metric)
allow(Gitlab::Metrics).to receive(:counter).with(:sidekiq_jobs_retried_total, anything).and_return(retried_total_metric)
allow(Gitlab::Metrics).to receive(:gauge).with(:sidekiq_running_jobs, anything, {}, :all).and_return(running_jobs_metric)
allow(Gitlab::Metrics).to receive(:gauge).with(:sidekiq_concurrency, anything, {}, :all).and_return(concurrency_metric)
allow(running_jobs_metric).to receive(:increment)
allow(concurrency_metric).to receive(:set)
end
before do
allow(Gitlab::Metrics).to receive(:histogram).with(:sidekiq_jobs_completion_seconds, anything, anything, anything).and_return(completion_seconds_metric)
allow(Gitlab::Metrics).to receive(:histogram).with(:sidekiq_jobs_cpu_seconds, anything, anything, anything).and_return(user_execution_seconds_metric)
allow(Gitlab::Metrics).to receive(:counter).with(:sidekiq_jobs_failed_total, anything).and_return(failed_total_metric)
allow(Gitlab::Metrics).to receive(:counter).with(:sidekiq_jobs_retried_total, anything).and_return(retried_total_metric)
allow(Gitlab::Metrics).to receive(:gauge).with(:sidekiq_running_jobs, anything, {}, :livesum).and_return(running_jobs_metric)
describe '#initialize' do
it 'sets general metrics' do
expect(concurrency_metric).to receive(:set).with({}, Sidekiq.options[:concurrency].to_i)
allow(running_jobs_metric).to receive(:increment)
middleware
end
end
describe '#call' do
let(:worker) { double(:worker) }
it 'yields block' do
allow(completion_seconds_metric).to receive(:observe)
......@@ -30,7 +42,7 @@
expect { |b| middleware.call(worker, {}, :test, &b) }.to yield_control.once
end
it 'sets metrics' do
it 'sets queue specific metrics' do
labels = { queue: :test }
allow(middleware).to receive(:get_thread_cputime).and_return(1, 3)
......
......@@ -18,7 +18,17 @@
expect(Sidekiq).to receive(:server?).and_return(true)
end
it { is_expected.to eq 'sidekiq' }
context 'in a clustered setup' do
before do
stub_env('SIDEKIQ_WORKER_ID', '123')
end
it { is_expected.to eq 'sidekiq_123' }
end
context 'in a single process setup' do
it { is_expected.to eq 'sidekiq' }
end
end
context 'when running in Unicorn mode' do
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册