diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml index ba820a4eaa6971880502c7775b6bf54c56dbb41c..c4f8ae50ced4ba7107df36d9612a60b9edb52206 100644 --- a/config/sidekiq_queues.yml +++ b/config/sidekiq_queues.yml @@ -705,6 +705,8 @@ - 1 - - search_elastic_default_branch_changed - 1 +- - search_elastic_delete + - 1 - - search_elastic_group_association_deletion - 1 - - search_elastic_trigger_indexing diff --git a/ee/app/services/ee/projects/transfer_service.rb b/ee/app/services/ee/projects/transfer_service.rb index 5be57d2ac9c2b2dfb5aad6fb4b1abb1f8c05ceb8..899b7e34befd549da5a48e70d20372177fb17af6 100644 --- a/ee/app/services/ee/projects/transfer_service.rb +++ b/ee/app/services/ee/projects/transfer_service.rb @@ -25,6 +25,11 @@ def transfer_missing_group_resources(group) def post_update_hooks(project, old_group) super + ::Search::Elastic::DeleteWorker.perform_async( + task: :project_transfer, + traversal_id: project.namespace.elastic_namespace_ancestry, + project_id: project.id + ) ::Elastic::ProjectTransferWorker.perform_async(project.id, old_namespace.id, new_namespace.id) ::Search::Zoekt::ProjectTransferWorker.perform_async(project.id, old_namespace.id) diff --git a/ee/app/services/search/elastic/delete/project_transfer_service.rb b/ee/app/services/search/elastic/delete/project_transfer_service.rb new file mode 100644 index 0000000000000000000000000000000000000000..f53a11e13d1ef52577f747b1151411a585db76a6 --- /dev/null +++ b/ee/app/services/search/elastic/delete/project_transfer_service.rb @@ -0,0 +1,84 @@ +# frozen_string_literal: true + +module Search + module Elastic + module Delete + class ProjectTransferService + include Gitlab::Loggable + + attr_reader :options + + def self.execute(options) + new(options).execute + end + + def initialize(options) + @options = options.with_indifferent_access + end + + def execute + project_id = options[:project_id] + traversal_id = options[:traversal_id] + remove_work_item_documents(project_id, traversal_id) + end + + private + + def logger + @logger ||= ::Gitlab::Elasticsearch::Logger.build + end + + def work_item_index_available? + ::Feature.enabled?(:elastic_index_work_items) && # rubocop:disable Gitlab/FeatureFlagWithoutActor -- We do not need an actor here + ::Elastic::DataMigrationService.migration_has_finished?(:create_work_items_index) + end + + def remove_work_item_documents(project_id, traversal_id) + return unless work_item_index_available? + + response = client.delete_by_query( + { + index: ::Search::Elastic::Types::WorkItem.index_name, + conflicts: 'proceed', + timeout: '10m', + body: { + query: { + bool: { + filter: [ + { term: { project_id: project_id } }, + { bool: { must_not: { prefix: { traversal_ids: { value: traversal_id } } } } } + ] + } + } + } + } + ) + + log_payload = build_structured_payload( + project_id: project_id, + traversal_id: traversal_id, + index: ::Search::Elastic::Types::WorkItem.index_name + ) + + if !response['failure'].nil? + log_payload[:failure] = response['failure'] + log_payload[:message] = "Failed to delete data for project transfer" + else + log_payload[:deleted] = response['deleted'] + log_payload[:message] = "Sucesfully deleted duplicate data for project transfer" + end + + if log_payload[:failure].present? + logger.error(log_payload) + else + logger.info(log_payload) + end + end + + def client + @client ||= ::Gitlab::Search::Client.new + end + end + end + end +end diff --git a/ee/app/workers/all_queues.yml b/ee/app/workers/all_queues.yml index cddb498b3df8f121afcab50ca89681e8a7a2af51..f70ef67cf7778a2e2216ea94397e29ce08dbdd7f 100644 --- a/ee/app/workers/all_queues.yml +++ b/ee/app/workers/all_queues.yml @@ -2010,6 +2010,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: search_elastic_delete + :worker_name: Search::Elastic::DeleteWorker + :feature_category: :global_search + :has_external_dependencies: false + :urgency: :throttled + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: search_elastic_group_association_deletion :worker_name: Search::ElasticGroupAssociationDeletionWorker :feature_category: :global_search diff --git a/ee/app/workers/search/elastic/delete_worker.rb b/ee/app/workers/search/elastic/delete_worker.rb new file mode 100644 index 0000000000000000000000000000000000000000..2c396aa1a095f206d978ac9b502d1a8677007735 --- /dev/null +++ b/ee/app/workers/search/elastic/delete_worker.rb @@ -0,0 +1,43 @@ +# frozen_string_literal: true + +module Search + module Elastic + class DeleteWorker + include ApplicationWorker + prepend ::Elastic::IndexingControl + prepend ::Geo::SkipSecondary + + sidekiq_options retry: 3 + data_consistency :delayed + feature_category :global_search + urgency :throttled + idempotent! + + TASKS = { + project_transfer: ::Search::Elastic::Delete::ProjectTransferService + }.freeze + + def perform(options = {}) + return false unless Gitlab::CurrentSettings.elasticsearch_indexing? + + options = options.with_indifferent_access + task = options[:task] + return run_all_tasks(options) if task.to_sym == :all + + raise ArgumentError, "Unknown task: #{task.inspect}" unless TASKS.key?(task.to_sym) + + TASKS[task.to_sym].execute(options) + end + + private + + def run_all_tasks(options) + TASKS.each_key do |task| + with_context(related_class: self.class) do + self.class.perform_async(options.merge(task: task)) + end + end + end + end + end +end diff --git a/ee/spec/services/projects/transfer_service_spec.rb b/ee/spec/services/projects/transfer_service_spec.rb index 991ac7799996ebff49e423bc44f0511c9be112e4..a83182a23a07bdc09373baed7da80764f2881a91 100644 --- a/ee/spec/services/projects/transfer_service_spec.rb +++ b/ee/spec/services/projects/transfer_service_spec.rb @@ -54,6 +54,77 @@ def operation end describe 'elasticsearch indexing' do + context 'when we transfer from group_namespace to group_namespace' do + let_it_be(:new_group) { create(:group) } + let_it_be(:project) { create(:project, namespace: group) } + + before do + new_group.add_owner(user) + end + + it 'call to ::Search::Elastic::DeleteWorker to remove duplicate work items' do + expect(project.namespace).to eq(group) + expect(::Search::Elastic::DeleteWorker).to receive(:perform_async).with({ + task: :project_transfer, + project_id: project.id, + traversal_id: new_group.elastic_namespace_ancestry + }).once + + subject.execute(new_group) + expect(project.namespace).to eq(new_group) + end + end + + context 'when we transfer from group_namespace to user_namespace' do + let_it_be(:project) { create(:project, namespace: group) } + + it 'call to ::Search::Elastic::DeleteWorker to remove duplicate work items' do + expect(project.namespace).to eq(group) + expect(::Search::Elastic::DeleteWorker).to receive(:perform_async).with({ + task: :project_transfer, + project_id: project.id, + traversal_id: user.namespace.elastic_namespace_ancestry + }).once + + subject.execute(user.namespace) + expect(project.namespace).to eq(user.namespace) + end + end + + context 'when we transfer from user_namespace to group_namespace' do + it 'call to ::Search::Elastic::DeleteWorker to remove duplicate work items' do + expect(project.namespace).to eq(user.namespace) + expect(::Search::Elastic::DeleteWorker).to receive(:perform_async).with({ + task: :project_transfer, + project_id: project.id, + traversal_id: group.elastic_namespace_ancestry + }).once + + subject.execute(group) + expect(project.namespace).to eq(group) + end + end + + context 'when we transfer from user_namespace to user_namespace' do + let_it_be(:new_user) { create(:user) } + + before do + project.add_owner(new_user) + end + + it 'call to ::Search::Elastic::DeleteWorker to remove duplicate work items' do + expect(project.namespace).to eq(user.namespace) + expect(::Search::Elastic::DeleteWorker).to receive(:perform_async).with({ + task: :project_transfer, + project_id: project.id, + traversal_id: new_user.namespace.elastic_namespace_ancestry + }).once + + described_class.new(project, new_user).execute(new_user.namespace) + expect(project.namespace).to eq(new_user.namespace) + end + end + it 'delegates transfer to Elastic::ProjectTransferWorker and ::Search::Zoekt::ProjectTransferWorker' do expect(::Elastic::ProjectTransferWorker).to receive(:perform_async).with(project.id, project.namespace.id, group.id).once expect(::Search::Zoekt::ProjectTransferWorker).to receive(:perform_async).with(project.id, project.namespace.id).once diff --git a/ee/spec/services/search/elastic/delete/project_transfer_service_spec.rb b/ee/spec/services/search/elastic/delete/project_transfer_service_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..0e1ffe2ef0c8c13812546983a4c63858845be7fe --- /dev/null +++ b/ee/spec/services/search/elastic/delete/project_transfer_service_spec.rb @@ -0,0 +1,98 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe ::Search::Elastic::Delete::ProjectTransferService, :elastic_helpers, feature_category: :global_search do + describe '#execute' do + subject(:execute) do + described_class.execute({ task: :project_transfer, + project_id: project.id, traversal_id: 'random-' }) + end + + let(:work_item_index) { ::Search::Elastic::Types::WorkItem.index_name } + let_it_be(:group) { create(:group) } + let_it_be(:project) { create(:project, group: group) } + let(:work_item) { create(:work_item, project: project) } + + before do + set_elasticsearch_migration_to :create_work_items_index, including: true + end + + context 'when Elasticsearch is enabled', :elastic_delete_by_query do + before do + stub_ee_application_setting(elasticsearch_indexing: true) + + work_item + ensure_elasticsearch_index! + end + + context 'when there is a failure in delete' do + let(:client) { instance_double('Elasticsearch::Transport::Client') } + let(:logger) { ::Gitlab::Elasticsearch::Logger.build } + + before do + allow(::Gitlab::Search::Client).to receive(:new).and_return(client) + allow(client).to receive(:delete_by_query).and_return({ 'failure' => ['failed'] }) + allow(::Gitlab::Elasticsearch::Logger).to receive(:build).and_return(logger) + end + + it 'logs the error' do + expect(logger).to receive(:error).with(hash_including(message: "Failed to delete data for project transfer")) + execute + end + end + + context 'when work_item index is available' do + it 'deletes work items not belonging to the passed traversal_id' do + # items are present already + expect(items_in_index(work_item_index).count).to eq(1) + expect(items_in_index(work_item_index)).to include(work_item.id) + + execute + es_helper.refresh_index(index_name: work_item_index) + + # items are deleted + expect(items_in_index(work_item_index).count).to eq(0) + end + end + + context 'when migration is not complete' do + before do + set_elasticsearch_migration_to :create_work_items_index, including: false + end + + it 'does not remove work items' do + # items are present already + expect(items_in_index(work_item_index)).to include(work_item.id) + expect(items_in_index(work_item_index).count).to eq(1) + + execute + es_helper.refresh_index(index_name: work_item_index) + + # work items not removed + expect(items_in_index(work_item_index).count).to eq(1) + expect(items_in_index(work_item_index)).to include(work_item.id) + end + end + + context 'when elastic_index_work_items is disabled' do + before do + stub_feature_flags(elastic_index_work_items: false) + end + + it 'does not remove work items' do + # items are present already + expect(items_in_index(work_item_index)).to include(work_item.id) + expect(items_in_index(work_item_index).count).to eq(1) + + execute + es_helper.refresh_index(index_name: work_item_index) + + # work items not removed + expect(items_in_index(work_item_index).count).to eq(1) + expect(items_in_index(work_item_index)).to include(work_item.id) + end + end + end + end +end diff --git a/ee/spec/support/helpers/elasticsearch_helpers.rb b/ee/spec/support/helpers/elasticsearch_helpers.rb index 7ce2f5c01ad04e189b5bdf9533c1f672c36c209a..8245186f5a5411591303679a91865f38e585ca7f 100644 --- a/ee/spec/support/helpers/elasticsearch_helpers.rb +++ b/ee/spec/support/helpers/elasticsearch_helpers.rb @@ -100,4 +100,8 @@ def elastic_delete_group_wiki_worker_random_delay_range def elastic_group_association_deletion_worker_random_delay_range a_value_between(0, Search::ElasticGroupAssociationDeletionWorker::MAX_JOBS_PER_HOUR.pred) end + + def items_in_index(index_name) + es_helper.client.search(index: index_name).dig('hits', 'hits').map { |hit| hit['_source']['id'] } + end end diff --git a/ee/spec/workers/search/elastic/delete_worker_spec.rb b/ee/spec/workers/search/elastic/delete_worker_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..66446ed61280799c8cdaba9fa88b484a6cdcd5e9 --- /dev/null +++ b/ee/spec/workers/search/elastic/delete_worker_spec.rb @@ -0,0 +1,51 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Search::Elastic::DeleteWorker, :elastic_helpers, feature_category: :global_search do + describe '#perform' do + subject(:perform) do + described_class.new.perform({ task: :project_transfer }) + end + + context 'when Elasticsearch is disabled' do + before do + stub_ee_application_setting(elasticsearch_indexing: false) + end + + it 'does not do anything' do + expect(perform).to be_falsey + end + end + + context 'when Elasticsearch is enabled' do + before do + stub_ee_application_setting(elasticsearch_indexing: true) + end + + context 'when we pass :all' do + it 'queues all tasks' do + Search::Elastic::DeleteWorker::TASKS.each_key do |t| + expect(described_class).to receive(:perform_async).with({ + task: t + }) + end + described_class.new.perform({ task: :all }) + end + end + + context 'when we pass valid task' do + it 'call the corresponding service' do + expect(::Search::Elastic::Delete::ProjectTransferService).to receive(:execute) + perform + end + end + + context 'when we pass invalid task' do + it 'raises ArgumentError' do + expect { described_class.new.perform({ task: :unknown_task }) }.to raise_error(ArgumentError) + end + end + end + end +end diff --git a/spec/workers/every_sidekiq_worker_spec.rb b/spec/workers/every_sidekiq_worker_spec.rb index 58c54ab92120f12d9f71c88ef0be39304ad5d77a..80df0e0bc9a468d772bf143cfbed95d3cb96ecd0 100644 --- a/spec/workers/every_sidekiq_worker_spec.rb +++ b/spec/workers/every_sidekiq_worker_spec.rb @@ -439,6 +439,7 @@ 'RunPipelineScheduleWorker' => 3, 'ScanSecurityReportSecretsWorker' => 17, 'Search::ElasticGroupAssociationDeletionWorker' => 3, + 'Search::Elastic::DeleteWorker' => 3, 'Security::StoreScansWorker' => 3, 'Security::TrackSecureScansWorker' => 1, 'ServiceDeskEmailReceiverWorker' => 3,