diff --git a/gems/gitlab-active-context/.rubocop.yml b/gems/gitlab-active-context/.rubocop.yml index c941a42c98ddcbad33f28eefcc08282db4e90fda..0621d40c0e2b779db0b15462e400bd9b995d73d9 100644 --- a/gems/gitlab-active-context/.rubocop.yml +++ b/gems/gitlab-active-context/.rubocop.yml @@ -6,3 +6,6 @@ Gemfile/MissingFeatureCategory: Search/NamespacedClass: Enabled: false + +RSpec/MultipleMemoizedHelpers: + Max: 25 diff --git a/gems/gitlab-active-context/README.md b/gems/gitlab-active-context/README.md index c3b8cc6ffc6f5ae127e132fb3c27468cedf4f0a2..3ca7d3734328fef44ba936142aefcbb4f9cb1f5e 100644 --- a/gems/gitlab-active-context/README.md +++ b/gems/gitlab-active-context/README.md @@ -53,7 +53,7 @@ ActiveContext.configure do |config| config.databases = { es1: { - adapter: 'elasticsearch', + adapter: 'ActiveContext::Databases::Elasticsearch::Adapter', prefix: 'gitlab_active_context', options: ::Gitlab::CurrentSettings.elasticsearch_config } @@ -70,6 +70,36 @@ end | `client_request_timeout` | The timeout for client requests in seconds | No | N/A | `60` | | `retry_on_failure` | The number of times to retry a failed request | No | `0` (no retries) | `3` | | `debug` | Enable or disable debug logging | No | `false` | `true` | +| `max_bulk_size_bytes` | Maximum size before forcing a bulk operation in megabytes | No | `10.megabytes` | `5242880` | + +### Scheduling a cron worker for async processing + +Create a file which includes the `BulkAsyncProcess` concern and other worker-specific concerns: + +```ruby +# frozen_string_literal: true + +module Ai + module Context + class BulkProcessWorker + include ActiveContext::Concerns::BulkAsyncProcess + include ::ApplicationWorker + include ::CronjobQueue + include Search::Worker + include Gitlab::ExclusiveLeaseHelpers + prepend ::Geo::SkipSecondary + + idempotent! + worker_resource_boundary :cpu + urgency :low + data_consistency :sticky + loggable_arguments 0, 1 + end + end +end +``` + +Schedule the worker on a cron schedule in `config/initializers/1_settings.rb`. ### Registering a queue diff --git a/gems/gitlab-active-context/lib/active_context/bulk_processor.rb b/gems/gitlab-active-context/lib/active_context/bulk_processor.rb new file mode 100644 index 0000000000000000000000000000000000000000..82ddf47ec7c906b5aa2ba966c10fba584f7c5c52 --- /dev/null +++ b/gems/gitlab-active-context/lib/active_context/bulk_processor.rb @@ -0,0 +1,52 @@ +# frozen_string_literal: true + +module ActiveContext + class BulkProcessor + attr_reader :failures, :adapter + + def initialize + @failures = [] + @adapter = ActiveContext.adapter + end + + def process(ref) + send_bulk if @adapter.add_ref(ref) + end + + def flush + send_bulk.failures + end + + private + + def send_bulk + return self if adapter.empty? + + failed_refs = try_send_bulk + + logger.info( + 'message' => 'bulk_submitted', + 'meta.indexing.bulk_count' => adapter.all_refs.size, + 'meta.indexing.errors_count' => failed_refs.count + ) + + failures.push(*failed_refs) + + adapter.reset + + self + end + + def try_send_bulk + result = adapter.bulk + adapter.process_bulk_errors(result) + rescue StandardError => e + logger.error(message: 'bulk_exception', error_class: e.class.to_s, error_message: e.message) + adapter.all_refs + end + + def logger + @logger ||= ActiveContext::Config.logger + end + end +end diff --git a/gems/gitlab-active-context/lib/active_context/databases/concerns/adapter.rb b/gems/gitlab-active-context/lib/active_context/databases/concerns/adapter.rb index bf8854efbfa9bd84655810ee3b05a5d46cb823c9..51ef809e8c138b64cc7ea082115c46f4fbabda62 100644 --- a/gems/gitlab-active-context/lib/active_context/databases/concerns/adapter.rb +++ b/gems/gitlab-active-context/lib/active_context/databases/concerns/adapter.rb @@ -4,17 +4,24 @@ module ActiveContext module Databases module Concerns module Adapter - attr_reader :client + attr_reader :options, :client, :indexer delegate :search, to: :client + delegate :all_refs, :add_ref, :empty?, :bulk, :process_bulk_errors, :reset, to: :indexer def initialize(options) + @options = options @client = client_klass.new(options) + @indexer = indexer_klass.new(options, client) end def client_klass raise NotImplementedError end + + def indexer_klass + raise NotImplementedError + end end end end diff --git a/gems/gitlab-active-context/lib/active_context/databases/concerns/indexer.rb b/gems/gitlab-active-context/lib/active_context/databases/concerns/indexer.rb new file mode 100644 index 0000000000000000000000000000000000000000..35859584a7cbf702bfaa5813aa9fd3075d909557 --- /dev/null +++ b/gems/gitlab-active-context/lib/active_context/databases/concerns/indexer.rb @@ -0,0 +1,61 @@ +# frozen_string_literal: true + +# rubocop: disable Gitlab/ModuleWithInstanceVariables -- this is a concern + +module ActiveContext + module Databases + module Concerns + module Indexer + attr_reader :options, :client, :refs + + def initialize(options, client) + @options = options + @client = client + @refs = [] + end + + def all_refs + refs + end + + # Adds a reference to the refs array + # + # @param ref [Object] The reference to add + # @return [Boolean] True if bulk processing should be forced, e.g., when a size threshold is reached + def add_ref(ref) + raise NotImplementedError + end + + # Checks if nothing should be processed + # + # @return [Boolean] True if bulk processing should be skipped + def empty? + raise NotImplementedError + end + + # Performs bulk processing on the refs array + # + # @return [Object] The result of bulk processing + def bulk + raise NotImplementedError + end + + # Processes errors from bulk operation + # + # @param result [Object] The result from the bulk operation + # @return [Array] Any failures that occurred during bulk processing + def process_bulk_errors(_result) + raise NotImplementedError + end + + # Resets the adapter to a clean state + def reset + @refs = [] + # also reset anything that builds up from the refs array + end + end + end + end +end + +# rubocop: enable Gitlab/ModuleWithInstanceVariables diff --git a/gems/gitlab-active-context/lib/active_context/databases/elasticsearch/adapter.rb b/gems/gitlab-active-context/lib/active_context/databases/elasticsearch/adapter.rb index bf6035c0993cde5bf14628b7b7e3db295575fe59..a75894a347976dac9e4e7b0c0b6e2c1990acb7ee 100644 --- a/gems/gitlab-active-context/lib/active_context/databases/elasticsearch/adapter.rb +++ b/gems/gitlab-active-context/lib/active_context/databases/elasticsearch/adapter.rb @@ -9,6 +9,10 @@ class Adapter def client_klass ActiveContext::Databases::Elasticsearch::Client end + + def indexer_klass + ActiveContext::Databases::Elasticsearch::Indexer + end end end end diff --git a/gems/gitlab-active-context/lib/active_context/databases/elasticsearch/client.rb b/gems/gitlab-active-context/lib/active_context/databases/elasticsearch/client.rb index 42c0a8b094e4bd6f56e81b7dfb41623365c02b6a..d885a6751f96ea574ed2f14a9d8a65ddb7f1172a 100644 --- a/gems/gitlab-active-context/lib/active_context/databases/elasticsearch/client.rb +++ b/gems/gitlab-active-context/lib/active_context/databases/elasticsearch/client.rb @@ -6,6 +6,8 @@ module Elasticsearch class Client include ActiveContext::Databases::Concerns::Client + delegate :bulk, to: :client + OPEN_TIMEOUT = 5 NO_RETRY = 0 diff --git a/gems/gitlab-active-context/lib/active_context/databases/elasticsearch/indexer.rb b/gems/gitlab-active-context/lib/active_context/databases/elasticsearch/indexer.rb new file mode 100644 index 0000000000000000000000000000000000000000..47f5d80cf6f9e3da66282e27b1ccc8d6601506e8 --- /dev/null +++ b/gems/gitlab-active-context/lib/active_context/databases/elasticsearch/indexer.rb @@ -0,0 +1,99 @@ +# frozen_string_literal: true + +module ActiveContext + module Databases + module Elasticsearch + class Indexer + include ActiveContext::Databases::Concerns::Indexer + + DEFAULT_MAX_BULK_SIZE = 10.megabytes + + attr_reader :operations, :bulk_size + + def initialize(...) + super + @operations = [] + @bulk_size = 0 + end + + def add_ref(ref) + operation = build_operation(ref) + @refs << ref + @operations << operation + @bulk_size += calculate_operation_size(operation) + + bulk_size >= bulk_threshold + end + + def empty? + operations.empty? + end + + def bulk + client.bulk(body: operations.flatten) + end + + def process_bulk_errors(result) + return [] unless result['errors'] + + failed_refs = [] + + result['items'].each_with_index do |item, index| + op = item['index'] || item['update'] || item['delete'] + + next unless op.nil? || op['error'] + + ref = refs[index] + + logger.warn( + 'message' => 'indexing_failed', + 'meta.indexing.error' => op&.dig('error') || 'Operation was nil', + 'meta.indexing.status' => op&.dig('status'), + 'meta.indexing.operation_type' => item.each_key.first, + 'meta.indexing.ref' => ref.serialize, + 'meta.indexing.identifier' => ref.identifier + ) + + failed_refs << ref + end + + failed_refs + end + + def reset + super + @operations = [] + @bulk_size = 0 + end + + private + + def build_operation(ref) + case ref.operation.to_sym + when :index, :upsert + [ + { update: { _index: ref.index_name, _id: ref.identifier, routing: ref.routing }.compact }, + { doc: ref.as_indexed_json, doc_as_upsert: true } + ] + when :delete + [{ delete: { _index: ref.index_name, _id: ref.identifier, routing: ref.routing }.compact }] + else + raise StandardError, "Operation #{ref.operation} is not supported" + end + end + + def calculate_operation_size(operation) + operation.to_json.bytesize + 2 # Account for newlines + end + + def bulk_threshold + @bulk_threshold ||= options[:max_bulk_size_bytes] || DEFAULT_MAX_BULK_SIZE + end + + def logger + @logger ||= ActiveContext::Config.logger + end + end + end + end +end diff --git a/gems/gitlab-active-context/lib/active_context/databases/opensearch/adapter.rb b/gems/gitlab-active-context/lib/active_context/databases/opensearch/adapter.rb index 7c685b7bba0d796f8a6d352f2270c1c80951aefb..a72fac16d38a825a5df640cd5163cdb701ffb106 100644 --- a/gems/gitlab-active-context/lib/active_context/databases/opensearch/adapter.rb +++ b/gems/gitlab-active-context/lib/active_context/databases/opensearch/adapter.rb @@ -9,6 +9,10 @@ class Adapter def client_klass ActiveContext::Databases::Opensearch::Client end + + def indexer_klass + ActiveContext::Databases::Opensearch::Indexer + end end end end diff --git a/gems/gitlab-active-context/lib/active_context/databases/opensearch/indexer.rb b/gems/gitlab-active-context/lib/active_context/databases/opensearch/indexer.rb new file mode 100644 index 0000000000000000000000000000000000000000..c40e3cce82493cd23181066c47399d15369e3c99 --- /dev/null +++ b/gems/gitlab-active-context/lib/active_context/databases/opensearch/indexer.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +module ActiveContext + module Databases + module Opensearch + class Indexer + include ActiveContext::Databases::Concerns::Indexer + end + end + end +end diff --git a/gems/gitlab-active-context/lib/active_context/databases/postgresql/adapter.rb b/gems/gitlab-active-context/lib/active_context/databases/postgresql/adapter.rb index 33f2b2b199b83e93fbe66cfdc2fda16f19184d2a..3d71ee59d50ed45bc14b8d3138fee74142b52e85 100644 --- a/gems/gitlab-active-context/lib/active_context/databases/postgresql/adapter.rb +++ b/gems/gitlab-active-context/lib/active_context/databases/postgresql/adapter.rb @@ -9,6 +9,10 @@ class Adapter def client_klass ActiveContext::Databases::Postgresql::Client end + + def indexer_klass + ActiveContext::Databases::Postgresql::Indexer + end end end end diff --git a/gems/gitlab-active-context/lib/active_context/databases/postgresql/indexer.rb b/gems/gitlab-active-context/lib/active_context/databases/postgresql/indexer.rb new file mode 100644 index 0000000000000000000000000000000000000000..dc659cff5eb4ecf31eece139b8344065a4a9c559 --- /dev/null +++ b/gems/gitlab-active-context/lib/active_context/databases/postgresql/indexer.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +module ActiveContext + module Databases + module Postgresql + class Indexer + include ActiveContext::Databases::Concerns::Indexer + end + end + end +end diff --git a/gems/gitlab-active-context/spec/lib/active_context/bulk_processor_spec.rb b/gems/gitlab-active-context/spec/lib/active_context/bulk_processor_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..2baeab1ec9173170b0269b576785953df6371b6b --- /dev/null +++ b/gems/gitlab-active-context/spec/lib/active_context/bulk_processor_spec.rb @@ -0,0 +1,114 @@ +# frozen_string_literal: true + +RSpec.describe ActiveContext::BulkProcessor do + let(:adapter) { ActiveContext::Databases::Elasticsearch::Adapter.new(url: 'http://localhost:9200') } + let(:logger) { instance_double(Logger) } + let(:ref) { double } + + before do + allow(ActiveContext).to receive(:adapter).and_return(adapter) + allow(ActiveContext::Config).to receive(:logger).and_return(logger) + allow(logger).to receive(:info) + allow(logger).to receive(:error) + allow(ref).to receive_messages( + operation: :index, + id: 1, + as_indexed_json: { title: 'Test Issue' }, + index_name: 'issues', + identifier: '1', + routing: 'group_1' + ) + end + + describe '#initialize' do + it 'initializes with empty failures and the correct adapter' do + processor = described_class.new + + expect(processor.failures).to be_empty + expect(processor.adapter).to be_a(ActiveContext::Databases::Elasticsearch::Adapter) + end + end + + describe '#process' do + let(:processor) { described_class.new } + + it 'adds ref to adapter and calls send_bulk if it returns true' do + allow(adapter).to receive(:add_ref).and_return(true) + expect(processor).to receive(:send_bulk).once + + processor.process(ref) + end + + it 'adds ref to adapter and does not call send_bulk if it returns false' do + allow(adapter).to receive(:add_ref).and_return(false) + expect(processor).not_to receive(:send_bulk) + + processor.process(ref) + end + end + + describe '#flush' do + let(:processor) { described_class.new } + + it 'calls send_bulk and returns failures' do + allow(processor).to receive(:send_bulk).and_return(processor) + expect(processor.flush).to eq([]) + end + end + + describe '#send_bulk' do + let(:processor) { described_class.new } + + before do + processor.process(ref) + end + + it 'processes bulk and logs info' do + allow(adapter).to receive(:bulk).and_return({ 'items' => [] }) + + expect(logger).to receive(:info).with( + 'message' => 'bulk_submitted', + 'meta.indexing.bulk_count' => 1, + 'meta.indexing.errors_count' => 0 + ) + + processor.send(:send_bulk) + end + + it 'resets the adapter after processing' do + allow(adapter).to receive(:bulk).and_return({ 'items' => [] }) + expect(adapter).to receive(:reset) + + processor.send(:send_bulk) + end + end + + describe '#try_send_bulk' do + let(:processor) { described_class.new } + + before do + processor.process(ref) + end + + context 'when bulk processing succeeds' do + it 'returns empty array' do + allow(adapter).to receive(:bulk).and_return({ 'items' => [] }) + expect(processor.send(:try_send_bulk)).to eq([]) + end + end + + context 'when bulk processing fails' do + it 'logs error and returns all refs' do + allow(adapter).to receive(:bulk).and_raise(StandardError.new('Bulk processing failed')) + + expect(logger).to receive(:error).with( + message: 'bulk_exception', + error_class: 'StandardError', + error_message: 'Bulk processing failed' + ) + + expect(processor.send(:try_send_bulk)).to eq([ref]) + end + end + end +end diff --git a/gems/gitlab-active-context/spec/lib/active_context/databases/elasticsearch/indexer_spec.rb b/gems/gitlab-active-context/spec/lib/active_context/databases/elasticsearch/indexer_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..6f06f10f9c84a8c3f6ccd8c5568223f9682d7c73 --- /dev/null +++ b/gems/gitlab-active-context/spec/lib/active_context/databases/elasticsearch/indexer_spec.rb @@ -0,0 +1,121 @@ +# frozen_string_literal: true + +RSpec.describe ActiveContext::Databases::Elasticsearch::Indexer do + let(:es_client) { instance_double(Elasticsearch::Client) } + let(:logger) { instance_double(Logger, warn: nil) } + let(:options) { {} } + let(:indexer) { described_class.new(options, es_client) } + let(:ref) { double } + + before do + allow(ActiveContext::Config).to receive(:logger).and_return(logger) + allow(ref).to receive_messages( + operation: :index, + id: 1, + as_indexed_json: { title: 'Test Issue' }, + index_name: 'issues', + identifier: '1', + routing: 'group_1', + serialize: 'issue 1 group_1' + ) + end + + describe '#initialize' do + it 'initializes with empty operations and zero bulk size' do + expect(indexer.operations).to be_empty + expect(indexer.bulk_size).to eq(0) + end + end + + describe '#add_ref' do + it 'adds the ref and returns true when bulk threshold is reached' do + allow(indexer).to receive(:bulk_threshold).and_return(1) + expect(indexer.add_ref(ref)).to be true + expect(indexer.operations).not_to be_empty + end + + it 'adds the ref and returns false when bulk threshold is not reached' do + allow(indexer).to receive(:bulk_threshold).and_return(1000000) + expect(indexer.add_ref(ref)).to be false + expect(indexer.operations).not_to be_empty + end + + it 'raises an error for unsupported operations' do + allow(ref).to receive(:operation).and_return(:unsupported) + expect { indexer.add_ref(ref) }.to raise_error(StandardError, /Operation unsupported is not supported/) + end + end + + describe '#empty?' do + it 'returns true when there are no operations' do + expect(indexer).to be_empty + end + + it 'returns false when there are operations' do + indexer.instance_variable_set(:@operations, [{}]) + expect(indexer).not_to be_empty + end + end + + describe '#bulk' do + before do + indexer.instance_variable_set(:@operations, [{ index: {} }]) + end + + it 'calls bulk on the client with flattened operations' do + expect(es_client).to receive(:bulk).with(body: [{ index: {} }]) + indexer.bulk + end + end + + describe '#process_bulk_errors' do + before do + indexer.instance_variable_set(:@refs, [ref]) + end + + context 'when there are no errors' do + it 'returns an empty array' do + result = { 'errors' => false } + expect(indexer.process_bulk_errors(result)).to be_empty + end + end + + context 'when there are errors' do + let(:result) do + { + 'errors' => true, + 'items' => [ + { 'index' => { 'error' => 'Error message', 'status' => 400 } } + ] + } + end + + it 'logs warnings and returns failed refs' do + expect(logger).to receive(:warn).with( + 'message' => 'indexing_failed', + 'meta.indexing.error' => 'Error message', + 'meta.indexing.status' => 400, + 'meta.indexing.operation_type' => 'index', + 'meta.indexing.ref' => 'issue 1 group_1', + 'meta.indexing.identifier' => '1' + ) + + failed_refs = indexer.process_bulk_errors(result) + expect(failed_refs).to eq([ref]) + end + end + end + + describe '#reset' do + before do + indexer.instance_variable_set(:@operations, [{}]) + indexer.instance_variable_set(:@bulk_size, 100) + end + + it 'resets operations and bulk size' do + indexer.reset + expect(indexer.operations).to be_empty + expect(indexer.bulk_size).to eq(0) + end + end +end diff --git a/gems/gitlab-active-context/spec/spec_helper.rb b/gems/gitlab-active-context/spec/spec_helper.rb index a9f134d9dfcc0a77ac6d594a865efc6e404f0f4f..98e275e9548e786808dfb0945b1567e1d35d6042 100644 --- a/gems/gitlab-active-context/spec/spec_helper.rb +++ b/gems/gitlab-active-context/spec/spec_helper.rb @@ -1,16 +1,13 @@ # frozen_string_literal: true -require "active_context" +require 'active_context' +require 'active_support/all' require 'logger' require 'elasticsearch' require 'opensearch' require 'aws-sdk-core' -require 'active_support/concern' require 'redis' require 'byebug' -require 'active_support' -require 'active_support/core_ext/numeric/time' -require 'active_context/concerns/bulk_async_process' Dir[File.join(__dir__, 'support/**/*.rb')].each { |f| require f }