diff --git a/config/initializers/1_settings.rb b/config/initializers/1_settings.rb index a17b75922977c50ebd2b1f4786fddcad5ae83a26..c38415601ec6c2a3a021bb9652d51357d7c299ff 100644 --- a/config/initializers/1_settings.rb +++ b/config/initializers/1_settings.rb @@ -807,7 +807,7 @@ Settings.cron_jobs['elastic_migration_worker']['cron'] ||= '*/5 * * * *' Settings.cron_jobs['elastic_migration_worker']['job_class'] ||= 'Elastic::MigrationWorker' Settings.cron_jobs['search_zoekt_scheduling_worker'] ||= {} - Settings.cron_jobs['search_zoekt_scheduling_worker']['cron'] ||= '*/10 * * * *' + Settings.cron_jobs['search_zoekt_scheduling_worker']['cron'] ||= '*/1 * * * *' Settings.cron_jobs['search_zoekt_scheduling_worker']['job_class'] ||= 'Search::Zoekt::SchedulingWorker' Settings.cron_jobs['search_index_curation_worker'] ||= {} Settings.cron_jobs['search_index_curation_worker']['cron'] ||= '*/1 * * * *' diff --git a/ee/app/models/search/zoekt/replica.rb b/ee/app/models/search/zoekt/replica.rb index aea76b6ac4269fb7e5a30fc39fe87d7283f2df57..f7f8e9ea07f7ac80ae35eef0f7d6b845329aca83 100644 --- a/ee/app/models/search/zoekt/replica.rb +++ b/ee/app/models/search/zoekt/replica.rb @@ -18,6 +18,16 @@ class Replica < ApplicationRecord validate :project_can_not_assigned_to_same_replica_unless_index_is_reallocating + scope :with_all_ready_indices, -> do + raw_sql = 'sum(case when zoekt_indices.state != :state then 0 else 1 end) = count(*)' + joins(:indices).group(:id).having(raw_sql, state: Search::Zoekt::Index.states[:ready]) + end + + scope :with_non_ready_indices, -> do + non_ready_index_states = Search::Zoekt::Index.states.values - [Search::Zoekt::Index.states[:ready]] + where(id: Search::Zoekt::Index.select(:zoekt_replica_id).where(state: non_ready_index_states).distinct) + end + scope :for_namespace, ->(id) { where(namespace_id: id) } def self.for_enabled_namespace!(zoekt_enabled_namespace) diff --git a/ee/app/services/search/zoekt/replica_state_service.rb b/ee/app/services/search/zoekt/replica_state_service.rb new file mode 100644 index 0000000000000000000000000000000000000000..c62f8ff10db5adda2bc9feeaf533d39699ce57be --- /dev/null +++ b/ee/app/services/search/zoekt/replica_state_service.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +module Search + module Zoekt + class ReplicaStateService + def self.execute + new.execute + end + + def execute + pending_replicas_with_all_ready_indices.update_all(state: :ready) if Replica.pending.exists? + + return unless Replica.ready.exists? + + ready_replicas_with_non_ready_indices.update_all(state: :pending) + end + + private + + def pending_replicas_with_all_ready_indices + Replica.pending.with_all_ready_indices + end + + def ready_replicas_with_non_ready_indices + Replica.ready.with_non_ready_indices + end + end + end +end diff --git a/ee/app/services/search/zoekt/scheduling_service.rb b/ee/app/services/search/zoekt/scheduling_service.rb index 2f0eb171b828061d71767fa98ed5d5371ac8b3be..915272990afd56a9452e2a17267e494c5b9ac4bb 100644 --- a/ee/app/services/search/zoekt/scheduling_service.rb +++ b/ee/app/services/search/zoekt/scheduling_service.rb @@ -13,6 +13,7 @@ class SchedulingService mark_indices_as_ready initial_indexing auto_index_self_managed + update_replica_states ].freeze BUFFER_FACTOR = 3 @@ -67,43 +68,45 @@ def reallocation return false unless ::Gitlab::Saas.feature_available?(:exact_code_search) return false if Feature.disabled?(:zoekt_reallocation_task) - nodes = ::Search::Zoekt::Node.online.find_each.to_a - over_watermark_nodes = nodes.select { |n| (n.used_bytes / n.total_bytes.to_f) >= WATERMARK_LIMIT_HIGH } + execute_every 10.minutes, cache_key: :reallocation do + nodes = ::Search::Zoekt::Node.online.find_each.to_a + over_watermark_nodes = nodes.select { |n| (n.used_bytes / n.total_bytes.to_f) >= WATERMARK_LIMIT_HIGH } - return if over_watermark_nodes.empty? + break if over_watermark_nodes.empty? - info(:reallocation, message: 'Detected nodes over watermark', - watermark_limit_high: WATERMARK_LIMIT_HIGH, - count: over_watermark_nodes.count) + info(:reallocation, message: 'Detected nodes over watermark', + watermark_limit_high: WATERMARK_LIMIT_HIGH, + count: over_watermark_nodes.count) - over_watermark_nodes.each do |node| - sizes = {} + over_watermark_nodes.each do |node| + sizes = {} - node.indices.each_batch do |batch| - scope = Namespace.includes(:root_storage_statistics) # rubocop:disable CodeReuse/ActiveRecord -- this is a temporary incident mitigation task - .by_parent(nil) - .id_in(batch.select(:namespace_id)) + node.indices.each_batch do |batch| + scope = Namespace.includes(:root_storage_statistics) # rubocop:disable CodeReuse/ActiveRecord -- this is a temporary incident mitigation task + .by_parent(nil) + .id_in(batch.select(:namespace_id)) - scope.each do |group| - sizes[group.id] = group.root_storage_statistics&.repository_size || 0 + scope.each do |group| + sizes[group.id] = group.root_storage_statistics&.repository_size || 0 + end end - end - sorted = sizes.to_a.sort_by { |_k, v| v } + sorted = sizes.to_a.sort_by { |_k, v| v } - namespaces_to_move = [] - total_repository_size = 0 - node_original_used_bytes = node.used_bytes - sorted.each do |namespace_id, repository_size| - node.used_bytes -= repository_size + namespaces_to_move = [] + total_repository_size = 0 + node_original_used_bytes = node.used_bytes + sorted.each do |namespace_id, repository_size| + node.used_bytes -= repository_size - break if (node.used_bytes / node.total_bytes.to_f) < WATERMARK_LIMIT_LOW + break if (node.used_bytes / node.total_bytes.to_f) < WATERMARK_LIMIT_LOW - namespaces_to_move << namespace_id - total_repository_size += repository_size - end + namespaces_to_move << namespace_id + total_repository_size += repository_size + end - unassign_namespaces_from_node(node, namespaces_to_move, node_original_used_bytes, total_repository_size) + unassign_namespaces_from_node(node, namespaces_to_move, node_original_used_bytes, total_repository_size) + end end end @@ -188,7 +191,9 @@ def dot_com_rollout def remove_expired_subscriptions return false unless ::Gitlab::Saas.feature_available?(:exact_code_search) - Search::Zoekt::EnabledNamespace.destroy_namespaces_with_expired_subscriptions! + execute_every 10.minutes, cache_key: :remove_expired_subscriptions do + Search::Zoekt::EnabledNamespace.destroy_namespaces_with_expired_subscriptions! + end end def node_assignment @@ -249,51 +254,56 @@ def node_assignment end def mark_indices_as_ready - initializing_indices = Search::Zoekt::Index.initializing - if initializing_indices.empty? - logger.info(build_structured_payload(task: :mark_indices_as_ready, message: 'Set indices ready', count: 0)) - return - end + execute_every 10.minutes, cache_key: :mark_indices_as_ready do + initializing_indices = Search::Zoekt::Index.initializing + if initializing_indices.empty? + logger.info(build_structured_payload(task: :mark_indices_as_ready, message: 'Set indices ready', count: 0)) + break + end - count = 0 - initializing_indices.each_batch do |batch| - records = batch.with_all_repositories_ready - next if records.empty? + count = 0 + initializing_indices.each_batch do |batch| + records = batch.with_all_repositories_ready + next if records.empty? - count += records.update_all(state: :ready) + count += records.update_all(state: :ready) + end + logger.info(build_structured_payload(task: :mark_indices_as_ready, message: 'Set indices ready', + count: count)) end - logger.info(build_structured_payload(task: :mark_indices_as_ready, message: 'Set indices ready', count: count)) end def initial_indexing return false if Feature.disabled?(:zoekt_initial_indexing_task) - Index.in_progress.preload_zoekt_enabled_namespace_and_namespace.preload_node.find_each do |index| - namespace = index.zoekt_enabled_namespace&.namespace - next unless namespace - - count = namespace.all_project_ids.count - repo_count = index.zoekt_repositories.count - if repo_count >= count - index.initializing! - node = index.node - log_data = build_structured_payload( - meta: { - 'zoekt.node_name' => node.metadata['name'], 'zoekt.node_id' => node.id, 'zoekt.index_id' => index.id - }, - namespace_id: namespace.id, message: 'index moved to initializing', - repo_count: repo_count, project_count: count, task: :initial_indexing - ) - logger.info(log_data) + execute_every 10.minutes, cache_key: :initial_indexing do + Index.in_progress.preload_zoekt_enabled_namespace_and_namespace.preload_node.find_each do |index| + namespace = index.zoekt_enabled_namespace&.namespace + next unless namespace + + count = namespace.all_project_ids.count + repo_count = index.zoekt_repositories.count + if repo_count >= count + index.initializing! + node = index.node + log_data = build_structured_payload( + meta: { + 'zoekt.node_name' => node.metadata['name'], 'zoekt.node_id' => node.id, 'zoekt.index_id' => index.id + }, + namespace_id: namespace.id, message: 'index moved to initializing', + repo_count: repo_count, project_count: count, task: :initial_indexing + ) + logger.info(log_data) + end end - end - Index.pending.each_batch do |batch, i| - NamespaceInitialIndexingWorker.bulk_perform_in_with_contexts( - i * 5.minutes, batch.preload_zoekt_enabled_namespace_and_namespace, - arguments_proc: ->(zoekt_index) { zoekt_index.id }, - context_proc: ->(zoekt_index) { { namespace: zoekt_index.zoekt_enabled_namespace&.namespace } } - ) + Index.pending.each_batch do |batch, i| + NamespaceInitialIndexingWorker.bulk_perform_in_with_contexts( + i * 5.minutes, batch.preload_zoekt_enabled_namespace_and_namespace, + arguments_proc: ->(zoekt_index) { zoekt_index.id }, + context_proc: ->(zoekt_index) { { namespace: zoekt_index.zoekt_enabled_namespace&.namespace } } + ) + end end end @@ -302,9 +312,19 @@ def auto_index_self_managed return if Gitlab::Saas.feature_available?(:exact_code_search) return unless Gitlab::CurrentSettings.zoekt_auto_index_root_namespace? - Namespace.group_namespaces.root_namespaces_without_zoekt_enabled_namespace.each_batch do |batch| - data = batch.pluck_primary_key.map { |id| { root_namespace_id: id } } - Search::Zoekt::EnabledNamespace.insert_all(data) + execute_every 10.minutes, cache_key: :auto_index_self_managed do + Namespace.group_namespaces.root_namespaces_without_zoekt_enabled_namespace.each_batch do |batch| + data = batch.pluck_primary_key.map { |id| { root_namespace_id: id } } + Search::Zoekt::EnabledNamespace.insert_all(data) + end + end + end + + def update_replica_states + return false if Feature.disabled?(:zoekt_replica_state_updates) + + execute_every 2.minutes, cache_key: :update_replica_states do + ReplicaStateService.execute end end end diff --git a/ee/config/feature_flags/ops/zoekt_replica_state_updates.yml b/ee/config/feature_flags/ops/zoekt_replica_state_updates.yml new file mode 100644 index 0000000000000000000000000000000000000000..56f3152e237ed18698dccf3dccb9956767177a7c --- /dev/null +++ b/ee/config/feature_flags/ops/zoekt_replica_state_updates.yml @@ -0,0 +1,9 @@ +--- +name: zoekt_replica_state_updates +feature_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/467406 +introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/158100 +rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/470364 +milestone: '17.2' +group: group::global search +type: ops +default_enabled: false diff --git a/ee/spec/models/search/zoekt/replica_spec.rb b/ee/spec/models/search/zoekt/replica_spec.rb index 8fabd5dfdd43333b15226d1b68fdb8b61f4398ca..bd7f5469ea1a3b3674cb58be85542474ad001553 100644 --- a/ee/spec/models/search/zoekt/replica_spec.rb +++ b/ee/spec/models/search/zoekt/replica_spec.rb @@ -128,6 +128,42 @@ end describe 'scopes' do + let_it_be_with_reload(:replica_1_idx_1) do + create(:zoekt_index, replica: zoekt_replica, zoekt_enabled_namespace: zoekt_replica.zoekt_enabled_namespace) + end + + let_it_be_with_reload(:replica_1_idx_2) do + create(:zoekt_index, replica: zoekt_replica, zoekt_enabled_namespace: zoekt_replica.zoekt_enabled_namespace) + end + + describe '.with_all_ready_indices' do + subject(:scope) { described_class.with_all_ready_indices } + + it 'returns replicas where all their indices are marked as ready' do + replica_1_idx_1.ready! + replica_1_idx_2.pending! + + expect(scope).to be_empty + + replica_1_idx_2.ready! + expect(scope).to match_array(zoekt_replica) + end + end + + describe '.with_non_ready_indices' do + subject(:scope) { described_class.with_non_ready_indices } + + it 'returns replicas that have at least one index that is not ready' do + replica_1_idx_1.ready! + replica_1_idx_2.ready! + + expect(scope).to be_empty + + replica_1_idx_2.pending! + expect(scope).to match_array(zoekt_replica) + end + end + describe '.for_namespace' do before do create(:zoekt_replica) diff --git a/ee/spec/services/search/zoekt/replica_state_service_spec.rb b/ee/spec/services/search/zoekt/replica_state_service_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..e04391264dbfc2dafbf4c119bc0bfeb3e29684cf --- /dev/null +++ b/ee/spec/services/search/zoekt/replica_state_service_spec.rb @@ -0,0 +1,55 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Search::Zoekt::ReplicaStateService, feature_category: :global_search do + subject(:service) { described_class.new } + + let_it_be_with_reload(:replica) { create(:zoekt_replica) } + let_it_be(:enabled_namespace) { replica.zoekt_enabled_namespace } + let_it_be(:node_1) { create(:zoekt_node) } + let_it_be(:node_2) { create(:zoekt_node) } + let_it_be_with_reload(:idx_1) do + create(:zoekt_index, zoekt_enabled_namespace: enabled_namespace, node: node_1, replica: replica) + end + + let_it_be_with_reload(:idx_2) do + create(:zoekt_index, zoekt_enabled_namespace: enabled_namespace, node: node_2, replica: replica) + end + + describe '.execute' do + let(:replica_state_service) { instance_double(::Search::Zoekt::ReplicaStateService) } + + it 'delegates to a new instance' do + expect(described_class).to receive(:new).and_return(replica_state_service) + expect(replica_state_service).to receive(:execute) + + described_class.execute + end + end + + describe '#execute' do + context 'when all the indices for a replica are marked as ready' do + before do + replica.indices.update_all(state: :ready) + end + + it 'marks the replica as ready' do + replica.pending! + expect { service.execute }.to change { replica.reload.state }.from('pending').to('ready') + end + end + + context 'when one of the indices for the replica is not ready' do + before do + idx_1.ready! + idx_2.pending! + end + + it 'marks the replica as pending' do + replica.ready! + expect { service.execute }.to change { replica.reload.state }.from('ready').to('pending') + end + end + end +end diff --git a/ee/spec/services/search/zoekt/scheduling_service_spec.rb b/ee/spec/services/search/zoekt/scheduling_service_spec.rb index 44c1434b19811a9a189d405f1a040709d564cd3b..ca5ecf0acc546b39311b042b0c717c3b677b32e5 100644 --- a/ee/spec/services/search/zoekt/scheduling_service_spec.rb +++ b/ee/spec/services/search/zoekt/scheduling_service_spec.rb @@ -524,4 +524,24 @@ expect(top_group.reload.zoekt_enabled_namespace).not_to be_nil end end + + describe '#update_replica_states' do + let(:task) { :update_replica_states } + + it 'calls ReplicaStateService.execute' do + expect(::Search::Zoekt::ReplicaStateService).to receive(:execute) + execute_task + end + + context 'when zoekt replica state updates FF is disabled' do + before do + stub_feature_flags(zoekt_replica_state_updates: false) + end + + it 'returns false and does not do anything' do + expect(::Search::Zoekt::ReplicaStateService).not_to receive(:execute) + expect(execute_task).to eq(false) + end + end + end end