diff --git a/app/workers/concurrency_limit/resume_worker.rb b/app/workers/concurrency_limit/resume_worker.rb index 3ae646320152defeb6058acd8f5053219da13c5c..a08e228a7ca62b04f7b7cbf1d36e3236511b138a 100644 --- a/app/workers/concurrency_limit/resume_worker.rb +++ b/app/workers/concurrency_limit/resume_worker.rb @@ -25,7 +25,7 @@ def perform next unless queue_size > 0 next if limit < 0 # do not re-queue jobs if circuit-broken - current = current_concurrency(worker: worker) + current = concurrent_worker_count(worker) Gitlab::SidekiqLogging::ConcurrencyLimitLogger.instance.worker_stats_log( worker.name, limit, queue_size, current ) @@ -49,18 +49,6 @@ def perform private - def current_concurrency(worker:) - if ::Feature.enabled?(:sidekiq_concurrency_limit_optimized_count, Feature.current_request) - return concurrent_worker_count(worker) - end - - @current_concurrency ||= ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersConcurrency.workers( - skip_cache: true - ) - - @current_concurrency[worker.name].to_i - end - def concurrent_worker_count(worker) Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService.concurrent_worker_count(worker.name) end diff --git a/config/feature_flags/gitlab_com_derisk/sidekiq_concurrency_limit_optimized_count.yml b/config/feature_flags/gitlab_com_derisk/sidekiq_concurrency_limit_optimized_count.yml deleted file mode 100644 index 1d2d291c3c0968f0f40a4942c52e32fb4f6fecbd..0000000000000000000000000000000000000000 --- a/config/feature_flags/gitlab_com_derisk/sidekiq_concurrency_limit_optimized_count.yml +++ /dev/null @@ -1,9 +0,0 @@ ---- -name: sidekiq_concurrency_limit_optimized_count -feature_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/490936 -introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/169561 -rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/496335 -milestone: '17.6' -group: group::scalability -type: gitlab_com_derisk -default_enabled: false diff --git a/ee/spec/workers/concurrency_limit/resume_worker_spec.rb b/ee/spec/workers/concurrency_limit/resume_worker_spec.rb index 80b4a214c4075c210eeb223b0909e7e518035f38..5eda81a3e29650302bfc5dc020d8368c6616902a 100644 --- a/ee/spec/workers/concurrency_limit/resume_worker_spec.rb +++ b/ee/spec/workers/concurrency_limit/resume_worker_spec.rb @@ -6,10 +6,13 @@ subject(:worker) { described_class.new } let(:worker_with_concurrency_limit) { ElasticCommitIndexerWorker } + let(:concurrent_workers) { 5 } describe '#perform' do before do allow(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService).to receive(:resume_processing!) + allow(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService) + .to receive(:concurrent_worker_count).and_return(concurrent_workers) end shared_examples 'report prometheus metrics' do |limit = described_class::BATCH_SIZE, queue_size = 100| @@ -36,7 +39,7 @@ end end - shared_examples 'no jobs in the queue' do + context 'when there are no jobs in the queue' do before do allow(Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap).to receive(:limit_for) .and_return(10) @@ -60,7 +63,7 @@ it_behaves_like 'report prometheus metrics', 10, 0 end - shared_examples 'jobs in the queue' do + context 'when there are jobs in the queue' do before do allow(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService).to receive(:queue_size) .and_return(0) @@ -135,30 +138,5 @@ it_behaves_like 'report prometheus metrics', 0 end end - - context 'when sidekiq_concurrency_limit_optimized_count feature flag is disabled' do - let(:concurrent_workers) { 6 } - - before do - stub_feature_flags(sidekiq_concurrency_limit_optimized_count: false) - allow(::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersConcurrency).to receive(:workers) - .and_return(worker_with_concurrency_limit.name => concurrent_workers) - end - - it_behaves_like 'no jobs in the queue' - it_behaves_like 'jobs in the queue' - end - - context 'when sidekiq_concurrency_limit_optimized_count feature flag is enabled' do - let(:concurrent_workers) { 5 } - - before do - allow(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService) - .to receive(:concurrent_worker_count).and_return(concurrent_workers) - end - - it_behaves_like 'no jobs in the queue' - it_behaves_like 'jobs in the queue' - end end end diff --git a/lib/gitlab/metrics/samplers/concurrency_limit_sampler.rb b/lib/gitlab/metrics/samplers/concurrency_limit_sampler.rb index 6e89095a523f596f0876c7755e9134326228a87c..ab2913b968790fdb73c8a68fd4995f8c08092959 100644 --- a/lib/gitlab/metrics/samplers/concurrency_limit_sampler.rb +++ b/lib/gitlab/metrics/samplers/concurrency_limit_sampler.rb @@ -11,17 +11,13 @@ def sample queue_size = concurrent_limit_service.queue_size(w.name) report_queue_size(w.name, queue_size) if queue_size > 0 - concurrent_worker_count = workers_concurrency.current_for(worker: w) + concurrent_worker_count = concurrent_limit_service.concurrent_worker_count(w.name) report_concurrent_workers(w.name, concurrent_worker_count) if concurrent_worker_count > 0 end end private - def workers_concurrency - ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersConcurrency - end - def worker_maps Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap end diff --git a/lib/gitlab/sidekiq_middleware/concurrency_limit/workers_concurrency.rb b/lib/gitlab/sidekiq_middleware/concurrency_limit/workers_concurrency.rb deleted file mode 100644 index f133751520197774a901b7dd535043276e6ac3ca..0000000000000000000000000000000000000000 --- a/lib/gitlab/sidekiq_middleware/concurrency_limit/workers_concurrency.rb +++ /dev/null @@ -1,79 +0,0 @@ -# frozen_string_literal: true - -module Gitlab - module SidekiqMiddleware - module ConcurrencyLimit - class WorkersConcurrency - CACHE_EXPIRES_IN = 15.seconds - LEASE_EXPIRES_IN = 5.seconds - - CACHE_KEY = [:concurrency_limit, :workers].join(':') - LEASE_KEY = [:concurrency_limit, :lease].join(':') - - class << self - def current_for(worker:, skip_cache: false) - worker_class = worker.is_a?(Class) ? worker : worker.class - worker_name = worker_class.name - - if ::Feature.enabled?(:sidekiq_concurrency_limit_optimized_count, Feature.current_request, - type: :gitlab_com_derisk) - return ConcurrencyLimitService.concurrent_worker_count(worker_name) - end - - workers(skip_cache: skip_cache)[worker_name].to_i - end - - def workers(skip_cache: false) - return workers_uncached if skip_cache - - with_cache { workers_uncached } - end - - private - - # To calculate the `tally` takes hundreds of Redis calls so we - # really want to minimize the risk of concurrently recalculating the value. - # This caches the data in Redis for 15s but the 5s lease ensures that some - # process will recalculate the cache every 5s. The reason we don't just use a - # 5s cache is because the high concurrency of execution of this code path - # means that we're very likely to have many concurrent cache misses which means - # many processes concurrently recalculating the same cached value. - def with_cache - Gitlab::Redis::Cache.with do |redis| - key_set = redis.set(LEASE_KEY, 1, ex: LEASE_EXPIRES_IN, nx: true) - - break update_workers_cache(redis) { yield } if key_set - - tally = redis.get(CACHE_KEY) - break Gitlab::Json.parse(tally) if tally - - update_workers_cache(redis) { yield } - end - end - - def update_workers_cache(redis) - tally = yield - redis.set(CACHE_KEY, tally.to_json, ex: CACHE_EXPIRES_IN) - - tally - end - - def workers_uncached - hash = [] - Gitlab::SidekiqSharding::Router.with_routed_client do - workers = sidekiq_workers.map do |_process_id, _thread_id, work| - ::Gitlab::Json.parse(work.payload)['class'] - end - hash.concat(workers) - end - hash.tally - end - - def sidekiq_workers - Sidekiq::Workers.new.each - end - end - end - end - end -end diff --git a/lib/gitlab/sidekiq_middleware/concurrency_limit/workers_map.rb b/lib/gitlab/sidekiq_middleware/concurrency_limit/workers_map.rb index 3524d1ce8a6e35c2bb174aa94c99b4fa3147f938..09cc8ba3d9c7912320a808e5999af7b2f4d78c2b 100644 --- a/lib/gitlab/sidekiq_middleware/concurrency_limit/workers_map.rb +++ b/lib/gitlab/sidekiq_middleware/concurrency_limit/workers_map.rb @@ -29,7 +29,7 @@ def over_the_limit?(worker:) return false if limit == 0 return true if limit < 0 - current = ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersConcurrency.current_for(worker: worker) + current = current_count(worker) current >= limit end @@ -42,6 +42,12 @@ def workers private + def current_count(worker) + worker_class = worker.is_a?(Class) ? worker : worker.class + worker_name = worker_class.name + ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService.concurrent_worker_count(worker_name) + end + attr_reader :data end end diff --git a/spec/lib/gitlab/metrics/samplers/concurrency_limit_sampler_spec.rb b/spec/lib/gitlab/metrics/samplers/concurrency_limit_sampler_spec.rb index d8552b7f36e3e9625743e656e6ccc39d3f9f4a2c..cb505f3a99c7fbb1aaeab100334c5a3948372aef 100644 --- a/spec/lib/gitlab/metrics/samplers/concurrency_limit_sampler_spec.rb +++ b/spec/lib/gitlab/metrics/samplers/concurrency_limit_sampler_spec.rb @@ -21,8 +21,8 @@ expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService) .to receive(:queue_size).exactly(workers_with_limits.size).times.and_return(1) - expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersConcurrency) - .to receive(:current_for).exactly(workers_with_limits.size).times.and_return(1) + expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService) + .to receive(:concurrent_worker_count).exactly(workers_with_limits.size).times.and_return(1) queue_size_gauge_double = instance_double(Prometheus::Client::Counter) expect(Gitlab::Metrics).to receive(:counter) @@ -51,8 +51,8 @@ expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService) .to receive(:queue_size).exactly(workers_with_limits.size).times.and_return(0) - expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersConcurrency) - .to receive(:current_for).exactly(workers_with_limits.size).times.and_return(0) + expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService) + .to receive(:concurrent_worker_count).exactly(workers_with_limits.size).times.and_return(0) expect(Gitlab::Metrics).not_to receive(:counter) diff --git a/spec/lib/gitlab/sidekiq_middleware/concurrency_limit/workers_concurrency_spec.rb b/spec/lib/gitlab/sidekiq_middleware/concurrency_limit/workers_concurrency_spec.rb deleted file mode 100644 index 8740e6d506af92c05c8ac972461110a2a456c166..0000000000000000000000000000000000000000 --- a/spec/lib/gitlab/sidekiq_middleware/concurrency_limit/workers_concurrency_spec.rb +++ /dev/null @@ -1,177 +0,0 @@ -# frozen_string_literal: true - -require 'spec_helper' - -RSpec.describe Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersConcurrency, feature_category: :global_search do - let(:worker_class) do - Class.new do - def self.name - 'TestConcurrencyLimitWorker' - end - - include ApplicationWorker - - concurrency_limit -> { 60 } - - def perform(*); end - end - end - - let(:current_concurrency) { 10 } - let(:work) do - instance_double(Sidekiq::Work, payload: { 'class' => 'TestConcurrencyLimitWorker' }.to_json, queue: 'default') - end - - let(:sidekiq_worker) do - [ - 'process_id', - 'thread_id', - work - ] - end - - let(:concurrency_tracking_hash_content) do - (1..current_concurrency).flat_map { |i| [i, i] } - end - - before do - stub_const('TestConcurrencyLimitWorker', worker_class) - allow(described_class).to receive(:sidekiq_workers).and_return([sidekiq_worker] * current_concurrency) - Gitlab::Redis::QueuesMetadata.with do |c| - prefix = Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService::REDIS_KEY_PREFIX - c.hset("#{prefix}:{#{worker_class.name.underscore}}:executing", *concurrency_tracking_hash_content) - end - end - - describe '.current_for' do - subject(:current_for) { described_class.current_for(worker: TestConcurrencyLimitWorker, skip_cache: skip_cache) } - - context 'without cache' do - let(:skip_cache) { true } - - it 'looks up current concurrency from hash' do - expect(described_class).not_to receive(:workers_uncached) - expect(current_for).to eq(current_concurrency) - end - - context 'when sidekiq_concurrency_limit_optimized_count feature flag is disabled' do - before do - stub_feature_flags(sidekiq_concurrency_limit_optimized_count: false) - end - - it 'returns the current concurrency', quarantine: 'https://gitlab.com/gitlab-org/gitlab/-/issues/451677' do - expect(described_class).to receive(:workers_uncached).and_call_original - expect(current_for).to eq(current_concurrency) - end - end - end - - context 'with cache', :clean_gitlab_redis_cache do - let(:skip_cache) { false } - let(:cached_value) { { "TestConcurrencyLimitWorker" => 20 } } - - before do - cache_setup!(tally: cached_value, lease: true) - end - - it 'looks up current concurrency from hash' do - expect(described_class).not_to receive(:workers) - expect(current_for).to eq(current_concurrency) - end - - context 'when sidekiq_concurrency_limit_optimized_count feature flag is disabled' do - before do - stub_feature_flags(sidekiq_concurrency_limit_optimized_count: false) - end - - it 'returns cached current_for' do - expect(described_class).not_to receive(:workers_uncached) - - expect(current_for).to eq(20) - end - end - end - end - - describe '.workers' do - subject(:workers) { described_class.workers(skip_cache: skip_cache) } - - context 'without cache' do - let(:skip_cache) { true } - - it 'returns current_workers', quarantine: 'https://gitlab.com/gitlab-org/gitlab/-/issues/463861' do - expect(workers).to eq('TestConcurrencyLimitWorker' => 10) - end - - context 'with multiple shard instances' do - before do - allow(Gitlab::Redis::Queues) - .to receive(:instances).and_return({ 'main' => Gitlab::Redis::Queues, 'shard' => Gitlab::Redis::Queues }) - end - - it 'returns count for all instances' do - expect(workers).to eq({ - 'TestConcurrencyLimitWorker' => current_concurrency * Gitlab::Redis::Queues.instances.size - }) - end - end - end - - context 'with cache', :clean_gitlab_redis_cache do - let(:skip_cache) { false } - let(:cached_value) { { "TestConcurrencyLimitWorker" => 20 } } - let(:actual_tally) { { "TestConcurrencyLimitWorker" => 15 } } - - before do - cache_setup!(tally: cached_value, lease: lease) - end - - context 'when lease is not held by another process' do - let(:lease) { false } - - it 'returns the current concurrency' do - expect(described_class).to receive(:workers_uncached).and_return(actual_tally) - - expect(workers).to eq(actual_tally) - end - end - - context 'when lease is held by another process' do - let(:lease) { true } - - it 'returns cached workers' do - expect(described_class).not_to receive(:workers_uncached) - - expect(workers).to eq(cached_value) - end - end - - context 'when lease is held by another process but the cache is empty' do - let(:lease) { true } - let(:cached_value) { nil } - - it 'returns the current concurrency' do - expect(described_class).to receive(:workers_uncached).and_return(actual_tally) - - expect(workers).to eq(actual_tally) - end - end - end - end - - def cache_setup!(tally:, lease:) - Gitlab::Redis::Cache.with do |redis| - if tally - redis.set(described_class::CACHE_KEY, tally.to_json) - else - redis.del(described_class::CACHE_KEY) - end - - if lease - redis.set(described_class::LEASE_KEY, 1) - else - redis.del(described_class::LEASE_KEY) - end - end - end -end diff --git a/spec/lib/gitlab/sidekiq_middleware/concurrency_limit/workers_map_spec.rb b/spec/lib/gitlab/sidekiq_middleware/concurrency_limit/workers_map_spec.rb index 6b73d29bca0420c821deff6c95685a5231835352..b860331688fca9e3d52c39260a69237a53d20e28 100644 --- a/spec/lib/gitlab/sidekiq_middleware/concurrency_limit/workers_map_spec.rb +++ b/spec/lib/gitlab/sidekiq_middleware/concurrency_limit/workers_map_spec.rb @@ -62,7 +62,8 @@ def perform(*); end with_them do before do allow(described_class).to receive(:limit_for).and_return(limit) - allow(::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersConcurrency).to receive(:current_for) + allow(::Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService) + .to receive(:concurrent_worker_count) .and_return(current) end