diff --git a/gems/gitlab-active-context/.rubocop.yml b/gems/gitlab-active-context/.rubocop.yml index 919eb7a6796a14828a60bd4dd45fa7360f4a7eac..29f1c5a30209a2d49e1cf89e3107fb7eb6fd887e 100644 --- a/gems/gitlab-active-context/.rubocop.yml +++ b/gems/gitlab-active-context/.rubocop.yml @@ -19,3 +19,7 @@ RSpec/VerifiedDoubleReference: - 'spec/lib/active_context/bulk_process_queue_spec.rb' - 'spec/lib/active_context/reference_spec.rb' - 'spec/lib/active_context/tracker_spec.rb' + +Gitlab/ModuleWithInstanceVariables: + Exclude: + - 'lib/active_context/databases/concerns/elastic_indexer.rb' diff --git a/gems/gitlab-active-context/lib/active_context/databases/concerns/elastic_indexer.rb b/gems/gitlab-active-context/lib/active_context/databases/concerns/elastic_indexer.rb new file mode 100644 index 0000000000000000000000000000000000000000..fac1654e95bc666b893fb90929e18082ccabe27e --- /dev/null +++ b/gems/gitlab-active-context/lib/active_context/databases/concerns/elastic_indexer.rb @@ -0,0 +1,101 @@ +# frozen_string_literal: true + +# This concern contains shared functionality for bulk indexing documents in Elasticsearch and OpenSearch databases. + +module ActiveContext + module Databases + module Concerns + module ElasticIndexer + include ActiveContext::Databases::Concerns::Indexer + + DEFAULT_MAX_BULK_SIZE = 10.megabytes + + attr_reader :operations, :bulk_size + + def initialize(...) + super + @operations = [] + @bulk_size = 0 + end + + def add_ref(ref) + operation = build_operation(ref) + @refs << ref + @operations << operation + @bulk_size += calculate_operation_size(operation) + + bulk_size >= bulk_threshold + end + + def empty? + operations.empty? + end + + def bulk + client.bulk(body: operations.flatten) + end + + def process_bulk_errors(result) + return [] unless result['errors'] + + failed_refs = [] + + result['items'].each_with_index do |item, index| + op = item['index'] || item['update'] || item['delete'] + + next unless op.nil? || op['error'] + + ref = refs[index] + + logger.warn( + 'message' => 'indexing_failed', + 'meta.indexing.error' => op&.dig('error') || 'Operation was nil', + 'meta.indexing.status' => op&.dig('status'), + 'meta.indexing.operation_type' => item.each_key.first, + 'meta.indexing.ref' => ref.serialize, + 'meta.indexing.identifier' => ref.identifier + ) + + failed_refs << ref + end + + failed_refs + end + + def reset + super + @operations = [] + @bulk_size = 0 + end + + private + + def build_operation(ref) + case ref.operation.to_sym + when :index, :upsert + [ + { update: { _index: ref.partition_name, _id: ref.identifier, routing: ref.routing }.compact }, + { doc: ref.as_indexed_json, doc_as_upsert: true } + ] + when :delete + [{ delete: { _index: ref.partition_name, _id: ref.identifier, routing: ref.routing }.compact }] + else + raise StandardError, "Operation #{ref.operation} is not supported" + end + end + + def calculate_operation_size(operation) + operation.to_json.bytesize + 2 # Account for newlines + end + + def bulk_threshold + @bulk_threshold ||= options[:max_bulk_size_bytes] || DEFAULT_MAX_BULK_SIZE + end + + def logger + @logger ||= ActiveContext::Config.logger + end + end + end + end +end diff --git a/gems/gitlab-active-context/lib/active_context/databases/elasticsearch/indexer.rb b/gems/gitlab-active-context/lib/active_context/databases/elasticsearch/indexer.rb index 77e5a3faa643fc19d523750841c6439715a11990..cea9e04e5778dd9e5936a828896fc3fd81c9b73a 100644 --- a/gems/gitlab-active-context/lib/active_context/databases/elasticsearch/indexer.rb +++ b/gems/gitlab-active-context/lib/active_context/databases/elasticsearch/indexer.rb @@ -4,95 +4,7 @@ module ActiveContext module Databases module Elasticsearch class Indexer - include ActiveContext::Databases::Concerns::Indexer - - DEFAULT_MAX_BULK_SIZE = 10.megabytes - - attr_reader :operations, :bulk_size - - def initialize(...) - super - @operations = [] - @bulk_size = 0 - end - - def add_ref(ref) - operation = build_operation(ref) - @refs << ref - @operations << operation - @bulk_size += calculate_operation_size(operation) - - bulk_size >= bulk_threshold - end - - def empty? - operations.empty? - end - - def bulk - client.bulk(body: operations.flatten) - end - - def process_bulk_errors(result) - return [] unless result['errors'] - - failed_refs = [] - - result['items'].each_with_index do |item, index| - op = item['index'] || item['update'] || item['delete'] - - next unless op.nil? || op['error'] - - ref = refs[index] - - logger.warn( - 'message' => 'indexing_failed', - 'meta.indexing.error' => op&.dig('error') || 'Operation was nil', - 'meta.indexing.status' => op&.dig('status'), - 'meta.indexing.operation_type' => item.each_key.first, - 'meta.indexing.ref' => ref.serialize, - 'meta.indexing.identifier' => ref.identifier - ) - - failed_refs << ref - end - - failed_refs - end - - def reset - super - @operations = [] - @bulk_size = 0 - end - - private - - def build_operation(ref) - case ref.operation.to_sym - when :index, :upsert - [ - { update: { _index: ref.partition_name, _id: ref.identifier, routing: ref.routing }.compact }, - { doc: ref.as_indexed_json, doc_as_upsert: true } - ] - when :delete - [{ delete: { _index: ref.partition_name, _id: ref.identifier, routing: ref.routing }.compact }] - else - raise StandardError, "Operation #{ref.operation} is not supported" - end - end - - def calculate_operation_size(operation) - operation.to_json.bytesize + 2 # Account for newlines - end - - def bulk_threshold - @bulk_threshold ||= options[:max_bulk_size_bytes] || DEFAULT_MAX_BULK_SIZE - end - - def logger - @logger ||= ActiveContext::Config.logger - end + include ActiveContext::Databases::Concerns::ElasticIndexer end end end diff --git a/gems/gitlab-active-context/lib/active_context/databases/opensearch/client.rb b/gems/gitlab-active-context/lib/active_context/databases/opensearch/client.rb index 310a724499d28db35f1694f7b96126236d185153..aaedc14dbaa4320ea9d16e3b14cc87da82c13b42 100644 --- a/gems/gitlab-active-context/lib/active_context/databases/opensearch/client.rb +++ b/gems/gitlab-active-context/lib/active_context/databases/opensearch/client.rb @@ -9,6 +9,8 @@ module Opensearch class Client include ActiveContext::Databases::Concerns::Client + delegate :bulk, to: :client + OPEN_TIMEOUT = 5 NO_RETRY = 0 diff --git a/gems/gitlab-active-context/lib/active_context/databases/opensearch/indexer.rb b/gems/gitlab-active-context/lib/active_context/databases/opensearch/indexer.rb index c40e3cce82493cd23181066c47399d15369e3c99..81b3c6b480b3c644269390ce03129393933701e7 100644 --- a/gems/gitlab-active-context/lib/active_context/databases/opensearch/indexer.rb +++ b/gems/gitlab-active-context/lib/active_context/databases/opensearch/indexer.rb @@ -4,7 +4,7 @@ module ActiveContext module Databases module Opensearch class Indexer - include ActiveContext::Databases::Concerns::Indexer + include ActiveContext::Databases::Concerns::ElasticIndexer end end end diff --git a/gems/gitlab-active-context/spec/lib/active_context/databases/opensearch/indexer_spec.rb b/gems/gitlab-active-context/spec/lib/active_context/databases/opensearch/indexer_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..aecf80d105289c52c567f37f8a68749bf1c84ae4 --- /dev/null +++ b/gems/gitlab-active-context/spec/lib/active_context/databases/opensearch/indexer_spec.rb @@ -0,0 +1,121 @@ +# frozen_string_literal: true + +RSpec.describe ActiveContext::Databases::Opensearch::Indexer do + let(:opensearch_client) { instance_double(OpenSearch::Client) } + let(:logger) { instance_double(Logger, warn: nil) } + let(:options) { {} } + let(:indexer) { described_class.new(options, opensearch_client) } + let(:ref) { double } + + before do + allow(ActiveContext::Config).to receive(:logger).and_return(logger) + allow(ref).to receive_messages( + operation: :index, + id: 1, + as_indexed_json: { title: 'Test Issue' }, + partition_name: 'issues', + identifier: '1', + routing: 'group_1', + serialize: 'issue 1 group_1' + ) + end + + describe '#initialize' do + it 'initializes with empty operations and zero bulk size' do + expect(indexer.operations).to be_empty + expect(indexer.bulk_size).to eq(0) + end + end + + describe '#add_ref' do + it 'adds the ref and returns true when bulk threshold is reached' do + allow(indexer).to receive(:bulk_threshold).and_return(1) + expect(indexer.add_ref(ref)).to be true + expect(indexer.operations).not_to be_empty + end + + it 'adds the ref and returns false when bulk threshold is not reached' do + allow(indexer).to receive(:bulk_threshold).and_return(1000000) + expect(indexer.add_ref(ref)).to be false + expect(indexer.operations).not_to be_empty + end + + it 'raises an error for unsupported operations' do + allow(ref).to receive(:operation).and_return(:unsupported) + expect { indexer.add_ref(ref) }.to raise_error(StandardError, /Operation unsupported is not supported/) + end + end + + describe '#empty?' do + it 'returns true when there are no operations' do + expect(indexer).to be_empty + end + + it 'returns false when there are operations' do + indexer.instance_variable_set(:@operations, [{}]) + expect(indexer).not_to be_empty + end + end + + describe '#bulk' do + before do + indexer.instance_variable_set(:@operations, [{ index: {} }]) + end + + it 'calls bulk on the client with flattened operations' do + expect(opensearch_client).to receive(:bulk).with(body: [{ index: {} }]) + indexer.bulk + end + end + + describe '#process_bulk_errors' do + before do + indexer.instance_variable_set(:@refs, [ref]) + end + + context 'when there are no errors' do + it 'returns an empty array' do + result = { 'errors' => false } + expect(indexer.process_bulk_errors(result)).to be_empty + end + end + + context 'when there are errors' do + let(:result) do + { + 'errors' => true, + 'items' => [ + { 'index' => { 'error' => 'Error message', 'status' => 400 } } + ] + } + end + + it 'logs warnings and returns failed refs' do + expect(logger).to receive(:warn).with( + 'message' => 'indexing_failed', + 'meta.indexing.error' => 'Error message', + 'meta.indexing.status' => 400, + 'meta.indexing.operation_type' => 'index', + 'meta.indexing.ref' => 'issue 1 group_1', + 'meta.indexing.identifier' => '1' + ) + + failed_refs = indexer.process_bulk_errors(result) + expect(failed_refs).to eq([ref]) + end + end + end + + describe '#reset' do + before do + indexer.instance_variable_set(:@operations, [{}]) + indexer.instance_variable_set(:@bulk_size, 100) + end + + it 'resets operations and bulk size' do + indexer.reset + expect(indexer.operations).to be_empty + expect(indexer.bulk_size).to eq(0) + end + end +end