From f3d15a6f59052a753837e2d7ef18ac86295183f3 Mon Sep 17 00:00:00 2001
From: John Mason <9717668-johnmason@users.noreply.gitlab.com>
Date: Sun, 3 Nov 2024 02:07:02 +0000
Subject: [PATCH] Update zoekt index used_storage_bytes on interval instead of
 callbacks

Changelog: changed
EE: true
---
 .../search/zoekt/task_succeeded_event.rb      | 18 ---------
 ee/app/models/search/zoekt/index.rb           | 17 ++++-----
 .../services/search/zoekt/callback_service.rb |  6 ---
 .../search/zoekt/scheduling_service.rb        |  7 ++++
 .../zoekt/update_index_used_bytes_worker.rb   |  8 +---
 ee/lib/ee/gitlab/event_store.rb               |  3 --
 ee/spec/lib/ee/gitlab/event_store_spec.rb     |  1 -
 ee/spec/models/search/zoekt/index_spec.rb     | 37 -------------------
 .../search/zoekt/callback_service_spec.rb     | 20 ----------
 .../search/zoekt/scheduling_service_spec.rb   | 15 ++++++++
 .../update_index_used_bytes_worker_spec.rb    | 37 -------------------
 11 files changed, 32 insertions(+), 137 deletions(-)
 delete mode 100644 ee/app/events/search/zoekt/task_succeeded_event.rb
 delete mode 100644 ee/spec/workers/search/zoekt/update_index_used_bytes_worker_spec.rb

diff --git a/ee/app/events/search/zoekt/task_succeeded_event.rb b/ee/app/events/search/zoekt/task_succeeded_event.rb
deleted file mode 100644
index e5a4cca6fe47c..0000000000000
--- a/ee/app/events/search/zoekt/task_succeeded_event.rb
+++ /dev/null
@@ -1,18 +0,0 @@
-# frozen_string_literal: true
-
-module Search
-  module Zoekt
-    class TaskSucceededEvent < ::Gitlab::EventStore::Event
-      def schema
-        {
-          'type' => 'object',
-          'properties' => {
-            'zoekt_repository_id' => { 'type' => 'integer' },
-            'task_id' => { 'type' => 'integer' }
-          },
-          'required' => %w[zoekt_repository_id task_id]
-        }
-      end
-    end
-  end
-end
diff --git a/ee/app/models/search/zoekt/index.rb b/ee/app/models/search/zoekt/index.rb
index e3bc7a5a85fc0..78285667043af 100644
--- a/ee/app/models/search/zoekt/index.rb
+++ b/ee/app/models/search/zoekt/index.rb
@@ -96,17 +96,16 @@ class Index < ApplicationRecord
           .where('(used_storage_bytes / reserved_storage_bytes::double precision) < ?', percent)
       end
 
-      def update_used_storage_bytes!
-        update!(used_storage_bytes: zoekt_repositories.sum(:size_bytes))
+      def self.update_used_storage_bytes!
+        all.find_each do |zoekt_index|
+          sum_for_index = 0
 
-      rescue StandardError => err
-        logger.error(build_structured_payload(
-          message: 'Error attempting to update used_storage_bytes',
-          index_id: id,
-          error: err.message
-        ))
+          Search::Zoekt::Repository.where(zoekt_index_id: zoekt_index.id).each_batch do |repo_batch|
+            sum_for_index += repo_batch.sum(:size_bytes)
+          end
 
-        raise err
+          zoekt_index.update!(used_storage_bytes: sum_for_index) if sum_for_index != zoekt_index.used_storage_bytes
+        end
       end
 
       def free_storage_bytes
diff --git a/ee/app/services/search/zoekt/callback_service.rb b/ee/app/services/search/zoekt/callback_service.rb
index 105140fe828fa..09f1e48254f5a 100644
--- a/ee/app/services/search/zoekt/callback_service.rb
+++ b/ee/app/services/search/zoekt/callback_service.rb
@@ -49,8 +49,6 @@ def process_success
 
           task.done!
         end
