Skip to content
代码片段 群组 项目
未验证 提交 010dc111 编辑于 作者: Madelein van Niekerk's avatar Madelein van Niekerk 提交者: GitLab
浏览文件

ActiveContext Find partition by serializing collection

Changelog: added
上级 db635af7
No related branches found
No related tags found
无相关合并请求
显示
175 个添加47 个删除
...@@ -8,6 +8,10 @@ class Collection < ApplicationRecord ...@@ -8,6 +8,10 @@ class Collection < ApplicationRecord
validates :name, presence: true, length: { maximum: 255 } validates :name, presence: true, length: { maximum: 255 }
validates :metadata, json_schema: { filename: 'ai_active_context_collection_metadata' } validates :metadata, json_schema: { filename: 'ai_active_context_collection_metadata' }
validates :number_of_partitions, presence: true, numericality: { greater_than_or_equal_to: 1, only_integer: true } validates :number_of_partitions, presence: true, numericality: { greater_than_or_equal_to: 1, only_integer: true }
def partition_for(routing_value)
::ActiveContext::Hash.consistent_hash(number_of_partitions, routing_value)
end
end end
end end
end end
...@@ -25,4 +25,29 @@ ...@@ -25,4 +25,29 @@
expect(collection.errors[:metadata]).to include('must be a valid json schema') expect(collection.errors[:metadata]).to include('must be a valid json schema')
end end
end end
describe '.partition_for' do
using RSpec::Parameterized::TableSyntax
let(:collection) { create(:ai_active_context_collection, number_of_partitions: 5) }
where(:routing_value, :partition_number) do
1 | 0
2 | 1
3 | 3
4 | 2
5 | 3
6 | 3
7 | 4
8 | 4
9 | 2
10 | 2
end
with_them do
it 'always returns the same partition for a routing value' do
expect(collection.partition_for(routing_value)).to eq(partition_number)
end
end
end
end end
# frozen_string_literal: true
module ActiveContext
module CollectionCache
class << self
TTL = 1.minute
def collections
refresh_cache if cache_expired?
@collections ||= {}
end
def fetch(value)
by_id(value) || by_name(value)
end
def by_id(id)
collections[id]
end
def by_name(name)
collections.values.find { |collection| collection.name == name.to_s }
end
private
def cache_expired?
return true unless @last_refreshed_at
Time.current - @last_refreshed_at > TTL
end
def refresh_cache
new_collections = {}
Config.collection_model.find_each do |record|
new_collections[record.id] = record
end
@collections = new_collections
@last_refreshed_at = Time.current
end
end
end
end
...@@ -10,6 +10,10 @@ def track!(*objects) ...@@ -10,6 +10,10 @@ def track!(*objects)
ActiveContext::Tracker.track!(objects, collection: self) ActiveContext::Tracker.track!(objects, collection: self)
end end
def collection_name
raise NotImplementedError
end
def queue def queue
raise NotImplementedError raise NotImplementedError
end end
...@@ -27,6 +31,10 @@ def reference_klasses ...@@ -27,6 +31,10 @@ def reference_klasses
def reference_klass def reference_klass
nil nil
end end
def collection_record
ActiveContext::CollectionCache.fetch(collection_name)
end
end end
attr_reader :object attr_reader :object
...@@ -38,9 +46,10 @@ def initialize(object) ...@@ -38,9 +46,10 @@ def initialize(object)
def references def references
reference_klasses = Array.wrap(self.class.reference_klasses) reference_klasses = Array.wrap(self.class.reference_klasses)
routing = self.class.routing(object) routing = self.class.routing(object)
collection_id = self.class.collection_record.id
reference_klasses.map do |reference_klass| reference_klasses.map do |reference_klass|
reference_klass.serialize(object, routing) reference_klass.serialize(collection_id, routing, object)
end end
end end
end end
......
...@@ -31,7 +31,7 @@ def register! ...@@ -31,7 +31,7 @@ def register!
end end
def push(references) def push(references)
refs_by_shard = references.group_by { |ref| ActiveContext::Shard.shard_number(number_of_shards, ref) } refs_by_shard = references.group_by { |ref| ActiveContext::Hash.consistent_hash(number_of_shards, ref) }
ActiveContext::Redis.with_redis do |redis| ActiveContext::Redis.with_redis do |redis|
refs_by_shard.each do |shard_number, shard_items| refs_by_shard.each do |shard_number, shard_items|
......
...@@ -2,7 +2,15 @@ ...@@ -2,7 +2,15 @@
module ActiveContext module ActiveContext
class Config class Config
Cfg = Struct.new(:enabled, :databases, :logger, :indexing_enabled, :re_enqueue_indexing_workers, :migrations_path) Cfg = Struct.new(
:enabled,
:databases,
:logger,
:indexing_enabled,
:re_enqueue_indexing_workers,
:migrations_path,
:collection_model
)
class << self class << self
def configure(&block) def configure(&block)
...@@ -25,6 +33,10 @@ def migrations_path ...@@ -25,6 +33,10 @@ def migrations_path
current.migrations_path || Rails.root.join('ee/db/active_context/migrate') current.migrations_path || Rails.root.join('ee/db/active_context/migrate')
end end
def collection_model
current.collection_model || ::Ai::ActiveContext::Collection
end
def logger def logger
current.logger || ::Logger.new($stdout) current.logger || ::Logger.new($stdout)
end end
......
...@@ -20,10 +20,18 @@ def create_collection(name, number_of_partitions:, &block) ...@@ -20,10 +20,18 @@ def create_collection(name, number_of_partitions:, &block)
number_of_partitions: number_of_partitions, number_of_partitions: number_of_partitions,
fields: builder.fields fields: builder.fields
) )
create_collection_record(full_name, number_of_partitions)
end end
private private
def create_collection_record(name, number_of_partitions)
collection = Config.collection_model.find_or_initialize_by(name: name)
collection.update(number_of_partitions: number_of_partitions)
collection.save!
end
def do_create_collection(...) def do_create_collection(...)
raise NotImplementedError raise NotImplementedError
end end
......
# frozen_string_literal: true
module ActiveContext
class Hash
def self.consistent_hash(number, data)
data = data.to_s unless data.is_a?(String)
Digest::SHA256.hexdigest(data).hex % number # rubocop: disable Fips/OpenSSL -- used for data distribution, not for security
end
end
end
...@@ -9,20 +9,18 @@ class Reference ...@@ -9,20 +9,18 @@ class Reference
class << self class << self
def deserialize(string) def deserialize(string)
ref_klass = ref_klass(string) ref_klass(string)&.instantiate(string)
if ref_klass
ref_klass.instantiate(string)
else
Search::Elastic::Reference.deserialize(string)
end
end end
def instantiate(string) def instantiate(string)
new(*deserialize_string(string)) new(*deserialize_string(string))
end end
def serialize def serialize(collection_id, routing, data)
new(collection_id, routing, *serialize_data(data)).serialize
end
def serialize_data
raise NotImplementedError raise NotImplementedError
end end
...@@ -35,23 +33,37 @@ def preprocess_references(refs) ...@@ -35,23 +33,37 @@ def preprocess_references(refs)
end end
end end
attr_reader :collection_id, :collection, :routing, :serialized_args
def initialize(collection_id, routing, *serialized_args)
@collection_id = collection_id.to_i
@collection = ActiveContext::CollectionCache.fetch(@collection_id)
@routing = routing
@serialized_args = serialized_args
init
end
def klass def klass
self.class.klass self.class.klass
end end
def serialize def serialize
self.class.join_delimited([collection_id, routing, serialize_arguments].flatten.compact)
end
def init
raise NotImplementedError raise NotImplementedError
end end
def as_indexed_json def serialize_arguments
raise NotImplementedError raise NotImplementedError
end end
def operation def as_indexed_json
raise NotImplementedError raise NotImplementedError
end end
def partition_name def operation
raise NotImplementedError raise NotImplementedError
end end
...@@ -59,8 +71,12 @@ def identifier ...@@ -59,8 +71,12 @@ def identifier
raise NotImplementedError raise NotImplementedError
end end
def routing def partition_name
nil collection.name
end
def partition_number
collection.partition_for(routing)
end end
end end
end end
# frozen_string_literal: true
module ActiveContext
class Shard
def self.shard_number(number_of_shards, data)
Digest::SHA256.hexdigest(data).hex % number_of_shards # rubocop: disable Fips/OpenSSL -- used for data distribution, not for security
end
end
end
...@@ -36,7 +36,7 @@ def self.number_of_shards ...@@ -36,7 +36,7 @@ def self.number_of_shards
it 'pushes references to Redis' do it 'pushes references to Redis' do
references = %w[ref1 ref2 ref3] references = %w[ref1 ref2 ref3]
allow(ActiveContext::Shard).to receive(:shard_number).and_return(0, 1, 0) allow(ActiveContext::Hash).to receive(:consistent_hash).and_return(0, 1, 0)
expect(redis_double).to receive(:incrby).with('mockmodule:{test_queue}:0:score', 2).and_return(2) expect(redis_double).to receive(:incrby).with('mockmodule:{test_queue}:0:score', 2).and_return(2)
expect(redis_double).to receive(:incrby).with('mockmodule:{test_queue}:1:score', 1).and_return(1) expect(redis_double).to receive(:incrby).with('mockmodule:{test_queue}:1:score', 1).and_return(1)
expect(redis_double).to receive(:zadd).with('mockmodule:{test_queue}:0:zset', [[1, 'ref1'], [2, 'ref3']]) expect(redis_double).to receive(:zadd).with('mockmodule:{test_queue}:0:zset', [[1, 'ref1'], [2, 'ref3']])
......
...@@ -72,6 +72,32 @@ ...@@ -72,6 +72,32 @@
end end
end end
describe '.collection_model' do
before do
stub_const('Ai::ActiveContext::Collection', Class.new)
end
context 'when collection_model is not set' do
it 'returns the default model' do
expect(described_class.collection_model).to eq(::Ai::ActiveContext::Collection)
end
end
context 'when collection_model is set' do
let(:custom_model) { Class.new }
before do
described_class.configure do |config|
config.collection_model = custom_model
end
end
it 'returns the configured collection model' do
expect(described_class.collection_model).to eq(custom_model)
end
end
end
describe '.logger' do describe '.logger' do
context 'when logger is not set' do context 'when logger is not set' do
it 'returns a default stdout logger' do it 'returns a default stdout logger' do
......
...@@ -23,9 +23,8 @@ ...@@ -23,9 +23,8 @@
stub_const('Search::Elastic::Reference', Class.new) stub_const('Search::Elastic::Reference', Class.new)
end end
it 'falls back to Search::Elastic::Reference.deserialize' do it 'returns nil' do
expect(Search::Elastic::Reference).to receive(:deserialize).with('test|string') expect(described_class.deserialize('test|string')).to be_nil
described_class.deserialize('test|string')
end end
end end
end end
...@@ -45,12 +44,6 @@ ...@@ -45,12 +44,6 @@
end end
end end
describe '#klass' do
it 'returns the demodulized class name' do
expect(described_class.new.klass).to eq('Reference')
end
end
describe 'ReferenceUtils methods' do describe 'ReferenceUtils methods' do
describe '.delimit' do describe '.delimit' do
it 'splits the string by the delimiter' do it 'splits the string by the delimiter' do
......
...@@ -30,18 +30,6 @@ def references ...@@ -30,18 +30,6 @@ def references
expect(mock_queue).to contain_exactly(['test_string']) expect(mock_queue).to contain_exactly(['test_string'])
end end
it 'serializes ActiveContext::Reference objects' do
reference_class = Class.new(ActiveContext::Reference) do
def serialize
'serialized_reference'
end
end
reference = reference_class.new
expect(described_class.track!(reference, collection: mock_collection)).to eq(1)
expect(mock_queue).to contain_exactly(['serialized_reference'])
end
it 'uses collection.references for other objects' do it 'uses collection.references for other objects' do
obj = double('SomeObject') obj = double('SomeObject')
collection_instance = instance_double('CollectionInstance') collection_instance = instance_double('CollectionInstance')
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册