Skip to content
代码片段 群组 项目
提交 27b6724e 编辑于 作者: Dmitry Gruzd's avatar Dmitry Gruzd
浏览文件

Remove the parallel_bulk_cron_worker feature flag

Changelog: changed
EE: true
上级 112ed929
No related branches found
No related tags found
无相关合并请求
......@@ -20,12 +20,12 @@ def perform(shard_number = nil)
return false
end
return if legacy_lock_exists? # skip execution if legacy lease is still obtained
if shard_number
process_shard(shard_number)
elsif Feature.enabled?(:parallel_bulk_cron_worker)
schedule_shards
else
process_all_shards
schedule_shards
end
rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError
# We're scheduled on a cronjob, so nothing to do here
......@@ -34,8 +34,6 @@ def perform(shard_number = nil)
private
def process_shard(shard_number)
return if legacy_lock_exists? # skip execution if legacy lease is still obtained
in_lock("#{self.class.name.underscore}/shard/#{shard_number}", ttl: 10.minutes, retries: 10, sleep_sec: 1) do
service.execute(shards: [shard_number]).tap do |records_count|
log_extra_metadata_on_done(:records_count, records_count)
......@@ -48,24 +46,11 @@ def process_shard(shard_number)
end
def schedule_shards
return if legacy_lock_exists? # skip execution if legacy lease is still obtained
Elastic::ProcessBookkeepingService::SHARDS.each do |shard_number|
self.class.perform_async(shard_number)
end
end
def process_all_shards
in_lock(self.class.name.underscore, ttl: 10.minutes, retries: 10, sleep_sec: 1) do
service.execute.tap do |records_count|
log_extra_metadata_on_done(:records_count, records_count)
# Requeue current worker if the queue isn't empty
self.class.perform_in(RESCHEDULE_INTERVAL) if should_requeue?(records_count)
end
end
end
def legacy_lock_exists?
!!Gitlab::ExclusiveLease.get_uuid(self.class.name.underscore)
end
......
---
name: parallel_bulk_cron_worker
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/110936
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/390361
milestone: '15.9'
type: development
group: group::global search
default_enabled: true
......@@ -8,167 +8,102 @@
let(:worker) { described_class.new }
let(:lease_key) { 'elastic_index_bulk_cron_worker' }
context 'parallel feature flag is enabled' do
let(:shards) { [0, 1] }
let(:shard_number) { shards.first }
let(:shards) { [0, 1] }
let(:shard_number) { shards.first }
before do
stub_feature_flags(parallel_bulk_cron_worker: true)
stub_const("Elastic::ProcessBookkeepingService::SHARDS", shards)
end
context 'indexing is not paused' do
before do
expect(Elastic::IndexingControl).to receive(:non_cached_pause_indexing?).and_return(false)
end
it 'queues all shards for execution' do
shards.each do |shard_number|
expect(described_class).to receive(:perform_async).with(shard_number)
end
worker.perform
end
context 'legacy lease is detected' do
before do
allow(Gitlab::ExclusiveLease).to receive(:get_uuid).with(lease_key).and_return('lease_uuid')
end
it 'skips scheduling' do
expect(described_class).not_to receive(:perform_async)
worker.perform
end
it 'skips shard execution' do
expect(described_class).not_to receive(:perform_async)
worker.perform(shard_number)
end
end
it 'executes the service under an exclusive lease' do
expect_to_obtain_exclusive_lease("#{lease_key}/shard/#{shard_number}")
expect_next_instance_of(::Elastic::ProcessBookkeepingService) do |service|
expect(service).to receive(:execute).with(shards: [shard_number])
end
before do
stub_const("Elastic::ProcessBookkeepingService::SHARDS", shards)
end
worker.perform(shard_number)
end
context 'indexing is not paused' do
before do
expect(Elastic::IndexingControl).to receive(:non_cached_pause_indexing?).and_return(false)
end
context 'indexing is paused' do
before do
expect(Elastic::IndexingControl).to receive(:non_cached_pause_indexing?).and_return(true)
it 'queues all shards for execution' do
shards.each do |shard_number|
expect(described_class).to receive(:perform_async).with(shard_number)
end
it 'does nothing if indexing is paused' do
expect(::Elastic::ProcessBookkeepingService).not_to receive(:new)
expect(worker.perform).to eq(false)
end
worker.perform
end
context 'when service returns non-zero counter' do
context 'legacy lease is detected' do
before do
expect_next_instance_of(::Elastic::ProcessBookkeepingService) do |service|
expect(service).to receive(:execute).and_return(15)
end
allow(Gitlab::ExclusiveLease).to receive(:get_uuid).with(lease_key).and_return('lease_uuid')
end
it 'requeues the worker' do
expect(described_class).to receive(:perform_in).with(described_class::RESCHEDULE_INTERVAL, shard_number)
it 'skips scheduling' do
expect(described_class).not_to receive(:perform_async)
worker.perform(shard_number)
worker.perform
end
it 'does not requeue the worker if FF is disabled' do
stub_feature_flags(bulk_cron_worker_auto_requeue: false)
expect(described_class).not_to receive(:perform_in)
it 'skips shard execution' do
expect(described_class).not_to receive(:perform_async)
worker.perform(shard_number)
end
end
context 'when there are no records in the queue' do
it 'does not requeue the worker' do
expect(described_class).not_to receive(:perform_in)
it 'executes the service under an exclusive lease' do
expect_to_obtain_exclusive_lease("#{lease_key}/shard/#{shard_number}")
worker.perform(shard_number)
expect_next_instance_of(::Elastic::ProcessBookkeepingService) do |service|
expect(service).to receive(:execute).with(shards: [shard_number])
end
worker.perform(shard_number)
end
end
context 'parallel feature flag is disabled' do
context 'indexing is paused' do
before do
stub_feature_flags(parallel_bulk_cron_worker: false)
expect(Elastic::IndexingControl).to receive(:non_cached_pause_indexing?).and_return(true)
end
context 'indexing is not paused' do
before do
expect(Elastic::IndexingControl).to receive(:non_cached_pause_indexing?).and_return(false)
end
it 'executes the service under an exclusive lease' do
expect_to_obtain_exclusive_lease(lease_key)
it 'does nothing if indexing is paused' do
expect(::Elastic::ProcessBookkeepingService).not_to receive(:new)
expect_next_instance_of(::Elastic::ProcessBookkeepingService) do |service|
expect(service).to receive(:execute)
end
worker.perform
end
expect(worker.perform).to eq(false)
end
end
context 'indexing is paused' do
before do
expect(Elastic::IndexingControl).to receive(:non_cached_pause_indexing?).and_return(true)
end
it 'does nothing if indexing is paused' do
expect(::Elastic::ProcessBookkeepingService).not_to receive(:new)
expect(worker.perform).to eq(false)
context 'when service returns non-zero counter' do
before do
expect_next_instance_of(::Elastic::ProcessBookkeepingService) do |service|
expect(service).to receive(:execute).and_return(15)
end
end
context 'when service returns non-zero counter' do
before do
expect_next_instance_of(::Elastic::ProcessBookkeepingService) do |service|
expect(service).to receive(:execute).and_return(15)
end
end
it 'adds the elastic_bulk_count to the done log' do
worker.perform
it 'adds logging_extras to the done log' do
worker.perform(shard_number)
expect(worker.logging_extras).to eq(
"#{ApplicationWorker::LOGGING_EXTRA_KEY}.elastic_index_bulk_cron_worker.records_count" => 15
)
end
expect(worker.logging_extras).to eq(
"#{ApplicationWorker::LOGGING_EXTRA_KEY}.elastic_index_bulk_cron_worker.records_count" => 15,
"#{ApplicationWorker::LOGGING_EXTRA_KEY}.elastic_index_bulk_cron_worker.shard_number" => shard_number
)
end
it 'requeues the worker' do
expect(described_class).to receive(:perform_in)
it 'requeues the worker' do
expect(described_class).to receive(:perform_in).with(described_class::RESCHEDULE_INTERVAL, shard_number)
worker.perform
end
worker.perform(shard_number)
end
it 'does not requeue the worker if FF is disabled' do
stub_feature_flags(bulk_cron_worker_auto_requeue: false)
expect(described_class).not_to receive(:perform_in)
it 'does not requeue the worker if FF is disabled' do
stub_feature_flags(bulk_cron_worker_auto_requeue: false)
expect(described_class).not_to receive(:perform_in)
worker.perform
end
worker.perform(shard_number)
end
end
context 'when there are no records in the queue' do
it 'does not requeue the worker' do
expect(described_class).not_to receive(:perform_in)
context 'when there are no records in the queue' do
it 'does not requeue the worker' do
expect(described_class).not_to receive(:perform_in)
worker.perform
end
worker.perform(shard_number)
end
end
end
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册