Skip to content
代码片段 群组 项目
未验证 提交 35596c9c 编辑于 作者: Siddharth Dungarwal's avatar Siddharth Dungarwal 提交者: GitLab
浏览文件

Apply 1 suggestion(s) to 1 file(s)


Co-authored-by: default avatarTerri Chu <tchu@gitlab.com>
上级 06fd45b7
No related branches found
No related tags found
无相关合并请求
...@@ -705,6 +705,8 @@ ...@@ -705,6 +705,8 @@
- 1 - 1
- - search_elastic_default_branch_changed - - search_elastic_default_branch_changed
- 1 - 1
- - search_elastic_delete
- 1
- - search_elastic_group_association_deletion - - search_elastic_group_association_deletion
- 1 - 1
- - search_elastic_trigger_indexing - - search_elastic_trigger_indexing
......
...@@ -25,6 +25,11 @@ def transfer_missing_group_resources(group) ...@@ -25,6 +25,11 @@ def transfer_missing_group_resources(group)
def post_update_hooks(project, old_group) def post_update_hooks(project, old_group)
super super
::Search::Elastic::DeleteWorker.perform_async(
task: :project_transfer,
traversal_id: project.namespace.elastic_namespace_ancestry,
project_id: project.id
)
::Elastic::ProjectTransferWorker.perform_async(project.id, old_namespace.id, new_namespace.id) ::Elastic::ProjectTransferWorker.perform_async(project.id, old_namespace.id, new_namespace.id)
::Search::Zoekt::ProjectTransferWorker.perform_async(project.id, old_namespace.id) ::Search::Zoekt::ProjectTransferWorker.perform_async(project.id, old_namespace.id)
......
# frozen_string_literal: true
module Search
module Elastic
module Delete
class ProjectTransferService
include Gitlab::Loggable
attr_reader :options
def self.execute(options)
new(options).execute
end
def initialize(options)
@options = options.with_indifferent_access
end
def execute
project_id = options[:project_id]
traversal_id = options[:traversal_id]
remove_work_item_documents(project_id, traversal_id)
end
private
def logger
@logger ||= ::Gitlab::Elasticsearch::Logger.build
end
def work_item_index_available?
::Feature.enabled?(:elastic_index_work_items) && # rubocop:disable Gitlab/FeatureFlagWithoutActor -- We do not need an actor here
::Elastic::DataMigrationService.migration_has_finished?(:create_work_items_index)
end
def remove_work_item_documents(project_id, traversal_id)
return unless work_item_index_available?
response = client.delete_by_query(
{
index: ::Search::Elastic::Types::WorkItem.index_name,
conflicts: 'proceed',
timeout: '10m',
body: {
query: {
bool: {
filter: [
{ term: { project_id: project_id } },
{ bool: { must_not: { prefix: { traversal_ids: { value: traversal_id } } } } }
]
}
}
}
}
)
log_payload = build_structured_payload(
project_id: project_id,
traversal_id: traversal_id,
index: ::Search::Elastic::Types::WorkItem.index_name
)
if !response['failure'].nil?
log_payload[:failure] = response['failure']
log_payload[:message] = "Failed to delete data for project transfer"
else
log_payload[:deleted] = response['deleted']
log_payload[:message] = "Sucesfully deleted duplicate data for project transfer"
end
if log_payload[:failure].present?
logger.error(log_payload)
else
logger.info(log_payload)
end
end
def client
@client ||= ::Gitlab::Search::Client.new
end
end
end
end
end
...@@ -2010,6 +2010,15 @@ ...@@ -2010,6 +2010,15 @@
:weight: 1 :weight: 1
:idempotent: true :idempotent: true
:tags: [] :tags: []
- :name: search_elastic_delete
:worker_name: Search::Elastic::DeleteWorker
:feature_category: :global_search
:has_external_dependencies: false
:urgency: :throttled
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: search_elastic_group_association_deletion - :name: search_elastic_group_association_deletion
:worker_name: Search::ElasticGroupAssociationDeletionWorker :worker_name: Search::ElasticGroupAssociationDeletionWorker
:feature_category: :global_search :feature_category: :global_search
......
# frozen_string_literal: true
module Search
module Elastic
class DeleteWorker
include ApplicationWorker
prepend ::Elastic::IndexingControl
prepend ::Geo::SkipSecondary
sidekiq_options retry: 3
data_consistency :delayed
feature_category :global_search
urgency :throttled
idempotent!
TASKS = {
project_transfer: ::Search::Elastic::Delete::ProjectTransferService
}.freeze
def perform(options = {})
return false unless Gitlab::CurrentSettings.elasticsearch_indexing?
options = options.with_indifferent_access
task = options[:task]
return run_all_tasks(options) if task.to_sym == :all
raise ArgumentError, "Unknown task: #{task.inspect}" unless TASKS.key?(task.to_sym)
TASKS[task.to_sym].execute(options)
end
private
def run_all_tasks(options)
TASKS.each_key do |task|
with_context(related_class: self.class) do
self.class.perform_async(options.merge(task: task))
end
end
end
end
end
end
...@@ -54,6 +54,77 @@ def operation ...@@ -54,6 +54,77 @@ def operation
end end
describe 'elasticsearch indexing' do describe 'elasticsearch indexing' do
context 'when we transfer from group_namespace to group_namespace' do
let_it_be(:new_group) { create(:group) }
let_it_be(:project) { create(:project, namespace: group) }
before do
new_group.add_owner(user)
end
it 'call to ::Search::Elastic::DeleteWorker to remove duplicate work items' do
expect(project.namespace).to eq(group)
expect(::Search::Elastic::DeleteWorker).to receive(:perform_async).with({
task: :project_transfer,
project_id: project.id,
traversal_id: new_group.elastic_namespace_ancestry
}).once
subject.execute(new_group)
expect(project.namespace).to eq(new_group)
end
end
context 'when we transfer from group_namespace to user_namespace' do
let_it_be(:project) { create(:project, namespace: group) }
it 'call to ::Search::Elastic::DeleteWorker to remove duplicate work items' do
expect(project.namespace).to eq(group)
expect(::Search::Elastic::DeleteWorker).to receive(:perform_async).with({
task: :project_transfer,
project_id: project.id,
traversal_id: user.namespace.elastic_namespace_ancestry
}).once
subject.execute(user.namespace)
expect(project.namespace).to eq(user.namespace)
end
end
context 'when we transfer from user_namespace to group_namespace' do
it 'call to ::Search::Elastic::DeleteWorker to remove duplicate work items' do
expect(project.namespace).to eq(user.namespace)
expect(::Search::Elastic::DeleteWorker).to receive(:perform_async).with({
task: :project_transfer,
project_id: project.id,
traversal_id: group.elastic_namespace_ancestry
}).once
subject.execute(group)
expect(project.namespace).to eq(group)
end
end
context 'when we transfer from user_namespace to user_namespace' do
let_it_be(:new_user) { create(:user) }
before do
project.add_owner(new_user)
end
it 'call to ::Search::Elastic::DeleteWorker to remove duplicate work items' do
expect(project.namespace).to eq(user.namespace)
expect(::Search::Elastic::DeleteWorker).to receive(:perform_async).with({
task: :project_transfer,
project_id: project.id,
traversal_id: new_user.namespace.elastic_namespace_ancestry
}).once
described_class.new(project, new_user).execute(new_user.namespace)
expect(project.namespace).to eq(new_user.namespace)
end
end
it 'delegates transfer to Elastic::ProjectTransferWorker and ::Search::Zoekt::ProjectTransferWorker' do it 'delegates transfer to Elastic::ProjectTransferWorker and ::Search::Zoekt::ProjectTransferWorker' do
expect(::Elastic::ProjectTransferWorker).to receive(:perform_async).with(project.id, project.namespace.id, group.id).once expect(::Elastic::ProjectTransferWorker).to receive(:perform_async).with(project.id, project.namespace.id, group.id).once
expect(::Search::Zoekt::ProjectTransferWorker).to receive(:perform_async).with(project.id, project.namespace.id).once expect(::Search::Zoekt::ProjectTransferWorker).to receive(:perform_async).with(project.id, project.namespace.id).once
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe ::Search::Elastic::Delete::ProjectTransferService, :elastic_helpers, feature_category: :global_search do
describe '#execute' do
subject(:execute) do
described_class.execute({ task: :project_transfer,
project_id: project.id, traversal_id: 'random-' })
end
let(:work_item_index) { ::Search::Elastic::Types::WorkItem.index_name }
let_it_be(:group) { create(:group) }
let_it_be(:project) { create(:project, group: group) }
let(:work_item) { create(:work_item, project: project) }
before do
set_elasticsearch_migration_to :create_work_items_index, including: true
end
context 'when Elasticsearch is enabled', :elastic_delete_by_query do
before do
stub_ee_application_setting(elasticsearch_indexing: true)
work_item
ensure_elasticsearch_index!
end
context 'when there is a failure in delete' do
let(:client) { instance_double('Elasticsearch::Transport::Client') }
let(:logger) { ::Gitlab::Elasticsearch::Logger.build }
before do
allow(::Gitlab::Search::Client).to receive(:new).and_return(client)
allow(client).to receive(:delete_by_query).and_return({ 'failure' => ['failed'] })
allow(::Gitlab::Elasticsearch::Logger).to receive(:build).and_return(logger)
end
it 'logs the error' do
expect(logger).to receive(:error).with(hash_including(message: "Failed to delete data for project transfer"))
execute
end
end
context 'when work_item index is available' do
it 'deletes work items not belonging to the passed traversal_id' do
# items are present already
expect(items_in_index(work_item_index).count).to eq(1)
expect(items_in_index(work_item_index)).to include(work_item.id)
execute
es_helper.refresh_index(index_name: work_item_index)
# items are deleted
expect(items_in_index(work_item_index).count).to eq(0)
end
end
context 'when migration is not complete' do
before do
set_elasticsearch_migration_to :create_work_items_index, including: false
end
it 'does not remove work items' do
# items are present already
expect(items_in_index(work_item_index)).to include(work_item.id)
expect(items_in_index(work_item_index).count).to eq(1)
execute
es_helper.refresh_index(index_name: work_item_index)
# work items not removed
expect(items_in_index(work_item_index).count).to eq(1)
expect(items_in_index(work_item_index)).to include(work_item.id)
end
end
context 'when elastic_index_work_items is disabled' do
before do
stub_feature_flags(elastic_index_work_items: false)
end
it 'does not remove work items' do
# items are present already
expect(items_in_index(work_item_index)).to include(work_item.id)
expect(items_in_index(work_item_index).count).to eq(1)
execute
es_helper.refresh_index(index_name: work_item_index)
# work items not removed
expect(items_in_index(work_item_index).count).to eq(1)
expect(items_in_index(work_item_index)).to include(work_item.id)
end
end
end
end
end
...@@ -100,4 +100,8 @@ def elastic_delete_group_wiki_worker_random_delay_range ...@@ -100,4 +100,8 @@ def elastic_delete_group_wiki_worker_random_delay_range
def elastic_group_association_deletion_worker_random_delay_range def elastic_group_association_deletion_worker_random_delay_range
a_value_between(0, Search::ElasticGroupAssociationDeletionWorker::MAX_JOBS_PER_HOUR.pred) a_value_between(0, Search::ElasticGroupAssociationDeletionWorker::MAX_JOBS_PER_HOUR.pred)
end end
def items_in_index(index_name)
es_helper.client.search(index: index_name).dig('hits', 'hits').map { |hit| hit['_source']['id'] }
end
end end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Search::Elastic::DeleteWorker, :elastic_helpers, feature_category: :global_search do
describe '#perform' do
subject(:perform) do
described_class.new.perform({ task: :project_transfer })
end
context 'when Elasticsearch is disabled' do
before do
stub_ee_application_setting(elasticsearch_indexing: false)
end
it 'does not do anything' do
expect(perform).to be_falsey
end
end
context 'when Elasticsearch is enabled' do
before do
stub_ee_application_setting(elasticsearch_indexing: true)
end
context 'when we pass :all' do
it 'queues all tasks' do
Search::Elastic::DeleteWorker::TASKS.each_key do |t|
expect(described_class).to receive(:perform_async).with({
task: t
})
end
described_class.new.perform({ task: :all })
end
end
context 'when we pass valid task' do
it 'call the corresponding service' do
expect(::Search::Elastic::Delete::ProjectTransferService).to receive(:execute)
perform
end
end
context 'when we pass invalid task' do
it 'raises ArgumentError' do
expect { described_class.new.perform({ task: :unknown_task }) }.to raise_error(ArgumentError)
end
end
end
end
end
...@@ -439,6 +439,7 @@ ...@@ -439,6 +439,7 @@
'RunPipelineScheduleWorker' => 3, 'RunPipelineScheduleWorker' => 3,
'ScanSecurityReportSecretsWorker' => 17, 'ScanSecurityReportSecretsWorker' => 17,
'Search::ElasticGroupAssociationDeletionWorker' => 3, 'Search::ElasticGroupAssociationDeletionWorker' => 3,
'Search::Elastic::DeleteWorker' => 3,
'Security::StoreScansWorker' => 3, 'Security::StoreScansWorker' => 3,
'Security::TrackSecureScansWorker' => 1, 'Security::TrackSecureScansWorker' => 1,
'ServiceDeskEmailReceiverWorker' => 3, 'ServiceDeskEmailReceiverWorker' => 3,
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册