diff --git a/ee/app/workers/concerns/elastic/bulk_cron_worker.rb b/ee/app/workers/concerns/elastic/bulk_cron_worker.rb index 203dfbd3dc15952c974e92065985aac1eb7cc9a8..6e62d34996c1fe1316d4700636daf55b822196a7 100644 --- a/ee/app/workers/concerns/elastic/bulk_cron_worker.rb +++ b/ee/app/workers/concerns/elastic/bulk_cron_worker.rb @@ -20,12 +20,12 @@ def perform(shard_number = nil) return false end + return if legacy_lock_exists? # skip execution if legacy lease is still obtained + if shard_number process_shard(shard_number) - elsif Feature.enabled?(:parallel_bulk_cron_worker) - schedule_shards else - process_all_shards + schedule_shards end rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError # We're scheduled on a cronjob, so nothing to do here @@ -34,8 +34,6 @@ def perform(shard_number = nil) private def process_shard(shard_number) - return if legacy_lock_exists? # skip execution if legacy lease is still obtained - in_lock("#{self.class.name.underscore}/shard/#{shard_number}", ttl: 10.minutes, retries: 10, sleep_sec: 1) do service.execute(shards: [shard_number]).tap do |records_count| log_extra_metadata_on_done(:records_count, records_count) @@ -48,24 +46,11 @@ def process_shard(shard_number) end def schedule_shards - return if legacy_lock_exists? # skip execution if legacy lease is still obtained - Elastic::ProcessBookkeepingService::SHARDS.each do |shard_number| self.class.perform_async(shard_number) end end - def process_all_shards - in_lock(self.class.name.underscore, ttl: 10.minutes, retries: 10, sleep_sec: 1) do - service.execute.tap do |records_count| - log_extra_metadata_on_done(:records_count, records_count) - - # Requeue current worker if the queue isn't empty - self.class.perform_in(RESCHEDULE_INTERVAL) if should_requeue?(records_count) - end - end - end - def legacy_lock_exists? !!Gitlab::ExclusiveLease.get_uuid(self.class.name.underscore) end diff --git a/ee/config/feature_flags/development/parallel_bulk_cron_worker.yml b/ee/config/feature_flags/development/parallel_bulk_cron_worker.yml deleted file mode 100644 index b18a1215c677571e4d2d716b38147a4383027c8d..0000000000000000000000000000000000000000 --- a/ee/config/feature_flags/development/parallel_bulk_cron_worker.yml +++ /dev/null @@ -1,8 +0,0 @@ ---- -name: parallel_bulk_cron_worker -introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/110936 -rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/390361 -milestone: '15.9' -type: development -group: group::global search -default_enabled: true diff --git a/ee/spec/workers/elastic_index_bulk_cron_worker_spec.rb b/ee/spec/workers/elastic_index_bulk_cron_worker_spec.rb index 36ad344891b9364891e8b9181a873d8117628893..187653e446113e62db36dcde73c4cdb6e0fc491c 100644 --- a/ee/spec/workers/elastic_index_bulk_cron_worker_spec.rb +++ b/ee/spec/workers/elastic_index_bulk_cron_worker_spec.rb @@ -8,167 +8,102 @@ let(:worker) { described_class.new } let(:lease_key) { 'elastic_index_bulk_cron_worker' } - context 'parallel feature flag is enabled' do - let(:shards) { [0, 1] } - let(:shard_number) { shards.first } + let(:shards) { [0, 1] } + let(:shard_number) { shards.first } - before do - stub_feature_flags(parallel_bulk_cron_worker: true) - stub_const("Elastic::ProcessBookkeepingService::SHARDS", shards) - end - - context 'indexing is not paused' do - before do - expect(Elastic::IndexingControl).to receive(:non_cached_pause_indexing?).and_return(false) - end - - it 'queues all shards for execution' do - shards.each do |shard_number| - expect(described_class).to receive(:perform_async).with(shard_number) - end - - worker.perform - end - - context 'legacy lease is detected' do - before do - allow(Gitlab::ExclusiveLease).to receive(:get_uuid).with(lease_key).and_return('lease_uuid') - end - - it 'skips scheduling' do - expect(described_class).not_to receive(:perform_async) - - worker.perform - end - - it 'skips shard execution' do - expect(described_class).not_to receive(:perform_async) - - worker.perform(shard_number) - end - end - - it 'executes the service under an exclusive lease' do - expect_to_obtain_exclusive_lease("#{lease_key}/shard/#{shard_number}") - - expect_next_instance_of(::Elastic::ProcessBookkeepingService) do |service| - expect(service).to receive(:execute).with(shards: [shard_number]) - end + before do + stub_const("Elastic::ProcessBookkeepingService::SHARDS", shards) + end - worker.perform(shard_number) - end + context 'indexing is not paused' do + before do + expect(Elastic::IndexingControl).to receive(:non_cached_pause_indexing?).and_return(false) end - context 'indexing is paused' do - before do - expect(Elastic::IndexingControl).to receive(:non_cached_pause_indexing?).and_return(true) + it 'queues all shards for execution' do + shards.each do |shard_number| + expect(described_class).to receive(:perform_async).with(shard_number) end - it 'does nothing if indexing is paused' do - expect(::Elastic::ProcessBookkeepingService).not_to receive(:new) - - expect(worker.perform).to eq(false) - end + worker.perform end - context 'when service returns non-zero counter' do + context 'legacy lease is detected' do before do - expect_next_instance_of(::Elastic::ProcessBookkeepingService) do |service| - expect(service).to receive(:execute).and_return(15) - end + allow(Gitlab::ExclusiveLease).to receive(:get_uuid).with(lease_key).and_return('lease_uuid') end - it 'requeues the worker' do - expect(described_class).to receive(:perform_in).with(described_class::RESCHEDULE_INTERVAL, shard_number) + it 'skips scheduling' do + expect(described_class).not_to receive(:perform_async) - worker.perform(shard_number) + worker.perform end - it 'does not requeue the worker if FF is disabled' do - stub_feature_flags(bulk_cron_worker_auto_requeue: false) - expect(described_class).not_to receive(:perform_in) + it 'skips shard execution' do + expect(described_class).not_to receive(:perform_async) worker.perform(shard_number) end end - context 'when there are no records in the queue' do - it 'does not requeue the worker' do - expect(described_class).not_to receive(:perform_in) + it 'executes the service under an exclusive lease' do + expect_to_obtain_exclusive_lease("#{lease_key}/shard/#{shard_number}") - worker.perform(shard_number) + expect_next_instance_of(::Elastic::ProcessBookkeepingService) do |service| + expect(service).to receive(:execute).with(shards: [shard_number]) end + + worker.perform(shard_number) end end - context 'parallel feature flag is disabled' do + context 'indexing is paused' do before do - stub_feature_flags(parallel_bulk_cron_worker: false) + expect(Elastic::IndexingControl).to receive(:non_cached_pause_indexing?).and_return(true) end - context 'indexing is not paused' do - before do - expect(Elastic::IndexingControl).to receive(:non_cached_pause_indexing?).and_return(false) - end - - it 'executes the service under an exclusive lease' do - expect_to_obtain_exclusive_lease(lease_key) + it 'does nothing if indexing is paused' do + expect(::Elastic::ProcessBookkeepingService).not_to receive(:new) - expect_next_instance_of(::Elastic::ProcessBookkeepingService) do |service| - expect(service).to receive(:execute) - end - - worker.perform - end + expect(worker.perform).to eq(false) end + end - context 'indexing is paused' do - before do - expect(Elastic::IndexingControl).to receive(:non_cached_pause_indexing?).and_return(true) - end - - it 'does nothing if indexing is paused' do - expect(::Elastic::ProcessBookkeepingService).not_to receive(:new) - - expect(worker.perform).to eq(false) + context 'when service returns non-zero counter' do + before do + expect_next_instance_of(::Elastic::ProcessBookkeepingService) do |service| + expect(service).to receive(:execute).and_return(15) end end - context 'when service returns non-zero counter' do - before do - expect_next_instance_of(::Elastic::ProcessBookkeepingService) do |service| - expect(service).to receive(:execute).and_return(15) - end - end - - it 'adds the elastic_bulk_count to the done log' do - worker.perform + it 'adds logging_extras to the done log' do + worker.perform(shard_number) - expect(worker.logging_extras).to eq( - "#{ApplicationWorker::LOGGING_EXTRA_KEY}.elastic_index_bulk_cron_worker.records_count" => 15 - ) - end + expect(worker.logging_extras).to eq( + "#{ApplicationWorker::LOGGING_EXTRA_KEY}.elastic_index_bulk_cron_worker.records_count" => 15, + "#{ApplicationWorker::LOGGING_EXTRA_KEY}.elastic_index_bulk_cron_worker.shard_number" => shard_number + ) + end - it 'requeues the worker' do - expect(described_class).to receive(:perform_in) + it 'requeues the worker' do + expect(described_class).to receive(:perform_in).with(described_class::RESCHEDULE_INTERVAL, shard_number) - worker.perform - end + worker.perform(shard_number) + end - it 'does not requeue the worker if FF is disabled' do - stub_feature_flags(bulk_cron_worker_auto_requeue: false) - expect(described_class).not_to receive(:perform_in) + it 'does not requeue the worker if FF is disabled' do + stub_feature_flags(bulk_cron_worker_auto_requeue: false) + expect(described_class).not_to receive(:perform_in) - worker.perform - end + worker.perform(shard_number) end + end - context 'when there are no records in the queue' do - it 'does not requeue the worker' do - expect(described_class).not_to receive(:perform_in) + context 'when there are no records in the queue' do + it 'does not requeue the worker' do + expect(described_class).not_to receive(:perform_in) - worker.perform - end + worker.perform(shard_number) end end end