Skip to content
代码片段 群组 项目
提交 dffb2b53 编辑于 作者: Felipe Artur's avatar Felipe Artur
浏览文件

Merge branch '431547-attempt-to-fix-reached-end-of-table-flag' into 'master'

Attemp to fix reached_end_of_table flag

See merge request https://gitlab.com/gitlab-org/gitlab/-/merge_requests/136717



Merged-by: default avatarFelipe Artur <fcardozo@gitlab.com>
Approved-by: default avatarFelipe Artur <fcardozo@gitlab.com>
Reviewed-by: default avatarcharlie ablett <cablett@gitlab.com>
Co-authored-by: default avatarAdam Hegyi <ahegyi@gitlab.com>
No related branches found
No related tags found
加载中
......@@ -4,6 +4,7 @@ module ClickHouse
class EventsSyncWorker
include ApplicationWorker
include Gitlab::ExclusiveLeaseHelpers
include Gitlab::Utils::StrongMemoize
idempotent!
queue_namespace :cronjob
......@@ -91,6 +92,11 @@ def context
)
end
def last_event_id_in_postgresql
Event.maximum(:id)
end
strong_memoize_attr :last_event_id_in_postgresql
def enabled?
ClickHouse::Client.configuration.databases[:main].present? && Feature.enabled?(:event_sync_worker_for_click_house)
end
......@@ -110,24 +116,34 @@ def next_batch
def process_batch(context)
Enumerator.new do |yielder|
has_data = false
# rubocop: disable CodeReuse/ActiveRecord
Event.where(Event.arel_table[:id].gt(context.last_record_id)).each_batch(of: BATCH_SIZE) do |relation|
has_data = true
relation.select(*EVENT_PROJECTIONS).each do |row|
has_more_data = false
batching_scope.each_batch(of: BATCH_SIZE) do |relation|
records = relation.select(*EVENT_PROJECTIONS).to_a
has_more_data = records.size == BATCH_SIZE
records.each do |row|
yielder << row
context.last_processed_id = row.id
break if context.record_limit_reached?
end
break if context.over_time? || context.record_limit_reached?
break if context.over_time? || context.record_limit_reached? || !has_more_data
end
context.no_more_records! if has_data == false
# rubocop: enable CodeReuse/ActiveRecord
context.no_more_records! unless has_more_data
end
end
# rubocop: disable CodeReuse/ActiveRecord
def batching_scope
return Event.none unless last_event_id_in_postgresql
table = Event.arel_table
Event
.where(table[:id].gt(context.last_record_id))
.where(table[:id].lteq(last_event_id_in_postgresql))
end
# rubocop: enable CodeReuse/ActiveRecord
end
end
......@@ -63,11 +63,32 @@
end
it 'inserts all records' do
expect(worker).to receive(:log_extra_metadata_on_done).with(:result,
{ status: :processed, records_inserted: 4, reached_end_of_table: true })
worker.perform
events = ClickHouse::Client.select('SELECT * FROM events', :main)
expect(events.size).to eq(4)
end
context 'when new records are inserted while processing' do
it 'does not process new records created during the iteration' do
expect(worker).to receive(:log_extra_metadata_on_done).with(:result,
{ status: :processed, records_inserted: 4,
reached_end_of_table: true })
# Simulating the case when there is an insert during the iteration
call_count = 0
allow(worker).to receive(:next_batch).and_wrap_original do |method|
call_count += 1
create(:event) if call_count == 3
method.call
end
worker.perform
end
end
end
context 'when time limit is reached' do
......@@ -96,6 +117,9 @@
end
it 'syncs records after the cursor' do
expect(worker).to receive(:log_extra_metadata_on_done).with(:result,
{ status: :processed, records_inserted: 3, reached_end_of_table: true })
worker.perform
events = ClickHouse::Client.select('SELECT id FROM events ORDER BY id', :main)
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册