From a18b4aeab0423aa18cddd28cd2f209607520e01e Mon Sep 17 00:00:00 2001
From: Dmitry Gruzd <dgruzd@gitlab.com>
Date: Wed, 31 Jan 2024 12:19:19 +0000
Subject: [PATCH] Refactor Zoekt::SchedulingWorker into a service

---
 .../search/zoekt/scheduling_service.rb        |  71 ++++++++
 .../workers/search/zoekt/scheduling_worker.rb |  58 +-----
 .../search/zoekt/scheduling_service_spec.rb   | 152 ++++++++++++++++
 .../search/zoekt/scheduling_worker_spec.rb    | 166 +++---------------
 4 files changed, 250 insertions(+), 197 deletions(-)
 create mode 100644 ee/app/services/search/zoekt/scheduling_service.rb
 create mode 100644 ee/spec/services/search/zoekt/scheduling_service_spec.rb

diff --git a/ee/app/services/search/zoekt/scheduling_service.rb b/ee/app/services/search/zoekt/scheduling_service.rb
new file mode 100644
index 0000000000000..e3450cce6a55e
--- /dev/null
+++ b/ee/app/services/search/zoekt/scheduling_service.rb
@@ -0,0 +1,71 @@
+# frozen_string_literal: true
+
+module Search
+  module Zoekt
+    class SchedulingService
+      include Gitlab::Loggable
+
+      TASKS = %i[
+        node_assignment
+      ].freeze
+
+      BUFFER_FACTOR = 3
+      WATERMARK_LIMIT = 0.8
+
+      attr_reader :task
+
+      def initialize(task)
+        @task = task
+      end
+
+      def execute
+        raise ArgumentError, "Unknown task: #{task}" unless TASKS.include?(task)
+        raise NotImplementedError unless respond_to?(task, true)
+
+        send(task) # rubocop:disable GitlabSecurity/PublicSend -- We control the list of tasks in the source code
+      end
+
+      def self.execute(task)
+        new(task).execute
+      end
+
+      private
+
+      def logger
+        @logger ||= ::Zoekt::Logger.build
+      end
+
+      def node_assignment
+        EnabledNamespace.with_missing_indices.preload_storage_statistics.find_each do |zoekt_enabled_namespace|
+          storage_statistics = zoekt_enabled_namespace.namespace.root_storage_statistics
+          unless storage_statistics
+            logger.error(build_structured_payload(task: :node_assignment,
+              message: "RootStorageStatistics isn't available", zoekt_enabled_namespace_id: zoekt_enabled_namespace.id))
+            next
+          end
+
+          Node.descending_order_by_free_bytes.each do |node|
+            space_required = BUFFER_FACTOR * storage_statistics.repository_size
+            if (node.used_bytes + space_required) <= node.total_bytes * WATERMARK_LIMIT
+              # TODO: Once we have the task which moves pending to ready then remove the state attribute from here
+              # https://gitlab.com/gitlab-org/gitlab/-/issues/439042
+              zoekt_index = Search::Zoekt::Index.new(namespace_id: zoekt_enabled_namespace.root_namespace_id,
+                zoekt_node_id: node.id, zoekt_enabled_namespace: zoekt_enabled_namespace, state: :ready)
+
+              unless zoekt_index.save
+                logger.error(build_structured_payload(task: :node_assignment,
+                  message: 'Could not save Search::Zoekt::Index', zoekt_index: zoekt_index.attributes.compact))
+              end
+
+              break
+            else
+              logger.error(build_structured_payload(task: :node_assignment, message: 'Space is not available in Node',
+                zoekt_enabled_namespace_id: zoekt_enabled_namespace.id, node_id: node.id))
+              next
+            end
+          end
+        end
+      end
+    end
+  end
+end
diff --git a/ee/app/workers/search/zoekt/scheduling_worker.rb b/ee/app/workers/search/zoekt/scheduling_worker.rb
index 4d64a3b4b7c5c..71831565dea0f 100644
--- a/ee/app/workers/search/zoekt/scheduling_worker.rb
+++ b/ee/app/workers/search/zoekt/scheduling_worker.rb
@@ -7,75 +7,27 @@ class SchedulingWorker
       include CronjobQueue
       prepend ::Geo::SkipSecondary
 
