diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml index d84d1f2c1fa88cbb363c0d6fbdecc4e274e784b8..d051346bbee967d19efedef703dabc2ae580c907 100644 --- a/config/sidekiq_queues.yml +++ b/config/sidekiq_queues.yml @@ -787,6 +787,8 @@ - 1 - - search_zoekt_namespace_initial_indexing - 1 +- - search_zoekt_node_with_negative_unclaimed_storage_event + - 1 - - search_zoekt_orphaned_index_event - 1 - - search_zoekt_orphaned_repo_event diff --git a/ee/app/events/search/zoekt/node_with_negative_unclaimed_storage_event.rb b/ee/app/events/search/zoekt/node_with_negative_unclaimed_storage_event.rb new file mode 100644 index 0000000000000000000000000000000000000000..fd8632a4435ef132f54ecb6d72bc924effbac564 --- /dev/null +++ b/ee/app/events/search/zoekt/node_with_negative_unclaimed_storage_event.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +module Search + module Zoekt + class NodeWithNegativeUnclaimedStorageEvent < ::Gitlab::EventStore::Event + def schema + { + 'type' => 'object', + 'properties' => { + 'node_ids' => { 'type' => 'array', 'items' => { 'type' => 'integer' } } + }, + 'required' => %w[node_ids] + } + end + end + end +end diff --git a/ee/app/models/search/zoekt/node.rb b/ee/app/models/search/zoekt/node.rb index 1b12056594512561f2b5069e8aac6f817a8b5aca..4b6ce08a35672463ec2fc2173c5a03301de64a24 100644 --- a/ee/app/models/search/zoekt/node.rb +++ b/ee/app/models/search/zoekt/node.rb @@ -4,6 +4,7 @@ module Search module Zoekt class Node < ApplicationRecord self.table_name = 'zoekt_nodes' + include EachBatch DEFAULT_CONCURRENCY_LIMIT = 20 MAX_CONCURRENCY_LIMIT = 200 @@ -16,6 +17,10 @@ class Node < ApplicationRecord TASK_PULL_FREQUENCY_INCREASED = '500ms' DEBOUNCE_DELAY = 5.seconds + UNCLAIMED_STORAGE_BYTES_FORMULA = <<~SQL + (zoekt_nodes.total_bytes - zoekt_nodes.used_bytes + zoekt_nodes.indexed_bytes - COALESCE(sum(zoekt_indices.reserved_storage_bytes), 0)) + SQL + has_many :indices, foreign_key: :zoekt_node_id, inverse_of: :node, class_name: '::Search::Zoekt::Index' has_many :enabled_namespaces, @@ -44,14 +49,17 @@ class Node < ApplicationRecord .merge(Repository.searchable) .where(zoekt_repositories: { project: project }) end - scope :with_unclaimed_storage_bytes, -> do + scope :with_positive_unclaimed_storage_bytes, -> do sql = <<~SQL - zoekt_nodes.*, (zoekt_nodes.total_bytes - zoekt_nodes.used_bytes + zoekt_nodes.indexed_bytes - COALESCE(sum(zoekt_indices.reserved_storage_bytes), 0)) AS unclaimed_storage_bytes + zoekt_nodes.*, #{UNCLAIMED_STORAGE_BYTES_FORMULA} AS unclaimed_storage_bytes SQL - left_joins(:indices).group(:id).select(sql) + left_joins(:indices).group(:id).having("#{UNCLAIMED_STORAGE_BYTES_FORMULA} >= 0").select(sql) end scope :order_by_unclaimed_space, -> do - with_unclaimed_storage_bytes.order('unclaimed_storage_bytes') + with_positive_unclaimed_storage_bytes.order('unclaimed_storage_bytes') + end + scope :negative_unclaimed_storage_bytes, -> do + left_joins(:indices).group(:id).having("#{UNCLAIMED_STORAGE_BYTES_FORMULA} < 0") end def self.find_or_initialize_by_task_request(params) diff --git a/ee/app/services/search/zoekt/scheduling_service.rb b/ee/app/services/search/zoekt/scheduling_service.rb index 6e2155dcef794035ef406066a67e1243abac7f03..8902663bd6298a3626f987f0c152e4e96baf61e9 100644 --- a/ee/app/services/search/zoekt/scheduling_service.rb +++ b/ee/app/services/search/zoekt/scheduling_service.rb @@ -18,6 +18,7 @@ class SchedulingService lost_nodes_check mark_indices_as_ready node_assignment + node_with_negative_unclaimed_storage_bytes_check remove_expired_subscriptions repo_should_be_marked_as_orphaned_check repo_to_delete_check @@ -273,6 +274,16 @@ def node_assignment end # rubocop: enable Metrics/AbcSize + def node_with_negative_unclaimed_storage_bytes_check + execute_every 1.hour, cache_key: :node_with_negative_unclaimed_storage_bytes_check do + Search::Zoekt::Node.negative_unclaimed_storage_bytes.each_batch do |batch| + Gitlab::EventStore.publish( + Search::Zoekt::NodeWithNegativeUnclaimedStorageEvent.new(data: { node_ids: batch.pluck_primary_key }) + ) + end + end + end + # indices that don't have zoekt_repositories are already in `ready` state def mark_indices_as_ready execute_every 10.minutes, cache_key: :mark_indices_as_ready do diff --git a/ee/app/workers/all_queues.yml b/ee/app/workers/all_queues.yml index df82f837924450dae7ffead4da00270a168ce29d..7644a490bb9589ce65004ec785d6911052b52881 100644 --- a/ee/app/workers/all_queues.yml +++ b/ee/app/workers/all_queues.yml @@ -2388,6 +2388,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: search_zoekt_node_with_negative_unclaimed_storage_event + :worker_name: Search::Zoekt::NodeWithNegativeUnclaimedStorageEventWorker + :feature_category: :global_search + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: search_zoekt_orphaned_index_event :worker_name: Search::Zoekt::OrphanedIndexEventWorker :feature_category: :global_search diff --git a/ee/app/workers/search/zoekt/node_with_negative_unclaimed_storage_event_worker.rb b/ee/app/workers/search/zoekt/node_with_negative_unclaimed_storage_event_worker.rb new file mode 100644 index 0000000000000000000000000000000000000000..4b195b44bf473a11da4fd55c65a0861381f9ce32 --- /dev/null +++ b/ee/app/workers/search/zoekt/node_with_negative_unclaimed_storage_event_worker.rb @@ -0,0 +1,55 @@ +# frozen_string_literal: true + +module Search + module Zoekt + class NodeWithNegativeUnclaimedStorageEventWorker + include Gitlab::EventStore::Subscriber + include Search::Worker + prepend ::Geo::SkipSecondary + + deduplicate :until_executed + idempotent! + + BATCH_SIZE = 1_000 + + def handle_event(event) + node_ids = event.data[:node_ids] + + return if node_ids.empty? + + Search::Zoekt::Node.negative_unclaimed_storage_bytes.id_in(node_ids).find_each do |node| + unclaimed_storage_bytes = node.unclaimed_storage_bytes + next unless unclaimed_storage_bytes < 0 + + logger.info(build_structured_payload( + message: 'Processing node with negative unclaimed storage bytes', + 'zoekt.node_id': node.id, + 'zoekt.unclaimed_storage_bytes': unclaimed_storage_bytes)) + + total_reserved_bytes = 0 + index_ids_to_evict = [] + + node.indices.find_each do |index| + total_reserved_bytes += index.reserved_storage_bytes if index.reserved_storage_bytes > 0 + + index_ids_to_evict << index.id + + break if total_reserved_bytes >= unclaimed_storage_bytes.abs + end + + index_ids_to_evict.each_slice(BATCH_SIZE) do |index_ids| + Gitlab::EventStore.publish( + Search::Zoekt::IndexToEvictEvent.new(data: { index_ids: index_ids }) + ) + end + end + end + + private + + def logger + @logger ||= ::Search::Zoekt::Logger.build + end + end + end +end diff --git a/ee/lib/ee/gitlab/event_store.rb b/ee/lib/ee/gitlab/event_store.rb index f6b4acaf10e9f9033b85a6f6002de666e3a70c2c..2304b6ea2159a5ba981d6642284df0e881880313 100644 --- a/ee/lib/ee/gitlab/event_store.rb +++ b/ee/lib/ee/gitlab/event_store.rb @@ -209,6 +209,9 @@ def subscribe_to_zoekt_events(store) store.subscribe ::Search::Zoekt::AdjustIndicesReservedStorageBytesEventWorker, to: ::Search::Zoekt::AdjustIndicesReservedStorageBytesEvent + + store.subscribe ::Search::Zoekt::NodeWithNegativeUnclaimedStorageEventWorker, + to: ::Search::Zoekt::NodeWithNegativeUnclaimedStorageEvent end def subscribe_to_members_added_event(store) diff --git a/ee/spec/lib/ee/gitlab/event_store_spec.rb b/ee/spec/lib/ee/gitlab/event_store_spec.rb index 7ebc71726f7424ab1282cfce1e0229c4347a2423..8c5eb1cd5a8bc34af076ac1dce8834f1e104b295 100644 --- a/ee/spec/lib/ee/gitlab/event_store_spec.rb +++ b/ee/spec/lib/ee/gitlab/event_store_spec.rb @@ -61,6 +61,7 @@ Search::Zoekt::LostNodeEvent, Search::Zoekt::IndexWatermarkChangedEvent, Search::Zoekt::AdjustIndicesReservedStorageBytesEvent, + Search::Zoekt::NodeWithNegativeUnclaimedStorageEvent, Security::PolicyCreatedEvent, Security::PolicyUpdatedEvent, Security::PolicyDeletedEvent, diff --git a/ee/spec/models/search/zoekt/node_spec.rb b/ee/spec/models/search/zoekt/node_spec.rb index 3fed64e74597e64fd1054ebdb2a9d483046e698b..a827d3156e29cf9e8dfbeca68197dc061ca644c6 100644 --- a/ee/spec/models/search/zoekt/node_spec.rb +++ b/ee/spec/models/search/zoekt/node_spec.rb @@ -108,6 +108,93 @@ end end end + + describe '.negative_unclaimed_storage_bytes' do + let_it_be(:negative_node) { create(:zoekt_node, :enough_free_space) } + let_it_be(:_negative_index) do + create(:zoekt_index, reserved_storage_bytes: negative_node.total_bytes * 2, node: negative_node) + end + + let_it_be(:positive_node) { create(:zoekt_node, :enough_free_space) } + let_it_be(:_positive_index) { create(:zoekt_index, node: positive_node) } + + it 'includes only nodes with negative unclaimed storage' do + expect(described_class.negative_unclaimed_storage_bytes).to contain_exactly(node, negative_node) + end + + it 'does not include nodes with positive unclaimed storage' do + expect(described_class.negative_unclaimed_storage_bytes).not_to include(positive_node) + end + end + + describe '.with_positive_unclaimed_storage_bytes' do + let_it_be(:node_with_positive_storage) { create(:zoekt_node, :enough_free_space) } + let_it_be(:node_with_zero_storage) { create(:zoekt_node, total_bytes: 1000, used_bytes: 1000) } + let_it_be(:node_with_negative_storage) { create(:zoekt_node, :enough_free_space) } + + before do + # Scenario with positive unclaimed storage + create(:zoekt_index, + node: node_with_positive_storage, + reserved_storage_bytes: node_with_positive_storage.total_bytes / 2 + ) + + # Scenario with negative unclaimed storage + create(:zoekt_index, + node: node_with_negative_storage, + reserved_storage_bytes: node_with_negative_storage.total_bytes * 2 + ) + end + + it 'returns only nodes with non-negative unclaimed storage bytes' do + positive_nodes = described_class.with_positive_unclaimed_storage_bytes + + expect(positive_nodes).to include(node_with_positive_storage) + expect(positive_nodes).to include(node_with_zero_storage) + expect(positive_nodes).not_to include(node_with_negative_storage) + end + + it 'adds unclaimed_storage_bytes attribute to returned nodes' do + result = described_class.with_positive_unclaimed_storage_bytes.find(node_with_positive_storage.id) + + expect(result).to respond_to(:unclaimed_storage_bytes) + expect(result.unclaimed_storage_bytes).to be >= 0 + end + + it 'calculates unclaimed_storage_bytes correctly' do + result = described_class.with_positive_unclaimed_storage_bytes.find(node_with_positive_storage.id) + + # Manual calculation to verify the scope's calculation + expected_unclaimed_bytes = node_with_positive_storage.total_bytes - + node_with_positive_storage.used_bytes + + node_with_positive_storage.indexed_bytes - + node_with_positive_storage.indices.sum(:reserved_storage_bytes) + + expect(result.unclaimed_storage_bytes).to eq(expected_unclaimed_bytes) + end + + it 'groups results by node id to handle multiple indices' do + # Create multiple indices for the same node + create(:zoekt_index, + node: node_with_positive_storage, + reserved_storage_bytes: node_with_positive_storage.total_bytes / 4 + ) + + results = described_class.with_positive_unclaimed_storage_bytes + + expect(results).to include(node_with_positive_storage) + end + + context 'when no indices exist' do + let_it_be(:node_without_indices) { create(:zoekt_node, :enough_free_space) } + + it 'includes nodes without indices if they have positive unclaimed storage' do + results = described_class.with_positive_unclaimed_storage_bytes + + expect(results).to include(node_without_indices) + end + end + end end describe 'validations' do diff --git a/ee/spec/services/search/zoekt/scheduling_service_spec.rb b/ee/spec/services/search/zoekt/scheduling_service_spec.rb index fb0b07784a748dbeaa542b4e6a4508be6f22fce0..12341dfc0c0b5bc0dc6a1b1b405faf2eecb34d80 100644 --- a/ee/spec/services/search/zoekt/scheduling_service_spec.rb +++ b/ee/spec/services/search/zoekt/scheduling_service_spec.rb @@ -825,4 +825,22 @@ expect { execute_task }.to publish_event(Search::Zoekt::AdjustIndicesReservedStorageBytesEvent).with(expected) end end + + describe '#node_with_negative_unclaimed_storage_bytes_check' do + let(:task) { :node_with_negative_unclaimed_storage_bytes_check } + let_it_be(:negative_node) { create(:zoekt_node, :enough_free_space) } + let_it_be(:_negative_index) do + create(:zoekt_index, reserved_storage_bytes: negative_node.total_bytes * 2, node: negative_node) + end + + let_it_be(:positive_node) { create(:zoekt_node, :enough_free_space) } + let_it_be(:_positive_index) { create(:zoekt_index, node: positive_node) } + + it 'publishes a NodeWithNegativeUnclaimedStorageEvent for required nodes' do + expected = { + node_ids: [negative_node].map(&:id) + } + expect { execute_task }.to publish_event(Search::Zoekt::NodeWithNegativeUnclaimedStorageEvent).with(expected) + end + end end diff --git a/ee/spec/workers/search/zoekt/node_with_negative_unclaimed_storage_event_worker_spec.rb b/ee/spec/workers/search/zoekt/node_with_negative_unclaimed_storage_event_worker_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..75062d82218c8ff753efa413bb03a3751c96a4b4 --- /dev/null +++ b/ee/spec/workers/search/zoekt/node_with_negative_unclaimed_storage_event_worker_spec.rb @@ -0,0 +1,33 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Search::Zoekt::NodeWithNegativeUnclaimedStorageEventWorker, :zoekt_settings_enabled, + feature_category: :global_search do + let(:event) { Search::Zoekt::NodeWithNegativeUnclaimedStorageEvent.new(data: data) } + + let_it_be(:node) { create(:zoekt_node, :enough_free_space) } + let_it_be(:index) { create(:zoekt_index, node: node) } + let_it_be(:negative_node) { create(:zoekt_node, :enough_free_space) } + let_it_be(:negative_index) do + create(:zoekt_index, reserved_storage_bytes: negative_node.total_bytes * 2, node: negative_node) + end + + let(:data) do + { node_ids: [node.id, negative_node.id] } + end + + it_behaves_like 'subscribes to event' + + it 'has the `until_executed` deduplicate strategy' do + expect(described_class.get_deduplicate_strategy).to eq(:until_executed) + end + + it_behaves_like 'an idempotent worker' do + it 'processes nodes with negative unclaimed storage bytes' do + expect do + consume_event(subscriber: described_class, event: event) + end.to publish_event(Search::Zoekt::IndexToEvictEvent).with({ index_ids: [negative_index.id] }) + end + end +end