diff --git a/Gemfile.next.checksum b/Gemfile.next.checksum index 1872875af3d0add258dac52e8148da51c87e05d1..3c26bab39dd809084d28ccdc8e05369004d06eb4 100644 --- a/Gemfile.next.checksum +++ b/Gemfile.next.checksum @@ -570,7 +570,7 @@ {"name":"rbs","version":"3.6.1","platform":"ruby","checksum":"ed7273d018556844583d1785ac54194e67eec594d68e317d57fa90ad035532c0"}, {"name":"rbtrace","version":"0.5.1","platform":"ruby","checksum":"e8cba64d462bfb8ba102d7be2ecaacc789247d52ac587d8003549d909cb9c5dc"}, {"name":"rchardet","version":"1.8.0","platform":"ruby","checksum":"693acd5253d5ade81a51940697955f6dd4bb2f0d245bda76a8e23deec70a52c7"}, -{"name":"rdoc","version":"6.10.0","platform":"ruby","checksum":"db665021883ac9df3ba29cdf71aece960749888db1bf9615b4a584cfa3fa3eda"}, +{"name":"rdoc","version":"6.11.0","platform":"ruby","checksum":"bec66fb9b019be64f7ba7d2cd2aecb283a3a01fef23a95b33e2349c6d1aa0040"}, {"name":"re2","version":"2.7.0","platform":"aarch64-linux","checksum":"778921298b6e8aba26a6230dd298c9b361b92e45024f81fa6aee788060fa307c"}, {"name":"re2","version":"2.7.0","platform":"arm-linux","checksum":"d328b5286d83ae265e13b855da8e348a976f80f91b748045b52073a570577954"}, {"name":"re2","version":"2.7.0","platform":"arm64-darwin","checksum":"7d993f27a1afac4001c539a829e2af211ced62604930c90df32a307cf74cb4a4"}, diff --git a/Gemfile.next.lock b/Gemfile.next.lock index d39bb391a0ee0365c405bfcd3269704dc4e4d014..9c9c0f31a32fa716aa94af8e705399c0eafddaf1 100644 --- a/Gemfile.next.lock +++ b/Gemfile.next.lock @@ -1568,7 +1568,7 @@ GEM msgpack (>= 0.4.3) optimist (>= 3.0.0) rchardet (1.8.0) - rdoc (6.10.0) + rdoc (6.11.0) psych (>= 4.0.0) re2 (2.7.0) mini_portile2 (~> 2.8.5) diff --git a/gems/gitlab-active-context/Gemfile.lock b/gems/gitlab-active-context/Gemfile.lock index 65542b80a33e68415182dcff14a4bf759db4f6bf..6f5a395e2bfb623d607e3ae3428f954ffb08b98e 100644 --- a/gems/gitlab-active-context/Gemfile.lock +++ b/gems/gitlab-active-context/Gemfile.lock @@ -158,6 +158,10 @@ GEM rake (13.2.1) rdoc (6.8.1) psych (>= 4.0.0) + redis (5.3.0) + redis-client (>= 0.22.0) + redis-client (0.23.1) + connection_pool regexp_parser (2.9.3) reline (0.5.12) io-console (~> 0.5) @@ -241,6 +245,7 @@ DEPENDENCIES gitlab-active-context! gitlab-styles rake (~> 13.0) + redis rspec (~> 3.0) rspec-rails rubocop diff --git a/gems/gitlab-active-context/README.md b/gems/gitlab-active-context/README.md index aadb343db1c4c5a66ad3466a85021f925fe3faa5..39b12e4842d1772c3bbab351e4a5282aa30acf5f 100644 --- a/gems/gitlab-active-context/README.md +++ b/gems/gitlab-active-context/README.md @@ -87,6 +87,63 @@ ActiveContext.raw_queues => ["ai_context_queues:{merge_request}:0", "ai_context_queues:{merge_request}:1"] ``` +### Adding a new collection + +A collection maps data to references and specifies a queue to track its references. + +To add a new collection: + +1. Create a new file in the appropriate directory +1. Define a class that `includes ActiveContext::Concerns::Collection` +1. Implement the `self.queue` class method to return the associated queue +1. Implement the `references` instance method to return the references for an object + +Example: + +```ruby +module Ai + module Context + module Collections + class MergeRequest + include ActiveContext::Concerns::Collection + + def self.queue + Queues::MergeRequest + end + + def references + [Search::Elastic::References::Embedding.serialize(object)] + end + end + end + end +end +``` + +Adding references to the queue can be done a few ways: + +```ruby +Ai::Context::Collections::MergeRequest.track!(MergeRequest.first) +``` + +```ruby +Ai::Context::Collections::MergeRequest.track!(MergeRequest.take(10)) +``` + +```ruby +ActiveContext.track!(MergeRequest.first, collection: Ai::Context::Collections::MergeRequest) +``` + +```ruby +ActiveContext.track!(MergeRequest.first, collection: Ai::Context::Collections::MergeRequest, queue: Ai::Context::Queues::Default) +``` + +To view all tracked references: + +```ruby +ActiveContext::Queues.all_queued_items +``` + ## Contributing ### Development guidelines diff --git a/gems/gitlab-active-context/gitlab-active-context.gemspec b/gems/gitlab-active-context/gitlab-active-context.gemspec index 85dcb86381caa6697d97c540a351f0ff2523437d..9ff6c3b07ee703535d329695a070d3c076246c50 100644 --- a/gems/gitlab-active-context/gitlab-active-context.gemspec +++ b/gems/gitlab-active-context/gitlab-active-context.gemspec @@ -29,6 +29,7 @@ Gem::Specification.new do |spec| spec.add_development_dependency 'aws-sdk-core' spec.add_development_dependency 'faraday_middleware-aws-sigv4' spec.add_development_dependency 'gitlab-styles' + spec.add_development_dependency 'redis' spec.add_development_dependency 'rspec-rails' spec.add_development_dependency 'rubocop-rspec' spec.add_development_dependency 'webmock' diff --git a/gems/gitlab-active-context/lib/active_context.rb b/gems/gitlab-active-context/lib/active_context.rb index 076f9c4a5f9fc954d66630a9dff70f3aa16e5d52..7b7c649ef2a7d159f53d47dfb1a55c9b3cc14005 100644 --- a/gems/gitlab-active-context/lib/active_context.rb +++ b/gems/gitlab-active-context/lib/active_context.rb @@ -28,4 +28,8 @@ def self.queues def self.raw_queues ActiveContext::Queues.raw_queues end + + def self.track!(*objects, collection:, queue: nil) + ActiveContext::Tracker.track!(*objects, collection: collection, queue: queue) + end end diff --git a/gems/gitlab-active-context/lib/active_context/concerns/collection.rb b/gems/gitlab-active-context/lib/active_context/concerns/collection.rb new file mode 100644 index 0000000000000000000000000000000000000000..cb9f9d23913060f99aba9f8d0912405e6ebb723d --- /dev/null +++ b/gems/gitlab-active-context/lib/active_context/concerns/collection.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +module ActiveContext + module Concerns + module Collection + extend ActiveSupport::Concern + + class_methods do + def track!(*objects) + ActiveContext::Tracker.track!(objects, collection: self) + end + + def queue + raise NotImplementedError + end + end + + attr_reader :object + + def initialize(object) + @object = object + end + + def references + raise NotImplementedError + end + end + end +end diff --git a/gems/gitlab-active-context/lib/active_context/concerns/queue.rb b/gems/gitlab-active-context/lib/active_context/concerns/queue.rb index f32e59bd62fd561b5819cb383760d508b9b25036..d7ee7f7cbf26e34f6fc599c61988bd46bb1e7404 100644 --- a/gems/gitlab-active-context/lib/active_context/concerns/queue.rb +++ b/gems/gitlab-active-context/lib/active_context/concerns/queue.rb @@ -9,6 +9,9 @@ def self.included(base) end module ClassMethods + SLICE_SIZE = 1_000 + SHARD_LIMIT = 1_000 + def number_of_shards raise NotImplementedError end @@ -17,10 +20,84 @@ def register! ActiveContext::Queues.register!(redis_key, shards: number_of_shards) end + def push(references) + refs_by_shard = references.group_by { |ref| ActiveContext::Shard.shard_number(number_of_shards, ref) } + + ActiveContext::Redis.with_redis do |redis| + refs_by_shard.each do |shard_number, shard_items| + set_key = redis_set_key(shard_number) + + max = redis.incrby(redis_score_key(shard_number), shard_items.size) + min = (max - shard_items.size) + 1 + + (min..max).zip(shard_items).each_slice(SLICE_SIZE) do |group| + redis.zadd(set_key, group) + end + end + end + end + + def queue_size + ActiveContext::Redis.with_redis do |redis| + queue_shards.sum do |shard_number| + redis.zcard(redis_set_key(shard_number)) + end + end + end + + def queued_items + {}.tap do |hash| + ActiveContext::Redis.with_redis do |redis| + each_queued_items_by_shard(redis) do |shard_number, specs| + hash[shard_number] = specs unless specs.empty? + end + end + end + end + + def each_queued_items_by_shard(redis, shards: queue_shards) + (shards & queue_shards).each do |shard_number| + set_key = redis_set_key(shard_number) + specs = redis.zrangebyscore(set_key, '-inf', '+inf', limit: [0, shard_limit], with_scores: true) + + yield shard_number, specs + end + end + + def clear_tracking! + ActiveContext::Redis.with_redis do |redis| + ::Gitlab::Instrumentation::RedisClusterValidator.allow_cross_slot_commands do + keys = queue_shards.map { |m| [redis_set_key(m), redis_score_key(m)] }.flatten # rubocop:disable Performance/FlatMap -- more than one level + + if ::Gitlab::Redis::ClusterUtil.cluster?(redis) + ::Gitlab::Redis::ClusterUtil.batch_unlink(keys, redis) + else + redis.unlink(*keys) + end + end + end + end + + def queue_shards + 0.upto(number_of_shards - 1).to_a + end + + def shard_limit + SHARD_LIMIT + end + def redis_key "#{prefix}:{#{queue_name}}" end + def redis_set_key(shard_number) + "#{redis_key}:#{shard_number}:zset" + end + + def redis_score_key(shard_number) + "#{redis_key}:#{shard_number}:score" + end + def queue_name name_elements[-1].underscore end diff --git a/gems/gitlab-active-context/lib/active_context/queues.rb b/gems/gitlab-active-context/lib/active_context/queues.rb index 6e8a770fe4cc638419c1023e7b60c5e4f10ac2ce..d2c04e45b99be746b79b8d097a476c53e5f9a88e 100644 --- a/gems/gitlab-active-context/lib/active_context/queues.rb +++ b/gems/gitlab-active-context/lib/active_context/queues.rb @@ -22,5 +22,17 @@ def self.register!(key, shards:) @raw_queues << "#{key}:#{shard}" end end + + def self.all_queued_items + {}.tap do |hash| + @raw_queues&.each do |queue_key| + references = ActiveContext::Redis.with_redis do |redis| + queue_key = "#{queue_key}:zset" + redis.zrangebyscore(queue_key, '-inf', '+inf') + end + hash[queue_key] = references if references.present? + end + end + end end end diff --git a/gems/gitlab-active-context/lib/active_context/redis.rb b/gems/gitlab-active-context/lib/active_context/redis.rb new file mode 100644 index 0000000000000000000000000000000000000000..fb4a4a37772d5f721c68eab67f73cee2c6ecfc4d --- /dev/null +++ b/gems/gitlab-active-context/lib/active_context/redis.rb @@ -0,0 +1,9 @@ +# frozen_string_literal: true + +module ActiveContext + class Redis + def self.with_redis(&blk) + Gitlab::Redis::SharedState.with(&blk) + end + end +end diff --git a/gems/gitlab-active-context/lib/active_context/shard.rb b/gems/gitlab-active-context/lib/active_context/shard.rb new file mode 100644 index 0000000000000000000000000000000000000000..c3a628436b7975caab29badcec56cd6c34a43406 --- /dev/null +++ b/gems/gitlab-active-context/lib/active_context/shard.rb @@ -0,0 +1,9 @@ +# frozen_string_literal: true + +module ActiveContext + class Shard + def self.shard_number(number_of_shards, data) + Digest::SHA256.hexdigest(data).hex % number_of_shards # rubocop: disable Fips/OpenSSL -- used for data distribution, not for security + end + end +end diff --git a/gems/gitlab-active-context/lib/active_context/tracker.rb b/gems/gitlab-active-context/lib/active_context/tracker.rb new file mode 100644 index 0000000000000000000000000000000000000000..85acf90a5fe66c14d6f3d7cf4ed70abacc8d5509 --- /dev/null +++ b/gems/gitlab-active-context/lib/active_context/tracker.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true + +module ActiveContext + class Tracker + class << self + def track!(*objects, collection:, queue: nil) + references = collect_references(objects.flatten, collection) + + return 0 if references.empty? + + queue_to_use = queue || collection.queue + + queue_to_use.push(references) + + references.count + end + + private + + def collect_references(objects, collection) + objects.flat_map do |obj| + collection.new(obj).references + end + end + end + end +end diff --git a/gems/gitlab-active-context/spec/lib/active_context/concerns/queue_spec.rb b/gems/gitlab-active-context/spec/lib/active_context/concerns/queue_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..4bc54b611f366a0764d7ab43f51349bb0d2550b6 --- /dev/null +++ b/gems/gitlab-active-context/spec/lib/active_context/concerns/queue_spec.rb @@ -0,0 +1,126 @@ +# frozen_string_literal: true + +RSpec.describe ActiveContext::Concerns::Queue do + let(:mock_queue_class) do + Class.new do + def self.name + 'MockModule::TestQueue' + end + + def self.number_of_shards + 2 + end + + include ActiveContext::Concerns::Queue + end + end + + let(:redis_double) { instance_double(Redis) } + + before do + clear_all_queues! + allow(ActiveContext::Redis).to receive(:with_redis).and_yield(redis_double) + end + + describe '.register!' do + it 'registers the queue with ActiveContext::Queues' do + expect(ActiveContext::Queues).to receive(:register!).with('mockmodule:{test_queue}', shards: 2) + mock_queue_class + end + end + + describe '.push' do + it 'pushes references to Redis' do + references = %w[ref1 ref2 ref3] + + allow(ActiveContext::Shard).to receive(:shard_number).and_return(0, 1, 0) + expect(redis_double).to receive(:incrby).with('mockmodule:{test_queue}:0:score', 2).and_return(2) + expect(redis_double).to receive(:incrby).with('mockmodule:{test_queue}:1:score', 1).and_return(1) + expect(redis_double).to receive(:zadd).with('mockmodule:{test_queue}:0:zset', [[1, 'ref1'], [2, 'ref3']]) + expect(redis_double).to receive(:zadd).with('mockmodule:{test_queue}:1:zset', [[1, 'ref2']]) + + mock_queue_class.push(references) + end + end + + describe '.queue_size' do + it 'returns the total size of all shards' do + expect(redis_double).to receive(:zcard).with('mockmodule:{test_queue}:0:zset').and_return(5) + expect(redis_double).to receive(:zcard).with('mockmodule:{test_queue}:1:zset').and_return(3) + + expect(mock_queue_class.queue_size).to eq(8) + end + end + + describe '.queued_items' do + it 'returns items from all non-empty shards' do + expect(redis_double).to receive(:zrangebyscore) + .with('mockmodule:{test_queue}:0:zset', '-inf', '+inf', limit: [0, anything], with_scores: true) + .and_return([['ref1', 1.0], ['ref2', 2.0]]) + expect(redis_double).to receive(:zrangebyscore) + .with('mockmodule:{test_queue}:1:zset', '-inf', '+inf', limit: [0, anything], with_scores: true) + .and_return([]) + + expect(mock_queue_class.queued_items).to eq({ + 0 => [['ref1', 1.0], ['ref2', 2.0]] + }) + end + end + + describe '.clear_tracking!' do + # rubocop: disable RSpec/VerifiedDoubleReference -- stubbing GitLab logic + let(:redis_cluster_validator) { class_double("Gitlab::Instrumentation::RedisClusterValidator").as_stubbed_const } + let(:redis_cluster_util) { class_double("Gitlab::Redis::ClusterUtil").as_stubbed_const } + # rubocop: enable RSpec/VerifiedDoubleReference + + before do + allow(redis_cluster_validator).to receive(:allow_cross_slot_commands).and_yield + end + + context 'when Redis is not in cluster mode' do + before do + allow(redis_cluster_util).to receive(:cluster?).and_return(false) + end + + it 'calls unlink directly on redis' do + expect(redis_double).to receive(:unlink) + .with( + 'mockmodule:{test_queue}:0:zset', 'mockmodule:{test_queue}:0:score', + 'mockmodule:{test_queue}:1:zset', 'mockmodule:{test_queue}:1:score' + ) + + mock_queue_class.clear_tracking! + end + end + + context 'when Redis is in cluster mode' do + before do + allow(redis_cluster_util).to receive(:cluster?).and_return(true) + end + + it 'calls batch_unlink on ClusterUtil' do + expect(redis_cluster_util).to receive(:batch_unlink) + .with( + [ + 'mockmodule:{test_queue}:0:zset', 'mockmodule:{test_queue}:0:score', + 'mockmodule:{test_queue}:1:zset', 'mockmodule:{test_queue}:1:score' + ], + redis_double + ) + + mock_queue_class.clear_tracking! + end + end + end + + describe '.redis_key' do + it 'returns the correct Redis key' do + expect(mock_queue_class.redis_key).to eq('mockmodule:{test_queue}') + end + end + + def clear_all_queues! + ActiveContext::Queues.instance_variable_set(:@queues, Set.new) + ActiveContext::Queues.instance_variable_set(:@raw_queues, []) + end +end diff --git a/gems/gitlab-active-context/spec/lib/active_context/tracker_spec.rb b/gems/gitlab-active-context/spec/lib/active_context/tracker_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..5e940030037274a4d2380ed6b011b737cdb69233 --- /dev/null +++ b/gems/gitlab-active-context/spec/lib/active_context/tracker_spec.rb @@ -0,0 +1,91 @@ +# frozen_string_literal: true + +RSpec.describe ActiveContext::Tracker do + let(:mock_collection) do + Class.new do + include ActiveContext::Concerns::Collection + + def self.queue + @queue ||= [] + end + + def references + ["ref_#{object}"] + end + end + end + + let(:mock_queue) { [] } + + describe '.track!' do + context 'with single object' do + it 'tracks references and returns count' do + result = described_class.track!('test', collection: mock_collection) + + expect(result).to eq(1) + expect(mock_collection.queue).to contain_exactly(['ref_test']) + end + end + + context 'with multiple objects' do + it 'tracks references for all objects and returns total count' do + result = described_class.track!('test1', 'test2', collection: mock_collection) + + expect(result).to eq(2) + expect(mock_collection.queue).to contain_exactly(%w[ref_test1 ref_test2]) + end + end + + context 'with nested arrays' do + it 'flattens arrays and tracks all references' do + result = described_class.track!(['test1', %w[test2 test3]], collection: mock_collection) + + expect(result).to eq(3) + expect(mock_collection.queue).to contain_exactly(%w[ref_test1 ref_test2 ref_test3]) + end + end + + context 'with empty input' do + it 'returns zero and does not modify queue' do + result = described_class.track!([], collection: mock_collection) + + expect(result).to eq(0) + expect(mock_collection.queue).to be_empty + end + end + + context 'with custom queue' do + it 'uses provided queue instead of collection queue' do + result = described_class.track!('test', collection: mock_collection, queue: mock_queue) + + expect(result).to eq(1) + expect(mock_queue).to contain_exactly(['ref_test']) + expect(mock_collection.queue).to be_empty + end + end + + context 'when collection does not implement queue method' do + let(:invalid_collection) do + Class.new do + include ActiveContext::Concerns::Collection + + def references + ["ref"] + end + end + end + + it 'raises NotImplementedError' do + expect do + described_class.track!('test', collection: invalid_collection) + end.to raise_error(NotImplementedError) + end + end + end + + describe '.collect_references' do + it 'is a private method' do + expect(described_class.private_methods).to include(:collect_references) + end + end +end diff --git a/gems/gitlab-active-context/spec/spec_helper.rb b/gems/gitlab-active-context/spec/spec_helper.rb index dcfec747cbbb1e1e1b94a35487cd5e2cb08a7608..97e31cc879efcb1f7eb777bec6f7b49c569329d1 100644 --- a/gems/gitlab-active-context/spec/spec_helper.rb +++ b/gems/gitlab-active-context/spec/spec_helper.rb @@ -5,6 +5,8 @@ require 'elasticsearch' require 'opensearch' require 'aws-sdk-core' +require 'active_support/concern' +require 'redis' RSpec.configure do |config| # Enable flags like --only-failures and --next-failure