diff --git a/lib/gitlab/background_migration/job_coordinator.rb b/lib/gitlab/background_migration/job_coordinator.rb index 09e2b2a3219750867e10df5c258823ec7ee0df37..efffd635dd268e7b56f2d049971ca6e79ad4caf2 100644 --- a/lib/gitlab/background_migration/job_coordinator.rb +++ b/lib/gitlab/background_migration/job_coordinator.rb @@ -106,33 +106,41 @@ def perform(class_name, arguments) end def remaining - enqueued = Sidekiq::Queue.new(self.queue) - scheduled = Sidekiq::ScheduledSet.new + Sidekiq::Client.via(sidekiq_redis_pool) do + enqueued = Sidekiq::Queue.new(self.queue) + scheduled = Sidekiq::ScheduledSet.new - [enqueued, scheduled].sum do |set| - set.count do |job| - job.klass == worker_class.name + [enqueued, scheduled].sum do |set| + set.count do |job| + job.klass == worker_class.name + end end end end def exists?(migration_class, additional_queues = []) - enqueued = Sidekiq::Queue.new(self.queue) - scheduled = Sidekiq::ScheduledSet.new + Sidekiq::Client.via(sidekiq_redis_pool) do + enqueued = Sidekiq::Queue.new(self.queue) + scheduled = Sidekiq::ScheduledSet.new - enqueued_job?([enqueued, scheduled], migration_class) + enqueued_job?([enqueued, scheduled], migration_class) + end end def dead_jobs?(migration_class) - dead_set = Sidekiq::DeadSet.new + Sidekiq::Client.via(sidekiq_redis_pool) do + dead_set = Sidekiq::DeadSet.new - enqueued_job?([dead_set], migration_class) + enqueued_job?([dead_set], migration_class) + end end def retrying_jobs?(migration_class) - retry_set = Sidekiq::RetrySet.new + Sidekiq::Client.via(sidekiq_redis_pool) do + retry_set = Sidekiq::RetrySet.new - enqueued_job?([retry_set], migration_class) + enqueued_job?([retry_set], migration_class) + end end def migration_instance_for(class_name) diff --git a/spec/lib/gitlab/background_migration/job_coordinator_spec.rb b/spec/lib/gitlab/background_migration/job_coordinator_spec.rb index c2b3af1bd6fb548899042bc3fe5bbd439357936a..82027adf27a53f9405b6e6f5f08b21820a0af072 100644 --- a/spec/lib/gitlab/background_migration/job_coordinator_spec.rb +++ b/spec/lib/gitlab/background_migration/job_coordinator_spec.rb @@ -342,6 +342,12 @@ end describe '.remaining', :redis do + it 'is shard aware' do + expect(Sidekiq::Client).to receive(:via).with(coordinator.sidekiq_redis_pool).once + + coordinator.remaining + end + context 'when there are jobs remaining' do before do Sidekiq::Testing.disable! do @@ -370,6 +376,12 @@ end describe '.exists?', :redis do + it 'is shard aware' do + expect(Sidekiq::Client).to receive(:via).with(coordinator.sidekiq_redis_pool).once + + coordinator.exists?('Foo') + end + context 'when there are enqueued jobs present' do before do Sidekiq::Testing.disable! do @@ -436,6 +448,12 @@ ] end + it 'is shard aware' do + expect(Sidekiq::Client).to receive(:via).with(coordinator.sidekiq_redis_pool).once + + coordinator.retrying_jobs?('Foo') + end + context 'when there are dead jobs present' do before do allow(Sidekiq::RetrySet).to receive(:new).and_return(queue)