diff --git a/config/gitlab.yml.example b/config/gitlab.yml.example index 07be2bbf57b9d8b750a85f25f5c880c58e43fb2b..5002e9e24bf311e537ebce0af767da9582c428b7 100644 --- a/config/gitlab.yml.example +++ b/config/gitlab.yml.example @@ -657,6 +657,10 @@ production: &base ci_runners_stale_group_runners_prune_worker_cron: cron: "30 * * * *" + # Periodically queue syncing of finished builds from p_ci_finished_build_ch_sync_events to ClickHouse + click_house_ci_finished_builds_sync_worker: + cron: "*/3 * * * *" + registry: # enabled: true # host: registry.example.com diff --git a/config/initializers/1_settings.rb b/config/initializers/1_settings.rb index 003712c76c6197884883a600f6681051b81885c7..899ee1d8cd9a508118ad908de4b999ea8141cfad 100644 --- a/config/initializers/1_settings.rb +++ b/config/initializers/1_settings.rb @@ -902,7 +902,7 @@ Settings.cron_jobs['click_house_events_sync_worker']['job_class'] = 'ClickHouse::EventsSyncWorker' Settings.cron_jobs['click_house_ci_finished_builds_sync_worker'] ||= {} Settings.cron_jobs['click_house_ci_finished_builds_sync_worker']['cron'] ||= '*/3 * * * *' - Settings.cron_jobs['click_house_ci_finished_builds_sync_worker']['args'] ||= [0, 1] + Settings.cron_jobs['click_house_ci_finished_builds_sync_worker']['args'] ||= [1] Settings.cron_jobs['click_house_ci_finished_builds_sync_worker']['job_class'] = 'ClickHouse::CiFinishedBuildsSyncCronWorker' Settings.cron_jobs['vertex_ai_refresh_access_token_worker'] ||= {} Settings.cron_jobs['vertex_ai_refresh_access_token_worker']['cron'] ||= '*/50 * * * *' diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml index c2eae462ffdf68be32bbea02e1a57bdf6308ac79..37a6ade60fdbbeb1a2861350596c522013db5933 100644 --- a/config/sidekiq_queues.yml +++ b/config/sidekiq_queues.yml @@ -163,6 +163,8 @@ - 1 - - ci_upstream_projects_subscriptions_cleanup - 1 +- - click_house_ci_finished_builds_sync + - 1 - - cluster_agent - 1 - - compliance_management_chain_of_custody_report diff --git a/ee/app/workers/all_queues.yml b/ee/app/workers/all_queues.yml index d4b9bb67fdbe4ed56f68936c32877247b48fa87a..b2c29f7731921466a5b214077f5586ac6aef093a 100644 --- a/ee/app/workers/all_queues.yml +++ b/ee/app/workers/all_queues.yml @@ -114,7 +114,7 @@ - :name: cronjob:click_house_ci_finished_builds_sync_cron :worker_name: ClickHouse::CiFinishedBuildsSyncCronWorker :feature_category: :runner_fleet - :has_external_dependencies: true + :has_external_dependencies: false :urgency: :low :resource_boundary: :unknown :weight: 1 @@ -1209,6 +1209,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: click_house_ci_finished_builds_sync + :worker_name: ClickHouse::CiFinishedBuildsSyncWorker + :feature_category: :runner_fleet + :has_external_dependencies: false + :urgency: :throttled + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: compliance_management_chain_of_custody_report :worker_name: ComplianceManagement::ChainOfCustodyReportWorker :feature_category: :compliance_management diff --git a/ee/app/workers/click_house/ci_finished_builds_sync_cron_worker.rb b/ee/app/workers/click_house/ci_finished_builds_sync_cron_worker.rb index 404d8b792faba41c5f4363a1a4dfeec1ba88b25c..868c06e2c56c7698f81afdb0c7e8e91c26da36ea 100644 --- a/ee/app/workers/click_house/ci_finished_builds_sync_cron_worker.rb +++ b/ee/app/workers/click_house/ci_finished_builds_sync_cron_worker.rb @@ -4,19 +4,24 @@ module ClickHouse class CiFinishedBuildsSyncCronWorker include ApplicationWorker + version 2 + idempotent! queue_namespace :cronjob data_consistency :delayed - worker_has_external_dependencies! # the worker interacts with a ClickHouse database feature_category :runner_fleet + loggable_arguments 1 + + def perform(*args) + return unless job_version == 2 + + total_workers = args.first || 1 - def perform(worker_index = 0, total_workers = 1) - response = ::ClickHouse::DataIngestion::CiFinishedBuildsSyncService.new( - worker_index: worker_index, total_workers: total_workers - ).execute + total_workers.times do |worker_index| + CiFinishedBuildsSyncWorker.perform_async(worker_index, total_workers) + end - result = response.success? ? response.payload : response.deconstruct_keys(%i[message reason]) - log_extra_metadata_on_done(:result, result) + nil end end end diff --git a/ee/app/workers/click_house/ci_finished_builds_sync_worker.rb b/ee/app/workers/click_house/ci_finished_builds_sync_worker.rb new file mode 100644 index 0000000000000000000000000000000000000000..2461a63e21a5edbafea3d846699e2f43bbbd9af8 --- /dev/null +++ b/ee/app/workers/click_house/ci_finished_builds_sync_worker.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +module ClickHouse + class CiFinishedBuildsSyncWorker + include ApplicationWorker + + idempotent! + data_consistency :delayed + urgency :throttled + feature_category :runner_fleet + loggable_arguments 1, 2 + + def perform(worker_index = 0, total_workers = 1) + response = ::ClickHouse::DataIngestion::CiFinishedBuildsSyncService.new( + worker_index: worker_index, total_workers: total_workers + ).execute + + result = response.success? ? response.payload : response.deconstruct_keys(%i[message reason]) + log_extra_metadata_on_done(:result, result) + end + end +end diff --git a/ee/spec/workers/click_house/ci_finished_builds_sync_cron_worker_spec.rb b/ee/spec/workers/click_house/ci_finished_builds_sync_cron_worker_spec.rb index 7807479235c1b00c1ca03246aee65531cc323b17..2ee9c841856ff179209e5ea2f047c304c415bf0a 100644 --- a/ee/spec/workers/click_house/ci_finished_builds_sync_cron_worker_spec.rb +++ b/ee/spec/workers/click_house/ci_finished_builds_sync_cron_worker_spec.rb @@ -5,68 +5,79 @@ RSpec.describe ClickHouse::CiFinishedBuildsSyncCronWorker, :click_house, :freeze_time, feature_category: :runner_fleet do let(:worker) { described_class.new } - let_it_be(:ci_build1) { create(:ci_build, :success) } - let_it_be(:ci_build2) { create(:ci_build, :pending) } + subject(:perform) { worker.perform(*args) } - subject(:perform) { worker.perform } + include_examples 'an idempotent worker' do + context 'when job version is nil' do + before do + allow(worker).to receive(:job_version).and_return(nil) + end - before do - create_sync_events ci_build1 - end + context 'when arguments are not specified' do + let(:args) { [] } - include_examples 'an idempotent worker' do - it 'calls CiFinishedBuildsSyncService and returns its response payload' do - expect(worker).to receive(:log_extra_metadata_on_done) - .with(:result, { reached_end_of_table: true, records_inserted: 1 }) + it 'does nothing' do + expect(ClickHouse::CiFinishedBuildsSyncWorker).not_to receive(:perform_async) - params = { worker_index: 0, total_workers: 1 } - expect_next_instance_of(::ClickHouse::DataIngestion::CiFinishedBuildsSyncService, params) do |service| - expect(service).to receive(:execute).and_call_original + perform + end end - expect(ClickHouse::Client).to receive(:insert_csv).once.and_call_original + context 'when arguments are specified' do + let(:args) { [worker_index, total_workers] } - expect { perform }.to change { ci_finished_builds_row_count }.by(::Ci::Build.finished.count) - end + context 'with total_workers set to 3' do + let(:total_workers) { 3 } - context 'when an error is reported from service' do - before do - allow(ClickHouse::Client.configuration).to receive(:databases).and_return({}) - end + context 'with worker_index set to 0' do + let(:worker_index) { 0 } - it 'skips execution' do - expect(worker).to receive(:log_extra_metadata_on_done) - .with(:result, { message: 'ClickHouse database is not configured', reason: :db_not_configured }) + it 'does nothing' do + expect(ClickHouse::CiFinishedBuildsSyncWorker).not_to receive(:perform_async) - perform + perform + end + end + end end end - end - context 'with 2 workers' do - subject(:perform) { worker.perform(0, 2) } + context 'when job version is present' do + context 'when arguments are not specified' do + let(:args) { [] } - it 'calls CiFinishedBuildsSyncService with correct arguments' do - expect(worker).to receive(:log_extra_metadata_on_done).once + it 'invokes 1 worker with specified arguments' do + expect(ClickHouse::CiFinishedBuildsSyncWorker).to receive(:perform_async).with(0, 1) - params = { worker_index: 0, total_workers: 2 } - expect_next_instance_of(::ClickHouse::DataIngestion::CiFinishedBuildsSyncService, params) do |service| - expect(service).to receive(:execute).and_call_original + perform + end end - expect(ClickHouse::Client).to receive(:insert_csv).once.and_call_original + context 'when arguments are specified' do + let(:args) { [total_workers] } - perform - end - end + context 'with total_workers set to 1' do + let(:total_workers) { 1 } - def create_sync_events(*builds) - builds.each do |build| - Ci::FinishedBuildChSyncEvent.new(build_id: build.id, build_finished_at: build.finished_at).save! - end - end + it 'invokes 1 worker' do + expect(ClickHouse::CiFinishedBuildsSyncWorker).to receive(:perform_async).with(0, 1) + + perform + end + end - def ci_finished_builds_row_count - ClickHouse::Client.select('SELECT COUNT(*) AS count FROM ci_finished_builds', :main).first['count'] + context 'with total_workers set to 3', :aggregate_failures do + let(:total_workers) { 3 } + + it 'invokes 3 workers' do + expect(ClickHouse::CiFinishedBuildsSyncWorker).to receive(:perform_async).with(0, 3) + expect(ClickHouse::CiFinishedBuildsSyncWorker).to receive(:perform_async).with(1, 3) + expect(ClickHouse::CiFinishedBuildsSyncWorker).to receive(:perform_async).with(2, 3) + + perform + end + end + end + end end end diff --git a/ee/spec/workers/click_house/ci_finished_builds_sync_worker_spec.rb b/ee/spec/workers/click_house/ci_finished_builds_sync_worker_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..81fe6f40a51f24dd9ae4ba3887484e09480ad75d --- /dev/null +++ b/ee/spec/workers/click_house/ci_finished_builds_sync_worker_spec.rb @@ -0,0 +1,72 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe ClickHouse::CiFinishedBuildsSyncWorker, :click_house, :freeze_time, feature_category: :runner_fleet do + let(:worker) { described_class.new } + + let_it_be(:ci_build1) { create(:ci_build, :success) } + let_it_be(:ci_build2) { create(:ci_build, :pending) } + + subject(:perform) { worker.perform } + + before do + create_sync_events ci_build1 + end + + include_examples 'an idempotent worker' do + it 'calls CiFinishedBuildsSyncService and returns its response payload' do + expect(worker).to receive(:log_extra_metadata_on_done) + .with(:result, { reached_end_of_table: true, records_inserted: 1 }) + + params = { worker_index: 0, total_workers: 1 } + expect_next_instance_of(::ClickHouse::DataIngestion::CiFinishedBuildsSyncService, params) do |service| + expect(service).to receive(:execute).and_call_original + end + + expect(ClickHouse::Client).to receive(:insert_csv).once.and_call_original + + expect { perform }.to change { ci_finished_builds_row_count }.by(::Ci::Build.finished.count) + end + + context 'when an error is reported from service' do + before do + allow(ClickHouse::Client.configuration).to receive(:databases).and_return({}) + end + + it 'skips execution' do + expect(worker).to receive(:log_extra_metadata_on_done) + .with(:result, { message: 'ClickHouse database is not configured', reason: :db_not_configured }) + + perform + end + end + end + + context 'with 2 workers' do + subject(:perform) { worker.perform(0, 2) } + + it 'calls CiFinishedBuildsSyncService with correct arguments' do + expect(worker).to receive(:log_extra_metadata_on_done).once + + params = { worker_index: 0, total_workers: 2 } + expect_next_instance_of(::ClickHouse::DataIngestion::CiFinishedBuildsSyncService, params) do |service| + expect(service).to receive(:execute).and_call_original + end + + expect(ClickHouse::Client).to receive(:insert_csv).once.and_call_original + + perform + end + end + + def create_sync_events(*builds) + builds.each do |build| + Ci::FinishedBuildChSyncEvent.new(build_id: build.id, build_finished_at: build.finished_at).save! + end + end + + def ci_finished_builds_row_count + ClickHouse::Client.select('SELECT COUNT(*) AS count FROM ci_finished_builds', :main).first['count'] + end +end