Skip to content
代码片段 群组 项目
未验证 提交 68782866 编辑于 作者: Madelein van Niekerk's avatar Madelein van Niekerk 提交者: GitLab
浏览文件

ActiveContext indexer for elasticsearch

Changelog: added
EE: true
上级 2b07c6bd
No related branches found
No related tags found
无相关合并请求
显示
527 个添加7 个删除
...@@ -6,3 +6,6 @@ Gemfile/MissingFeatureCategory: ...@@ -6,3 +6,6 @@ Gemfile/MissingFeatureCategory:
Search/NamespacedClass: Search/NamespacedClass:
Enabled: false Enabled: false
RSpec/MultipleMemoizedHelpers:
Max: 25
...@@ -53,7 +53,7 @@ ActiveContext.configure do |config| ...@@ -53,7 +53,7 @@ ActiveContext.configure do |config|
config.databases = { config.databases = {
es1: { es1: {
adapter: 'elasticsearch', adapter: 'ActiveContext::Databases::Elasticsearch::Adapter',
prefix: 'gitlab_active_context', prefix: 'gitlab_active_context',
options: ::Gitlab::CurrentSettings.elasticsearch_config options: ::Gitlab::CurrentSettings.elasticsearch_config
} }
...@@ -70,6 +70,36 @@ end ...@@ -70,6 +70,36 @@ end
| `client_request_timeout` | The timeout for client requests in seconds | No | N/A | `60` | | `client_request_timeout` | The timeout for client requests in seconds | No | N/A | `60` |
| `retry_on_failure` | The number of times to retry a failed request | No | `0` (no retries) | `3` | | `retry_on_failure` | The number of times to retry a failed request | No | `0` (no retries) | `3` |
| `debug` | Enable or disable debug logging | No | `false` | `true` | | `debug` | Enable or disable debug logging | No | `false` | `true` |
| `max_bulk_size_bytes` | Maximum size before forcing a bulk operation in megabytes | No | `10.megabytes` | `5242880` |
### Scheduling a cron worker for async processing
Create a file which includes the `BulkAsyncProcess` concern and other worker-specific concerns:
```ruby
# frozen_string_literal: true
module Ai
module Context
class BulkProcessWorker
include ActiveContext::Concerns::BulkAsyncProcess
include ::ApplicationWorker
include ::CronjobQueue
include Search::Worker
include Gitlab::ExclusiveLeaseHelpers
prepend ::Geo::SkipSecondary
idempotent!
worker_resource_boundary :cpu
urgency :low
data_consistency :sticky
loggable_arguments 0, 1
end
end
end
```
Schedule the worker on a cron schedule in `config/initializers/1_settings.rb`.
### Registering a queue ### Registering a queue
......
# frozen_string_literal: true
module ActiveContext
class BulkProcessor
attr_reader :failures, :adapter
def initialize
@failures = []
@adapter = ActiveContext.adapter
end
def process(ref)
send_bulk if @adapter.add_ref(ref)
end
def flush
send_bulk.failures
end
private
def send_bulk
return self if adapter.empty?
failed_refs = try_send_bulk
logger.info(
'message' => 'bulk_submitted',
'meta.indexing.bulk_count' => adapter.all_refs.size,
'meta.indexing.errors_count' => failed_refs.count
)
failures.push(*failed_refs)
adapter.reset
self
end
def try_send_bulk
result = adapter.bulk
adapter.process_bulk_errors(result)
rescue StandardError => e
logger.error(message: 'bulk_exception', error_class: e.class.to_s, error_message: e.message)
adapter.all_refs
end
def logger
@logger ||= ActiveContext::Config.logger
end
end
end
...@@ -4,17 +4,24 @@ module ActiveContext ...@@ -4,17 +4,24 @@ module ActiveContext
module Databases module Databases
module Concerns module Concerns
module Adapter module Adapter
attr_reader :client attr_reader :options, :client, :indexer
delegate :search, to: :client delegate :search, to: :client
delegate :all_refs, :add_ref, :empty?, :bulk, :process_bulk_errors, :reset, to: :indexer
def initialize(options) def initialize(options)
@options = options
@client = client_klass.new(options) @client = client_klass.new(options)
@indexer = indexer_klass.new(options, client)
end end
def client_klass def client_klass
raise NotImplementedError raise NotImplementedError
end end
def indexer_klass
raise NotImplementedError
end
end end
end end
end end
......
# frozen_string_literal: true
# rubocop: disable Gitlab/ModuleWithInstanceVariables -- this is a concern
module ActiveContext
module Databases
module Concerns
module Indexer
attr_reader :options, :client, :refs
def initialize(options, client)
@options = options
@client = client
@refs = []
end
def all_refs
refs
end
# Adds a reference to the refs array
#
# @param ref [Object] The reference to add
# @return [Boolean] True if bulk processing should be forced, e.g., when a size threshold is reached
def add_ref(ref)
raise NotImplementedError
end
# Checks if nothing should be processed
#
# @return [Boolean] True if bulk processing should be skipped
def empty?
raise NotImplementedError
end
# Performs bulk processing on the refs array
#
# @return [Object] The result of bulk processing
def bulk
raise NotImplementedError
end
# Processes errors from bulk operation
#
# @param result [Object] The result from the bulk operation
# @return [Array] Any failures that occurred during bulk processing
def process_bulk_errors(_result)
raise NotImplementedError
end
# Resets the adapter to a clean state
def reset
@refs = []
# also reset anything that builds up from the refs array
end
end
end
end
end
# rubocop: enable Gitlab/ModuleWithInstanceVariables
...@@ -9,6 +9,10 @@ class Adapter ...@@ -9,6 +9,10 @@ class Adapter
def client_klass def client_klass
ActiveContext::Databases::Elasticsearch::Client ActiveContext::Databases::Elasticsearch::Client
end end
def indexer_klass
ActiveContext::Databases::Elasticsearch::Indexer
end
end end
end end
end end
......
...@@ -6,6 +6,8 @@ module Elasticsearch ...@@ -6,6 +6,8 @@ module Elasticsearch
class Client class Client
include ActiveContext::Databases::Concerns::Client include ActiveContext::Databases::Concerns::Client
delegate :bulk, to: :client
OPEN_TIMEOUT = 5 OPEN_TIMEOUT = 5
NO_RETRY = 0 NO_RETRY = 0
......
# frozen_string_literal: true
module ActiveContext
module Databases
module Elasticsearch
class Indexer
include ActiveContext::Databases::Concerns::Indexer
DEFAULT_MAX_BULK_SIZE = 10.megabytes
attr_reader :operations, :bulk_size
def initialize(...)
super
@operations = []
@bulk_size = 0
end
def add_ref(ref)
operation = build_operation(ref)
@refs << ref
@operations << operation
@bulk_size += calculate_operation_size(operation)
bulk_size >= bulk_threshold
end
def empty?
operations.empty?
end
def bulk
client.bulk(body: operations.flatten)
end
def process_bulk_errors(result)
return [] unless result['errors']
failed_refs = []
result['items'].each_with_index do |item, index|
op = item['index'] || item['update'] || item['delete']
next unless op.nil? || op['error']
ref = refs[index]
logger.warn(
'message' => 'indexing_failed',
'meta.indexing.error' => op&.dig('error') || 'Operation was nil',
'meta.indexing.status' => op&.dig('status'),
'meta.indexing.operation_type' => item.each_key.first,
'meta.indexing.ref' => ref.serialize,
'meta.indexing.identifier' => ref.identifier
)
failed_refs << ref
end
failed_refs
end
def reset
super
@operations = []
@bulk_size = 0
end
private
def build_operation(ref)
case ref.operation.to_sym
when :index, :upsert
[
{ update: { _index: ref.index_name, _id: ref.identifier, routing: ref.routing }.compact },
{ doc: ref.as_indexed_json, doc_as_upsert: true }
]
when :delete
[{ delete: { _index: ref.index_name, _id: ref.identifier, routing: ref.routing }.compact }]
else
raise StandardError, "Operation #{ref.operation} is not supported"
end
end
def calculate_operation_size(operation)
operation.to_json.bytesize + 2 # Account for newlines
end
def bulk_threshold
@bulk_threshold ||= options[:max_bulk_size_bytes] || DEFAULT_MAX_BULK_SIZE
end
def logger
@logger ||= ActiveContext::Config.logger
end
end
end
end
end
...@@ -9,6 +9,10 @@ class Adapter ...@@ -9,6 +9,10 @@ class Adapter
def client_klass def client_klass
ActiveContext::Databases::Opensearch::Client ActiveContext::Databases::Opensearch::Client
end end
def indexer_klass
ActiveContext::Databases::Opensearch::Indexer
end
end end
end end
end end
......
# frozen_string_literal: true
module ActiveContext
module Databases
module Opensearch
class Indexer
include ActiveContext::Databases::Concerns::Indexer
end
end
end
end
...@@ -9,6 +9,10 @@ class Adapter ...@@ -9,6 +9,10 @@ class Adapter
def client_klass def client_klass
ActiveContext::Databases::Postgresql::Client ActiveContext::Databases::Postgresql::Client
end end
def indexer_klass
ActiveContext::Databases::Postgresql::Indexer
end
end end
end end
end end
......
# frozen_string_literal: true
module ActiveContext
module Databases
module Postgresql
class Indexer
include ActiveContext::Databases::Concerns::Indexer
end
end
end
end
# frozen_string_literal: true
RSpec.describe ActiveContext::BulkProcessor do
let(:adapter) { ActiveContext::Databases::Elasticsearch::Adapter.new(url: 'http://localhost:9200') }
let(:logger) { instance_double(Logger) }
let(:ref) { double }
before do
allow(ActiveContext).to receive(:adapter).and_return(adapter)
allow(ActiveContext::Config).to receive(:logger).and_return(logger)
allow(logger).to receive(:info)
allow(logger).to receive(:error)
allow(ref).to receive_messages(
operation: :index,
id: 1,
as_indexed_json: { title: 'Test Issue' },
index_name: 'issues',
identifier: '1',
routing: 'group_1'
)
end
describe '#initialize' do
it 'initializes with empty failures and the correct adapter' do
processor = described_class.new
expect(processor.failures).to be_empty
expect(processor.adapter).to be_a(ActiveContext::Databases::Elasticsearch::Adapter)
end
end
describe '#process' do
let(:processor) { described_class.new }
it 'adds ref to adapter and calls send_bulk if it returns true' do
allow(adapter).to receive(:add_ref).and_return(true)
expect(processor).to receive(:send_bulk).once
processor.process(ref)
end
it 'adds ref to adapter and does not call send_bulk if it returns false' do
allow(adapter).to receive(:add_ref).and_return(false)
expect(processor).not_to receive(:send_bulk)
processor.process(ref)
end
end
describe '#flush' do
let(:processor) { described_class.new }
it 'calls send_bulk and returns failures' do
allow(processor).to receive(:send_bulk).and_return(processor)
expect(processor.flush).to eq([])
end
end
describe '#send_bulk' do
let(:processor) { described_class.new }
before do
processor.process(ref)
end
it 'processes bulk and logs info' do
allow(adapter).to receive(:bulk).and_return({ 'items' => [] })
expect(logger).to receive(:info).with(
'message' => 'bulk_submitted',
'meta.indexing.bulk_count' => 1,
'meta.indexing.errors_count' => 0
)
processor.send(:send_bulk)
end
it 'resets the adapter after processing' do
allow(adapter).to receive(:bulk).and_return({ 'items' => [] })
expect(adapter).to receive(:reset)
processor.send(:send_bulk)
end
end
describe '#try_send_bulk' do
let(:processor) { described_class.new }
before do
processor.process(ref)
end
context 'when bulk processing succeeds' do
it 'returns empty array' do
allow(adapter).to receive(:bulk).and_return({ 'items' => [] })
expect(processor.send(:try_send_bulk)).to eq([])
end
end
context 'when bulk processing fails' do
it 'logs error and returns all refs' do
allow(adapter).to receive(:bulk).and_raise(StandardError.new('Bulk processing failed'))
expect(logger).to receive(:error).with(
message: 'bulk_exception',
error_class: 'StandardError',
error_message: 'Bulk processing failed'
)
expect(processor.send(:try_send_bulk)).to eq([ref])
end
end
end
end
# frozen_string_literal: true
RSpec.describe ActiveContext::Databases::Elasticsearch::Indexer do
let(:es_client) { instance_double(Elasticsearch::Client) }
let(:logger) { instance_double(Logger, warn: nil) }
let(:options) { {} }
let(:indexer) { described_class.new(options, es_client) }
let(:ref) { double }
before do
allow(ActiveContext::Config).to receive(:logger).and_return(logger)
allow(ref).to receive_messages(
operation: :index,
id: 1,
as_indexed_json: { title: 'Test Issue' },
index_name: 'issues',
identifier: '1',
routing: 'group_1',
serialize: 'issue 1 group_1'
)
end
describe '#initialize' do
it 'initializes with empty operations and zero bulk size' do
expect(indexer.operations).to be_empty
expect(indexer.bulk_size).to eq(0)
end
end
describe '#add_ref' do
it 'adds the ref and returns true when bulk threshold is reached' do
allow(indexer).to receive(:bulk_threshold).and_return(1)
expect(indexer.add_ref(ref)).to be true
expect(indexer.operations).not_to be_empty
end
it 'adds the ref and returns false when bulk threshold is not reached' do
allow(indexer).to receive(:bulk_threshold).and_return(1000000)
expect(indexer.add_ref(ref)).to be false
expect(indexer.operations).not_to be_empty
end
it 'raises an error for unsupported operations' do
allow(ref).to receive(:operation).and_return(:unsupported)
expect { indexer.add_ref(ref) }.to raise_error(StandardError, /Operation unsupported is not supported/)
end
end
describe '#empty?' do
it 'returns true when there are no operations' do
expect(indexer).to be_empty
end
it 'returns false when there are operations' do
indexer.instance_variable_set(:@operations, [{}])
expect(indexer).not_to be_empty
end
end
describe '#bulk' do
before do
indexer.instance_variable_set(:@operations, [{ index: {} }])
end
it 'calls bulk on the client with flattened operations' do
expect(es_client).to receive(:bulk).with(body: [{ index: {} }])
indexer.bulk
end
end
describe '#process_bulk_errors' do
before do
indexer.instance_variable_set(:@refs, [ref])
end
context 'when there are no errors' do
it 'returns an empty array' do
result = { 'errors' => false }
expect(indexer.process_bulk_errors(result)).to be_empty
end
end
context 'when there are errors' do
let(:result) do
{
'errors' => true,
'items' => [
{ 'index' => { 'error' => 'Error message', 'status' => 400 } }
]
}
end
it 'logs warnings and returns failed refs' do
expect(logger).to receive(:warn).with(
'message' => 'indexing_failed',
'meta.indexing.error' => 'Error message',
'meta.indexing.status' => 400,
'meta.indexing.operation_type' => 'index',
'meta.indexing.ref' => 'issue 1 group_1',
'meta.indexing.identifier' => '1'
)
failed_refs = indexer.process_bulk_errors(result)
expect(failed_refs).to eq([ref])
end
end
end
describe '#reset' do
before do
indexer.instance_variable_set(:@operations, [{}])
indexer.instance_variable_set(:@bulk_size, 100)
end
it 'resets operations and bulk size' do
indexer.reset
expect(indexer.operations).to be_empty
expect(indexer.bulk_size).to eq(0)
end
end
end
# frozen_string_literal: true # frozen_string_literal: true
require "active_context" require 'active_context'
require 'active_support/all'
require 'logger' require 'logger'
require 'elasticsearch' require 'elasticsearch'
require 'opensearch' require 'opensearch'
require 'aws-sdk-core' require 'aws-sdk-core'
require 'active_support/concern'
require 'redis' require 'redis'
require 'byebug' require 'byebug'
require 'active_support'
require 'active_support/core_ext/numeric/time'
require 'active_context/concerns/bulk_async_process'
Dir[File.join(__dir__, 'support/**/*.rb')].each { |f| require f } Dir[File.join(__dir__, 'support/**/*.rb')].each { |f| require f }
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册