diff --git a/.rubocop.yml b/.rubocop.yml index 4794fbb2c441ecd9b449c3b75c0144ce0d1cd228..d3fbf7fd8ddf93004db9418335e468d85c62843b 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -1045,6 +1045,7 @@ Cop/RedisQueueUsage: - 'lib/gitlab/sidekiq_sharding/scheduled_enq.rb' - 'lib/gitlab/sidekiq_sharding/router.rb' - 'app/workers/concerns/application_worker.rb' + - 'lib/gitlab/sidekiq_queue.rb' Cop/SidekiqApiUsage: Enabled: true diff --git a/lib/gitlab/sidekiq_queue.rb b/lib/gitlab/sidekiq_queue.rb index e22f09b4976bc135d971e6fa64c9788e63fbd522..df2dd7b5c0767ffd66ca097422c21f43433b75bf 100644 --- a/lib/gitlab/sidekiq_queue.rb +++ b/lib/gitlab/sidekiq_queue.rb @@ -29,24 +29,31 @@ def drop_jobs!(search_metadata, timeout:) .compact raise NoMetadataError if job_search_metadata.empty? - raise InvalidQueueError unless queue + raise InvalidQueueError if sidekiq_queues.values.compact.empty? - queue.each do |job| - if timeout_exceeded?(start_time, timeout) - completed = false - break - end + Gitlab::Redis::Queues.instances.map do |key, instance| + queue = sidekiq_queues[key] + next if queue.nil? + + Sidekiq::Client.via(instance.sidekiq_redis) do + queue.each do |job| + if timeout_exceeded?(start_time, timeout) + completed = false + break + end - next unless job_matches?(job, job_search_metadata) + next unless job_matches?(job, job_search_metadata) - job.delete - deleted_jobs += 1 + job.delete + deleted_jobs += 1 + end + end end { completed: completed, deleted_jobs: deleted_jobs, - queue_size: queue.size + queue_size: queue_size } end @@ -60,14 +67,24 @@ def transform_key(key) end end - def queue - strong_memoize(:queue) do - # Sidekiq::Queue.new always returns a queue, even if it doesn't - # exist. - Sidekiq::Queue.all.find { |queue| queue.name == queue_name } + def sidekiq_queues + @sidekiq_queues ||= Gitlab::Redis::Queues.instances.to_h do |name, instance| + Sidekiq::Client.via(instance.sidekiq_redis) do + [name, Sidekiq::Queue.all.find { |queue| queue.name == queue_name }] + end end end + def queue_size + sidekiq_queues.filter_map do |k, v| + instance = Gitlab::Redis::Queues.instances[k] + next if instance.nil? + + # .size calls `llen` using Sidekiq.redis, hence we need to wrap it with .via + Sidekiq::Client.via(instance.sidekiq_redis) { v.size } + end.sum + end + def job_matches?(job, job_search_metadata) job_search_metadata.all? { |key, value| job[key] == value } end diff --git a/spec/lib/gitlab/sidekiq_queue_spec.rb b/spec/lib/gitlab/sidekiq_queue_spec.rb index 8ceba7ca4b754799da6cd9079fe00a0cc3ce6d5d..0a64c94ef3b2048b16d4fc0a1f8531f8a464a876 100644 --- a/spec/lib/gitlab/sidekiq_queue_spec.rb +++ b/spec/lib/gitlab/sidekiq_queue_spec.rb @@ -76,6 +76,30 @@ def add_job(args, user:, klass: 'AuthorizedProjectsWorker') end end + context 'when there extra queue shard instances are used' do + let(:search_metadata) { { user: sidekiq_queue_user.username } } + let(:sidekiq_queue) { described_class.new('foobar') } + let_it_be(:sidekiq_queue_user) { create(:user) } + + before do + allow(Gitlab::Redis::Queues) + .to receive(:instances).and_return({ 'main' => Gitlab::Redis::Queues, 'shard' => Gitlab::Redis::Queues }) + + add_job([1], user: create(:user)) + add_job([2], user: sidekiq_queue_user, klass: 'MergeWorker') + add_job([3], user: sidekiq_queue_user) + end + + it 'tracks queues from both instances' do + expect(Sidekiq::Queue).to receive(:all).twice.and_call_original + + expect(sidekiq_queue.drop_jobs!(search_metadata, timeout: 10)) + .to eq(completed: true, + deleted_jobs: 2, + queue_size: 2) # Note: intentional double count + end + end + context 'when there are no valid metadata keys passed' do it 'raises NoMetadataError' do add_job([1], user: create(:user))