From a6b610aa5b09f68629ac6e76b5b7074360b96aa0 Mon Sep 17 00:00:00 2001
From: John Mason <9717668-johnmason@users.noreply.gitlab.com>
Date: Wed, 2 Oct 2024 02:40:06 +0000
Subject: [PATCH] Add scheduling worker to update storage used on zoekt index

Changelog: changed
EE: true
---
 config/sidekiq_queues.yml                     |  2 +
 .../search/zoekt/task_succeeded_event.rb      | 18 +++++++++
 ee/app/models/search/zoekt/index.rb           | 18 ++++++++-
 .../services/search/zoekt/callback_service.rb | 13 ++++++-
 ee/app/workers/all_queues.yml                 |  9 +++++
 .../zoekt/update_index_used_bytes_worker.rb   | 21 ++++++++++
 ee/lib/ee/gitlab/event_store.rb               |  3 ++
 ee/spec/lib/ee/gitlab/event_store_spec.rb     |  1 +
 ee/spec/models/search/zoekt/index_spec.rb     | 38 ++++++++++++++++---
 .../search/zoekt/callback_service_spec.rb     | 24 +++++++++++-
 .../update_index_used_bytes_worker_spec.rb    | 37 ++++++++++++++++++
 11 files changed, 173 insertions(+), 11 deletions(-)
 create mode 100644 ee/app/events/search/zoekt/task_succeeded_event.rb
 create mode 100644 ee/app/workers/search/zoekt/update_index_used_bytes_worker.rb
 create mode 100644 ee/spec/workers/search/zoekt/update_index_used_bytes_worker_spec.rb

diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml
index 11eda113650f0..a30d0ae5ef846 100644
--- a/config/sidekiq_queues.yml
+++ b/config/sidekiq_queues.yml
@@ -765,6 +765,8 @@
   - 1
 - - search_zoekt_task_failed_event
   - 1
+- - search_zoekt_update_index_used_bytes
+  - 1
 - - secrets_management_provision_project_secrets_manager
   - 1
 - - security_create_security_policy_project
diff --git a/ee/app/events/search/zoekt/task_succeeded_event.rb b/ee/app/events/search/zoekt/task_succeeded_event.rb
new file mode 100644
index 0000000000000..e5a4cca6fe47c
--- /dev/null
+++ b/ee/app/events/search/zoekt/task_succeeded_event.rb
@@ -0,0 +1,18 @@
+# frozen_string_literal: true
+
+module Search
+  module Zoekt
+    class TaskSucceededEvent < ::Gitlab::EventStore::Event
+      def schema
+        {
+          'type' => 'object',
+          'properties' => {
+            'zoekt_repository_id' => { 'type' => 'integer' },
+            'task_id' => { 'type' => 'integer' }
+          },
+          'required' => %w[zoekt_repository_id task_id]
+        }
+      end
+    end
+  end
+end
diff --git a/ee/app/models/search/zoekt/index.rb b/ee/app/models/search/zoekt/index.rb
index aad2cd8482f3b..fcca11169b0c6 100644
--- a/ee/app/models/search/zoekt/index.rb
+++ b/ee/app/models/search/zoekt/index.rb
@@ -6,6 +6,7 @@ class Index < ApplicationRecord
       self.table_name = 'zoekt_indices'
       include EachBatch
       include NamespaceValidateable
+      include Gitlab::Loggable
 
       SEARCHEABLE_STATES = %i[ready].freeze
 
@@ -65,8 +66,17 @@ class Index < ApplicationRecord
         where(state: [:orphaned, :pending_deletion])
       end
 
-      def used_storage_bytes
-        zoekt_repositories.sum(:size_bytes)
+      def update_used_storage_bytes!
+        update!(used_storage_bytes: zoekt_repositories.sum(:size_bytes))
+
+      rescue StandardError => err
+        logger.error(build_structured_payload(
+          message: 'Error attempting to update used_storage_bytes',
+          index_id: id,
+          error: err.message
+        ))
+
+        raise err
       end
 
       def free_storage_bytes
