diff --git a/gems/gitlab-active-context/.rubocop.yml b/gems/gitlab-active-context/.rubocop.yml index 0621d40c0e2b779db0b15462e400bd9b995d73d9..919eb7a6796a14828a60bd4dd45fa7360f4a7eac 100644 --- a/gems/gitlab-active-context/.rubocop.yml +++ b/gems/gitlab-active-context/.rubocop.yml @@ -9,3 +9,13 @@ Search/NamespacedClass: RSpec/MultipleMemoizedHelpers: Max: 25 + +RSpec/VerifiedDoubles: + Exclude: + - 'spec/lib/active_context/tracker_spec.rb' + +RSpec/VerifiedDoubleReference: + Exclude: + - 'spec/lib/active_context/bulk_process_queue_spec.rb' + - 'spec/lib/active_context/reference_spec.rb' + - 'spec/lib/active_context/tracker_spec.rb' diff --git a/gems/gitlab-active-context/README.md b/gems/gitlab-active-context/README.md index 3ca7d3734328fef44ba936142aefcbb4f9cb1f5e..28929cf46e8a2ca291c9b6e967cacd693136f4d3 100644 --- a/gems/gitlab-active-context/README.md +++ b/gems/gitlab-active-context/README.md @@ -142,6 +142,66 @@ ActiveContext.raw_queues #<Ai::Context::Queues::MergeRequest:0x0000000177cdf370 @shard=1>] ``` +### Adding a new reference type + +Create a class under `lib/active_context/references/` and inherit from the `Reference` class and define the following methods: + +Class methods required: + +- `serialize(object, routing)`: defines a string representation of the reference object +- `preload_refs` (optional): preload database records to prevent N+1 issues + +Instance methods required: + +- `serialize`: defines a string representation of the reference object +- `as_indexed_json`: a hash containing the data representation of the object +- `operation`: determines the operation which can be one of `index`, `upsert` or `delete` +- `partition_name`: name of the table or index +- `identifier`: unique identifier +- `routing` (optional) + +Example: + +```ruby +# frozen_string_literal: true + +module Ai + module Context + module References + class MergeRequest < ::ActiveContext::Reference + def self.serialize(record) + new(record.id).serialize + end + + attr_reader :identifier + + def initialize(identifier) + @identifier = identifier.to_i + end + + def serialize + self.class.join_delimited([identifier].compact) + end + + def as_indexed_json + { + id: identifier + } + end + + def operation + :index + end + + def partition_name + 'ai_context_merge_requests' + end + end + end + end +end +``` + ### Adding a new collection A collection maps data to references and specifies a queue to track its references. @@ -151,7 +211,8 @@ To add a new collection: 1. Create a new file in the appropriate directory 1. Define a class that `includes ActiveContext::Concerns::Collection` 1. Implement the `self.queue` class method to return the associated queue -1. Implement the `references` instance method to return the references for an object +1. Implement the `self.reference_klass` or `self.reference_klasses` class method to return the references for an object +1. Implement the `self.routing(object)` class method to determine how an object should be routed Example: @@ -166,8 +227,15 @@ module Ai Queues::MergeRequest end - def references - [Search::Elastic::References::Embedding.serialize(object)] + def self.reference_klasses + [ + References::Embedding, + References::MergeRequest + ] + end + + def self.routing(object) + object.project.root_ancestor.id end end end @@ -193,6 +261,10 @@ ActiveContext.track!(MergeRequest.first, collection: Ai::Context::Collections::M ActiveContext.track!(MergeRequest.first, collection: Ai::Context::Collections::MergeRequest, queue: Ai::Context::Queues::Default) ``` +```ruby +ActiveContext.track!(Ai::Context::References::MergeRequest.new(1), queue: Ai::Context::Queues::MergeRequest) +``` + To view all tracked references: ```ruby diff --git a/gems/gitlab-active-context/lib/active_context.rb b/gems/gitlab-active-context/lib/active_context.rb index 7b7c649ef2a7d159f53d47dfb1a55c9b3cc14005..a7c8cbe860144dc1a536483c10396a0c2a034f2c 100644 --- a/gems/gitlab-active-context/lib/active_context.rb +++ b/gems/gitlab-active-context/lib/active_context.rb @@ -29,7 +29,7 @@ def self.raw_queues ActiveContext::Queues.raw_queues end - def self.track!(*objects, collection:, queue: nil) + def self.track!(*objects, collection: nil, queue: nil) ActiveContext::Tracker.track!(*objects, collection: collection, queue: queue) end end diff --git a/gems/gitlab-active-context/lib/active_context/bulk_process_queue.rb b/gems/gitlab-active-context/lib/active_context/bulk_process_queue.rb index 0911df7327f0be432084d1118a00c4b00ec0a652..c5ad1bc5eee8708cd866841c50124c8e91cfee67 100644 --- a/gems/gitlab-active-context/lib/active_context/bulk_process_queue.rb +++ b/gems/gitlab-active-context/lib/active_context/bulk_process_queue.rb @@ -46,8 +46,24 @@ def process(redis) return [0, 0] if specs_buffer.blank? - # TODO: https://gitlab.com/gitlab-org/gitlab/-/issues/507973 - # deserialize refs, preload records, submit docs, flush, etc. + refs = deserialize_all(specs_buffer) + + Reference.preload(refs).each do |ref| # rubocop: disable Rails/FindEach -- not ActiveRecord + bulk_processor.process(ref) + end + + flushing_duration_s = Benchmark.realtime do + @failures = bulk_processor.flush + end + + logger.info( + 'class' => self.class.name, + 'message' => 'bulk_indexer_flushed', + 'meta.indexing.search_flushing_duration_s' => flushing_duration_s + ) + + # Re-enqueue any failures so they are retried + ActiveContext.track!(@failures, queue: queue) # Remove all the successes scores.each do |set_key, (first_score, last_score, count)| @@ -70,6 +86,14 @@ def process(redis) private + def deserialize_all(specs) + specs.filter_map { |spec, _| Reference.deserialize(spec) } + end + + def bulk_processor + @bulk_processor ||= ActiveContext::BulkProcessor.new + end + def logger @logger ||= ActiveContext::Config.logger end diff --git a/gems/gitlab-active-context/lib/active_context/concerns/collection.rb b/gems/gitlab-active-context/lib/active_context/concerns/collection.rb index cb9f9d23913060f99aba9f8d0912405e6ebb723d..049ae007bb83b9fa2f1a99782ce4fee979375f34 100644 --- a/gems/gitlab-active-context/lib/active_context/concerns/collection.rb +++ b/gems/gitlab-active-context/lib/active_context/concerns/collection.rb @@ -13,6 +13,20 @@ def track!(*objects) def queue raise NotImplementedError end + + def routing(_) + raise NotImplementedError + end + + def reference_klasses + Array.wrap(reference_klass).tap do |klasses| + raise NotImplementedError, "#{self} should define reference_klasses or reference_klass" if klasses.empty? + end + end + + def reference_klass + nil + end end attr_reader :object @@ -22,7 +36,12 @@ def initialize(object) end def references - raise NotImplementedError + reference_klasses = Array.wrap(self.class.reference_klasses) + routing = self.class.routing(object) + + reference_klasses.map do |reference_klass| + reference_klass.serialize(object, routing) + end end end end diff --git a/gems/gitlab-active-context/lib/active_context/concerns/reference_utils.rb b/gems/gitlab-active-context/lib/active_context/concerns/reference_utils.rb new file mode 100644 index 0000000000000000000000000000000000000000..c202d37b3458c3860c0bdb83f6e46343e9e3ce28 --- /dev/null +++ b/gems/gitlab-active-context/lib/active_context/concerns/reference_utils.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +module ActiveContext + module Concerns + module ReferenceUtils + def delimit(string) + string.split(self::DELIMITER) + end + + def join_delimited(array) + [self, array].join(self::DELIMITER) + end + + def deserialize_string(string) + delimit(string)[1..] + end + + def ref_klass(string) + klass = delimit(string).first.safe_constantize + + klass if klass && klass < ::ActiveContext::Reference + end + + def ref_module + to_s.pluralize + end + end + end +end diff --git a/gems/gitlab-active-context/lib/active_context/databases/elasticsearch/indexer.rb b/gems/gitlab-active-context/lib/active_context/databases/elasticsearch/indexer.rb index 47f5d80cf6f9e3da66282e27b1ccc8d6601506e8..77e5a3faa643fc19d523750841c6439715a11990 100644 --- a/gems/gitlab-active-context/lib/active_context/databases/elasticsearch/indexer.rb +++ b/gems/gitlab-active-context/lib/active_context/databases/elasticsearch/indexer.rb @@ -72,11 +72,11 @@ def build_operation(ref) case ref.operation.to_sym when :index, :upsert [ - { update: { _index: ref.index_name, _id: ref.identifier, routing: ref.routing }.compact }, + { update: { _index: ref.partition_name, _id: ref.identifier, routing: ref.routing }.compact }, { doc: ref.as_indexed_json, doc_as_upsert: true } ] when :delete - [{ delete: { _index: ref.index_name, _id: ref.identifier, routing: ref.routing }.compact }] + [{ delete: { _index: ref.partition_name, _id: ref.identifier, routing: ref.routing }.compact }] else raise StandardError, "Operation #{ref.operation} is not supported" end diff --git a/gems/gitlab-active-context/lib/active_context/reference.rb b/gems/gitlab-active-context/lib/active_context/reference.rb new file mode 100644 index 0000000000000000000000000000000000000000..56a7d933d3bb10785ccbfc63ee14771543c28bf8 --- /dev/null +++ b/gems/gitlab-active-context/lib/active_context/reference.rb @@ -0,0 +1,76 @@ +# frozen_string_literal: true + +module ActiveContext + class Reference + extend Concerns::ReferenceUtils + + DELIMITER = '|' + PRELOAD_BATCH_SIZE = 1_000 + + class << self + def deserialize(string) + ref_klass = ref_klass(string) + + if ref_klass + ref_klass.instantiate(string) + else + Search::Elastic::Reference.deserialize(string) + end + end + + def instantiate(string) + new(*deserialize_string(string)) + end + + def preload(refs) + refs.group_by(&:class).each do |klass, class_refs| + class_refs.each_slice(PRELOAD_BATCH_SIZE) do |group_slice| + klass.preload_refs(group_slice) + end + end + + refs + end + + def serialize + raise NotImplementedError + end + + def preload_refs(refs) + refs + end + + def klass + name.demodulize + end + end + + def klass + self.class.klass + end + + def serialize + raise NotImplementedError + end + + def as_indexed_json + raise NotImplementedError + end + + def operation + raise NotImplementedError + end + + def partition_name + raise NotImplementedError + end + + def identifier + raise NotImplementedError + end + + def routing + nil + end + end +end diff --git a/gems/gitlab-active-context/lib/active_context/tracker.rb b/gems/gitlab-active-context/lib/active_context/tracker.rb index 85acf90a5fe66c14d6f3d7cf4ed70abacc8d5509..e01d00326f1ed7f14d0daa02cca92378d569089f 100644 --- a/gems/gitlab-active-context/lib/active_context/tracker.rb +++ b/gems/gitlab-active-context/lib/active_context/tracker.rb @@ -3,7 +3,7 @@ module ActiveContext class Tracker class << self - def track!(*objects, collection:, queue: nil) + def track!(*objects, collection: nil, queue: nil) references = collect_references(objects.flatten, collection) return 0 if references.empty? @@ -19,9 +19,22 @@ def track!(*objects, collection:, queue: nil) def collect_references(objects, collection) objects.flat_map do |obj| - collection.new(obj).references + if obj.is_a?(ActiveContext::Reference) + obj.serialize + elsif obj.is_a?(String) + obj + else + next collection.new(obj).references if collection + + logger.warn("ActiveContext unable to track `#{obj}`: Collection must be specified") + [] + end end end + + def logger + ActiveContext::Config.logger + end end end end diff --git a/gems/gitlab-active-context/spec/lib/active_context/bulk_process_queue_spec.rb b/gems/gitlab-active-context/spec/lib/active_context/bulk_process_queue_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..c8ca03336ff58b7b06487aa834fd2cb500595f81 --- /dev/null +++ b/gems/gitlab-active-context/spec/lib/active_context/bulk_process_queue_spec.rb @@ -0,0 +1,80 @@ +# frozen_string_literal: true + +RSpec.describe ActiveContext::BulkProcessQueue do + let(:queue) { instance_double('ActiveContext::Queue') } + let(:shard) { 0 } + let(:redis) { instance_double(Redis) } + let(:bulk_processor) { instance_double('ActiveContext::BulkProcessor') } + let(:logger) { instance_double('Logger', info: nil, error: nil) } + + subject(:bulk_process_queue) { described_class.new(queue, shard) } + + before do + allow(ActiveContext::Redis).to receive(:with_redis).and_yield(redis) + allow(ActiveContext::BulkProcessor).to receive(:new).and_return(bulk_processor) + allow(ActiveContext::Config).to receive(:logger).and_return(logger) + allow(bulk_processor).to receive(:process) + allow(bulk_processor).to receive(:flush).and_return([]) + end + + describe '#process' do + let(:specs) { [['spec1', 1], ['spec2', 2]] } + let(:reference_class) { class_double("ActiveContext::Reference", preload_refs: nil).as_stubbed_const } + let(:references) { [instance_double('ActiveContext::Reference'), instance_double('ActiveContext::Reference')] } + + before do + allow(queue).to receive(:each_queued_items_by_shard).and_yield(shard, specs) + allow(queue).to receive(:redis_set_key).and_return('redis_set_key') + allow(queue).to receive(:push) + allow(bulk_process_queue).to receive(:deserialize_all).and_return(references) + allow(redis).to receive(:zremrangebyscore) + allow(references).to receive(:group_by).and_return({ reference_class => references }) + allow(reference_class).to receive(:preload_refs) + allow(ActiveContext::Reference).to receive(:preload).and_return(references) + end + + it 'processes specs and flushes the bulk processor' do + expect(bulk_processor).to receive(:process).twice + expect(bulk_processor).to receive(:flush) + + bulk_process_queue.process(redis) + end + + it 'removes processed items from Redis' do + expect(redis).to receive(:zremrangebyscore).with('redis_set_key', 1, 2) + + bulk_process_queue.process(redis) + end + + it 'returns the count of processed specs and failures' do + expect(bulk_process_queue.process(redis)).to eq([2, 0]) + end + + context 'when there are failures' do + let(:failures) { ['failed_spec'] } + + before do + allow(bulk_processor).to receive(:flush).and_return(failures) + end + + it 're-enqueues failures' do + expect(ActiveContext).to receive(:track!).with(failures, queue: queue) + + bulk_process_queue.process(redis) + end + + it 'returns the correct count of processed specs and failures' do + expect(bulk_process_queue.process(redis)).to eq([2, 1]) + end + end + + context 'when specs are empty' do + let(:specs) { [] } + + it 'returns [0, 0] without processing' do + expect(bulk_processor).not_to receive(:process) + expect(bulk_process_queue.process(redis)).to eq([0, 0]) + end + end + end +end diff --git a/gems/gitlab-active-context/spec/lib/active_context/bulk_processor_spec.rb b/gems/gitlab-active-context/spec/lib/active_context/bulk_processor_spec.rb index 2baeab1ec9173170b0269b576785953df6371b6b..8e8f9f8fc5bfa4c09623659c93b64dc39295b44f 100644 --- a/gems/gitlab-active-context/spec/lib/active_context/bulk_processor_spec.rb +++ b/gems/gitlab-active-context/spec/lib/active_context/bulk_processor_spec.rb @@ -14,7 +14,7 @@ operation: :index, id: 1, as_indexed_json: { title: 'Test Issue' }, - index_name: 'issues', + partition_name: 'issues', identifier: '1', routing: 'group_1' ) diff --git a/gems/gitlab-active-context/spec/lib/active_context/databases/elasticsearch/indexer_spec.rb b/gems/gitlab-active-context/spec/lib/active_context/databases/elasticsearch/indexer_spec.rb index 6f06f10f9c84a8c3f6ccd8c5568223f9682d7c73..e037b2888ad5f5fe70d4c4c4ddfafbf9d63aeb98 100644 --- a/gems/gitlab-active-context/spec/lib/active_context/databases/elasticsearch/indexer_spec.rb +++ b/gems/gitlab-active-context/spec/lib/active_context/databases/elasticsearch/indexer_spec.rb @@ -13,7 +13,7 @@ operation: :index, id: 1, as_indexed_json: { title: 'Test Issue' }, - index_name: 'issues', + partition_name: 'issues', identifier: '1', routing: 'group_1', serialize: 'issue 1 group_1' diff --git a/gems/gitlab-active-context/spec/lib/active_context/reference_spec.rb b/gems/gitlab-active-context/spec/lib/active_context/reference_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..fd83d9de69cec39ffa8dadc22cb776b277962941 --- /dev/null +++ b/gems/gitlab-active-context/spec/lib/active_context/reference_spec.rb @@ -0,0 +1,73 @@ +# frozen_string_literal: true + +RSpec.describe ActiveContext::Reference do + describe '.deserialize' do + context 'when ref_klass exists' do + let(:mock_ref_klass) { class_double("ActiveContext::References::TestReference") } + let(:mock_instance) { instance_double("ActiveContext::References::TestReference") } + + before do + allow(described_class).to receive(:ref_klass).and_return(mock_ref_klass) + allow(mock_ref_klass).to receive(:new).and_return(mock_instance) + end + + it 'instantiates the ref_klass with the string' do + expect(mock_ref_klass).to receive(:instantiate).with('test|string') + described_class.deserialize('test|string') + end + end + + context 'when ref_klass does not exist' do + before do + allow(described_class).to receive(:ref_klass).and_return(nil) + stub_const('Search::Elastic::Reference', Class.new) + end + + it 'falls back to Search::Elastic::Reference.deserialize' do + expect(Search::Elastic::Reference).to receive(:deserialize).with('test|string') + described_class.deserialize('test|string') + end + end + end + + describe '.ref_klass' do + before do + stub_const('ActiveContext::References::TestReference', Class.new(described_class)) + end + + it 'returns the correct class when it exists' do + expect(described_class.ref_klass('ActiveContext::References::TestReference|some|data')) + .to eq(ActiveContext::References::TestReference) + end + + it 'returns nil when the class does not exist' do + expect(described_class.ref_klass('ActiveContext::References::NonExistantReference|some|data')).to be_nil + end + end + + describe '#klass' do + it 'returns the demodulized class name' do + expect(described_class.new.klass).to eq('Reference') + end + end + + describe 'ReferenceUtils methods' do + describe '.delimit' do + it 'splits the string by the delimiter' do + expect(described_class.delimit('a|b|c')).to eq(%w[a b c]) + end + end + + describe '.join_delimited' do + it 'joins the array with the delimiter' do + expect(described_class.join_delimited(%w[a b c])).to eq('ActiveContext::Reference|a|b|c') + end + end + + describe '.ref_module' do + it 'returns the pluralized class name' do + expect(described_class.ref_module).to eq('ActiveContext::References') + end + end + end +end diff --git a/gems/gitlab-active-context/spec/lib/active_context/tracker_spec.rb b/gems/gitlab-active-context/spec/lib/active_context/tracker_spec.rb index 5e940030037274a4d2380ed6b011b737cdb69233..7ac7bbb0fc7ec106f7c7cf5cb3fc7f7d69fd5b5e 100644 --- a/gems/gitlab-active-context/spec/lib/active_context/tracker_spec.rb +++ b/gems/gitlab-active-context/spec/lib/active_context/tracker_spec.rb @@ -18,68 +18,40 @@ def references let(:mock_queue) { [] } describe '.track!' do - context 'with single object' do - it 'tracks references and returns count' do - result = described_class.track!('test', collection: mock_collection) + let(:mock_collection) { double('Collection') } + let(:mock_queue) { [] } - expect(result).to eq(1) - expect(mock_collection.queue).to contain_exactly(['ref_test']) - end - end - - context 'with multiple objects' do - it 'tracks references for all objects and returns total count' do - result = described_class.track!('test1', 'test2', collection: mock_collection) - - expect(result).to eq(2) - expect(mock_collection.queue).to contain_exactly(%w[ref_test1 ref_test2]) - end + before do + allow(mock_collection).to receive(:queue).and_return(mock_queue) end - context 'with nested arrays' do - it 'flattens arrays and tracks all references' do - result = described_class.track!(['test1', %w[test2 test3]], collection: mock_collection) - - expect(result).to eq(3) - expect(mock_collection.queue).to contain_exactly(%w[ref_test1 ref_test2 ref_test3]) - end + it 'tracks a string as-is' do + expect(described_class.track!('test_string', collection: mock_collection)).to eq(1) + expect(mock_queue).to contain_exactly(['test_string']) end - context 'with empty input' do - it 'returns zero and does not modify queue' do - result = described_class.track!([], collection: mock_collection) - - expect(result).to eq(0) - expect(mock_collection.queue).to be_empty + it 'serializes ActiveContext::Reference objects' do + reference_class = Class.new(ActiveContext::Reference) do + def serialize + 'serialized_reference' + end end - end + reference = reference_class.new - context 'with custom queue' do - it 'uses provided queue instead of collection queue' do - result = described_class.track!('test', collection: mock_collection, queue: mock_queue) - - expect(result).to eq(1) - expect(mock_queue).to contain_exactly(['ref_test']) - expect(mock_collection.queue).to be_empty - end + expect(described_class.track!(reference, collection: mock_collection)).to eq(1) + expect(mock_queue).to contain_exactly(['serialized_reference']) end - context 'when collection does not implement queue method' do - let(:invalid_collection) do - Class.new do - include ActiveContext::Concerns::Collection + it 'uses collection.references for other objects' do + obj = double('SomeObject') + collection_instance = instance_double('CollectionInstance') + references = [instance_double(ActiveContext::Reference), instance_double(ActiveContext::Reference)] - def references - ["ref"] - end - end - end + allow(mock_collection).to receive(:new).with(obj).and_return(collection_instance) + allow(collection_instance).to receive(:references).and_return(references) - it 'raises NotImplementedError' do - expect do - described_class.track!('test', collection: invalid_collection) - end.to raise_error(NotImplementedError) - end + expect(described_class.track!(obj, collection: mock_collection)).to eq(2) + expect(mock_queue).to contain_exactly(references) end end