diff --git a/config/initializers/1_settings.rb b/config/initializers/1_settings.rb index 41d4fbb9e7c777ea7dbaead18aacc1268e0aa0d6..f5c045ea96e26356c5b43bdd7ef60e1c352ccae0 100644 --- a/config/initializers/1_settings.rb +++ b/config/initializers/1_settings.rb @@ -978,6 +978,9 @@ Settings.cron_jobs['ai_conversation_cleanup_cron_worker'] ||= {} Settings.cron_jobs['ai_conversation_cleanup_cron_worker']['cron'] ||= '30 2 * * *' Settings.cron_jobs['ai_conversation_cleanup_cron_worker']['job_class'] = 'Ai::Conversation::CleanupCronWorker' + Settings.cron_jobs['ai_active_context_bulk_process_worker'] ||= {} + Settings.cron_jobs['ai_active_context_bulk_process_worker']['cron'] ||= '*/1 * * * *' + Settings.cron_jobs['ai_active_context_bulk_process_worker']['job_class'] ||= 'Ai::ActiveContext::BulkProcessWorker' Settings.cron_jobs['namespaces_enable_descendants_cache_cron_worker'] ||= {} Settings.cron_jobs['namespaces_enable_descendants_cache_cron_worker']['cron'] ||= '*/11 * * * *' Settings.cron_jobs['namespaces_enable_descendants_cache_cron_worker']['job_class'] = 'Namespaces::EnableDescendantsCacheCronWorker' diff --git a/ee/app/workers/ai/active_context/bulk_process_worker.rb b/ee/app/workers/ai/active_context/bulk_process_worker.rb new file mode 100644 index 0000000000000000000000000000000000000000..52bc8ed0667b105342587c7f53bc4692041d068c --- /dev/null +++ b/ee/app/workers/ai/active_context/bulk_process_worker.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +# This cron worker runs every minute +# It enqueues a job for each `ActiveContext.raw_queues` if `ActiveContext::Config.indexing_enabled?` is true +# For each job it fetches references from the queue, processes them and removes them from the queue +# The job will re-enqueue itself until the queue is empty +# Please see ActiveContext::Concerns::BulkAsyncProcess for the details + +module Ai + module ActiveContext + class BulkProcessWorker + include ::ActiveContext::Concerns::BulkAsyncProcess + include ::ApplicationWorker + include ::CronjobQueue + include Search::Worker + include Gitlab::ExclusiveLeaseHelpers + prepend ::Geo::SkipSecondary + + idempotent! + worker_resource_boundary :cpu + urgency :low + data_consistency :sticky + loggable_arguments 0, 1 + end + end +end diff --git a/ee/app/workers/all_queues.yml b/ee/app/workers/all_queues.yml index 38270efcaf011bddc0c648c095130fa316f95a06..57851b88f75bc31f748300bdb4380072f6243db5 100644 --- a/ee/app/workers/all_queues.yml +++ b/ee/app/workers/all_queues.yml @@ -48,6 +48,15 @@ :weight: 1 :idempotent: false :tags: [] +- :name: cronjob:ai_active_context_bulk_process + :worker_name: Ai::ActiveContext::BulkProcessWorker + :feature_category: :global_search + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :cpu + :weight: 1 + :idempotent: true + :tags: [] - :name: cronjob:analytics_cycle_analytics_consistency :worker_name: Analytics::CycleAnalytics::ConsistencyWorker :feature_category: :value_stream_management diff --git a/ee/spec/workers/ai/active_context/bulk_process_worker_spec.rb b/ee/spec/workers/ai/active_context/bulk_process_worker_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..ff1d9e0bc79a758866ca5a2dde1cf9f3bdc71339 --- /dev/null +++ b/ee/spec/workers/ai/active_context/bulk_process_worker_spec.rb @@ -0,0 +1,125 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Ai::ActiveContext::BulkProcessWorker, type: :worker, feature_category: :global_search do + let(:worker) { described_class.new } + + it_behaves_like 'an idempotent worker' do + let(:job_args) { ['Ai::Context::TestQueue', 1] } + end + + it { is_expected.to be_a(ApplicationWorker) } + it { is_expected.to be_a(CronjobQueue) } + it { is_expected.to be_a(Search::Worker) } + + describe '#perform' do + let(:queue_class_name) { 'Ai::Context::TestQueue' } + let(:shard) { 1 } + + before do + stub_const(queue_class_name, Ai::Context::TestQueue) + allow(ActiveContext::Config).to receive(:indexing_enabled?).and_return(true) + allow(worker).to receive(:in_lock).and_yield + end + + context 'when indexing is disabled' do + before do + allow(ActiveContext::Config).to receive(:indexing_enabled?).and_return(false) + end + + it 'logs a message and returns false' do + expect(worker).to receive(:log) + .with("Ai::ActiveContext::BulkProcessWorker indexing disabled. Execution is skipped.") + expect(worker.perform).to be false + end + end + + context 'when no arguments are provided' do + it 'enqueues all shards' do + expect(described_class).to receive(:bulk_perform_async_with_contexts) + worker.perform + end + end + + context 'when arguments are provided' do + it 'processes the shard' do + expect(worker).to receive(:process_shard).with(Ai::Context::TestQueue, shard) + worker.perform(queue_class_name, shard) + end + + it 'handles FailedToObtainLockError' do + allow(worker).to receive(:process_shard).and_raise(Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError) + expect { worker.perform(queue_class_name, shard) }.not_to raise_error + end + end + end + + describe '#process_shard' do + let(:queue) { instance_double(Ai::Context::TestQueue, shard: 1, redis_key: 'test_queue:1') } + let(:shard) { 1 } + + before do + allow(worker).to receive(:in_lock).and_yield + allow(ActiveContext::BulkProcessQueue).to receive(:process!).and_return([10, 0]) + end + + it 'processes the queue and logs metadata' do + expect(worker).to receive(:log_extra_metadata_on_done).with(:records_count, 10) + expect(worker).to receive(:log_extra_metadata_on_done).with(:shard_number, shard) + worker.process_shard(queue, shard) + end + + context 'when re-enqueue conditions are met' do + before do + allow(ActiveContext::Config).to receive(:re_enqueue_indexing_workers?).and_return(true) + end + + it 're-enqueues the shard' do + expect(described_class).to receive(:perform_in) + worker.process_shard(queue, shard) + end + end + + context 'when re-enqueue conditions are not met' do + before do + allow(ActiveContext::Config).to receive(:re_enqueue_indexing_workers?).and_return(false) + end + + it 'does not re-enqueue the shard' do + expect(described_class).not_to receive(:perform_in) + worker.process_shard(queue, shard) + end + end + end + + describe '#should_re_enqueue?' do + it 'returns true when conditions are met' do + allow(ActiveContext::Config).to receive(:re_enqueue_indexing_workers?).and_return(true) + expect(worker.should_re_enqueue?(10, 0)).to be true + end + + it 'returns false when records_count is zero' do + expect(worker.should_re_enqueue?(0, 0)).to be false + end + + it 'returns false when failures_count is positive' do + expect(worker.should_re_enqueue?(10, 1)).to be false + end + + it 'returns false when re-enqueue is disabled' do + allow(ActiveContext::Config).to receive(:re_enqueue_indexing_workers?).and_return(false) + expect(worker.should_re_enqueue?(10, 0)).to be false + end + end +end + +module Ai + module Context + TestQueue = Struct.new(:shard) do + def redis_key + "test_queue:#{shard}" + end + end + end +end diff --git a/gems/gitlab-active-context/.gitignore b/gems/gitlab-active-context/.gitignore index b04a8c840df1a534cfd67449e31919721b410986..535f98460a98d87a455206cd4e3428436e06fc7c 100644 --- a/gems/gitlab-active-context/.gitignore +++ b/gems/gitlab-active-context/.gitignore @@ -9,3 +9,4 @@ # rspec failure tracking .rspec_status +.byebug_history diff --git a/gems/gitlab-active-context/README.md b/gems/gitlab-active-context/README.md index 39b12e4842d1772c3bbab351e4a5282aa30acf5f..c3b8cc6ffc6f5ae127e132fb3c27468cedf4f0a2 100644 --- a/gems/gitlab-active-context/README.md +++ b/gems/gitlab-active-context/README.md @@ -10,6 +10,28 @@ After checking out the repo, run `bin/setup` to install dependencies. Then, run TODO +## How it works + +### Async processing + +A cron worker triggers a Sidekiq job for every queue in `ActiveContext.raw_queues` every minute. For each of the jobs, it fetches a set amount of references from the queue, processes them and removes them from the queue. The job will re-enqueue itself every second until there are no more references to process in the queue. + +Async processing depends on the following configuration values: + + 1. `indexing_enabled`: processing exits early if this is false. Recommended to set to: + + ```ruby + config.indexing_enabled = Gitlab::CurrentSettings.elasticsearch_indexing? && + Search::ClusterHealthCheck::Elastic.healthy? && + !Elastic::IndexingControl.non_cached_pause_indexing? + ``` + + 1. `re_enqueue_indexing_workers`: whether or not to re-enqueue workers until there are no more references to process. Increases indexing throughput when set to `true`. Recommended to set to: + + ```ruby + config.re_enqueue_indexing_workers = Gitlab::CurrentSettings.elasticsearch_requeue_workers? + ``` + ## Usage ### Configuration @@ -18,6 +40,8 @@ Add an initializer with the following options: 1. `enabled`: `true|false`. Defaults to `false` 1. `databases`: Hash containing database configuration options +1. `indexing_enabled`: `true|false`. Defaults to `false` +1. `re_enqueue_indexing_workers`: `true|false`. Defaults to `false` 1. `logger`: Logger. Defaults to `Logger.new($stdout)` For example: @@ -84,7 +108,8 @@ To view sharded queues: ```ruby ActiveContext.raw_queues -=> ["ai_context_queues:{merge_request}:0", "ai_context_queues:{merge_request}:1"] +=> [#<Ai::Context::Queues::MergeRequest:0x0000000177cdf460 @shard=0>, + #<Ai::Context::Queues::MergeRequest:0x0000000177cdf370 @shard=1>] ``` ### Adding a new collection diff --git a/gems/gitlab-active-context/lib/active_context/bulk_process_queue.rb b/gems/gitlab-active-context/lib/active_context/bulk_process_queue.rb new file mode 100644 index 0000000000000000000000000000000000000000..0911df7327f0be432084d1118a00c4b00ec0a652 --- /dev/null +++ b/gems/gitlab-active-context/lib/active_context/bulk_process_queue.rb @@ -0,0 +1,81 @@ +# frozen_string_literal: true + +module ActiveContext + class BulkProcessQueue + def self.process!(queue, shard) + new(queue, shard).process! + end + + attr_reader :queue, :shard + + def initialize(queue, shard) + @queue = queue + @shard = shard + end + + def process! + ActiveContext::Redis.with_redis { |redis| process(redis) } + end + + def process(redis) + start_time = current_time + specs_buffer = [] + scores = {} + @failures = [] + + queue.each_queued_items_by_shard(redis, shards: [shard]) do |shard_number, specs| + next if specs.empty? + + set_key = queue.redis_set_key(shard_number) + first_score = specs.first.last + last_score = specs.last.last + + logger.info( + 'queue' => queue, + 'message' => 'bulk_indexing_start', + 'meta.indexing.redis_set' => set_key, + 'meta.indexing.records_count' => specs.count, + 'meta.indexing.first_score' => first_score, + 'meta.indexing.last_score' => last_score + ) + + specs_buffer += specs + + scores[set_key] = [first_score, last_score, specs.count] + end + + return [0, 0] if specs_buffer.blank? + + # TODO: https://gitlab.com/gitlab-org/gitlab/-/issues/507973 + # deserialize refs, preload records, submit docs, flush, etc. + + # Remove all the successes + scores.each do |set_key, (first_score, last_score, count)| + redis.zremrangebyscore(set_key, first_score, last_score) + + logger.info( + 'class' => self.class.name, + 'message' => 'bulk_indexing_end', + 'meta.indexing.redis_set' => set_key, + 'meta.indexing.records_count' => count, + 'meta.indexing.first_score' => first_score, + 'meta.indexing.last_score' => last_score, + 'meta.indexing.failures_count' => @failures.count, + 'meta.indexing.bulk_execution_duration_s' => current_time - start_time + ) + end + + [specs_buffer.count, @failures.count] + end + + private + + def logger + @logger ||= ActiveContext::Config.logger + end + + def current_time + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + end +end diff --git a/gems/gitlab-active-context/lib/active_context/concerns/bulk_async_process.rb b/gems/gitlab-active-context/lib/active_context/concerns/bulk_async_process.rb new file mode 100644 index 0000000000000000000000000000000000000000..01825970c843499e81fad89baa332e6be1dc3fa1 --- /dev/null +++ b/gems/gitlab-active-context/lib/active_context/concerns/bulk_async_process.rb @@ -0,0 +1,74 @@ +# frozen_string_literal: true + +module ActiveContext + module Concerns + module BulkAsyncProcess + RESCHEDULE_INTERVAL = 1.second + LOCK_TTL = 10.minutes + LOCK_RETRIES = 10 + LOCK_SLEEP_SECONDS = 1 + + extend ActiveSupport::Concern + + def perform(*args) + unless ActiveContext::Config.indexing_enabled? + log "#{self.class} indexing disabled. Execution is skipped." + return false + end + + if args.empty? + enqueue_all_shards + else + queue_class, shard = args + queue = queue_class.safe_constantize + + process_shard(queue, shard) if queue + end + rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError + # no-op, scheduled on a cronjob + end + + def enqueue_all_shards + self.class.bulk_perform_async_with_contexts( + ActiveContext.raw_queues, + arguments_proc: ->(raw_queue) { [raw_queue.class.to_s, raw_queue.shard] }, + context_proc: ->(_) { {} } + ) + end + + def process_shard(queue, shard) + in_lock(lock_key(queue, shard), ttl: LOCK_TTL, retries: LOCK_RETRIES, sleep_sec: LOCK_SLEEP_SECONDS) do + BulkProcessQueue.process!(queue, shard).tap do |records_count, failures_count| + log_extra_metadata_on_done(:records_count, records_count) + log_extra_metadata_on_done(:shard_number, shard) + + re_enqueue_shard(queue, shard) if should_re_enqueue?(records_count, failures_count) + end + end + end + + def re_enqueue_shard(queue, shard) + self.class.perform_in(RESCHEDULE_INTERVAL, queue.to_s, shard) + end + + def should_re_enqueue?(records_count, failures_count) + return false if failures_count&.positive? + return false unless records_count&.positive? + + ActiveContext::Config.re_enqueue_indexing_workers? + end + + def log(message) + logger.info(structured_payload(message: message)) + end + + def logger + ActiveContext::Config.logger + end + + def lock_key(queue, shard) + "#{self.class.name.underscore}/queue/#{queue.redis_key}:#{shard}" + end + end + end +end diff --git a/gems/gitlab-active-context/lib/active_context/concerns/queue.rb b/gems/gitlab-active-context/lib/active_context/concerns/queue.rb index d7ee7f7cbf26e34f6fc599c61988bd46bb1e7404..685362dec7d8a250842b860fb0580df4052b9093 100644 --- a/gems/gitlab-active-context/lib/active_context/concerns/queue.rb +++ b/gems/gitlab-active-context/lib/active_context/concerns/queue.rb @@ -8,6 +8,16 @@ def self.included(base) base.register! end + def initialize(shard) + @shard = shard + end + + def redis_key + "#{self.class.redis_key}:#{shard}" + end + + attr_reader :shard + module ClassMethods SLICE_SIZE = 1_000 SHARD_LIMIT = 1_000 @@ -17,7 +27,7 @@ def number_of_shards end def register! - ActiveContext::Queues.register!(redis_key, shards: number_of_shards) + ActiveContext::Queues.register!(self) end def push(references) diff --git a/gems/gitlab-active-context/lib/active_context/config.rb b/gems/gitlab-active-context/lib/active_context/config.rb index 06ca412babbbe30180a44fce2d9d46c18e82f662..604007563944760a94ce804d69608cf1d765d420 100644 --- a/gems/gitlab-active-context/lib/active_context/config.rb +++ b/gems/gitlab-active-context/lib/active_context/config.rb @@ -2,7 +2,7 @@ module ActiveContext class Config - Cfg = Struct.new(:enabled, :databases, :logger) + Cfg = Struct.new(:enabled, :databases, :logger, :indexing_enabled, :re_enqueue_indexing_workers) class << self def configure(&block) @@ -24,6 +24,16 @@ def databases def logger current.logger || Logger.new($stdout) end + + def indexing_enabled? + return false unless enabled? + + current.indexing_enabled || false + end + + def re_enqueue_indexing_workers? + current.re_enqueue_indexing_workers || false + end end def initialize(config_block) diff --git a/gems/gitlab-active-context/lib/active_context/queues.rb b/gems/gitlab-active-context/lib/active_context/queues.rb index d2c04e45b99be746b79b8d097a476c53e5f9a88e..8184e5aafb16551cad19f3711470128f99bf1855 100644 --- a/gems/gitlab-active-context/lib/active_context/queues.rb +++ b/gems/gitlab-active-context/lib/active_context/queues.rb @@ -10,24 +10,28 @@ def self.raw_queues @raw_queues ||= [] end - def self.register!(key, shards:) - raise ArgumentError, "ActiveContext Queue '#{key}' is already registered" if @queues&.include?(key) + def self.register!(queue_class) + key = queue_class.redis_key @raw_queues ||= [] @queues = Set.new(@queues || []) + return if @queues.include?(key) + @queues.add(key) - shards.times do |shard| - @raw_queues << "#{key}:#{shard}" + queue_class.number_of_shards.times do |shard| + unless @raw_queues.any? { |q| q.instance_of?(queue_class) && q.shard == shard } + @raw_queues << queue_class.new(shard) + end end end def self.all_queued_items {}.tap do |hash| - @raw_queues&.each do |queue_key| + @raw_queues&.each do |raw_queue| + queue_key = "#{raw_queue.redis_key}:zset" references = ActiveContext::Redis.with_redis do |redis| - queue_key = "#{queue_key}:zset" redis.zrangebyscore(queue_key, '-inf', '+inf') end hash[queue_key] = references if references.present? diff --git a/gems/gitlab-active-context/spec/lib/active_context/concerns/queue_spec.rb b/gems/gitlab-active-context/spec/lib/active_context/concerns/queue_spec.rb index 4bc54b611f366a0764d7ab43f51349bb0d2550b6..9ee1824cddb8a12b45434434486d0cf857cd575f 100644 --- a/gems/gitlab-active-context/spec/lib/active_context/concerns/queue_spec.rb +++ b/gems/gitlab-active-context/spec/lib/active_context/concerns/queue_spec.rb @@ -24,8 +24,11 @@ def self.number_of_shards describe '.register!' do it 'registers the queue with ActiveContext::Queues' do - expect(ActiveContext::Queues).to receive(:register!).with('mockmodule:{test_queue}', shards: 2) mock_queue_class + + expect(ActiveContext::Queues.queues).to include(mock_queue_class.redis_key) + expect(ActiveContext::Queues.raw_queues.size).to eq(2) + expect(ActiveContext::Queues.raw_queues.all?(mock_queue_class)).to be true end end diff --git a/gems/gitlab-active-context/spec/lib/active_context/queues_spec.rb b/gems/gitlab-active-context/spec/lib/active_context/queues_spec.rb index 65c2f5ef4c6712b93f321f1a56485f69010ecfe6..af4b5a76a0d36a1be19e00aa95be59bcce9018ff 100644 --- a/gems/gitlab-active-context/spec/lib/active_context/queues_spec.rb +++ b/gems/gitlab-active-context/spec/lib/active_context/queues_spec.rb @@ -1,45 +1,58 @@ # frozen_string_literal: true RSpec.describe ActiveContext::Queues do + let(:test_queue_class) do + Class.new do + def self.name + "TestModule::TestQueue" + end + + def self.number_of_shards + 3 + end + + include ActiveContext::Concerns::Queue + end + end + + let(:redis) { instance_double(Redis) } + before do + stub_const('TestModule::TestQueue', test_queue_class) + allow(ActiveContext::Redis).to receive(:with_redis).and_yield(redis) described_class.instance_variable_set(:@queues, nil) described_class.instance_variable_set(:@raw_queues, nil) end describe '.register!' do - it 'adds the queue key to queues set' do - described_class.register!('test_queue', shards: 1) - expect(described_class.queues).to include('test_queue') - end - - it 'creates sharded queue names in raw_queues' do - described_class.register!('test_queue', shards: 3) - expected_raw_queues = ['test_queue:0', 'test_queue:1', 'test_queue:2'] - expect(described_class.raw_queues).to eq(expected_raw_queues) - end + it 'registers the queue class' do + expect(described_class.queues).to be_empty + expect(described_class.raw_queues).to be_empty - it 'handles multiple queue registrations' do - described_class.register!('queue1', shards: 2) - described_class.register!('queue2', shards: 1) + described_class.register!(test_queue_class) - expect(described_class.queues).to eq(Set.new(%w[queue1 queue2])) - expect(described_class.raw_queues).to eq(['queue1:0', 'queue1:1', 'queue2:0']) + expect(described_class.queues.size).to eq(1) + expect(described_class.queues.first).to eq('testmodule:{test_queue}') end - it 'raises an error when register is called for the same key multiple times' do - described_class.register!('test_queue', shards: 2) + it 'creates instances for each shard' do + expect { described_class.register!(test_queue_class) }.to change { described_class.raw_queues.size }.by(3) - expect(described_class.queues).to eq(Set.new(['test_queue'])) - expect(described_class.raw_queues).to eq(['test_queue:0', 'test_queue:1']) - - expect { described_class.register!('test_queue', shards: 1) }.to raise_error(ArgumentError) + raw_queues = described_class.raw_queues + expect(raw_queues.size).to eq(3) + expect(raw_queues.all?(test_queue_class)).to be true + expect(raw_queues.map(&:shard)).to eq([0, 1, 2]) end - it 'appends new sharded queues to existing raw_queues' do - described_class.register!('queue1', shards: 1) - described_class.register!('queue2', shards: 2) + it 'does not register the same queue class twice' do + described_class.register!(test_queue_class) + expect { described_class.register!(test_queue_class) }.not_to change { described_class.queues.size } + expect { described_class.register!(test_queue_class) }.not_to change { described_class.raw_queues.size } + end - expect(described_class.raw_queues).to eq(['queue1:0', 'queue2:0', 'queue2:1']) + it 'adds the correct key to the queues set' do + described_class.register!(test_queue_class) + expect(described_class.queues.first).to eq('testmodule:{test_queue}') end end end diff --git a/gems/gitlab-active-context/spec/spec_helper.rb b/gems/gitlab-active-context/spec/spec_helper.rb index eada08bc778b5c03a2961bf113eab06ae4ca8671..a9f134d9dfcc0a77ac6d594a865efc6e404f0f4f 100644 --- a/gems/gitlab-active-context/spec/spec_helper.rb +++ b/gems/gitlab-active-context/spec/spec_helper.rb @@ -7,6 +7,10 @@ require 'aws-sdk-core' require 'active_support/concern' require 'redis' +require 'byebug' +require 'active_support' +require 'active_support/core_ext/numeric/time' +require 'active_context/concerns/bulk_async_process' Dir[File.join(__dir__, 'support/**/*.rb')].each { |f| require f }