Skip to content
代码片段 群组 项目
提交 7b5c8b0d 编辑于 作者: Carla Drago's avatar Carla Drago 提交者: George Koltsov
浏览文件

Ensure after_run ops execute once only

This change ensures any operations designed to execute
after a bulk_import pipeline has run will only be executed
once, whether the pipeline is batched or not batched. It
corrects the error where after_run operations were run
after each batch completion, rather than after all batches
were completed. The method/pipeline step has been named
'on_finish' for clarity.

Changelog: fixed
上级 f66ca4ce
No related branches found
No related tags found
无相关合并请求
显示
182 个添加30 个删除
...@@ -3567,6 +3567,15 @@ ...@@ -3567,6 +3567,15 @@
:weight: 1 :weight: 1
:idempotent: false :idempotent: false
:tags: [] :tags: []
- :name: projects_import_export_after_import_merge_requests
:worker_name: Projects::ImportExport::AfterImportMergeRequestsWorker
:feature_category: :importers
:has_external_dependencies: false
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: projects_import_export_create_relation_exports - :name: projects_import_export_create_relation_exports
:worker_name: Projects::ImportExport::CreateRelationExportsWorker :worker_name: Projects::ImportExport::CreateRelationExportsWorker
:feature_category: :importers :feature_category: :importers
......
...@@ -16,6 +16,7 @@ class FinishBatchedPipelineWorker ...@@ -16,6 +16,7 @@ class FinishBatchedPipelineWorker
def perform(pipeline_tracker_id) def perform(pipeline_tracker_id)
@tracker = Tracker.find(pipeline_tracker_id) @tracker = Tracker.find(pipeline_tracker_id)
@context = ::BulkImports::Pipeline::Context.new(tracker)
return unless tracker.batched? return unless tracker.batched?
return unless tracker.started? return unless tracker.started?
...@@ -26,6 +27,7 @@ def perform(pipeline_tracker_id) ...@@ -26,6 +27,7 @@ def perform(pipeline_tracker_id)
tracker.batches.map(&:fail_op!) tracker.batches.map(&:fail_op!)
tracker.fail_op! tracker.fail_op!
else else
tracker.pipeline_class.new(@context).on_finish
logger.info(log_attributes(message: 'Tracker finished')) logger.info(log_attributes(message: 'Tracker finished'))
tracker.finish! tracker.finish!
end end
......
# frozen_string_literal: true
module Projects
module ImportExport
class AfterImportMergeRequestsWorker
include ApplicationWorker
idempotent!
data_consistency :delayed
urgency :low
feature_category :importers
def perform(project_id)
project = Project.find_by_id(project_id)
return unless project
project.merge_requests.set_latest_merge_request_diff_ids!
end
end
end
end
...@@ -563,6 +563,8 @@ ...@@ -563,6 +563,8 @@
- 1 - 1
- - projects_git_garbage_collect - - projects_git_garbage_collect
- 1 - 1
- - projects_import_export_after_import_merge_requests
- 1
- - projects_import_export_create_relation_exports - - projects_import_export_create_relation_exports
- 1 - 1
- - projects_import_export_parallel_project_export - - projects_import_export_parallel_project_export
......
...@@ -40,6 +40,14 @@ def run ...@@ -40,6 +40,14 @@ def run
run_pipeline_step(:after_run) do run_pipeline_step(:after_run) do
after_run(extracted_data) after_run(extracted_data)
end end
# For batches, `#on_finish` is called once within `FinishBatchedPipelineWorker`
# after all batches have completed.
unless tracker.batched?
run_pipeline_step(:on_finish) do
on_finish
end
end
end end
info(message: 'Pipeline finished') info(message: 'Pipeline finished')
...@@ -47,6 +55,8 @@ def run ...@@ -47,6 +55,8 @@ def run
skip!('Skipping pipeline due to failed entity') skip!('Skipping pipeline due to failed entity')
end end
def on_finish; end
private # rubocop:disable Lint/UselessAccessModifier private # rubocop:disable Lint/UselessAccessModifier
def run_pipeline_step(step, class_name = nil, entry = nil) def run_pipeline_step(step, class_name = nil, entry = nil)
......
...@@ -10,8 +10,8 @@ class MergeRequestsPipeline ...@@ -10,8 +10,8 @@ class MergeRequestsPipeline
extractor ::BulkImports::Common::Extractors::NdjsonExtractor, relation: relation extractor ::BulkImports::Common::Extractors::NdjsonExtractor, relation: relation
def after_run(_) def on_finish
context.portable.merge_requests.set_latest_merge_request_diff_ids! ::Projects::ImportExport::AfterImportMergeRequestsWorker.perform_async(context.portable.id)
end end
end end
end end
......
...@@ -10,9 +10,7 @@ class ReleasesPipeline ...@@ -10,9 +10,7 @@ class ReleasesPipeline
extractor ::BulkImports::Common::Extractors::NdjsonExtractor, relation: relation extractor ::BulkImports::Common::Extractors::NdjsonExtractor, relation: relation
def after_run(_context) def on_finish
super
portable.releases.find_each do |release| portable.releases.find_each do |release|
create_release_evidence(release) create_release_evidence(release)
end end
......
...@@ -194,6 +194,8 @@ def load(context, data); end ...@@ -194,6 +194,8 @@ def load(context, data); end
.with(context, extracted_data.data.first) .with(context, extracted_data.data.first)
end end
expect(subject).to receive(:on_finish)
expect_next_instance_of(Gitlab::Import::Logger) do |logger| expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger).to receive(:info) expect(logger).to receive(:info)
.with( .with(
...@@ -230,6 +232,14 @@ def load(context, data); end ...@@ -230,6 +232,14 @@ def load(context, data); end
step_class: 'BulkImports::Loader' step_class: 'BulkImports::Loader'
) )
) )
expect(logger).to receive(:info)
.with(
log_params(
context,
pipeline_class: 'BulkImports::MyPipeline',
pipeline_step: :on_finish
)
)
expect(logger).to receive(:info) expect(logger).to receive(:info)
.with( .with(
log_params( log_params(
...@@ -251,6 +261,28 @@ def load(context, data); end ...@@ -251,6 +261,28 @@ def load(context, data); end
subject.run subject.run
end end
context 'when the pipeline is batched' do
let(:tracker) { create(:bulk_import_tracker, :batched, entity: entity) }
before do
allow_next_instance_of(BulkImports::Extractor) do |extractor|
allow(extractor).to receive(:extract).and_return(extracted_data)
end
end
it 'calls after_run' do
expect(subject).to receive(:after_run)
subject.run
end
it 'does not call on_finish' do
expect(subject).not_to receive(:on_finish)
subject.run
end
end
context 'when extracted data has multiple pages' do context 'when extracted data has multiple pages' do
it 'updates tracker information and runs pipeline again' do it 'updates tracker information and runs pipeline again' do
first_page = extracted_data(has_next_page: true) first_page = extracted_data(has_next_page: true)
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe BulkImports::Projects::Pipelines::MergeRequestsPipeline do RSpec.describe BulkImports::Projects::Pipelines::MergeRequestsPipeline, feature_category: :importers do
let_it_be(:user) { create(:user) } let_it_be(:user) { create(:user) }
let_it_be(:another_user) { create(:user) } let_it_be(:another_user) { create(:user) }
let_it_be(:group) { create(:group) } let_it_be(:group) { create(:group) }
...@@ -43,6 +43,7 @@ ...@@ -43,6 +43,7 @@
'base_commit_sha' => 'ae73cb07c9eeaf35924a10f713b364d32b2dd34f', 'base_commit_sha' => 'ae73cb07c9eeaf35924a10f713b364d32b2dd34f',
'head_commit_sha' => 'a97f74ddaa848b707bea65441c903ae4bf5d844d', 'head_commit_sha' => 'a97f74ddaa848b707bea65441c903ae4bf5d844d',
'start_commit_sha' => '9eea46b5c72ead701c22f516474b95049c9d9462', 'start_commit_sha' => '9eea46b5c72ead701c22f516474b95049c9d9462',
'diff_type' => 1,
'merge_request_diff_commits' => [ 'merge_request_diff_commits' => [
{ {
'sha' => 'COMMIT1', 'sha' => 'COMMIT1',
...@@ -99,6 +100,8 @@ ...@@ -99,6 +100,8 @@
allow(project.repository).to receive(:branch_exists?).and_return(false) allow(project.repository).to receive(:branch_exists?).and_return(false)
allow(project.repository).to receive(:create_branch) allow(project.repository).to receive(:create_branch)
allow(::Projects::ImportExport::AfterImportMergeRequestsWorker).to receive(:perform_async)
pipeline.run pipeline.run
end end
...@@ -244,8 +247,10 @@ ...@@ -244,8 +247,10 @@
expect(imported_mr.merge_request_diff).to be_present expect(imported_mr.merge_request_diff).to be_present
end end
it 'has the correct data for merge request latest_merge_request_diff' do it 'enqueues AfterImportMergeRequestsWorker worker' do
expect(imported_mr.latest_merge_request_diff_id).to eq(imported_mr.merge_request_diffs.maximum(:id)) expect(::Projects::ImportExport::AfterImportMergeRequestsWorker)
.to have_received(:perform_async)
.with(project.id)
end end
it 'imports diff files' do it 'imports diff files' do
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe BulkImports::Projects::Pipelines::ReleasesPipeline do RSpec.describe BulkImports::Projects::Pipelines::ReleasesPipeline, feature_category: :importers do
let_it_be(:user) { create(:user) } let_it_be(:user) { create(:user) }
let_it_be(:group) { create(:group) } let_it_be(:group) { create(:group) }
let_it_be(:project) { create(:project, group: group) } let_it_be(:project) { create(:project, group: group) }
......
...@@ -5,16 +5,48 @@ ...@@ -5,16 +5,48 @@
RSpec.describe BulkImports::FinishBatchedPipelineWorker, feature_category: :importers do RSpec.describe BulkImports::FinishBatchedPipelineWorker, feature_category: :importers do
let_it_be(:bulk_import) { create(:bulk_import) } let_it_be(:bulk_import) { create(:bulk_import) }
let_it_be(:config) { create(:bulk_import_configuration, bulk_import: bulk_import) } let_it_be(:config) { create(:bulk_import_configuration, bulk_import: bulk_import) }
let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import) } let_it_be(:project) { create(:project, :repository) }
let_it_be(:entity) do
create(
:bulk_import_entity,
:project_entity,
project: project,
bulk_import: bulk_import
)
end
let(:pipeline_class) do
Class.new do
def initialize(_); end
def on_finish; end
end
end
let(:status_event) { :finish } let(:pipeline_tracker) do
let(:pipeline_tracker) { create(:bulk_import_tracker, :started, :batched, entity: entity) } create(
:bulk_import_tracker,
:started,
:batched,
entity: entity,
pipeline_name: 'FakePipeline'
)
end
subject(:worker) { described_class.new } subject(:worker) { described_class.new }
describe '#perform' do describe '#perform' do
before do
stub_const('FakePipeline', pipeline_class)
allow_next_instance_of(BulkImports::Projects::Stage) do |instance|
allow(instance).to receive(:pipelines)
.and_return([{ stage: 0, pipeline: pipeline_class }])
end
end
context 'when import is in progress' do context 'when import is in progress' do
it 'marks the pipeline as finished' do it 'marks the tracker as finished' do
expect_next_instance_of(BulkImports::Logger) do |logger| expect_next_instance_of(BulkImports::Logger) do |logger|
expect(logger).to receive(:info).with( expect(logger).to receive(:info).with(
a_hash_including('message' => 'Tracker finished') a_hash_including('message' => 'Tracker finished')
...@@ -26,6 +58,14 @@ ...@@ -26,6 +58,14 @@
.from(false).to(true) .from(false).to(true)
end end
it "calls the pipeline's `#on_finish`" do
expect_next_instance_of(pipeline_class) do |pipeline|
expect(pipeline).to receive(:on_finish)
end
subject.perform(pipeline_tracker.id)
end
it 're-enqueues for any started batches' do it 're-enqueues for any started batches' do
create(:bulk_import_batch_tracker, :started, tracker: pipeline_tracker) create(:bulk_import_batch_tracker, :started, tracker: pipeline_tracker)
...@@ -65,29 +105,39 @@ ...@@ -65,29 +105,39 @@
expect(pipeline_tracker.batches.first.reload.failed?).to eq(true) expect(pipeline_tracker.batches.first.reload.failed?).to eq(true)
end end
end end
end
context 'when pipeline is not batched' do shared_examples 'does nothing' do
let(:pipeline_tracker) { create(:bulk_import_tracker, :started, entity: entity) } it "does not call the tracker's `#finish!`" do
expect_next_found_instance_of(BulkImports::Tracker) do |instance|
expect(instance).not_to receive(:finish!)
end
it 'returns' do subject.perform(pipeline_tracker.id)
expect_next_instance_of(BulkImports::Tracker) do |instance| end
expect(instance).not_to receive(:finish!)
end
subject.perform(pipeline_tracker.id) it "does not call the pipeline's `#on_finish`" do
end expect(pipeline_class).not_to receive(:new)
subject.perform(pipeline_tracker.id)
end end
end
context 'when pipeline is not started' do context 'when tracker is not batched' do
let(:status_event) { :start } let(:pipeline_tracker) { create(:bulk_import_tracker, :started, entity: entity, batched: false) }
it 'returns' do include_examples 'does nothing'
expect_next_instance_of(BulkImports::Tracker) do |instance| end
expect(instance).not_to receive(:finish!)
end
described_class.new.perform(pipeline_tracker.id) context 'when tracker is not started' do
end let(:pipeline_tracker) { create(:bulk_import_tracker, :batched, :finished, entity: entity) }
end
include_examples 'does nothing'
end
context 'when pipeline is enqueued' do
let(:pipeline_tracker) { create(:bulk_import_tracker, status: 3, entity: entity) }
include_examples 'does nothing'
end end
end end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Projects::ImportExport::AfterImportMergeRequestsWorker, feature_category: :importers do
let_it_be(:project) { create(:project) }
let_it_be(:merge_requests) { project.merge_requests }
let(:worker) { described_class.new }
describe '#perform' do
it 'sets the latest merge request diff ids' do
expect(project.class).to receive(:find_by_id).and_return(project)
expect(merge_requests).to receive(:set_latest_merge_request_diff_ids!)
worker.perform(project.id)
end
it_behaves_like 'an idempotent worker' do
let(:job_args) { [project.id] }
end
end
end
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册