@@ -84,6 +94,10 @@ def index
       def delete_from_index
         ::Search::Zoekt::NamespaceIndexerWorker.perform_async(namespace_id, :delete, zoekt_node_id)
       end
+
+      def logger
+        @logger ||= ::Search::Zoekt::Logger.build
+      end
     end
   end
 end
diff --git a/ee/app/services/search/zoekt/callback_service.rb b/ee/app/services/search/zoekt/callback_service.rb
index b1b2c3fbacf6b..105140fe828fa 100644
--- a/ee/app/services/search/zoekt/callback_service.rb
+++ b/ee/app/services/search/zoekt/callback_service.rb
@@ -49,6 +49,8 @@ def process_success
 
           task.done!
         end
+
+        publish_task_succeeded_event_for(task)
       end
 
       def process_failure
@@ -59,9 +61,16 @@ def process_failure
         publish_task_failed_event_for(task)
       end
 
+      def publish_task_succeeded_event_for(task)
+        publish_event(TaskSucceededEvent, data: { zoekt_repository_id: task.zoekt_repository_id, task_id: task.id })
+      end
+
       def publish_task_failed_event_for(task)
-        event = TaskFailedEvent.new(data: { zoekt_repository_id: task.zoekt_repository_id })
-        Gitlab::EventStore.publish(event)
+        publish_event(TaskFailedEvent, data: { zoekt_repository_id: task.zoekt_repository_id })
+      end
+
+      def publish_event(event, data:)
+        Gitlab::EventStore.publish(event.new(data: data))
       end
     end
   end
diff --git a/ee/app/workers/all_queues.yml b/ee/app/workers/all_queues.yml
index fce13ece96765..e30410df60897 100644
--- a/ee/app/workers/all_queues.yml
+++ b/ee/app/workers/all_queues.yml
@@ -2280,6 +2280,15 @@
   :weight: 1
   :idempotent: true
   :tags: []
+- :name: search_zoekt_update_index_used_bytes
+  :worker_name: Search::Zoekt::UpdateIndexUsedBytesWorker
+  :feature_category: :global_search
+  :has_external_dependencies: false
+  :urgency: :low
+  :resource_boundary: :unknown
+  :weight: 1
+  :idempotent: true
+  :tags: []
 - :name: secrets_management_provision_project_secrets_manager
   :worker_name: SecretsManagement::ProvisionProjectSecretsManagerWorker
   :feature_category: :secrets_management
diff --git a/ee/app/workers/search/zoekt/update_index_used_bytes_worker.rb b/ee/app/workers/search/zoekt/update_index_used_bytes_worker.rb
new file mode 100644
index 0000000000000..444cc89825fbb
--- /dev/null
+++ b/ee/app/workers/search/zoekt/update_index_used_bytes_worker.rb
@@ -0,0 +1,21 @@
+# frozen_string_literal: true
+
+module Search
+  module Zoekt
+    class UpdateIndexUsedBytesWorker
+      include Gitlab::EventStore::Subscriber
+      prepend ::Geo::SkipSecondary
+
+      feature_category :global_search
+      urgency :low
+      idempotent!
+
+      def handle_event(event)
+        repo = ::Search::Zoekt::Repository.find_by_id(event.data[:zoekt_repository_id])
+        return if repo.nil?
+
+        repo.zoekt_index.update_used_storage_bytes!
+      end
+    end
+  end
+end
diff --git a/ee/lib/ee/gitlab/event_store.rb b/ee/lib/ee/gitlab/event_store.rb
index 53f3359397cf9..e9bf9f5e2c89e 100644
--- a/ee/lib/ee/gitlab/event_store.rb
+++ b/ee/lib/ee/gitlab/event_store.rb
@@ -178,6 +178,9 @@ def subscribe_to_zoekt_events(store)
           store.subscribe ::Search::Zoekt::InitialIndexingEventWorker,
             to: ::Search::Zoekt::InitialIndexingEvent
 