-      BUFFER_FACTOR = 3
-      WATERMARK_LIMIT = 0.8
-      TASKS = %i[node_assignment].freeze
-
       data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency -- It is a Cronjob
       feature_category :global_search
       idempotent!
       pause_control :zoekt
       urgency :low
 
-      def perform(task = :initiate)
+      def perform(task = nil)
         return false if Feature.disabled?(:zoekt_scheduling_worker, type: :beta)
 
-        task = task&.to_sym
-        raise ArgumentError, "Unknown task: #{task}" unless supported_tasks.include?(task)
+        return initiate if task.nil?
 
-        case task
-        when :initiate
-          initiate
-        when :node_assignment
-          assign_node_to_zoekt_namespace
-        end
+        SchedulingService.execute(task)
       end
 
       private
 
-      def supported_tasks
-        TASKS + Array(:initiate)
-      end
-
       def initiate
-        TASKS.each { |task| with_context(related_class: self.class) { self.class.perform_async(task) } }
-      end
-
-      def assign_node_to_zoekt_namespace
-        EnabledNamespace.with_missing_indices.preload_storage_statistics.find_each do |zoekt_enabled_namespace|
-          storage_statistics = zoekt_enabled_namespace.namespace.root_storage_statistics
-          unless storage_statistics
-            logger.error(build_structured_payload(task: :node_assignment,
-              message: "RootStorageStatistics isn't available", zoekt_enabled_namespace_id: zoekt_enabled_namespace.id))
-            next
-          end
-
-          Node.descending_order_by_free_bytes.each do |node|
-            space_required = BUFFER_FACTOR * storage_statistics.repository_size
-            if (node.used_bytes + space_required) <= node.total_bytes * WATERMARK_LIMIT
-              # TODO: Once we have the task which moves pending to ready then remove the state attribute from here
-              # https://gitlab.com/gitlab-org/gitlab/-/issues/439042
-              zoekt_index = Search::Zoekt::Index.new(namespace_id: zoekt_enabled_namespace.root_namespace_id,
-                zoekt_node_id: node.id, zoekt_enabled_namespace: zoekt_enabled_namespace, state: :ready)
-
-              unless zoekt_index.save
-                logger.error(build_structured_payload(task: :node_assignment,
-                  message: 'Could not save Search::Zoekt::Index', zoekt_index: zoekt_index.attributes.compact))
-              end
-
-              break
-            else
-              logger.error(build_structured_payload(task: :node_assignment, message: 'Space is not available in Node',
-                zoekt_enabled_namespace_id: zoekt_enabled_namespace.id, node_id: node.id))
-              next
-            end
-          end
+        SchedulingService::TASKS.each do |task|
+          with_context(related_class: self.class) { self.class.perform_async(task) }
         end
       end
-
-      def logger
-        @logger ||= ::Zoekt::Logger.build
-      end
     end
   end
 end
