diff --git a/config/initializers/1_settings.rb b/config/initializers/1_settings.rb index 0cf9726e1ca657696aa5e846773ccb835c184101..efa9b7626b286b2f60b8f9a75a11bb78133abc00 100644 --- a/config/initializers/1_settings.rb +++ b/config/initializers/1_settings.rb @@ -984,6 +984,9 @@ Settings.cron_jobs['delete_expired_dependency_exports_worker'] ||= {} Settings.cron_jobs['delete_expired_dependency_exports_worker']['cron'] ||= '0 4 * * *' Settings.cron_jobs['delete_expired_dependency_exports_worker']['job_class'] = 'Sbom::DeleteExpiredExportsWorker' + Settings.cron_jobs['analytics_dump_ai_user_metrics_database_write_buffer_cron_worker'] ||= {} + Settings.cron_jobs['analytics_dump_ai_user_metrics_database_write_buffer_cron_worker']['cron'] ||= "*/10 * * * *" + Settings.cron_jobs['analytics_dump_ai_user_metrics_database_write_buffer_cron_worker']['job_class'] = 'Analytics::DumpAiUserMetricsWriteBufferCronWorker' Gitlab.com do Settings.cron_jobs['disable_legacy_open_source_license_for_inactive_projects'] ||= {} diff --git a/ee/app/models/ai/user_metrics.rb b/ee/app/models/ai/user_metrics.rb index 02c1b0df2d34abb04ea4f99df6c7eec54f4de361..7bab16efdde13af02d1effec736b7760dc671abc 100644 --- a/ee/app/models/ai/user_metrics.rb +++ b/ee/app/models/ai/user_metrics.rb @@ -2,8 +2,18 @@ module Ai class UserMetrics < ApplicationRecord + include Analytics::HasWriteBuffer + self.table_name = 'ai_user_metrics' + self.write_buffer_options = { class: Analytics::AiUserMetricsDatabaseWriteBuffer } + belongs_to :user, optional: false + + validates :last_duo_activity_on, presence: true + + def self.refresh_last_activity_on(user, last_duo_activity_on: Time.current) + write_buffer.add({ user_id: user.id, last_duo_activity_on: last_duo_activity_on }) + end end end diff --git a/ee/app/models/concerns/analytics/has_write_buffer.rb b/ee/app/models/concerns/analytics/has_write_buffer.rb new file mode 100644 index 0000000000000000000000000000000000000000..5f779cad4677e42f784dfb041b2f464bfd409ff7 --- /dev/null +++ b/ee/app/models/concerns/analytics/has_write_buffer.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +module Analytics + module HasWriteBuffer + extend ActiveSupport::Concern + + class_methods do + def write_buffer + @buffer ||= write_buffer_options[:class].new(buffer_key: name.underscore) + end + + attr_writer :write_buffer_options + + def write_buffer_options + default_write_buffer_options.merge(@write_buffer_options || {}) + end + + def default_write_buffer_options + { + class: Analytics::DatabaseWriteBuffer + } + end + end + end +end diff --git a/ee/app/workers/all_queues.yml b/ee/app/workers/all_queues.yml index 4f9bada39629695055e2ca8f6fa5258534f9d41f..0b04fbe7c9d33f9e238360b2ca46df645b22aa53 100644 --- a/ee/app/workers/all_queues.yml +++ b/ee/app/workers/all_queues.yml @@ -102,6 +102,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: cronjob:analytics_dump_ai_user_metrics_write_buffer_cron + :worker_name: Analytics::DumpAiUserMetricsWriteBufferCronWorker + :feature_category: :database + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: cronjob:analytics_value_stream_dashboard_count :worker_name: Analytics::ValueStreamDashboard::CountWorker :feature_category: :value_stream_management diff --git a/ee/app/workers/analytics/dump_ai_user_metrics_write_buffer_cron_worker.rb b/ee/app/workers/analytics/dump_ai_user_metrics_write_buffer_cron_worker.rb new file mode 100644 index 0000000000000000000000000000000000000000..69de012ff110d5548b780cac9c537fe0168227dd --- /dev/null +++ b/ee/app/workers/analytics/dump_ai_user_metrics_write_buffer_cron_worker.rb @@ -0,0 +1,32 @@ +# frozen_string_literal: true + +module Analytics + class DumpAiUserMetricsWriteBufferCronWorker + include ApplicationWorker + include WriteBufferProcessorWorker + + idempotent! + queue_namespace :cronjob + data_consistency :delayed + feature_category :database + + MAX_RUNTIME = 200.seconds + BATCH_SIZE = 1000 + + def perform + @current_model = Ai::UserMetrics + super + end + + private + + def upsert_options + { + unique_by: %i[user_id], + on_duplicate: Arel.sql(<<~SQL.squish) + last_duo_activity_on = GREATEST(excluded.last_duo_activity_on, ai_user_metrics.last_duo_activity_on) + SQL + } + end + end +end diff --git a/ee/app/workers/concerns/analytics/write_buffer_processor_worker.rb b/ee/app/workers/concerns/analytics/write_buffer_processor_worker.rb new file mode 100644 index 0000000000000000000000000000000000000000..2b094d4a72865cf4d51dbe2265b9d1311b58379b --- /dev/null +++ b/ee/app/workers/concerns/analytics/write_buffer_processor_worker.rb @@ -0,0 +1,56 @@ +# frozen_string_literal: true + +module Analytics + module WriteBufferProcessorWorker + extend ActiveSupport::Concern + include LoopWithRuntimeLimit + + included do + attr_reader :current_model + end + + def perform + total_inserted_rows = 0 + + status = loop_with_runtime_limit(self.class::MAX_RUNTIME) do + inserted_rows = process_next_batch + break :processed if inserted_rows == 0 + + total_inserted_rows += inserted_rows + end + + log_extra_metadata_on_done(:result, { + status: status, + inserted_rows: total_inserted_rows + }) + end + + private + + def process_next_batch + valid_objects = prepare_batch_objects(next_batch) + + return 0 if valid_objects.empty? + + grouped_attributes = prepare_attributes(valid_objects).group_by(&:keys).values + + grouped_attributes.sum do |attributes| + res = current_model.upsert_all(attributes, **upsert_options) + + res ? res.rows.size : 0 + end + end + + def next_batch + current_model.write_buffer.pop(self.class::BATCH_SIZE) + end + + def prepare_attributes(valid_objects) + valid_objects.map { |obj| obj.attributes.compact }.reject(&:empty?) + end + + def prepare_batch_objects(batch) + batch.map { |attrs| current_model.new(attrs) }.select(&:valid?) + end + end +end diff --git a/ee/app/workers/usage_events/dump_write_buffer_cron_worker.rb b/ee/app/workers/usage_events/dump_write_buffer_cron_worker.rb index d507a3f99360ce3f2a1b6da23fcb60b8ad51cb8f..1fad6ac2ca708b57722f1267454feabb670c1be5 100644 --- a/ee/app/workers/usage_events/dump_write_buffer_cron_worker.rb +++ b/ee/app/workers/usage_events/dump_write_buffer_cron_worker.rb @@ -3,7 +3,7 @@ module UsageEvents class DumpWriteBufferCronWorker include ApplicationWorker - include LoopWithRuntimeLimit + include ::Analytics::WriteBufferProcessorWorker idempotent! queue_namespace :cronjob @@ -24,7 +24,7 @@ def perform status = loop_with_runtime_limit(MAX_RUNTIME) do inserted_rows = process_next_batch if inserted_rows == 0 - break :processed if @current_model == MODELS.last + break :processed if current_model == MODELS.last current_model_index += 1 @current_model = MODELS[current_model_index] @@ -41,25 +41,14 @@ def perform private - def process_next_batch - valid_attributes = next_batch.filter_map do |attributes| - event = @current_model.new(attributes) - next unless event.valid? - - event.attributes.compact - end - - grouped_attributes = valid_attributes.group_by(&:keys).values - - grouped_attributes.sum do |attributes| - res = @current_model.insert_all(attributes, unique_by: %i[id timestamp]) unless attributes.empty? - - res ? res.rows.size : 0 - end + def next_batch + Ai::UsageEventWriteBuffer.pop(current_model.name, BATCH_SIZE) end - def next_batch - Ai::UsageEventWriteBuffer.pop(@current_model.name, BATCH_SIZE) + def upsert_options + { + unique_by: %i[id timestamp] + } end end end diff --git a/ee/lib/ai/usage_event_write_buffer.rb b/ee/lib/ai/usage_event_write_buffer.rb index fb34db1c724ac3c1d68c656aa5bb550172fa5856..a94de52ec15a1c7a3ad61da73b02f1d6f5da5088 100644 --- a/ee/lib/ai/usage_event_write_buffer.rb +++ b/ee/lib/ai/usage_event_write_buffer.rb @@ -2,7 +2,7 @@ module Ai module UsageEventWriteBuffer - include Gitlab::Redis::BackwardsCompatibility + extend Gitlab::Redis::BackwardsCompatibility BUFFER_KEY_PREFIX = 'usage_event_write_buffer_' diff --git a/ee/lib/analytics/ai_user_metrics_database_write_buffer.rb b/ee/lib/analytics/ai_user_metrics_database_write_buffer.rb new file mode 100644 index 0000000000000000000000000000000000000000..347f5a31303770cb3d48b3510ec0b9c272c66740 --- /dev/null +++ b/ee/lib/analytics/ai_user_metrics_database_write_buffer.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +module Analytics + # Stores last attributes set for each `user_id` in Redis hash. + class AiUserMetricsDatabaseWriteBuffer < DatabaseWriteBuffer + def add(attributes) + hkey = attributes[:user_id] + + Gitlab::Redis::SharedState.with do |redis| + redis.hset(buffer_key, hkey.to_s, attributes.to_json) + end + end + + def pop(limit) + Gitlab::Redis::SharedState.with do |redis| + keys = redis.hkeys(buffer_key)[0..(limit - 1)] + + next [] if keys.empty? + + attributes, _deletes = redis.pipelined do |pipeline| + pipeline.hmget(buffer_key, *keys) + pipeline.hdel(buffer_key, keys) + end + + attributes.compact.map { |attrs| Gitlab::Json.parse(attrs) } + end + end + end +end diff --git a/ee/lib/analytics/database_write_buffer.rb b/ee/lib/analytics/database_write_buffer.rb new file mode 100644 index 0000000000000000000000000000000000000000..d244b09999d9adad0c145d5f217ca998e54c6f00 --- /dev/null +++ b/ee/lib/analytics/database_write_buffer.rb @@ -0,0 +1,32 @@ +# frozen_string_literal: true + +# This class is used to put all writes to a buffer to avoid individual frequent writes +# and offload main DB. +# It's used mainly for frequent events like code suggestions telemetry. +module Analytics + class DatabaseWriteBuffer + include Gitlab::Redis::BackwardsCompatibility + + BUFFER_KEY_POSTFIX = "_db_write_buffer" + + def initialize(buffer_key:) + @buffer_key = buffer_key + end + + def add(attributes) + Gitlab::Redis::SharedState.with do |redis| + redis.rpush(buffer_key, attributes.to_json) + end + end + + def pop(limit) + Array.wrap(lpop_with_limit(buffer_key, limit)).map { |hash| Gitlab::Json.parse(hash) } + end + + private + + def buffer_key + @buffer_key + BUFFER_KEY_POSTFIX + end + end +end diff --git a/ee/spec/lib/analytics/ai_user_metrics_database_write_buffer_spec.rb b/ee/spec/lib/analytics/ai_user_metrics_database_write_buffer_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..23af20d3cb176c4958fd5233c5ffe5756317b96c --- /dev/null +++ b/ee/spec/lib/analytics/ai_user_metrics_database_write_buffer_spec.rb @@ -0,0 +1,48 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Analytics::AiUserMetricsDatabaseWriteBuffer, feature_category: :devops_reports do + let(:model_name) { 'test_model' } + + subject(:buffer) { described_class.new(buffer_key: model_name) } + + describe '#add', :clean_gitlab_redis_shared_state do + it 'adds given attributes json to test_model_db_write_buffer redis hash' do + buffer.add({ user_id: '1', foo: 'bar' }) + + Gitlab::Redis::SharedState.with do |redis| + expect(redis.hgetall('test_model_db_write_buffer')).to eq({ '1' => { user_id: '1', foo: 'bar' }.to_json }) + end + end + + it 'refreshes attributes json if hash already exists' do + buffer.add({ user_id: 1, foo: 'bar' }) + buffer.add({ user_id: 1, foo: 'baz' }) + + Gitlab::Redis::SharedState.with do |redis| + expect(redis.hgetall('test_model_db_write_buffer')).to eq({ '1' => { user_id: 1, foo: 'baz' }.to_json }) + end + end + end + + describe '#pop', :clean_gitlab_redis_shared_state do + it 'pops limited array of elements from test_model_db_write_buffer key' do + buffer.add({ user_id: 1, foo: 'bar' }) + buffer.add({ user_id: 2, foo: 'baz' }) + buffer.add({ user_id: 3, foo: 'bad' }) + + Gitlab::Redis::SharedState.with do |redis| + expect(redis.hlen('test_model_db_write_buffer')).to eq(3) + end + expect(buffer.pop(2)).to eq([{ 'user_id' => 1, 'foo' => 'bar' }, { 'user_id' => 2, 'foo' => 'baz' }]) + Gitlab::Redis::SharedState.with do |redis| + expect(redis.hlen('test_model_db_write_buffer')).to eq(1) + end + expect(buffer.pop(2)).to eq([{ 'user_id' => 3, 'foo' => 'bad' }]) + Gitlab::Redis::SharedState.with do |redis| + expect(redis.hlen('test_model_db_write_buffer')).to eq(0) + end + end + end +end diff --git a/ee/spec/lib/analytics/database_write_buffer_spec.rb b/ee/spec/lib/analytics/database_write_buffer_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..603c29d14ecbf3c46630a1d3afdac76442fd49a2 --- /dev/null +++ b/ee/spec/lib/analytics/database_write_buffer_spec.rb @@ -0,0 +1,39 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Analytics::DatabaseWriteBuffer, feature_category: :devops_reports do + let(:model_name) { 'test_model' } + + subject(:buffer) { described_class.new(buffer_key: model_name) } + + describe '#add', :clean_gitlab_redis_shared_state do + it 'adds given attributes json to test_model_db_write_buffer redis list' do + buffer.add({ foo: 'bar' }) + + Gitlab::Redis::SharedState.with do |redis| + expect(redis.lindex('test_model_db_write_buffer', 0)).to eq({ foo: 'bar' }.to_json) + end + end + end + + describe '#pop', :clean_gitlab_redis_shared_state do + it 'pops limited array of elements from test_model_db_write_buffer key' do + buffer.add({ foo: 'bar' }) + buffer.add({ foo: 'baz' }) + buffer.add({ foo: 'bad' }) + + Gitlab::Redis::SharedState.with do |redis| + expect(redis.llen('test_model_db_write_buffer')).to eq(3) + end + expect(buffer.pop(2)).to eq([{ 'foo' => 'bar' }, { 'foo' => 'baz' }]) + Gitlab::Redis::SharedState.with do |redis| + expect(redis.llen('test_model_db_write_buffer')).to eq(1) + end + expect(buffer.pop(2)).to eq([{ 'foo' => 'bad' }]) + Gitlab::Redis::SharedState.with do |redis| + expect(redis.llen('test_model_db_write_buffer')).to eq(0) + end + end + end +end diff --git a/ee/spec/models/ai/user_metrics_spec.rb b/ee/spec/models/ai/user_metrics_spec.rb index 5e0566b99904e4b903fceed430d076a629575345..b6b71273bb82e627e37bed8b31594326d436bec1 100644 --- a/ee/spec/models/ai/user_metrics_spec.rb +++ b/ee/spec/models/ai/user_metrics_spec.rb @@ -4,4 +4,30 @@ RSpec.describe Ai::UserMetrics, feature_category: :ai_abstraction_layer do it { is_expected.to belong_to(:user).required } + + it { is_expected.to validate_presence_of(:last_duo_activity_on) } + + describe '.write_buffer' do + it 'returns instance of AiUserMetricsDatabaseWriteBuffer' do + expect(described_class.write_buffer).to be_instance_of(Analytics::AiUserMetricsDatabaseWriteBuffer) + end + end + + describe '.refresh_last_activity_on', :freeze_time do + let_it_be(:user) { build_stubbed(:user) } + + it 'adds current timestamp to model buffer' do + expect(described_class.write_buffer).to receive(:add).with({ user_id: user.id, +last_duo_activity_on: Time.current }) + + described_class.refresh_last_activity_on(user) + end + + it 'respects custom timestamp if provided' do + expect(described_class.write_buffer).to receive(:add).with({ user_id: user.id, +last_duo_activity_on: 1.minute.ago }) + + described_class.refresh_last_activity_on(user, last_duo_activity_on: 1.minute.ago) + end + end end diff --git a/ee/spec/workers/analytics/dump_ai_user_metrics_write_buffer_cron_worker_spec.rb b/ee/spec/workers/analytics/dump_ai_user_metrics_write_buffer_cron_worker_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..b7cee9667f792176b2e06f92dd33c4568f12d901 --- /dev/null +++ b/ee/spec/workers/analytics/dump_ai_user_metrics_write_buffer_cron_worker_spec.rb @@ -0,0 +1,86 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Analytics::DumpAiUserMetricsWriteBufferCronWorker, :clean_gitlab_redis_shared_state, feature_category: :value_stream_management do + let(:job) { described_class.new } + let(:perform) { job.perform } + + let_it_be(:user) { create(:user) } + let_it_be(:user2) { create(:user) } + let_it_be(:user3) { create(:user) } + + let(:inserted_records) { Ai::UserMetrics.all.map(&:attributes) } + + describe "#perform", :freeze_time do + it 'does not insert anything' do + perform + + expect(inserted_records).to be_empty + end + + context "when buffer has data" do + before do + Ai::UserMetrics.write_buffer.add({ user_id: user.id, last_duo_activity_on: 1.day.ago.to_date }) + Ai::UserMetrics.write_buffer.add({ user_id: user2.id, last_duo_activity_on: 2.days.ago.to_date }) + Ai::UserMetrics.write_buffer.add({ user_id: user3.id, last_duo_activity_on: 3.days.ago.to_date }) + end + + it 'upserts all rows' do + status = perform + + expect(status).to eq({ status: :processed, inserted_rows: 3 }) + expect(inserted_records).to match_array([ + hash_including('user_id' => user.id, 'last_duo_activity_on' => 1.day.ago.to_date), + hash_including('user_id' => user2.id, 'last_duo_activity_on' => 2.days.ago.to_date), + hash_including('user_id' => user3.id, 'last_duo_activity_on' => 3.days.ago.to_date) + ]) + end + + context 'when DB has preexisting data' do + before do + Ai::UserMetrics.create!(user_id: user.id, last_duo_activity_on: 10.days.ago.to_date) + Ai::UserMetrics.create!(user_id: user2.id, last_duo_activity_on: 11.days.ago.to_date) + Ai::UserMetrics.create!(user_id: user3.id, last_duo_activity_on: 1.minute.ago.to_date) + end + + it 'updates all rows with older last_duo_activity_on' do + status = perform + + expect(status).to eq({ status: :processed, inserted_rows: 3 }) + expect(inserted_records).to match_array([ + hash_including('user_id' => user.id, 'last_duo_activity_on' => 1.day.ago.to_date), + hash_including('user_id' => user2.id, 'last_duo_activity_on' => 2.days.ago.to_date), + hash_including('user_id' => user3.id, 'last_duo_activity_on' => 1.minute.ago.to_date) + ]) + end + end + + context 'when looping twice' do + it 'upserts all rows' do + stub_const("#{described_class.name}::BATCH_SIZE", 2) + + expect(perform).to eq({ status: :processed, inserted_rows: 3 }) + end + end + + context 'when time limit is up' do + it 'returns over_time status' do + stub_const("#{described_class.name}::BATCH_SIZE", 1) + + allow_next_instance_of(Gitlab::Metrics::RuntimeLimiter) do |limiter| + allow(limiter).to receive(:over_time?).and_return(false, true) + end + + status = perform + + expect(status).to eq({ status: :over_time, inserted_rows: 2 }) + expect(inserted_records).to match([ + hash_including('user_id' => user.id, 'last_duo_activity_on' => 1.day.ago.to_date), + hash_including('user_id' => user2.id, 'last_duo_activity_on' => 2.days.ago.to_date) + ]) + end + end + end + end +end diff --git a/ee/spec/workers/usage_events/dump_write_buffer_cron_worker_spec.rb b/ee/spec/workers/usage_events/dump_write_buffer_cron_worker_spec.rb index 7ababd8c7f7523fa01e3fdee9aa26dae7066449c..1cbef5a4582f57f6703c479d45920b45ad710e2a 100644 --- a/ee/spec/workers/usage_events/dump_write_buffer_cron_worker_spec.rb +++ b/ee/spec/workers/usage_events/dump_write_buffer_cron_worker_spec.rb @@ -2,7 +2,7 @@ require 'spec_helper' -RSpec.describe UsageEvents::DumpWriteBufferCronWorker, :clean_gitlab_redis_cache, feature_category: :value_stream_management do +RSpec.describe UsageEvents::DumpWriteBufferCronWorker, :clean_gitlab_redis_shared_state, feature_category: :value_stream_management do let_it_be(:organization) { create(:organization) } let(:job) { described_class.new } let(:perform) { job.perform } @@ -86,7 +86,7 @@ def add_to_buffer(attributes, model = Ai::CodeSuggestionEvent) end it 'inserts all rows by attribute groups' do - expect(Ai::DuoChatEvent).to receive(:insert_all).twice.and_call_original + expect(Ai::DuoChatEvent).to receive(:upsert_all).twice.and_call_original expect(perform).to eq({ status: :processed, inserted_rows: 3 }) end end diff --git a/lib/click_house/write_buffer.rb b/lib/click_house/write_buffer.rb index a5e8f7460fb0a727301fd31b71b1622e4481fb1a..08d4e0b0d3e36a1ab5bd98c9ad91205122af9b9d 100644 --- a/lib/click_house/write_buffer.rb +++ b/lib/click_house/write_buffer.rb @@ -2,7 +2,7 @@ module ClickHouse module WriteBuffer - include Gitlab::Redis::BackwardsCompatibility + extend Gitlab::Redis::BackwardsCompatibility BUFFER_KEY_PREFIX = 'clickhouse_write_buffer_' diff --git a/lib/gitlab/redis/backwards_compatibility.rb b/lib/gitlab/redis/backwards_compatibility.rb index d922e980611d1b0712c7a7ce312090e5869426ad..413c1fbfff67ef9f3aeb5c0c53dea9e058e24b00 100644 --- a/lib/gitlab/redis/backwards_compatibility.rb +++ b/lib/gitlab/redis/backwards_compatibility.rb @@ -3,18 +3,14 @@ module Gitlab module Redis module BackwardsCompatibility - extend ActiveSupport::Concern - - class_methods do - def lpop_with_limit(key, limit) - Gitlab::Redis::SharedState.with do |redis| - # To keep this compatible with Redis 6.0 - # use a Redis pipeline to pop all objects - # instead of using lpop with limit. - redis.pipelined do |pipeline| - limit.times { pipeline.lpop(key) } - end.compact - end + def lpop_with_limit(key, limit) + Gitlab::Redis::SharedState.with do |redis| + # To keep this compatible with Redis 6.0 + # use a Redis pipeline to pop all objects + # instead of using lpop with limit. + redis.pipelined do |pipeline| + limit.times { pipeline.lpop(key) } + end.compact end end end