+          store.subscribe ::Search::Zoekt::UpdateIndexUsedBytesWorker,
+            to: ::Search::Zoekt::TaskSucceededEvent
+
           store.subscribe ::Search::Zoekt::LostNodeEventWorker,
             to: ::Search::Zoekt::LostNodeEvent
         end
diff --git a/ee/spec/lib/ee/gitlab/event_store_spec.rb b/ee/spec/lib/ee/gitlab/event_store_spec.rb
index 7ac073a451003..e2f897bca179c 100644
--- a/ee/spec/lib/ee/gitlab/event_store_spec.rb
+++ b/ee/spec/lib/ee/gitlab/event_store_spec.rb
@@ -51,6 +51,7 @@
         Search::Zoekt::OrphanedIndexEvent,
         Search::Zoekt::OrphanedRepoEvent,
         Search::Zoekt::RepoMarkedAsToDeleteEvent,
+        Search::Zoekt::TaskSucceededEvent,
         Search::Zoekt::TaskFailedEvent,
         Search::Zoekt::LostNodeEvent,
         Security::PolicyDeletedEvent,
diff --git a/ee/spec/models/search/zoekt/index_spec.rb b/ee/spec/models/search/zoekt/index_spec.rb
index 058bf9d891fe9..2454e46672486 100644
--- a/ee/spec/models/search/zoekt/index_spec.rb
+++ b/ee/spec/models/search/zoekt/index_spec.rb
@@ -201,12 +201,40 @@
     end
   end
 
-  describe '#used_storage_bytes' do
-    let_it_be(:size_bytes) { 100.megabytes }
-    let_it_be(:repos) { create_list(:zoekt_repository, 5, zoekt_index: zoekt_index, size_bytes: size_bytes) }
+  describe '#update_used_storage_bytes!' do
+    let_it_be(:idx) { create(:zoekt_index, reserved_storage_bytes: 100.megabytes) }
+    let_it_be(:idx_project) { create(:project, namespace_id: idx.namespace_id) }
+    let_it_be(:idx_project_2) { create(:project, namespace_id: idx.namespace_id) }
 
-    it 'returns the sum of size_bytes for all repository associated with this index' do
-      expect(zoekt_index.used_storage_bytes).to eq(repos.length * size_bytes)
+    it 'updates indices with the sum of size_bytes for all all associated repositories' do
+      idx.zoekt_repositories.create!(zoekt_index: idx, project: idx_project, state: :ready,
+        size_bytes: 50.megabytes)
+      idx.zoekt_repositories.create!(zoekt_index: idx, project: idx_project_2, state: :ready,
+        size_bytes: 50.megabytes)
+
+      expect { idx.update_used_storage_bytes! }.to change {
+        described_class.find(idx.id).used_storage_bytes
+      }.from(0).to(100.megabytes)
+    end
+
+    context 'when an exception occurs' do
+      it 'logs the error and re-raises the exception' do
+        stubbed_logger = instance_double(::Search::Zoekt::Logger)
+        expect(::Search::Zoekt::Logger).to receive(:build).and_return stubbed_logger
+
+        expected_error_message = "Ka-Boom"
+
+        expect(stubbed_logger).to receive(:error).with({
+          class: "Search::Zoekt::Index",
+          message: 'Error attempting to update used_storage_bytes',
+          error: expected_error_message,
+          index_id: idx.id
+        }.with_indifferent_access)
+
+        expect(idx).to receive(:update!).and_raise expected_error_message
+
+        expect { idx.update_used_storage_bytes! }.to raise_error expected_error_message
+      end
     end
   end
 
diff --git a/ee/spec/services/search/zoekt/callback_service_spec.rb b/ee/spec/services/search/zoekt/callback_service_spec.rb
index 46f4b3bf7362e..c42be6883f909 100644
--- a/ee/spec/services/search/zoekt/callback_service_spec.rb
+++ b/ee/spec/services/search/zoekt/callback_service_spec.rb
@@ -29,10 +29,11 @@
       }
     end
 