diff --git a/ee/spec/services/search/zoekt/scheduling_service_spec.rb b/ee/spec/services/search/zoekt/scheduling_service_spec.rb
new file mode 100644
index 0000000000000..b1306986b510d
--- /dev/null
+++ b/ee/spec/services/search/zoekt/scheduling_service_spec.rb
@@ -0,0 +1,152 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe ::Search::Zoekt::SchedulingService, feature_category: :global_search do
+  let(:logger) { instance_double('Logger') }
+  let(:service) { described_class.new(task) }
+  let_it_be(:node) { create(:zoekt_node, :enough_free_space) }
+
+  subject(:execute_task) { service.execute }
+
+  before do
+    allow(described_class).to receive(:logger).and_return(logger)
+  end
+
+  describe '.execute' do
+    let(:task) { :foo }
+
+    it 'executes the task' do
+      expect(described_class).to receive(:new).with(task).and_return(service)
+      expect(service).to receive(:execute)
+
+      described_class.execute(task)
+    end
+  end
+
+  describe '#execute' do
+    let(:task) { :foo }
+
+    it 'raises an exception when unknown task is provided' do
+      expect { service.execute }.to raise_error(ArgumentError)
+    end
+
+    it 'raises an exception when the task is not implemented' do
+      stub_const('::Search::Zoekt::SchedulingService::TASKS', [:foo])
+
+      expect { service.execute }.to raise_error(NotImplementedError)
+    end
+  end
+
+  describe '#node_assignment' do
+    let(:task) { :node_assignment }
+
+    let_it_be(:namespace) { create(:group) }
+    let_it_be(:namespace_statistics) { create(:namespace_root_storage_statistics, repository_size: 1000) }
+    let_it_be(:namespace_with_statistics) { create(:group, root_storage_statistics: namespace_statistics) }
+
+    context 'when some zoekt enabled namespaces missing zoekt index' do
+      let(:logger) { instance_double(::Zoekt::Logger) }
+      let_it_be(:zkt_enabled_namespace) { create(:zoekt_enabled_namespace, namespace: namespace.root_ancestor) }
+      let_it_be(:zkt_enabled_namespace2) do
+        create(:zoekt_enabled_namespace, namespace: namespace_with_statistics.root_ancestor)
+      end
+
+      before do
+        allow(::Zoekt::Logger).to receive(:build).and_return(logger)
+      end
+
+      context 'when there is not enough space in any nodes' do
+        before do
+          node.update_column(:total_bytes, 100)
+        end
+
+        it 'does not creates a record of Search::Zoekt::Index for the namespace' do
+          node_free_space = node.total_bytes - node.used_bytes
+          expect(namespace_statistics.repository_size).to be > node_free_space
+          expect(zkt_enabled_namespace.indices).to be_empty
+          expect(zkt_enabled_namespace2.indices).to be_empty
+          expect(Search::Zoekt::Node).to receive(:descending_order_by_free_bytes).and_call_original
+          expect(logger).to receive(:error).with({ 'class' => described_class.to_s, 'task' => task,
+                                                          'message' => "RootStorageStatistics isn't available",
+                                                          'zoekt_enabled_namespace_id' => zkt_enabled_namespace.id }
+          )
+          expect(logger).to receive(:error).with({ 'class' => described_class.to_s, 'task' => task,
+                                                          'node_id' => node.id,
+                                                          'message' => 'Space is not available in Node',
+                                                          'zoekt_enabled_namespace_id' => zkt_enabled_namespace2.id }
+          )
+          expect { execute_task }.not_to change { Search::Zoekt::Index.count }
+          expect(zkt_enabled_namespace.indices).to be_empty
+          expect(zkt_enabled_namespace2.indices).to be_empty
+        end
+
+        context 'when there is space for the repository but not for the WATERMARK_LIMIT' do
+          before do
+            node.update_column(:total_bytes,
+              (namespace_statistics.repository_size * described_class::BUFFER_FACTOR) + node.used_bytes)
+          end
+
+          it 'does not creates a record of Search::Zoekt::Index for the namespace' do
+            node_free_space = node.total_bytes - node.used_bytes
+            # Assert that node's free space is equal to the repository_size times BUFFER_FACTOR
+            expect(namespace_statistics.repository_size * described_class::BUFFER_FACTOR).to eq node_free_space
+            expect(zkt_enabled_namespace.indices).to be_empty
+            expect(zkt_enabled_namespace2.indices).to be_empty
+            expect(Search::Zoekt::Node).to receive(:descending_order_by_free_bytes).and_call_original
+            expect(logger).to receive(:error).with({ 'class' => described_class.to_s, 'task' => task,
+                                                    'message' => "RootStorageStatistics isn't available",
+                                                    'zoekt_enabled_namespace_id' => zkt_enabled_namespace.id }
+            )
+            expect(logger).to receive(:error).with({ 'class' => described_class.to_s, 'task' => task,
+                                                    'node_id' => node.id,
+                                                    'message' => 'Space is not available in Node',
+                                                    'zoekt_enabled_namespace_id' => zkt_enabled_namespace2.id }
+            )
+            expect { execute_task }.not_to change { Search::Zoekt::Index.count }
+            expect(zkt_enabled_namespace.indices).to be_empty
+            expect(zkt_enabled_namespace2.indices).to be_empty
+          end
+        end
+      end
+
+      context 'when there is enough space in the node' do
+        context 'when a new record of Search::Zoekt::Index could not be saved' do
+          it 'logs error' do
+            expect(zkt_enabled_namespace.indices).to be_empty
+            expect(zkt_enabled_namespace2.indices).to be_empty
+            expect(Search::Zoekt::Node).to receive(:descending_order_by_free_bytes).and_call_original
+            expect(logger).to receive(:error).with({ 'class' => described_class.to_s, 'task' => task,
+                                                    'message' => "RootStorageStatistics isn't available",
+                                                    'zoekt_enabled_namespace_id' => zkt_enabled_namespace.id }
+            )
+            allow_next_instance_of(Search::Zoekt::Index) do |instance|
+              allow(instance).to receive(:valid?).and_return(false)
+            end
+            expect(logger).to receive(:error).with(hash_including('zoekt_index', 'class' => described_class.to_s,
+              'task' => task, 'message' => 'Could not save Search::Zoekt::Index'))
+            expect { execute_task }.not_to change { Search::Zoekt::Index.count }
+            expect(zkt_enabled_namespace.indices).to be_empty
+            expect(zkt_enabled_namespace2.indices).to be_empty
+          end
+        end
+
+        it 'creates a record of Search::Zoekt::Index for the namespace which has statistics' do
+          expect(zkt_enabled_namespace.indices).to be_empty
+          expect(zkt_enabled_namespace2.indices).to be_empty
+          expect(Search::Zoekt::Node).to receive(:descending_order_by_free_bytes).and_call_original
+          expect(logger).to receive(:error).with({ 'class' => described_class.to_s, 'task' => task,
+                                                  'message' => "RootStorageStatistics isn't available",
+                                                  'zoekt_enabled_namespace_id' => zkt_enabled_namespace.id }
+          )
+          expect { execute_task }.to change { Search::Zoekt::Index.count }.by(1)
+          expect(zkt_enabled_namespace.indices).to be_empty
+          index = zkt_enabled_namespace2.indices.last
+          expect(index).not_to be_nil
+          expect(index.namespace_id).to eq zkt_enabled_namespace2.root_namespace_id
+          expect(index).to be_ready
+        end
+      end
+    end
+  end
+end
diff --git a/ee/spec/workers/search/zoekt/scheduling_worker_spec.rb b/ee/spec/workers/search/zoekt/scheduling_worker_spec.rb
index 0dccccbad1260..934e4df2e8712 100644
--- a/ee/spec/workers/search/zoekt/scheduling_worker_spec.rb
+++ b/ee/spec/workers/search/zoekt/scheduling_worker_spec.rb
@@ -6,165 +6,43 @@
   it_behaves_like 'worker with data consistency', described_class, data_consistency: :always
 
   describe '#perform' do