-
-        publish_task_succeeded_event_for(task)
       end
 
       def process_failure
@@ -61,10 +59,6 @@ def process_failure
         publish_task_failed_event_for(task)
       end
 
-      def publish_task_succeeded_event_for(task)
-        publish_event(TaskSucceededEvent, data: { zoekt_repository_id: task.zoekt_repository_id, task_id: task.id })
-      end
-
       def publish_task_failed_event_for(task)
         publish_event(TaskFailedEvent, data: { zoekt_repository_id: task.zoekt_repository_id })
       end
diff --git a/ee/app/services/search/zoekt/scheduling_service.rb b/ee/app/services/search/zoekt/scheduling_service.rb
index 52c28e40f1bca..f209da8ab8278 100644
--- a/ee/app/services/search/zoekt/scheduling_service.rb
+++ b/ee/app/services/search/zoekt/scheduling_service.rb
@@ -20,6 +20,7 @@ class SchedulingService
         repo_should_be_marked_as_orphaned_check
         repo_to_delete_check
         report_metrics
+        update_index_used_bytes
         update_replica_states
       ].freeze
 
@@ -310,6 +311,12 @@ def update_replica_states
         end
       end
 
+      def update_index_used_bytes
+        execute_every 5.minutes, cache_key: :update_index_used_bytes do
+          Search::Zoekt::Index.update_used_storage_bytes!
+        end
+      end
+
       def report_metrics
         ::Search::Zoekt::Node.online.find_each do |node|
           log_data = build_structured_payload(
diff --git a/ee/app/workers/search/zoekt/update_index_used_bytes_worker.rb b/ee/app/workers/search/zoekt/update_index_used_bytes_worker.rb
index a539eb748ead2..393b1ba278e5d 100644
--- a/ee/app/workers/search/zoekt/update_index_used_bytes_worker.rb
+++ b/ee/app/workers/search/zoekt/update_index_used_bytes_worker.rb
@@ -10,12 +10,8 @@ class UpdateIndexUsedBytesWorker
       urgency :low
       idempotent!
 
-      def handle_event(event)
-        repo = ::Search::Zoekt::Repository.find_by_id(event.data[:zoekt_repository_id])
-        return if repo.nil?
-
-        repo.zoekt_index.update_used_storage_bytes!
-      end
+      # https://gitlab.com/gitlab-org/gitlab/-/issues/499620
+      def handle_event(event); end
     end
   end
 end
diff --git a/ee/lib/ee/gitlab/event_store.rb b/ee/lib/ee/gitlab/event_store.rb
index 3d8c6d136c43b..c9c3624829563 100644
--- a/ee/lib/ee/gitlab/event_store.rb
+++ b/ee/lib/ee/gitlab/event_store.rb
@@ -198,9 +198,6 @@ def subscribe_to_zoekt_events(store)
           store.subscribe ::Search::Zoekt::InitialIndexingEventWorker,
             to: ::Search::Zoekt::InitialIndexingEvent
 
-          store.subscribe ::Search::Zoekt::UpdateIndexUsedBytesWorker,
-            to: ::Search::Zoekt::TaskSucceededEvent
-
           store.subscribe ::Search::Zoekt::LostNodeEventWorker,
             to: ::Search::Zoekt::LostNodeEvent
         end
diff --git a/ee/spec/lib/ee/gitlab/event_store_spec.rb b/ee/spec/lib/ee/gitlab/event_store_spec.rb
index 6b45da8249e87..c89c333d869d7 100644
--- a/ee/spec/lib/ee/gitlab/event_store_spec.rb
+++ b/ee/spec/lib/ee/gitlab/event_store_spec.rb
@@ -54,7 +54,6 @@
         Search::Zoekt::OrphanedIndexEvent,
         Search::Zoekt::OrphanedRepoEvent,
         Search::Zoekt::RepoMarkedAsToDeleteEvent,
-        Search::Zoekt::TaskSucceededEvent,
         Search::Zoekt::TaskFailedEvent,
         Search::Zoekt::LostNodeEvent,
         Search::Zoekt::IndexOverWatermarkEvent,
diff --git a/ee/spec/models/search/zoekt/index_spec.rb b/ee/spec/models/search/zoekt/index_spec.rb
index cf074188d07c1..ea8cd182bcf7a 100644
--- a/ee/spec/models/search/zoekt/index_spec.rb
+++ b/ee/spec/models/search/zoekt/index_spec.rb
@@ -245,43 +245,6 @@
     end
   end
 
-  describe '#update_used_storage_bytes!' do
-    let_it_be(:idx) { create(:zoekt_index, reserved_storage_bytes: 100.megabytes) }
-    let_it_be(:idx_project) { create(:project, namespace_id: idx.namespace_id) }
-    let_it_be(:idx_project_2) { create(:project, namespace_id: idx.namespace_id) }
-
-    it 'updates indices with the sum of size_bytes for all all associated repositories' do
-      idx.zoekt_repositories.create!(zoekt_index: idx, project: idx_project, state: :ready,
-        size_bytes: 50.megabytes)
-      idx.zoekt_repositories.create!(zoekt_index: idx, project: idx_project_2, state: :ready,
-        size_bytes: 50.megabytes)
-
-      expect { idx.update_used_storage_bytes! }.to change {
-        described_class.find(idx.id).used_storage_bytes
-      }.from(0).to(100.megabytes)
-    end
-
-    context 'when an exception occurs' do
-      it 'logs the error and re-raises the exception' do
-        stubbed_logger = instance_double(::Search::Zoekt::Logger)
-        expect(::Search::Zoekt::Logger).to receive(:build).and_return stubbed_logger
-
-        expected_error_message = "Ka-Boom"
-
-        expect(stubbed_logger).to receive(:error).with({
-          class: "Search::Zoekt::Index",
-          message: 'Error attempting to update used_storage_bytes',
-          error: expected_error_message,
-          index_id: idx.id
-        }.with_indifferent_access)
-
-        expect(idx).to receive(:update!).and_raise expected_error_message
-
-        expect { idx.update_used_storage_bytes! }.to raise_error expected_error_message
-      end
-    end
-  end
-
   describe '#free_storage_bytes' do
     it 'is difference between reserved bytes and used bytes' do
       allow(zoekt_index).to receive(:reserved_storage_bytes).and_return(100)
diff --git a/ee/spec/services/search/zoekt/callback_service_spec.rb b/ee/spec/services/search/zoekt/callback_service_spec.rb
index c42be6883f909..df803d1aac271 100644
--- a/ee/spec/services/search/zoekt/callback_service_spec.rb
+++ b/ee/spec/services/search/zoekt/callback_service_spec.rb
@@ -43,14 +43,6 @@
     context 'for successful operation' do
       let_it_be_with_reload(:index_zoekt_task) { create(:zoekt_task, node: node) }
 
-      shared_examples 'successful zoekt task that publishes to event store' do
-        it 'publishes the correct event to GitLab::EventStore' do
-          expected_data = { zoekt_repository_id: zoekt_task.zoekt_repository_id, task_id: task_id }
-
-          expect { service.execute }.to publish_event(Search::Zoekt::TaskSucceededEvent).with(expected_data)
-        end
-      end
-
       context 'when task is already done' do
         let(:task_type) { 'index' }
         let(:task_id) { index_zoekt_task.id }
@@ -72,10 +64,6 @@
         let(:task_type) { 'index' }
         let(:task_id) { index_zoekt_task.id }
 
-        it_behaves_like 'successful zoekt task that publishes to event store' do
-          let_it_be(:zoekt_task) { index_zoekt_task }
-        end
-
         it 'updates the task state, zoekt_repository data' do
           freeze_time do
             expect { execute }.to change { index_zoekt_task.reload.state }.to('done')
@@ -98,10 +86,6 @@
           expect(delete_zoekt_task.zoekt_repository).to be nil
         end
 
-        it_behaves_like 'successful zoekt task that publishes to event store' do
-          let_it_be(:zoekt_task) { delete_zoekt_task }
-        end
-
         context 'when repository is already deleted' do
           before do
             delete_zoekt_task.zoekt_repository.destroy!
@@ -110,10 +94,6 @@
           it 'moves the task to done' do
             expect { execute }.to change { delete_zoekt_task.reload.state }.to('done')
           end
-
-          it_behaves_like 'successful zoekt task that publishes to event store' do
-            let_it_be(:zoekt_task) { delete_zoekt_task }
-          end
         end
       end
     end
diff --git a/ee/spec/services/search/zoekt/scheduling_service_spec.rb b/ee/spec/services/search/zoekt/scheduling_service_spec.rb
index 1ea1d2b1d879f..1fd6822e6850c 100644
--- a/ee/spec/services/search/zoekt/scheduling_service_spec.rb
+++ b/ee/spec/services/search/zoekt/scheduling_service_spec.rb
@@ -500,6 +500,21 @@
     end
   end
 
+  describe '#update_index_used_bytes' do
+    let(:task) { :update_index_used_bytes }
+    let_it_be(:index) { create(:zoekt_index, :ready) }
+    let_it_be(:repo) { create(:zoekt_repository, zoekt_index: index) }
+    let_it_be(:another_repo) { create(:zoekt_repository, zoekt_index: index) }
+
+    it 'resizes ready indices used_storage_bytes' do
+      expect do
+        execute_task
+      end.to change {
+        index.reload.used_storage_bytes
+      }.from(0).to(repo.size_bytes + another_repo.size_bytes)
+    end
+  end
+
   describe '#report_metrics' do
     let(:logger) { instance_double(::Search::Zoekt::Logger) }
     let(:task) { :report_metrics }
diff --git a/ee/spec/workers/search/zoekt/update_index_used_bytes_worker_spec.rb b/ee/spec/workers/search/zoekt/update_index_used_bytes_worker_spec.rb
deleted file mode 100644
index 2b14c14266de7..0000000000000
--- a/ee/spec/workers/search/zoekt/update_index_used_bytes_worker_spec.rb
+++ /dev/null
@@ -1,37 +0,0 @@
-# frozen_string_literal: true
-
-require 'spec_helper'
-
-RSpec.describe Search::Zoekt::UpdateIndexUsedBytesWorker, :zoekt_settings_enabled, feature_category: :global_search do
-  let(:event) { Search::Zoekt::TaskSucceededEvent.new(data: data) }
-
-  let_it_be(:zoekt_task) { create(:zoekt_task) }
-
-  let_it_be(:index) { create(:zoekt_index) }
-  let_it_be(:repo) { create(:zoekt_repository, zoekt_index: index) }
-  let_it_be(:another_repo) { create(:zoekt_repository, zoekt_index: index) }
-
-  let(:data) do
-    { zoekt_repository_id: repo.id, task_id: zoekt_task.id }
-  end
-
-  it_behaves_like 'worker with data consistency', described_class, data_consistency: :always
-
-  it_behaves_like 'subscribes to event'
-
-  it_behaves_like 'an idempotent worker' do
-    let(:logger) { instance_double(::Search::Zoekt::Logger) }
-
-    before do
-      allow(::Search::Zoekt::Logger).to receive(:build).and_return(logger)
-    end
-
-    it 'resizes an index used_storage_bytes' do
-      expect do
-        consume_event(subscriber: described_class, event: event)
-      end.to change {
-        Search::Zoekt::Index.find(index.id).used_storage_bytes
-      }.from(0).to(repo.size_bytes + another_repo.size_bytes)
-    end
-  end
-end
-- 
GitLab