+    let(:success) { true }
+
     context 'when task is not found' do
       let(:task_id) { non_existing_record_id }
       let(:task_type) { 'index' }
-      let(:success) { true }
 
       it 'does not performs anything' do
         expect(execute).to be nil
@@ -41,7 +42,14 @@
 
     context 'for successful operation' do
       let_it_be_with_reload(:index_zoekt_task) { create(:zoekt_task, node: node) }
-      let(:success) { true }
+
+      shared_examples 'successful zoekt task that publishes to event store' do
+        it 'publishes the correct event to GitLab::EventStore' do
+          expected_data = { zoekt_repository_id: zoekt_task.zoekt_repository_id, task_id: task_id }
+
+          expect { service.execute }.to publish_event(Search::Zoekt::TaskSucceededEvent).with(expected_data)
+        end
+      end
 
       context 'when task is already done' do
         let(:task_type) { 'index' }
@@ -64,6 +72,10 @@
         let(:task_type) { 'index' }
         let(:task_id) { index_zoekt_task.id }
 
+        it_behaves_like 'successful zoekt task that publishes to event store' do
+          let_it_be(:zoekt_task) { index_zoekt_task }
+        end
+
         it 'updates the task state, zoekt_repository data' do
           freeze_time do
             expect { execute }.to change { index_zoekt_task.reload.state }.to('done')
@@ -86,6 +98,10 @@
           expect(delete_zoekt_task.zoekt_repository).to be nil
         end
 
+        it_behaves_like 'successful zoekt task that publishes to event store' do
+          let_it_be(:zoekt_task) { delete_zoekt_task }
+        end
+
         context 'when repository is already deleted' do
           before do
             delete_zoekt_task.zoekt_repository.destroy!
@@ -94,6 +110,10 @@
           it 'moves the task to done' do
             expect { execute }.to change { delete_zoekt_task.reload.state }.to('done')
           end
+
+          it_behaves_like 'successful zoekt task that publishes to event store' do
+            let_it_be(:zoekt_task) { delete_zoekt_task }
+          end
         end
       end
     end
diff --git a/ee/spec/workers/search/zoekt/update_index_used_bytes_worker_spec.rb b/ee/spec/workers/search/zoekt/update_index_used_bytes_worker_spec.rb
new file mode 100644
index 0000000000000..2b14c14266de7
--- /dev/null
+++ b/ee/spec/workers/search/zoekt/update_index_used_bytes_worker_spec.rb
@@ -0,0 +1,37 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe Search::Zoekt::UpdateIndexUsedBytesWorker, :zoekt_settings_enabled, feature_category: :global_search do
+  let(:event) { Search::Zoekt::TaskSucceededEvent.new(data: data) }
+
+  let_it_be(:zoekt_task) { create(:zoekt_task) }
+
+  let_it_be(:index) { create(:zoekt_index) }
+  let_it_be(:repo) { create(:zoekt_repository, zoekt_index: index) }
+  let_it_be(:another_repo) { create(:zoekt_repository, zoekt_index: index) }
+
+  let(:data) do
+    { zoekt_repository_id: repo.id, task_id: zoekt_task.id }
+  end
+
+  it_behaves_like 'worker with data consistency', described_class, data_consistency: :always
+
+  it_behaves_like 'subscribes to event'
+
+  it_behaves_like 'an idempotent worker' do
+    let(:logger) { instance_double(::Search::Zoekt::Logger) }
+
+    before do
+      allow(::Search::Zoekt::Logger).to receive(:build).and_return(logger)
+    end
+
+    it 'resizes an index used_storage_bytes' do
+      expect do
+        consume_event(subscriber: described_class, event: event)
+      end.to change {
+        Search::Zoekt::Index.find(index.id).used_storage_bytes
+      }.from(0).to(repo.size_bytes + another_repo.size_bytes)
+    end
+  end
+end
-- 
GitLab