-    subject(:execute_worker) { described_class.new.perform(task) }
+    context 'when no arguments are provided' do
+      subject(:worker) { described_class.new }
 
-    let_it_be(:node) { create(:zoekt_node, :enough_free_space) }
-
-    context 'when feature flag zoekt_scheduling_worker is disabled' do
-      let(:task) { :initate }
-
-      it_behaves_like 'an idempotent worker' do
-        before do
-          stub_feature_flags(zoekt_scheduling_worker: false)
-        end
-
-        it 'returns false' do
-          expect(execute_worker).to be false
-        end
-      end
-    end
-
-    context 'when task is initiate' do
-      let(:task) { :initiate }
-
-      it_behaves_like 'an idempotent worker' do
-        it 'calls the worker with each supported tasks' do
-          described_class::TASKS.each { |t| expect(described_class).to receive(:perform_async).with(t) }
-          execute_worker
-        end
-      end
-    end
-
-    context 'when task is not supported' do
-      let(:task) { :dummy }
-
-      it_behaves_like 'an idempotent worker' do
-        it 'calls the worker with each supported tasks' do
-          expect { execute_worker }.to raise_error(ArgumentError, "Unknown task: #{task}")
-        end
-      end
-    end
-
-    context 'when task is node_assignment' do
-      let(:task) { :node_assignment }
-      let_it_be(:namespace) { create(:group) }
-      let_it_be(:namespace_statistics) { create(:namespace_root_storage_statistics, repository_size: 1000) }
-      let_it_be(:namespace_with_statistics) { create(:group, root_storage_statistics: namespace_statistics) }
-
-      context 'when all zoekt enabled namespaces has the index' do
+      context 'when feature flag zoekt_scheduling_worker is disabled' do
         it_behaves_like 'an idempotent worker' do
           before do
