Skip to content
代码片段 群组 项目
未验证 提交 b18c45d5 编辑于 作者: Madelein van Niekerk's avatar Madelein van Niekerk 提交者: GitLab
浏览文件

Add ActiveContext reference class

Changelog: added
EE: true
上级 513c086e
No related branches found
No related tags found
无相关合并请求
显示
432 个添加64 个删除
...@@ -9,3 +9,13 @@ Search/NamespacedClass: ...@@ -9,3 +9,13 @@ Search/NamespacedClass:
RSpec/MultipleMemoizedHelpers: RSpec/MultipleMemoizedHelpers:
Max: 25 Max: 25
RSpec/VerifiedDoubles:
Exclude:
- 'spec/lib/active_context/tracker_spec.rb'
RSpec/VerifiedDoubleReference:
Exclude:
- 'spec/lib/active_context/bulk_process_queue_spec.rb'
- 'spec/lib/active_context/reference_spec.rb'
- 'spec/lib/active_context/tracker_spec.rb'
...@@ -142,6 +142,66 @@ ActiveContext.raw_queues ...@@ -142,6 +142,66 @@ ActiveContext.raw_queues
#<Ai::Context::Queues::MergeRequest:0x0000000177cdf370 @shard=1>] #<Ai::Context::Queues::MergeRequest:0x0000000177cdf370 @shard=1>]
``` ```
### Adding a new reference type
Create a class under `lib/active_context/references/` and inherit from the `Reference` class and define the following methods:
Class methods required:
- `serialize(object, routing)`: defines a string representation of the reference object
- `preload_refs` (optional): preload database records to prevent N+1 issues
Instance methods required:
- `serialize`: defines a string representation of the reference object
- `as_indexed_json`: a hash containing the data representation of the object
- `operation`: determines the operation which can be one of `index`, `upsert` or `delete`
- `partition_name`: name of the table or index
- `identifier`: unique identifier
- `routing` (optional)
Example:
```ruby
# frozen_string_literal: true
module Ai
module Context
module References
class MergeRequest < ::ActiveContext::Reference
def self.serialize(record)
new(record.id).serialize
end
attr_reader :identifier
def initialize(identifier)
@identifier = identifier.to_i
end
def serialize
self.class.join_delimited([identifier].compact)
end
def as_indexed_json
{
id: identifier
}
end
def operation
:index
end
def partition_name
'ai_context_merge_requests'
end
end
end
end
end
```
### Adding a new collection ### Adding a new collection
A collection maps data to references and specifies a queue to track its references. A collection maps data to references and specifies a queue to track its references.
...@@ -151,7 +211,8 @@ To add a new collection: ...@@ -151,7 +211,8 @@ To add a new collection:
1. Create a new file in the appropriate directory 1. Create a new file in the appropriate directory
1. Define a class that `includes ActiveContext::Concerns::Collection` 1. Define a class that `includes ActiveContext::Concerns::Collection`
1. Implement the `self.queue` class method to return the associated queue 1. Implement the `self.queue` class method to return the associated queue
1. Implement the `references` instance method to return the references for an object 1. Implement the `self.reference_klass` or `self.reference_klasses` class method to return the references for an object
1. Implement the `self.routing(object)` class method to determine how an object should be routed
Example: Example:
...@@ -166,8 +227,15 @@ module Ai ...@@ -166,8 +227,15 @@ module Ai
Queues::MergeRequest Queues::MergeRequest
end end
def references def self.reference_klasses
[Search::Elastic::References::Embedding.serialize(object)] [
References::Embedding,
References::MergeRequest
]
end
def self.routing(object)
object.project.root_ancestor.id
end end
end end
end end
...@@ -193,6 +261,10 @@ ActiveContext.track!(MergeRequest.first, collection: Ai::Context::Collections::M ...@@ -193,6 +261,10 @@ ActiveContext.track!(MergeRequest.first, collection: Ai::Context::Collections::M
ActiveContext.track!(MergeRequest.first, collection: Ai::Context::Collections::MergeRequest, queue: Ai::Context::Queues::Default) ActiveContext.track!(MergeRequest.first, collection: Ai::Context::Collections::MergeRequest, queue: Ai::Context::Queues::Default)
``` ```
```ruby
ActiveContext.track!(Ai::Context::References::MergeRequest.new(1), queue: Ai::Context::Queues::MergeRequest)
```
To view all tracked references: To view all tracked references:
```ruby ```ruby
......
...@@ -29,7 +29,7 @@ def self.raw_queues ...@@ -29,7 +29,7 @@ def self.raw_queues
ActiveContext::Queues.raw_queues ActiveContext::Queues.raw_queues
end end
def self.track!(*objects, collection:, queue: nil) def self.track!(*objects, collection: nil, queue: nil)
ActiveContext::Tracker.track!(*objects, collection: collection, queue: queue) ActiveContext::Tracker.track!(*objects, collection: collection, queue: queue)
end end
end end
...@@ -46,8 +46,24 @@ def process(redis) ...@@ -46,8 +46,24 @@ def process(redis)
return [0, 0] if specs_buffer.blank? return [0, 0] if specs_buffer.blank?
# TODO: https://gitlab.com/gitlab-org/gitlab/-/issues/507973 refs = deserialize_all(specs_buffer)
# deserialize refs, preload records, submit docs, flush, etc.
Reference.preload(refs).each do |ref| # rubocop: disable Rails/FindEach -- not ActiveRecord
bulk_processor.process(ref)
end
flushing_duration_s = Benchmark.realtime do
@failures = bulk_processor.flush
end
logger.info(
'class' => self.class.name,
'message' => 'bulk_indexer_flushed',
'meta.indexing.search_flushing_duration_s' => flushing_duration_s
)
# Re-enqueue any failures so they are retried
ActiveContext.track!(@failures, queue: queue)
# Remove all the successes # Remove all the successes
scores.each do |set_key, (first_score, last_score, count)| scores.each do |set_key, (first_score, last_score, count)|
...@@ -70,6 +86,14 @@ def process(redis) ...@@ -70,6 +86,14 @@ def process(redis)
private private
def deserialize_all(specs)
specs.filter_map { |spec, _| Reference.deserialize(spec) }
end
def bulk_processor
@bulk_processor ||= ActiveContext::BulkProcessor.new
end
def logger def logger
@logger ||= ActiveContext::Config.logger @logger ||= ActiveContext::Config.logger
end end
......
...@@ -13,6 +13,20 @@ def track!(*objects) ...@@ -13,6 +13,20 @@ def track!(*objects)
def queue def queue
raise NotImplementedError raise NotImplementedError
end end
def routing(_)
raise NotImplementedError
end
def reference_klasses
Array.wrap(reference_klass).tap do |klasses|
raise NotImplementedError, "#{self} should define reference_klasses or reference_klass" if klasses.empty?
end
end
def reference_klass
nil
end
end end
attr_reader :object attr_reader :object
...@@ -22,7 +36,12 @@ def initialize(object) ...@@ -22,7 +36,12 @@ def initialize(object)
end end
def references def references
raise NotImplementedError reference_klasses = Array.wrap(self.class.reference_klasses)
routing = self.class.routing(object)
reference_klasses.map do |reference_klass|
reference_klass.serialize(object, routing)
end
end end
end end
end end
......
# frozen_string_literal: true
module ActiveContext
module Concerns
module ReferenceUtils
def delimit(string)
string.split(self::DELIMITER)
end
def join_delimited(array)
[self, array].join(self::DELIMITER)
end
def deserialize_string(string)
delimit(string)[1..]
end
def ref_klass(string)
klass = delimit(string).first.safe_constantize
klass if klass && klass < ::ActiveContext::Reference
end
def ref_module
to_s.pluralize
end
end
end
end
...@@ -72,11 +72,11 @@ def build_operation(ref) ...@@ -72,11 +72,11 @@ def build_operation(ref)
case ref.operation.to_sym case ref.operation.to_sym
when :index, :upsert when :index, :upsert
[ [
{ update: { _index: ref.index_name, _id: ref.identifier, routing: ref.routing }.compact }, { update: { _index: ref.partition_name, _id: ref.identifier, routing: ref.routing }.compact },
{ doc: ref.as_indexed_json, doc_as_upsert: true } { doc: ref.as_indexed_json, doc_as_upsert: true }
] ]
when :delete when :delete
[{ delete: { _index: ref.index_name, _id: ref.identifier, routing: ref.routing }.compact }] [{ delete: { _index: ref.partition_name, _id: ref.identifier, routing: ref.routing }.compact }]
else else
raise StandardError, "Operation #{ref.operation} is not supported" raise StandardError, "Operation #{ref.operation} is not supported"
end end
......
# frozen_string_literal: true
module ActiveContext
class Reference
extend Concerns::ReferenceUtils
DELIMITER = '|'
PRELOAD_BATCH_SIZE = 1_000
class << self
def deserialize(string)
ref_klass = ref_klass(string)
if ref_klass
ref_klass.instantiate(string)
else
Search::Elastic::Reference.deserialize(string)
end
end
def instantiate(string)
new(*deserialize_string(string))
end
def preload(refs)
refs.group_by(&:class).each do |klass, class_refs|
class_refs.each_slice(PRELOAD_BATCH_SIZE) do |group_slice|
klass.preload_refs(group_slice)
end
end
refs
end
def serialize
raise NotImplementedError
end
def preload_refs(refs)
refs
end
def klass
name.demodulize
end
end
def klass
self.class.klass
end
def serialize
raise NotImplementedError
end
def as_indexed_json
raise NotImplementedError
end
def operation
raise NotImplementedError
end
def partition_name
raise NotImplementedError
end
def identifier
raise NotImplementedError
end
def routing
nil
end
end
end
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
module ActiveContext module ActiveContext
class Tracker class Tracker
class << self class << self
def track!(*objects, collection:, queue: nil) def track!(*objects, collection: nil, queue: nil)
references = collect_references(objects.flatten, collection) references = collect_references(objects.flatten, collection)
return 0 if references.empty? return 0 if references.empty?
...@@ -19,9 +19,22 @@ def track!(*objects, collection:, queue: nil) ...@@ -19,9 +19,22 @@ def track!(*objects, collection:, queue: nil)
def collect_references(objects, collection) def collect_references(objects, collection)
objects.flat_map do |obj| objects.flat_map do |obj|
collection.new(obj).references if obj.is_a?(ActiveContext::Reference)
obj.serialize
elsif obj.is_a?(String)
obj
else
next collection.new(obj).references if collection
logger.warn("ActiveContext unable to track `#{obj}`: Collection must be specified")
[]
end
end end
end end
def logger
ActiveContext::Config.logger
end
end end
end end
end end
# frozen_string_literal: true
RSpec.describe ActiveContext::BulkProcessQueue do
let(:queue) { instance_double('ActiveContext::Queue') }
let(:shard) { 0 }
let(:redis) { instance_double(Redis) }
let(:bulk_processor) { instance_double('ActiveContext::BulkProcessor') }
let(:logger) { instance_double('Logger', info: nil, error: nil) }
subject(:bulk_process_queue) { described_class.new(queue, shard) }
before do
allow(ActiveContext::Redis).to receive(:with_redis).and_yield(redis)
allow(ActiveContext::BulkProcessor).to receive(:new).and_return(bulk_processor)
allow(ActiveContext::Config).to receive(:logger).and_return(logger)
allow(bulk_processor).to receive(:process)
allow(bulk_processor).to receive(:flush).and_return([])
end
describe '#process' do
let(:specs) { [['spec1', 1], ['spec2', 2]] }
let(:reference_class) { class_double("ActiveContext::Reference", preload_refs: nil).as_stubbed_const }
let(:references) { [instance_double('ActiveContext::Reference'), instance_double('ActiveContext::Reference')] }
before do
allow(queue).to receive(:each_queued_items_by_shard).and_yield(shard, specs)
allow(queue).to receive(:redis_set_key).and_return('redis_set_key')
allow(queue).to receive(:push)
allow(bulk_process_queue).to receive(:deserialize_all).and_return(references)
allow(redis).to receive(:zremrangebyscore)
allow(references).to receive(:group_by).and_return({ reference_class => references })
allow(reference_class).to receive(:preload_refs)
allow(ActiveContext::Reference).to receive(:preload).and_return(references)
end
it 'processes specs and flushes the bulk processor' do
expect(bulk_processor).to receive(:process).twice
expect(bulk_processor).to receive(:flush)
bulk_process_queue.process(redis)
end
it 'removes processed items from Redis' do
expect(redis).to receive(:zremrangebyscore).with('redis_set_key', 1, 2)
bulk_process_queue.process(redis)
end
it 'returns the count of processed specs and failures' do
expect(bulk_process_queue.process(redis)).to eq([2, 0])
end
context 'when there are failures' do
let(:failures) { ['failed_spec'] }
before do
allow(bulk_processor).to receive(:flush).and_return(failures)
end
it 're-enqueues failures' do
expect(ActiveContext).to receive(:track!).with(failures, queue: queue)
bulk_process_queue.process(redis)
end
it 'returns the correct count of processed specs and failures' do
expect(bulk_process_queue.process(redis)).to eq([2, 1])
end
end
context 'when specs are empty' do
let(:specs) { [] }
it 'returns [0, 0] without processing' do
expect(bulk_processor).not_to receive(:process)
expect(bulk_process_queue.process(redis)).to eq([0, 0])
end
end
end
end
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
operation: :index, operation: :index,
id: 1, id: 1,
as_indexed_json: { title: 'Test Issue' }, as_indexed_json: { title: 'Test Issue' },
index_name: 'issues', partition_name: 'issues',
identifier: '1', identifier: '1',
routing: 'group_1' routing: 'group_1'
) )
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
operation: :index, operation: :index,
id: 1, id: 1,
as_indexed_json: { title: 'Test Issue' }, as_indexed_json: { title: 'Test Issue' },
index_name: 'issues', partition_name: 'issues',
identifier: '1', identifier: '1',
routing: 'group_1', routing: 'group_1',
serialize: 'issue 1 group_1' serialize: 'issue 1 group_1'
......
# frozen_string_literal: true
RSpec.describe ActiveContext::Reference do
describe '.deserialize' do
context 'when ref_klass exists' do
let(:mock_ref_klass) { class_double("ActiveContext::References::TestReference") }
let(:mock_instance) { instance_double("ActiveContext::References::TestReference") }
before do
allow(described_class).to receive(:ref_klass).and_return(mock_ref_klass)
allow(mock_ref_klass).to receive(:new).and_return(mock_instance)
end
it 'instantiates the ref_klass with the string' do
expect(mock_ref_klass).to receive(:instantiate).with('test|string')
described_class.deserialize('test|string')
end
end
context 'when ref_klass does not exist' do
before do
allow(described_class).to receive(:ref_klass).and_return(nil)
stub_const('Search::Elastic::Reference', Class.new)
end
it 'falls back to Search::Elastic::Reference.deserialize' do
expect(Search::Elastic::Reference).to receive(:deserialize).with('test|string')
described_class.deserialize('test|string')
end
end
end
describe '.ref_klass' do
before do
stub_const('ActiveContext::References::TestReference', Class.new(described_class))
end
it 'returns the correct class when it exists' do
expect(described_class.ref_klass('ActiveContext::References::TestReference|some|data'))
.to eq(ActiveContext::References::TestReference)
end
it 'returns nil when the class does not exist' do
expect(described_class.ref_klass('ActiveContext::References::NonExistantReference|some|data')).to be_nil
end
end
describe '#klass' do
it 'returns the demodulized class name' do
expect(described_class.new.klass).to eq('Reference')
end
end
describe 'ReferenceUtils methods' do
describe '.delimit' do
it 'splits the string by the delimiter' do
expect(described_class.delimit('a|b|c')).to eq(%w[a b c])
end
end
describe '.join_delimited' do
it 'joins the array with the delimiter' do
expect(described_class.join_delimited(%w[a b c])).to eq('ActiveContext::Reference|a|b|c')
end
end
describe '.ref_module' do
it 'returns the pluralized class name' do
expect(described_class.ref_module).to eq('ActiveContext::References')
end
end
end
end
...@@ -18,68 +18,40 @@ def references ...@@ -18,68 +18,40 @@ def references
let(:mock_queue) { [] } let(:mock_queue) { [] }
describe '.track!' do describe '.track!' do
context 'with single object' do let(:mock_collection) { double('Collection') }
it 'tracks references and returns count' do let(:mock_queue) { [] }
result = described_class.track!('test', collection: mock_collection)
expect(result).to eq(1) before do
expect(mock_collection.queue).to contain_exactly(['ref_test']) allow(mock_collection).to receive(:queue).and_return(mock_queue)
end
end
context 'with multiple objects' do
it 'tracks references for all objects and returns total count' do
result = described_class.track!('test1', 'test2', collection: mock_collection)
expect(result).to eq(2)
expect(mock_collection.queue).to contain_exactly(%w[ref_test1 ref_test2])
end
end end
context 'with nested arrays' do it 'tracks a string as-is' do
it 'flattens arrays and tracks all references' do expect(described_class.track!('test_string', collection: mock_collection)).to eq(1)
result = described_class.track!(['test1', %w[test2 test3]], collection: mock_collection) expect(mock_queue).to contain_exactly(['test_string'])
expect(result).to eq(3)
expect(mock_collection.queue).to contain_exactly(%w[ref_test1 ref_test2 ref_test3])
end
end end
context 'with empty input' do it 'serializes ActiveContext::Reference objects' do
it 'returns zero and does not modify queue' do reference_class = Class.new(ActiveContext::Reference) do
result = described_class.track!([], collection: mock_collection) def serialize
'serialized_reference'
expect(result).to eq(0) end
expect(mock_collection.queue).to be_empty
end end
end reference = reference_class.new
context 'with custom queue' do expect(described_class.track!(reference, collection: mock_collection)).to eq(1)
it 'uses provided queue instead of collection queue' do expect(mock_queue).to contain_exactly(['serialized_reference'])
result = described_class.track!('test', collection: mock_collection, queue: mock_queue)
expect(result).to eq(1)
expect(mock_queue).to contain_exactly(['ref_test'])
expect(mock_collection.queue).to be_empty
end
end end
context 'when collection does not implement queue method' do it 'uses collection.references for other objects' do
let(:invalid_collection) do obj = double('SomeObject')
Class.new do collection_instance = instance_double('CollectionInstance')
include ActiveContext::Concerns::Collection references = [instance_double(ActiveContext::Reference), instance_double(ActiveContext::Reference)]
def references allow(mock_collection).to receive(:new).with(obj).and_return(collection_instance)
["ref"] allow(collection_instance).to receive(:references).and_return(references)
end
end
end
it 'raises NotImplementedError' do expect(described_class.track!(obj, collection: mock_collection)).to eq(2)
expect do expect(mock_queue).to contain_exactly(references)
described_class.track!('test', collection: invalid_collection)
end.to raise_error(NotImplementedError)
end
end end
end end
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册