diff --git a/app/workers/click_house/events_sync_worker.rb b/app/workers/click_house/events_sync_worker.rb index e884a43b1e38a0d44d6cb612b165f0248fea70d4..5936d30b8b2cafae59d3609ecc2a0ff57b8381d8 100644 --- a/app/workers/click_house/events_sync_worker.rb +++ b/app/workers/click_house/events_sync_worker.rb @@ -4,6 +4,7 @@ module ClickHouse class EventsSyncWorker include ApplicationWorker include Gitlab::ExclusiveLeaseHelpers + include Gitlab::Utils::StrongMemoize idempotent! queue_namespace :cronjob @@ -91,6 +92,11 @@ def context ) end + def last_event_id_in_postgresql + Event.maximum(:id) + end + strong_memoize_attr :last_event_id_in_postgresql + def enabled? ClickHouse::Client.configuration.databases[:main].present? && Feature.enabled?(:event_sync_worker_for_click_house) end @@ -110,24 +116,34 @@ def next_batch def process_batch(context) Enumerator.new do |yielder| - has_data = false - # rubocop: disable CodeReuse/ActiveRecord - Event.where(Event.arel_table[:id].gt(context.last_record_id)).each_batch(of: BATCH_SIZE) do |relation| - has_data = true - - relation.select(*EVENT_PROJECTIONS).each do |row| + has_more_data = false + batching_scope.each_batch(of: BATCH_SIZE) do |relation| + records = relation.select(*EVENT_PROJECTIONS).to_a + has_more_data = records.size == BATCH_SIZE + records.each do |row| yielder << row context.last_processed_id = row.id break if context.record_limit_reached? end - break if context.over_time? || context.record_limit_reached? + break if context.over_time? || context.record_limit_reached? || !has_more_data end - context.no_more_records! if has_data == false - # rubocop: enable CodeReuse/ActiveRecord + context.no_more_records! unless has_more_data end end + + # rubocop: disable CodeReuse/ActiveRecord + def batching_scope + return Event.none unless last_event_id_in_postgresql + + table = Event.arel_table + + Event + .where(table[:id].gt(context.last_record_id)) + .where(table[:id].lteq(last_event_id_in_postgresql)) + end + # rubocop: enable CodeReuse/ActiveRecord end end diff --git a/spec/workers/click_house/events_sync_worker_spec.rb b/spec/workers/click_house/events_sync_worker_spec.rb index 01267db36a7916beb949490362491032a6947137..28a885629a4f789eadc48c0ce8189129f89004d4 100644 --- a/spec/workers/click_house/events_sync_worker_spec.rb +++ b/spec/workers/click_house/events_sync_worker_spec.rb @@ -63,11 +63,32 @@ end it 'inserts all records' do + expect(worker).to receive(:log_extra_metadata_on_done).with(:result, + { status: :processed, records_inserted: 4, reached_end_of_table: true }) + worker.perform events = ClickHouse::Client.select('SELECT * FROM events', :main) expect(events.size).to eq(4) end + + context 'when new records are inserted while processing' do + it 'does not process new records created during the iteration' do + expect(worker).to receive(:log_extra_metadata_on_done).with(:result, + { status: :processed, records_inserted: 4, + reached_end_of_table: true }) + + # Simulating the case when there is an insert during the iteration + call_count = 0 + allow(worker).to receive(:next_batch).and_wrap_original do |method| + call_count += 1 + create(:event) if call_count == 3 + method.call + end + + worker.perform + end + end end context 'when time limit is reached' do @@ -96,6 +117,9 @@ end it 'syncs records after the cursor' do + expect(worker).to receive(:log_extra_metadata_on_done).with(:result, + { status: :processed, records_inserted: 3, reached_end_of_table: true }) + worker.perform events = ClickHouse::Client.select('SELECT id FROM events ORDER BY id', :main)