-            [namespace, namespace_with_statistics].each { |n| zoekt_ensure_namespace_indexed!(n) }
+            stub_feature_flags(zoekt_scheduling_worker: false)
           end
 
-          it 'does not creates Search::Zoekt::Index record' do
-            expect(Search::Zoekt::Node).not_to receive(:descending_order_and_select_by_free_bytes)
-            expect { execute_worker }.not_to change { Search::Zoekt::Index.count }
+          it 'does not call the service' do
+            expect(worker.perform).to be false
+            expect(Search::Zoekt::SchedulingWorker).not_to receive(:new)
           end
         end
       end
 
-      context 'when some zoekt enabled namespaces missing zoekt index' do
-        let(:logger) { instance_double(::Zoekt::Logger) }
-        let_it_be(:zkt_enabled_namespace) { create(:zoekt_enabled_namespace, namespace: namespace.root_ancestor) }
-        let_it_be(:zkt_enabled_namespace2) do
-          create(:zoekt_enabled_namespace, namespace: namespace_with_statistics.root_ancestor)
-        end
-
-        before do
-          allow(::Zoekt::Logger).to receive(:build).and_return(logger)
-        end
-
-        context 'when there is not enough space in any nodes' do
-          before do
-            node.update_column(:total_bytes, 100)
+      it_behaves_like 'an idempotent worker' do
+        it 'calls the worker with each supported tasks' do
+          Search::Zoekt::SchedulingService::TASKS.each do |t|
+            expect(described_class).to receive(:perform_async).with(t)
           end
 
-          it 'does not creates a record of Search::Zoekt::Index for the namespace' do
-            node_free_space = node.total_bytes - node.used_bytes
-            expect(namespace_statistics.repository_size).to be > node_free_space
-            expect(zkt_enabled_namespace.indices).to be_empty
-            expect(zkt_enabled_namespace2.indices).to be_empty
-            expect(Search::Zoekt::Node).to receive(:descending_order_by_free_bytes).and_call_original
-            expect(logger).to receive(:error).with({ 'class' => described_class.to_s, 'task' => task,
-                                                            'message' => "RootStorageStatistics isn't available",
-                                                            'zoekt_enabled_namespace_id' => zkt_enabled_namespace.id }
-            )
-            expect(logger).to receive(:error).with({ 'class' => described_class.to_s, 'task' => task,
-                                                            'node_id' => node.id,
-                                                            'message' => 'Space is not available in Node',
-                                                            'zoekt_enabled_namespace_id' => zkt_enabled_namespace2.id }
-            )
-            expect { execute_worker }.not_to change { Search::Zoekt::Index.count }
-            expect(zkt_enabled_namespace.indices).to be_empty
-            expect(zkt_enabled_namespace2.indices).to be_empty
-          end
+          worker.perform
+        end
+      end
+    end
 
-          context 'when there is space for the repository but not for the WATERMARK_LIMIT' do
-            before do
-              node.update_column(:total_bytes,
-                (namespace_statistics.repository_size * described_class::BUFFER_FACTOR) + node.used_bytes)
-            end
+    context 'when task is provided' do
+      subject(:worker) { described_class.new }
 
