diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml index 2f71ccd9e19e1a2935f059370bc1119c68fa9810..9d0d3a0d473d508ed9e57a8e0efbf05c836a8418 100644 --- a/config/sidekiq_queues.yml +++ b/config/sidekiq_queues.yml @@ -709,6 +709,8 @@ - 1 - - search_zoekt_namespace_indexer - 1 +- - search_zoekt_namespace_initial_indexing + - 1 - - search_zoekt_project_transfer - 1 - - security_delete_orchestration_configuration diff --git a/ee/app/models/search/zoekt/index.rb b/ee/app/models/search/zoekt/index.rb index 52564a0634c0b0b1e1130915fd722ed062288abc..cd3ccd7c37407f6500165887c3ad1bc4f6670de3 100644 --- a/ee/app/models/search/zoekt/index.rb +++ b/ee/app/models/search/zoekt/index.rb @@ -21,7 +21,8 @@ class Index < ApplicationRecord enum state: { pending: 0, - initializing: 1, + in_progress: 1, + initializing: 2, ready: 10 } @@ -50,6 +51,9 @@ class Index < ApplicationRecord .where_exists(Repository.where(Repository.arel_table[:zoekt_index_id].eq(Index.arel_table[:id]))) end + scope :preload_zoekt_enabled_namespace_and_namespace, -> { includes(zoekt_enabled_namespace: :namespace) } + scope :preload_node, -> { includes(:node) } + private def zoekt_enabled_root_namespace_matches_namespace_id diff --git a/ee/app/services/search/zoekt/indexing_task_service.rb b/ee/app/services/search/zoekt/indexing_task_service.rb index fe504e774cd6ea5480c5e6a5268621e778ab33d3..81c754b84bee7f50585733911ea69522aa53430e 100644 --- a/ee/app/services/search/zoekt/indexing_task_service.rb +++ b/ee/app/services/search/zoekt/indexing_task_service.rb @@ -5,6 +5,10 @@ module Zoekt class IndexingTaskService REINDEXING_CHANCE_PERCENTAGE = 0.5 + def self.execute(...) + new(...).execute + end + def initialize(project, task_type, node_id: nil, root_namespace_id: nil, force: false, delay: nil) @project = project @task_type = task_type.to_sym @@ -28,10 +32,6 @@ def execute end end - def self.execute(...) - new(...).execute - end - private attr_reader :project, :node_id, :root_namespace_id, :force, :task_type, :delay @@ -40,9 +40,7 @@ def preflight_check? return false if Feature.disabled?(:zoekt_create_indexing_tasks, project) # should return true even the project is nil but the task_type is :delete_repo return true if task_type == :delete_repo - return false unless project - return false if project.empty_repo? true end diff --git a/ee/app/services/search/zoekt/scheduling_service.rb b/ee/app/services/search/zoekt/scheduling_service.rb index 675eef37f3a442b6cb107a341b157d7774ae3c3c..2344bd02e97253b78ba344c5cb68a8b70bb416d0 100644 --- a/ee/app/services/search/zoekt/scheduling_service.rb +++ b/ee/app/services/search/zoekt/scheduling_service.rb @@ -11,6 +11,7 @@ class SchedulingService remove_expired_subscriptions node_assignment mark_indices_as_ready + initial_indexing ].freeze BUFFER_FACTOR = ::Gitlab::Saas.feature_available?(:exact_code_search) ? 2 : 3 @@ -25,6 +26,10 @@ class SchedulingService attr_reader :task + def self.execute(task) + new(task).execute + end + def initialize(task) @task = task.to_sym end @@ -36,10 +41,6 @@ def execute send(task) # rubocop:disable GitlabSecurity/PublicSend -- We control the list of tasks in the source code end - def self.execute(task) - new(task).execute - end - private def logger @@ -263,6 +264,38 @@ def mark_indices_as_ready end logger.info(build_structured_payload(task: :mark_indices_as_ready, message: 'Set indices ready', count: count)) end + + def initial_indexing + return false if Feature.disabled?(:zoekt_initial_indexing_task) + + Index.in_progress.preload_zoekt_enabled_namespace_and_namespace.preload_node.find_each do |index| + namespace = index.zoekt_enabled_namespace&.namespace + next unless namespace + + count = namespace.all_project_ids.count + repo_count = index.zoekt_repositories.count + if repo_count >= count + index.initializing! + node = index.node + log_data = build_structured_payload( + meta: { + 'zoekt.node_name' => node.metadata['name'], 'zoekt.node_id' => node.id, 'zoekt.index_id' => index.id + }, + namespace_id: namespace.id, message: 'index moved to initializing', + repo_count: repo_count, project_count: count, task: :initial_indexing + ) + logger.info(log_data) + end + end + + Index.pending.each_batch do |batch, i| + NamespaceInitialIndexingWorker.bulk_perform_in_with_contexts( + i * 5.minutes, batch.preload_zoekt_enabled_namespace_and_namespace, + arguments_proc: ->(zoekt_index) { zoekt_index.id }, + context_proc: ->(zoekt_index) { { namespace: zoekt_index.zoekt_enabled_namespace&.namespace } } + ) + end + end end end end diff --git a/ee/app/workers/all_queues.yml b/ee/app/workers/all_queues.yml index 32b753bb8b7c349bd184dbec37b5e9014da9af9b..f546b8e772eb8a1cd45d55b3d20878ad6927983b 100644 --- a/ee/app/workers/all_queues.yml +++ b/ee/app/workers/all_queues.yml @@ -2037,6 +2037,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: search_zoekt_namespace_initial_indexing + :worker_name: Search::Zoekt::NamespaceInitialIndexingWorker + :feature_category: :global_search + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: search_zoekt_project_transfer :worker_name: Search::Zoekt::ProjectTransferWorker :feature_category: :global_search diff --git a/ee/app/workers/search/zoekt/namespace_initial_indexing_worker.rb b/ee/app/workers/search/zoekt/namespace_initial_indexing_worker.rb new file mode 100644 index 0000000000000000000000000000000000000000..d690563f603f3e7dbad18df9b980e691fc655737 --- /dev/null +++ b/ee/app/workers/search/zoekt/namespace_initial_indexing_worker.rb @@ -0,0 +1,46 @@ +# frozen_string_literal: true + +module Search + module Zoekt + class NamespaceInitialIndexingWorker + include ApplicationWorker + prepend ::Geo::SkipSecondary + + feature_category :global_search + data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency -- always otherwise we risk race condition where it doesn't think that indexing is enabled yet for the namespace. + idempotent! + pause_control :zoekt + urgency :low + + DELAY_INTERVAL = 1.hour.freeze + + def perform(zoekt_index_id, options = {}) + return unless ::License.feature_available?(:zoekt_code_search) + + index = Index.find_by_id(zoekt_index_id) + return unless index + + options = options.with_indifferent_access + namespace = if options[:namespace_id] + Namespace.find_by_id(options[:namespace_id]) + else + index.zoekt_enabled_namespace&.namespace + end + + return unless namespace + + namespace.children.each_batch do |relation| + relation.pluck_primary_key.each do |id| + self.class.perform_in(rand(DELAY_INTERVAL).seconds, zoekt_index_id, namespace_id: id) + end + end + + namespace.projects.each_batch do |relation| + relation.pluck_primary_key.each { |id| Search::Zoekt.index_in(rand(DELAY_INTERVAL), id) } + end + + index.in_progress! if index.pending? + end + end + end +end diff --git a/ee/config/feature_flags/ops/zoekt_initial_indexing_task.yml b/ee/config/feature_flags/ops/zoekt_initial_indexing_task.yml new file mode 100644 index 0000000000000000000000000000000000000000..0db8080a36d252c596ac4da0dff7b411a06fdcaf --- /dev/null +++ b/ee/config/feature_flags/ops/zoekt_initial_indexing_task.yml @@ -0,0 +1,10 @@ +--- +name: zoekt_initial_indexing_task +feature_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/442883 +introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/149365 +rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/463049 +milestone: '17.1' +group: group::global search +type: ops +default_enabled: false + diff --git a/ee/spec/models/search/zoekt/index_spec.rb b/ee/spec/models/search/zoekt/index_spec.rb index 95cb2168df9de10f1ced98b4cc2c4401cba536df..3e62353cff287dad71474cf007b60530766594e8 100644 --- a/ee/spec/models/search/zoekt/index_spec.rb +++ b/ee/spec/models/search/zoekt/index_spec.rb @@ -131,5 +131,21 @@ expect(described_class.searchable).to contain_exactly(zoekt_index_ready) end end + + describe '#preload_zoekt_enabled_namespace_and_namespace' do + it 'preloads the project and avoids N+1 queries' do + index = described_class.preload_zoekt_enabled_namespace_and_namespace.first + recorder = ActiveRecord::QueryRecorder.new { index.zoekt_enabled_namespace.namespace } + expect(recorder.count).to be_zero + end + end + + describe '#preload_node' do + it 'preloads the node and avoids N+1 queries' do + index = described_class.preload_node.first + recorder = ActiveRecord::QueryRecorder.new { index.node } + expect(recorder.count).to be_zero + end + end end end diff --git a/ee/spec/services/search/zoekt/indexing_task_service_spec.rb b/ee/spec/services/search/zoekt/indexing_task_service_spec.rb index 5a54657dd96a0a705902eaeaba6344758dde5a41..4a8979bcc9c625d80de44fbf9f5f395db408b42d 100644 --- a/ee/spec/services/search/zoekt/indexing_task_service_spec.rb +++ b/ee/spec/services/search/zoekt/indexing_task_service_spec.rb @@ -58,14 +58,6 @@ end end - context 'if project has an empty repo' do - let_it_be(:project) { create(:project, :empty_repo) } - - it 'does not creates Search::Zoekt::Task record' do - expect { service.execute }.not_to change { Search::Zoekt::Task.count } - end - end - context 'if project does not exists' do let_it_be(:project) { create(:project, :empty_repo) } diff --git a/ee/spec/services/search/zoekt/scheduling_service_spec.rb b/ee/spec/services/search/zoekt/scheduling_service_spec.rb index 51531206370a918ae0af5f6c080069c7c4fcf66c..1228fbfbc573536a5095e724271102bd9494ce3e 100644 --- a/ee/spec/services/search/zoekt/scheduling_service_spec.rb +++ b/ee/spec/services/search/zoekt/scheduling_service_spec.rb @@ -441,4 +441,69 @@ end end end + + describe '#initial_indexing' do + let(:task) { :initial_indexing } + + context 'when feature flag zoekt_initial_indexing_task is disabled' do + before do + stub_feature_flags(zoekt_initial_indexing_task: false) + end + + it 'returns false' do + expect(execute_task).to eq(false) + end + end + + context 'when there are no zoekt_indices in_progress' do + let_it_be(:index) { create(:zoekt_index, state: :pending) } + + it 'does not moves the index to initializing and calls NamespaceInitialIndexingWorker on the index' do + expect(Search::Zoekt::NamespaceInitialIndexingWorker).to receive(:bulk_perform_in_with_contexts) + .with(anything, [index], hash_including(:arguments_proc, :context_proc)) + expect { execute_task }.not_to change { index.reload.state } + end + end + + context 'when all zoekt_indices are already in progress' do + let_it_be(:idx_in_progress) { create(:zoekt_index, state: :in_progress) } + let_it_be(:namespace) { idx_in_progress.zoekt_enabled_namespace.namespace } + + context 'when there are no pending indices' do + context 'when zoekt_repositories count is less than all the projects within the namespace' do + before do + create(:project, namespace: namespace) + end + + it 'does not moves the index to initializing' do + expect { execute_task }.not_to change { idx_in_progress.reload.state } + end + end + + context 'when zoekt_repositories count is equal to all the projects within the namespace' do + let(:logger) { instance_double(::Search::Zoekt::Logger) } + let_it_be(:project) { create(:project, namespace: namespace) } + + before do + allow(Search::Zoekt::Logger).to receive(:build).and_return(logger) + create(:zoekt_repository, zoekt_index: idx_in_progress, project_id: project.id, + project_identifier: project.id) + end + + it 'moves the index to initializing and do the logging' do + node = idx_in_progress.node + expect(logger).to receive(:info).with({ 'class' => described_class.to_s, 'namespace_id' => namespace.id, + 'message' => 'index moved to initializing', + 'meta' => { 'zoekt.index_id' => idx_in_progress.id, + 'zoekt.node_id' => node.id, + 'zoekt.node_name' => node.metadata['name'] }, + 'repo_count' => idx_in_progress.zoekt_repositories.count, + 'project_count' => namespace.all_projects.count, 'task' => task } + ) + expect { execute_task }.to change { idx_in_progress.reload.state }.from('in_progress').to('initializing') + end + end + end + end + end end diff --git a/ee/spec/workers/search/zoekt/namespace_initial_indexing_worker_spec.rb b/ee/spec/workers/search/zoekt/namespace_initial_indexing_worker_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..7664c1f522cba6876ec61e90f11aeebe5e403bc8 --- /dev/null +++ b/ee/spec/workers/search/zoekt/namespace_initial_indexing_worker_spec.rb @@ -0,0 +1,98 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Search::Zoekt::NamespaceInitialIndexingWorker, feature_category: :global_search do + it_behaves_like 'worker with data consistency', described_class, data_consistency: :always + + describe '#perform' do + let_it_be(:namespace) { create(:group, :with_hierarchy, children: 1, depth: 3) } + let_it_be(:zoekt_enabled_namespace) { create(:zoekt_enabled_namespace, namespace: namespace) } + let_it_be(:zoekt_index) do + create(:zoekt_index, zoekt_enabled_namespace: zoekt_enabled_namespace, namespace_id: namespace.id) + end + + let_it_be(:job_args) { [zoekt_index.id] } + + before do + [namespace, namespace.children.first].each { |n| create(:project, namespace: n) } + end + + subject(:perform_worker) { described_class.new.perform(*job_args) } + + context 'when license zoekt_code_search is not available' do + before do + stub_licensed_features(zoekt_code_search: false) + end + + it_behaves_like 'an idempotent worker' do + it 'does not call the NamespaceInitialIndexingWorker on any child of the namespace' do + expect(described_class).not_to receive(:perform_in) + perform_worker + end + + it 'does not call the Search::Zoekt.index_in and does not change the zoekt_index state to in_progress' do + expect(Search::Zoekt).not_to receive(:index_in) + expect { perform_worker }.not_to change { zoekt_index.reload.state } + end + end + end + + context 'when namespace_id is also passed in the options' do + context 'when invalid namespace_id is passed' do + let(:job_args) { [zoekt_index.id, { namespace_id: non_existing_record_id }] } + + it_behaves_like 'an idempotent worker' do + it 'does not call the NamespaceInitialIndexingWorker on any child of the namespace' do + expect(described_class).not_to receive(:perform_in) + perform_worker + end + + it 'does not call the Search::Zoekt.index_in and does not change the zoekt_index state to in_progress' do + expect(Search::Zoekt).not_to receive(:index_in) + expect { perform_worker }.not_to change { zoekt_index.reload.state } + end + end + end + + it_behaves_like 'an idempotent worker' do + let(:passed_namespace) { namespace.children.first } + let(:job_args) { [zoekt_index.id, { namespace_id: passed_namespace.id }] } + + it 'calls NamespaceInitialIndexingWorker on all the children of the passed namespace' do + passed_namespace.children.each do |child| + expect(described_class).to receive(:perform_in).with(be_between(0, described_class::DELAY_INTERVAL), + zoekt_index.id, { namespace_id: child.id }) + end + perform_worker + end + + it 'calls the Search::Zoekt.index_in and changes the zoekt_index state to in_progress' do + passed_namespace.projects.each do |project| + expect(Search::Zoekt).to receive(:index_in) + .with(be_between(0, described_class::DELAY_INTERVAL), project.id) + end + expect { perform_worker }.to change { zoekt_index.reload.state }.from('pending').to('in_progress') + end + end + end + + it_behaves_like 'an idempotent worker' do + it 'calls the NamespaceInitialIndexingWorker on all the children of the namespace' do + namespace.children.each do |child| + expect(described_class).to receive(:perform_in) + .with(be_between(0, described_class::DELAY_INTERVAL), zoekt_index.id, { namespace_id: child.id }) + end + perform_worker + end + + it 'calls the Search::Zoekt.index_in and changes the zoekt_index state to in_progress' do + namespace.projects.each do |project| + expect(Search::Zoekt).to receive(:index_in) + .with(be_between(0, described_class::DELAY_INTERVAL), project.id) + end + expect { perform_worker }.to change { zoekt_index.reload.state }.from('pending').to('in_progress') + end + end + end +end