diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml index 11eda113650f0a1c3ba2313a709da4184386e5ea..a30d0ae5ef8469493cf67c2c03a7bc34f4f1573a 100644 --- a/config/sidekiq_queues.yml +++ b/config/sidekiq_queues.yml @@ -765,6 +765,8 @@ - 1 - - search_zoekt_task_failed_event - 1 +- - search_zoekt_update_index_used_bytes + - 1 - - secrets_management_provision_project_secrets_manager - 1 - - security_create_security_policy_project diff --git a/ee/app/events/search/zoekt/task_succeeded_event.rb b/ee/app/events/search/zoekt/task_succeeded_event.rb new file mode 100644 index 0000000000000000000000000000000000000000..e5a4cca6fe47c562a802c452c1276eddac169f5a --- /dev/null +++ b/ee/app/events/search/zoekt/task_succeeded_event.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +module Search + module Zoekt + class TaskSucceededEvent < ::Gitlab::EventStore::Event + def schema + { + 'type' => 'object', + 'properties' => { + 'zoekt_repository_id' => { 'type' => 'integer' }, + 'task_id' => { 'type' => 'integer' } + }, + 'required' => %w[zoekt_repository_id task_id] + } + end + end + end +end diff --git a/ee/app/models/search/zoekt/index.rb b/ee/app/models/search/zoekt/index.rb index aad2cd8482f3b7cfd7a8b7b86f0a285efe95506a..fcca11169b0c670d1b3279136b9f71dd36a6e32c 100644 --- a/ee/app/models/search/zoekt/index.rb +++ b/ee/app/models/search/zoekt/index.rb @@ -6,6 +6,7 @@ class Index < ApplicationRecord self.table_name = 'zoekt_indices' include EachBatch include NamespaceValidateable + include Gitlab::Loggable SEARCHEABLE_STATES = %i[ready].freeze @@ -65,8 +66,17 @@ class Index < ApplicationRecord where(state: [:orphaned, :pending_deletion]) end - def used_storage_bytes - zoekt_repositories.sum(:size_bytes) + def update_used_storage_bytes! + update!(used_storage_bytes: zoekt_repositories.sum(:size_bytes)) + + rescue StandardError => err + logger.error(build_structured_payload( + message: 'Error attempting to update used_storage_bytes', + index_id: id, + error: err.message + )) + + raise err end def free_storage_bytes @@ -84,6 +94,10 @@ def index def delete_from_index ::Search::Zoekt::NamespaceIndexerWorker.perform_async(namespace_id, :delete, zoekt_node_id) end + + def logger + @logger ||= ::Search::Zoekt::Logger.build + end end end end diff --git a/ee/app/services/search/zoekt/callback_service.rb b/ee/app/services/search/zoekt/callback_service.rb index b1b2c3fbacf6b9092803f3be422738cfedae9f5e..105140fe828fa5f50ac979de1540d3242156e557 100644 --- a/ee/app/services/search/zoekt/callback_service.rb +++ b/ee/app/services/search/zoekt/callback_service.rb @@ -49,6 +49,8 @@ def process_success task.done! end + + publish_task_succeeded_event_for(task) end def process_failure @@ -59,9 +61,16 @@ def process_failure publish_task_failed_event_for(task) end + def publish_task_succeeded_event_for(task) + publish_event(TaskSucceededEvent, data: { zoekt_repository_id: task.zoekt_repository_id, task_id: task.id }) + end + def publish_task_failed_event_for(task) - event = TaskFailedEvent.new(data: { zoekt_repository_id: task.zoekt_repository_id }) - Gitlab::EventStore.publish(event) + publish_event(TaskFailedEvent, data: { zoekt_repository_id: task.zoekt_repository_id }) + end + + def publish_event(event, data:) + Gitlab::EventStore.publish(event.new(data: data)) end end end diff --git a/ee/app/workers/all_queues.yml b/ee/app/workers/all_queues.yml index fce13ece967653af4cd1b50cfd28403ec8141c61..e30410df6089784b42492bed50907a252d1a038a 100644 --- a/ee/app/workers/all_queues.yml +++ b/ee/app/workers/all_queues.yml @@ -2280,6 +2280,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: search_zoekt_update_index_used_bytes + :worker_name: Search::Zoekt::UpdateIndexUsedBytesWorker + :feature_category: :global_search + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: secrets_management_provision_project_secrets_manager :worker_name: SecretsManagement::ProvisionProjectSecretsManagerWorker :feature_category: :secrets_management diff --git a/ee/app/workers/search/zoekt/update_index_used_bytes_worker.rb b/ee/app/workers/search/zoekt/update_index_used_bytes_worker.rb new file mode 100644 index 0000000000000000000000000000000000000000..444cc89825fbbca0f786d297f6205dff2aaae482 --- /dev/null +++ b/ee/app/workers/search/zoekt/update_index_used_bytes_worker.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true + +module Search + module Zoekt + class UpdateIndexUsedBytesWorker + include Gitlab::EventStore::Subscriber + prepend ::Geo::SkipSecondary + + feature_category :global_search + urgency :low + idempotent! + + def handle_event(event) + repo = ::Search::Zoekt::Repository.find_by_id(event.data[:zoekt_repository_id]) + return if repo.nil? + + repo.zoekt_index.update_used_storage_bytes! + end + end + end +end diff --git a/ee/lib/ee/gitlab/event_store.rb b/ee/lib/ee/gitlab/event_store.rb index 53f3359397cf976408e3031396567a3574e01945..e9bf9f5e2c89ea0b2f5686f0259d2e2ab49f9f57 100644 --- a/ee/lib/ee/gitlab/event_store.rb +++ b/ee/lib/ee/gitlab/event_store.rb @@ -178,6 +178,9 @@ def subscribe_to_zoekt_events(store) store.subscribe ::Search::Zoekt::InitialIndexingEventWorker, to: ::Search::Zoekt::InitialIndexingEvent + store.subscribe ::Search::Zoekt::UpdateIndexUsedBytesWorker, + to: ::Search::Zoekt::TaskSucceededEvent + store.subscribe ::Search::Zoekt::LostNodeEventWorker, to: ::Search::Zoekt::LostNodeEvent end diff --git a/ee/spec/lib/ee/gitlab/event_store_spec.rb b/ee/spec/lib/ee/gitlab/event_store_spec.rb index 7ac073a451003920474029a8763cccc5a0018c23..e2f897bca179cf90f7310c7f2e22268726611f5f 100644 --- a/ee/spec/lib/ee/gitlab/event_store_spec.rb +++ b/ee/spec/lib/ee/gitlab/event_store_spec.rb @@ -51,6 +51,7 @@ Search::Zoekt::OrphanedIndexEvent, Search::Zoekt::OrphanedRepoEvent, Search::Zoekt::RepoMarkedAsToDeleteEvent, + Search::Zoekt::TaskSucceededEvent, Search::Zoekt::TaskFailedEvent, Search::Zoekt::LostNodeEvent, Security::PolicyDeletedEvent, diff --git a/ee/spec/models/search/zoekt/index_spec.rb b/ee/spec/models/search/zoekt/index_spec.rb index 058bf9d891fe9ae1b9a9bfb98331d023a00da635..2454e46672486ea79e76507bd5561cff4093dde5 100644 --- a/ee/spec/models/search/zoekt/index_spec.rb +++ b/ee/spec/models/search/zoekt/index_spec.rb @@ -201,12 +201,40 @@ end end - describe '#used_storage_bytes' do - let_it_be(:size_bytes) { 100.megabytes } - let_it_be(:repos) { create_list(:zoekt_repository, 5, zoekt_index: zoekt_index, size_bytes: size_bytes) } + describe '#update_used_storage_bytes!' do + let_it_be(:idx) { create(:zoekt_index, reserved_storage_bytes: 100.megabytes) } + let_it_be(:idx_project) { create(:project, namespace_id: idx.namespace_id) } + let_it_be(:idx_project_2) { create(:project, namespace_id: idx.namespace_id) } - it 'returns the sum of size_bytes for all repository associated with this index' do - expect(zoekt_index.used_storage_bytes).to eq(repos.length * size_bytes) + it 'updates indices with the sum of size_bytes for all all associated repositories' do + idx.zoekt_repositories.create!(zoekt_index: idx, project: idx_project, state: :ready, + size_bytes: 50.megabytes) + idx.zoekt_repositories.create!(zoekt_index: idx, project: idx_project_2, state: :ready, + size_bytes: 50.megabytes) + + expect { idx.update_used_storage_bytes! }.to change { + described_class.find(idx.id).used_storage_bytes + }.from(0).to(100.megabytes) + end + + context 'when an exception occurs' do + it 'logs the error and re-raises the exception' do + stubbed_logger = instance_double(::Search::Zoekt::Logger) + expect(::Search::Zoekt::Logger).to receive(:build).and_return stubbed_logger + + expected_error_message = "Ka-Boom" + + expect(stubbed_logger).to receive(:error).with({ + class: "Search::Zoekt::Index", + message: 'Error attempting to update used_storage_bytes', + error: expected_error_message, + index_id: idx.id + }.with_indifferent_access) + + expect(idx).to receive(:update!).and_raise expected_error_message + + expect { idx.update_used_storage_bytes! }.to raise_error expected_error_message + end end end diff --git a/ee/spec/services/search/zoekt/callback_service_spec.rb b/ee/spec/services/search/zoekt/callback_service_spec.rb index 46f4b3bf7362e1cb8ee7b74509ac14ca922949d2..c42be6883f909d3a82f74cb95257716188a2cc96 100644 --- a/ee/spec/services/search/zoekt/callback_service_spec.rb +++ b/ee/spec/services/search/zoekt/callback_service_spec.rb @@ -29,10 +29,11 @@ } end + let(:success) { true } + context 'when task is not found' do let(:task_id) { non_existing_record_id } let(:task_type) { 'index' } - let(:success) { true } it 'does not performs anything' do expect(execute).to be nil @@ -41,7 +42,14 @@ context 'for successful operation' do let_it_be_with_reload(:index_zoekt_task) { create(:zoekt_task, node: node) } - let(:success) { true } + + shared_examples 'successful zoekt task that publishes to event store' do + it 'publishes the correct event to GitLab::EventStore' do + expected_data = { zoekt_repository_id: zoekt_task.zoekt_repository_id, task_id: task_id } + + expect { service.execute }.to publish_event(Search::Zoekt::TaskSucceededEvent).with(expected_data) + end + end context 'when task is already done' do let(:task_type) { 'index' } @@ -64,6 +72,10 @@ let(:task_type) { 'index' } let(:task_id) { index_zoekt_task.id } + it_behaves_like 'successful zoekt task that publishes to event store' do + let_it_be(:zoekt_task) { index_zoekt_task } + end + it 'updates the task state, zoekt_repository data' do freeze_time do expect { execute }.to change { index_zoekt_task.reload.state }.to('done') @@ -86,6 +98,10 @@ expect(delete_zoekt_task.zoekt_repository).to be nil end + it_behaves_like 'successful zoekt task that publishes to event store' do + let_it_be(:zoekt_task) { delete_zoekt_task } + end + context 'when repository is already deleted' do before do delete_zoekt_task.zoekt_repository.destroy! @@ -94,6 +110,10 @@ it 'moves the task to done' do expect { execute }.to change { delete_zoekt_task.reload.state }.to('done') end + + it_behaves_like 'successful zoekt task that publishes to event store' do + let_it_be(:zoekt_task) { delete_zoekt_task } + end end end end diff --git a/ee/spec/workers/search/zoekt/update_index_used_bytes_worker_spec.rb b/ee/spec/workers/search/zoekt/update_index_used_bytes_worker_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..2b14c14266de7afd67ac6b0edf790cd32e4ad76d --- /dev/null +++ b/ee/spec/workers/search/zoekt/update_index_used_bytes_worker_spec.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Search::Zoekt::UpdateIndexUsedBytesWorker, :zoekt_settings_enabled, feature_category: :global_search do + let(:event) { Search::Zoekt::TaskSucceededEvent.new(data: data) } + + let_it_be(:zoekt_task) { create(:zoekt_task) } + + let_it_be(:index) { create(:zoekt_index) } + let_it_be(:repo) { create(:zoekt_repository, zoekt_index: index) } + let_it_be(:another_repo) { create(:zoekt_repository, zoekt_index: index) } + + let(:data) do + { zoekt_repository_id: repo.id, task_id: zoekt_task.id } + end + + it_behaves_like 'worker with data consistency', described_class, data_consistency: :always + + it_behaves_like 'subscribes to event' + + it_behaves_like 'an idempotent worker' do + let(:logger) { instance_double(::Search::Zoekt::Logger) } + + before do + allow(::Search::Zoekt::Logger).to receive(:build).and_return(logger) + end + + it 'resizes an index used_storage_bytes' do + expect do + consume_event(subscriber: described_class, event: event) + end.to change { + Search::Zoekt::Index.find(index.id).used_storage_bytes + }.from(0).to(repo.size_bytes + another_repo.size_bytes) + end + end +end