-            it 'does not creates a record of Search::Zoekt::Index for the namespace' do
-              node_free_space = node.total_bytes - node.used_bytes
-              # Assert that node's free space is equal to the repository_size times BUFFER_FACTOR
-              expect(namespace_statistics.repository_size * described_class::BUFFER_FACTOR).to eq node_free_space
-              expect(zkt_enabled_namespace.indices).to be_empty
-              expect(zkt_enabled_namespace2.indices).to be_empty
-              expect(Search::Zoekt::Node).to receive(:descending_order_by_free_bytes).and_call_original
-              expect(logger).to receive(:error).with({ 'class' => described_class.to_s, 'task' => task,
-                                                      'message' => "RootStorageStatistics isn't available",
-                                                      'zoekt_enabled_namespace_id' => zkt_enabled_namespace.id }
-              )
-              expect(logger).to receive(:error).with({ 'class' => described_class.to_s, 'task' => task,
-                                                      'node_id' => node.id,
-                                                      'message' => 'Space is not available in Node',
-                                                      'zoekt_enabled_namespace_id' => zkt_enabled_namespace2.id }
-              )
-              expect { execute_worker }.not_to change { Search::Zoekt::Index.count }
-              expect(zkt_enabled_namespace.indices).to be_empty
-              expect(zkt_enabled_namespace2.indices).to be_empty
-            end
-          end
-        end
+      let(:task) { :node_assignment }
 
-        context 'when there is enough space in the node' do
-          context 'when a new record of Search::Zoekt::Index could not be saved' do
-            it 'logs error' do
-              expect(zkt_enabled_namespace.indices).to be_empty
-              expect(zkt_enabled_namespace2.indices).to be_empty
-              expect(Search::Zoekt::Node).to receive(:descending_order_by_free_bytes).and_call_original
-              expect(logger).to receive(:error).with({ 'class' => described_class.to_s, 'task' => task,
-                                                      'message' => "RootStorageStatistics isn't available",
-                                                      'zoekt_enabled_namespace_id' => zkt_enabled_namespace.id }
-              )
-              allow_next_instance_of(Search::Zoekt::Index) do |instance|
-                allow(instance).to receive(:valid?).and_return(false)
-              end
-              expect(logger).to receive(:error).with(hash_including('zoekt_index', 'class' => described_class.to_s,
-                'task' => task, 'message' => 'Could not save Search::Zoekt::Index'))
-              expect { execute_worker }.not_to change { Search::Zoekt::Index.count }
-              expect(zkt_enabled_namespace.indices).to be_empty
-              expect(zkt_enabled_namespace2.indices).to be_empty
-            end
-          end
+      it_behaves_like 'an idempotent worker' do
+        it 'calls the service with the task' do
+          expect(Search::Zoekt::SchedulingService).to receive(:execute).with(task)
 
-          it 'creates a record of Search::Zoekt::Index for the namespace which has statistics' do
-            expect(zkt_enabled_namespace.indices).to be_empty
-            expect(zkt_enabled_namespace2.indices).to be_empty
-            expect(Search::Zoekt::Node).to receive(:descending_order_by_free_bytes).and_call_original
-            expect(logger).to receive(:error).with({ 'class' => described_class.to_s, 'task' => task,
-                                                    'message' => "RootStorageStatistics isn't available",
-                                                    'zoekt_enabled_namespace_id' => zkt_enabled_namespace.id }
-            )
-            expect { execute_worker }.to change { Search::Zoekt::Index.count }.by(1)
-            expect(zkt_enabled_namespace.indices).to be_empty
-            index = zkt_enabled_namespace2.indices.last
-            expect(index).not_to be_nil
-            expect(index.namespace_id).to eq zkt_enabled_namespace2.root_namespace_id
-            expect(index).to be_ready
-          end
+          worker.perform(task)
         end
       end
     end
-- 
GitLab