Skip to content
代码片段 群组 项目
未验证 提交 c74bfba7 编辑于 作者: Terri Chu's avatar Terri Chu 提交者: GitLab
浏览文件

Apply 1 suggestion(s) to 1 file(s)


Co-authored-by: default avatarStan Hu <stanhu@gmail.com>
上级 7cbd12e3
No related branches found
No related tags found
无相关合并请求
...@@ -18,11 +18,13 @@ def perform ...@@ -18,11 +18,13 @@ def perform
reschedule_job = false reschedule_job = false
workers.each do |worker| workers.each do |worker|
next unless jobs_in_the_queue?(worker) limit = ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.limit_for(worker: worker)&.call
queue_size = queue_size(worker)
report_prometheus_metrics(worker, queue_size, limit)
reschedule_job = true next unless queue_size > 0
limit = ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.limit_for(worker: worker)&.call reschedule_job = true
processing_limit = if limit processing_limit = if limit
current = current_concurrency(worker: worker) current = current_concurrency(worker: worker)
...@@ -49,8 +51,8 @@ def current_concurrency(worker:) ...@@ -49,8 +51,8 @@ def current_concurrency(worker:)
@current_concurrency[worker.name].to_i @current_concurrency[worker.name].to_i
end end
def jobs_in_the_queue?(worker) def queue_size(worker)
Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService.has_jobs_in_queue?(worker.name) Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService.queue_size(worker.name)
end end
def resume_processing!(worker, limit:) def resume_processing!(worker, limit:)
...@@ -60,5 +62,18 @@ def resume_processing!(worker, limit:) ...@@ -60,5 +62,18 @@ def resume_processing!(worker, limit:)
def workers def workers
Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.workers Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.workers
end end
def report_prometheus_metrics(worker, queue_size, limit)
queue_size_metric = Gitlab::Metrics.gauge(:sidekiq_concurrency_limit_queue_jobs,
'Number of jobs queued by the concurrency limit middleware.',
{},
:max)
queue_size_metric.set({ worker: worker.name }, queue_size)
limit_metric = Gitlab::Metrics.gauge(:sidekiq_concurrency_limit_max_concurrent_jobs,
'Max number of concurrent running jobs.',
{})
limit_metric.set({ worker: worker.name }, limit || DEFAULT_LIMIT)
end
end end
end end
...@@ -232,6 +232,8 @@ configuration option in `gitlab.yml`. These metrics are served from the ...@@ -232,6 +232,8 @@ configuration option in `gitlab.yml`. These metrics are served from the
| `sidekiq_running_jobs` | Gauge | 12.2 | Number of Sidekiq jobs running | `queue`, `boundary`, `external_dependencies`, `feature_category`, `urgency` | | `sidekiq_running_jobs` | Gauge | 12.2 | Number of Sidekiq jobs running | `queue`, `boundary`, `external_dependencies`, `feature_category`, `urgency` |
| `sidekiq_concurrency` | Gauge | 12.5 | Maximum number of Sidekiq jobs | | | `sidekiq_concurrency` | Gauge | 12.5 | Maximum number of Sidekiq jobs | |
| `sidekiq_mem_total_bytes` | Gauge | 15.3 | Number of bytes allocated for both objects consuming an object slot and objects that required a malloc'| | | `sidekiq_mem_total_bytes` | Gauge | 15.3 | Number of bytes allocated for both objects consuming an object slot and objects that required a malloc'| |
| `sidekiq_concurrency_limit_queue_jobs` | Gauge | 17.3 | Number of Sidekiq jobs waiting in the concurrency limit queue| `worker` |
| `sidekiq_concurrency_limit_max_concurrent_jobs` | Gauge | 17.3 | Max number of concurrent running Sidekiq jobs | `worker` |
| `geo_db_replication_lag_seconds` | Gauge | 10.2 | Database replication lag (seconds) | `url` | | `geo_db_replication_lag_seconds` | Gauge | 10.2 | Database replication lag (seconds) | `url` |
| `geo_repositories` | Gauge | 10.2 | Total number of repositories available on primary | `url` | | `geo_repositories` | Gauge | 10.2 | Total number of repositories available on primary | `url` |
| `geo_lfs_objects` | Gauge | 10.2 | Number of LFS objects on primary | `url` | | `geo_lfs_objects` | Gauge | 10.2 | Number of LFS objects on primary | `url` |
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
context 'when there are no jobs in the queue' do context 'when there are no jobs in the queue' do
before do before do
allow(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService).to receive(:has_jobs_in_queue?) allow(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService).to receive(:has_jobs_in_queue?)
.and_return(false) .and_return(0)
end end
it 'does nothing' do it 'does nothing' do
...@@ -24,12 +24,33 @@ ...@@ -24,12 +24,33 @@
worker.perform worker.perform
end end
it 'reports prometheus metrics' do
stub_application_setting(elasticsearch_max_code_indexing_concurrency: 30)
queue_size_gauge_double = instance_double(Prometheus::Client::Gauge)
expect(Gitlab::Metrics).to receive(:gauge).at_least(:once)
.with(:sidekiq_concurrency_limit_queue_jobs, anything, {}, :max)
.and_return(queue_size_gauge_double)
allow(queue_size_gauge_double).to receive(:set).with({ worker: anything }, 0)
expect(queue_size_gauge_double).to receive(:set).with({ worker: worker_with_concurrency_limit.name }, 0)
limit_gauge_double = instance_double(Prometheus::Client::Gauge)
expect(Gitlab::Metrics).to receive(:gauge).at_least(:once)
.with(:sidekiq_concurrency_limit_max_concurrent_jobs, anything, {})
.and_return(limit_gauge_double)
allow(limit_gauge_double).to receive(:set).with({ worker: anything }, anything)
expect(limit_gauge_double).to receive(:set).with({ worker: worker_with_concurrency_limit.name }, 30)
worker.perform
end
end end
context 'when there are jobs in the queue' do context 'when there are jobs in the queue' do
before do before do
allow(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService).to receive(:has_jobs_in_queue?) allow(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService).to receive(:queue_size)
.and_return(true) .and_return(100)
end end
it 'resumes processing' do it 'resumes processing' do
...@@ -52,19 +73,72 @@ ...@@ -52,19 +73,72 @@
worker.perform worker.perform
end end
it 'resumes processing if limit is not set' do it 'reports prometheus metrics' do
nil_proc = -> { nil } stub_application_setting(elasticsearch_max_code_indexing_concurrency: 60)
allow(Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap).to receive(:limit_for) allow(::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersConcurrency).to receive(:workers)
expect(::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap).to receive(:limit_for) .and_return(worker_with_concurrency_limit.name => 15)
.with(worker: worker_with_concurrency_limit)
.and_return(nil_proc) queue_size_gauge_double = instance_double(Prometheus::Client::Gauge)
expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService) expect(Gitlab::Metrics).to receive(:gauge).at_least(:once)
.to receive(:resume_processing!) .with(:sidekiq_concurrency_limit_queue_jobs, anything, {}, :max)
.with(worker_with_concurrency_limit.name, limit: described_class::DEFAULT_LIMIT) .and_return(queue_size_gauge_double)
expect(described_class).to receive(:perform_in)
allow(queue_size_gauge_double).to receive(:set).with({ worker: anything }, anything)
expect(queue_size_gauge_double).to receive(:set).with({ worker: worker_with_concurrency_limit.name }, 100)
limit_gauge_double = instance_double(Prometheus::Client::Gauge)
expect(Gitlab::Metrics).to receive(:gauge).at_least(:once)
.with(:sidekiq_concurrency_limit_max_concurrent_jobs, anything, {})
.and_return(limit_gauge_double)
allow(limit_gauge_double).to receive(:set).with({ worker: anything }, anything)
expect(limit_gauge_double).to receive(:set).with({ worker: worker_with_concurrency_limit.name }, 60)
worker.perform worker.perform
end end
context 'when limit is not set' do
before do
allow(Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap).to receive(:limit_for)
nil_proc = -> { nil }
allow(::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap).to receive(:limit_for)
.with(worker: worker_with_concurrency_limit)
.and_return(nil_proc)
end
it 'resumes processing using the DEFAULT_LIMIT' do
expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService)
.to receive(:resume_processing!)
.with(worker_with_concurrency_limit.name, limit: described_class::DEFAULT_LIMIT)
expect(described_class).to receive(:perform_in)
worker.perform
end
it 'reports limit as DEFAULT_LIMIT' do
allow(::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersConcurrency).to receive(:workers)
.and_return(worker_with_concurrency_limit.name => 15)
queue_size_gauge_double = instance_double(Prometheus::Client::Gauge)
expect(Gitlab::Metrics).to receive(:gauge).at_least(:once)
.with(:sidekiq_concurrency_limit_queue_jobs, anything, {}, :max)
.and_return(queue_size_gauge_double)
allow(queue_size_gauge_double).to receive(:set).with({ worker: anything }, anything)
expect(queue_size_gauge_double).to receive(:set).with({ worker: worker_with_concurrency_limit.name }, 100)
limit_gauge_double = instance_double(Prometheus::Client::Gauge)
expect(Gitlab::Metrics).to receive(:gauge).at_least(:once)
.with(:sidekiq_concurrency_limit_max_concurrent_jobs, anything, {})
.and_return(limit_gauge_double)
allow(limit_gauge_double).to receive(:set).with({ worker: anything }, anything)
expect(limit_gauge_double).to receive(:set)
.with({ worker: worker_with_concurrency_limit.name }, described_class::DEFAULT_LIMIT)
worker.perform
end
end
end end
end end
end end
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册