From 7b5c8b0d5066e539a1a09cc21ac473a0c8be5cc3 Mon Sep 17 00:00:00 2001
From: Carla Drago <cdrago@gitlab.com>
Date: Fri, 10 Nov 2023 18:09:10 +0000
Subject: [PATCH] Ensure after_run ops execute once only

This change ensures any operations designed to execute
after a bulk_import pipeline has run will only be executed
once, whether the pipeline is batched or not batched. It
corrects the error where after_run operations were run
after each batch completion, rather than after all batches
were completed. The method/pipeline step has been named
'on_finish' for clarity.

Changelog: fixed
---
 app/workers/all_queues.yml                    |  9 ++
 .../finish_batched_pipeline_worker.rb         |  2 +
 .../after_import_merge_requests_worker.rb     | 21 +++++
 config/sidekiq_queues.yml                     |  2 +
 lib/bulk_imports/pipeline/runner.rb           | 10 ++
 .../pipelines/merge_requests_pipeline.rb      |  4 +-
 .../projects/pipelines/releases_pipeline.rb   |  4 +-
 spec/lib/bulk_imports/pipeline/runner_spec.rb | 32 +++++++
 .../pipelines/merge_requests_pipeline_spec.rb | 11 ++-
 .../pipelines/releases_pipeline_spec.rb       |  2 +-
 .../finish_batched_pipeline_worker_spec.rb    | 92 ++++++++++++++-----
 ...after_import_merge_requests_worker_spec.rb | 23 +++++
 12 files changed, 182 insertions(+), 30 deletions(-)
 create mode 100644 app/workers/projects/import_export/after_import_merge_requests_worker.rb
 create mode 100644 spec/workers/projects/import_export/after_import_merge_requests_worker_spec.rb

diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml
index cd166750fdd38..0bb88efe1836b 100644
--- a/app/workers/all_queues.yml
+++ b/app/workers/all_queues.yml
@@ -3567,6 +3567,15 @@
   :weight: 1
   :idempotent: false
   :tags: []
+- :name: projects_import_export_after_import_merge_requests
+  :worker_name: Projects::ImportExport::AfterImportMergeRequestsWorker
+  :feature_category: :importers
+  :has_external_dependencies: false
+  :urgency: :low
+  :resource_boundary: :unknown
+  :weight: 1
+  :idempotent: true
+  :tags: []
 - :name: projects_import_export_create_relation_exports
   :worker_name: Projects::ImportExport::CreateRelationExportsWorker
   :feature_category: :importers
diff --git a/app/workers/bulk_imports/finish_batched_pipeline_worker.rb b/app/workers/bulk_imports/finish_batched_pipeline_worker.rb
index 973889e1411cf..40d26e14dc1f6 100644
--- a/app/workers/bulk_imports/finish_batched_pipeline_worker.rb
+++ b/app/workers/bulk_imports/finish_batched_pipeline_worker.rb
@@ -16,6 +16,7 @@ class FinishBatchedPipelineWorker
 
     def perform(pipeline_tracker_id)
       @tracker = Tracker.find(pipeline_tracker_id)
+      @context = ::BulkImports::Pipeline::Context.new(tracker)
 
       return unless tracker.batched?
       return unless tracker.started?
@@ -26,6 +27,7 @@ def perform(pipeline_tracker_id)
         tracker.batches.map(&:fail_op!)
         tracker.fail_op!
       else
+        tracker.pipeline_class.new(@context).on_finish
         logger.info(log_attributes(message: 'Tracker finished'))
         tracker.finish!
       end
diff --git a/app/workers/projects/import_export/after_import_merge_requests_worker.rb b/app/workers/projects/import_export/after_import_merge_requests_worker.rb
new file mode 100644
index 0000000000000..b40e0ca5f096a
--- /dev/null
+++ b/app/workers/projects/import_export/after_import_merge_requests_worker.rb
@@ -0,0 +1,21 @@
+# frozen_string_literal: true
+
+module Projects
+  module ImportExport
+    class AfterImportMergeRequestsWorker
+      include ApplicationWorker
+
+      idempotent!
+      data_consistency :delayed
+      urgency :low
+      feature_category :importers
+
+      def perform(project_id)
+        project = Project.find_by_id(project_id)
+        return unless project
+
+        project.merge_requests.set_latest_merge_request_diff_ids!
+      end
+    end
+  end
+end
diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml
index f8396cb6d0f15..210a246978ae1 100644
--- a/config/sidekiq_queues.yml
+++ b/config/sidekiq_queues.yml
@@ -563,6 +563,8 @@
   - 1
 - - projects_git_garbage_collect
   - 1
+- - projects_import_export_after_import_merge_requests
+  - 1
 - - projects_import_export_create_relation_exports
   - 1
 - - projects_import_export_parallel_project_export
diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb
index 6037104ef033a..e2a14c35e791e 100644
--- a/lib/bulk_imports/pipeline/runner.rb
+++ b/lib/bulk_imports/pipeline/runner.rb
@@ -40,6 +40,14 @@ def run
           run_pipeline_step(:after_run) do
             after_run(extracted_data)
           end
+
+          # For batches, `#on_finish` is called once within `FinishBatchedPipelineWorker`
+          # after all batches have completed.
+          unless tracker.batched?
+            run_pipeline_step(:on_finish) do
+              on_finish
+            end
+          end
         end
 
         info(message: 'Pipeline finished')
@@ -47,6 +55,8 @@ def run
         skip!('Skipping pipeline due to failed entity')
       end
 
+      def on_finish; end
+
       private # rubocop:disable Lint/UselessAccessModifier
 
       def run_pipeline_step(step, class_name = nil, entry = nil)
diff --git a/lib/bulk_imports/projects/pipelines/merge_requests_pipeline.rb b/lib/bulk_imports/projects/pipelines/merge_requests_pipeline.rb
index 264bda6e654cf..fe5c61e81a308 100644
--- a/lib/bulk_imports/projects/pipelines/merge_requests_pipeline.rb
+++ b/lib/bulk_imports/projects/pipelines/merge_requests_pipeline.rb
@@ -10,8 +10,8 @@ class MergeRequestsPipeline
 
         extractor ::BulkImports::Common::Extractors::NdjsonExtractor, relation: relation
 
-        def after_run(_)
-          context.portable.merge_requests.set_latest_merge_request_diff_ids!
+        def on_finish
+          ::Projects::ImportExport::AfterImportMergeRequestsWorker.perform_async(context.portable.id)
         end
       end
     end
diff --git a/lib/bulk_imports/projects/pipelines/releases_pipeline.rb b/lib/bulk_imports/projects/pipelines/releases_pipeline.rb
index c77e53b9aec98..433419f4c5cdd 100644
--- a/lib/bulk_imports/projects/pipelines/releases_pipeline.rb
+++ b/lib/bulk_imports/projects/pipelines/releases_pipeline.rb
@@ -10,9 +10,7 @@ class ReleasesPipeline
 
         extractor ::BulkImports::Common::Extractors::NdjsonExtractor, relation: relation
 
-        def after_run(_context)
-          super
-
+        def on_finish
           portable.releases.find_each do |release|
             create_release_evidence(release)
           end
diff --git a/spec/lib/bulk_imports/pipeline/runner_spec.rb b/spec/lib/bulk_imports/pipeline/runner_spec.rb
index 2f7bdf88a60fb..4540408990c55 100644
--- a/spec/lib/bulk_imports/pipeline/runner_spec.rb
+++ b/spec/lib/bulk_imports/pipeline/runner_spec.rb
@@ -194,6 +194,8 @@ def load(context, data); end
             .with(context, extracted_data.data.first)
         end
 
+        expect(subject).to receive(:on_finish)
+
         expect_next_instance_of(Gitlab::Import::Logger) do |logger|
           expect(logger).to receive(:info)
             .with(
@@ -230,6 +232,14 @@ def load(context, data); end
                 step_class: 'BulkImports::Loader'
               )
             )
+          expect(logger).to receive(:info)
+            .with(
+              log_params(
+                context,
+                pipeline_class: 'BulkImports::MyPipeline',
+                pipeline_step: :on_finish
+              )
+            )
           expect(logger).to receive(:info)
             .with(
               log_params(
@@ -251,6 +261,28 @@ def load(context, data); end
         subject.run
       end
 
+      context 'when the pipeline is batched' do
+        let(:tracker) { create(:bulk_import_tracker, :batched, entity: entity) }
+
+        before do
+          allow_next_instance_of(BulkImports::Extractor) do |extractor|
+            allow(extractor).to receive(:extract).and_return(extracted_data)
+          end
+        end
+
+        it 'calls after_run' do
+          expect(subject).to receive(:after_run)
+
+          subject.run
+        end
+
+        it 'does not call on_finish' do
+          expect(subject).not_to receive(:on_finish)
+
+          subject.run
+        end
+      end
+
       context 'when extracted data has multiple pages' do
         it 'updates tracker information and runs pipeline again' do
           first_page = extracted_data(has_next_page: true)
diff --git a/spec/lib/bulk_imports/projects/pipelines/merge_requests_pipeline_spec.rb b/spec/lib/bulk_imports/projects/pipelines/merge_requests_pipeline_spec.rb
index 3fb7e28036ee9..b9e424f4a7dcc 100644
--- a/spec/lib/bulk_imports/projects/pipelines/merge_requests_pipeline_spec.rb
+++ b/spec/lib/bulk_imports/projects/pipelines/merge_requests_pipeline_spec.rb
@@ -2,7 +2,7 @@
 
 require 'spec_helper'
 
-RSpec.describe BulkImports::Projects::Pipelines::MergeRequestsPipeline do
+RSpec.describe BulkImports::Projects::Pipelines::MergeRequestsPipeline, feature_category: :importers do
   let_it_be(:user) { create(:user) }
   let_it_be(:another_user) { create(:user) }
   let_it_be(:group) { create(:group) }
@@ -43,6 +43,7 @@
         'base_commit_sha' => 'ae73cb07c9eeaf35924a10f713b364d32b2dd34f',
         'head_commit_sha' => 'a97f74ddaa848b707bea65441c903ae4bf5d844d',
         'start_commit_sha' => '9eea46b5c72ead701c22f516474b95049c9d9462',
+        'diff_type' => 1,
         'merge_request_diff_commits' => [
           {
             'sha' => 'COMMIT1',
@@ -99,6 +100,8 @@
       allow(project.repository).to receive(:branch_exists?).and_return(false)
       allow(project.repository).to receive(:create_branch)
 
+      allow(::Projects::ImportExport::AfterImportMergeRequestsWorker).to receive(:perform_async)
+
       pipeline.run
     end
 
@@ -244,8 +247,10 @@
         expect(imported_mr.merge_request_diff).to be_present
       end
 
-      it 'has the correct data for merge request latest_merge_request_diff' do
-        expect(imported_mr.latest_merge_request_diff_id).to eq(imported_mr.merge_request_diffs.maximum(:id))
+      it 'enqueues AfterImportMergeRequestsWorker worker' do
+        expect(::Projects::ImportExport::AfterImportMergeRequestsWorker)
+          .to have_received(:perform_async)
+          .with(project.id)
       end
 
       it 'imports diff files' do
diff --git a/spec/lib/bulk_imports/projects/pipelines/releases_pipeline_spec.rb b/spec/lib/bulk_imports/projects/pipelines/releases_pipeline_spec.rb
index 9e0b5af6bfea4..fa85e24189ce3 100644
--- a/spec/lib/bulk_imports/projects/pipelines/releases_pipeline_spec.rb
+++ b/spec/lib/bulk_imports/projects/pipelines/releases_pipeline_spec.rb
@@ -2,7 +2,7 @@
 
 require 'spec_helper'
 
-RSpec.describe BulkImports::Projects::Pipelines::ReleasesPipeline do
+RSpec.describe BulkImports::Projects::Pipelines::ReleasesPipeline, feature_category: :importers do
   let_it_be(:user) { create(:user) }
   let_it_be(:group) { create(:group) }
   let_it_be(:project) { create(:project, group: group) }
diff --git a/spec/workers/bulk_imports/finish_batched_pipeline_worker_spec.rb b/spec/workers/bulk_imports/finish_batched_pipeline_worker_spec.rb
index 610138c2723a3..59ae4205c0f30 100644
--- a/spec/workers/bulk_imports/finish_batched_pipeline_worker_spec.rb
+++ b/spec/workers/bulk_imports/finish_batched_pipeline_worker_spec.rb
@@ -5,16 +5,48 @@
 RSpec.describe BulkImports::FinishBatchedPipelineWorker, feature_category: :importers do
   let_it_be(:bulk_import) { create(:bulk_import) }
   let_it_be(:config) { create(:bulk_import_configuration, bulk_import: bulk_import) }
-  let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import) }
+  let_it_be(:project) { create(:project, :repository) }
+  let_it_be(:entity) do
+    create(
+      :bulk_import_entity,
+      :project_entity,
+      project: project,
+      bulk_import: bulk_import
+    )
+  end
+
+  let(:pipeline_class) do
+    Class.new do
+      def initialize(_); end
+
+      def on_finish; end
+    end
+  end
 
-  let(:status_event) { :finish }
-  let(:pipeline_tracker) { create(:bulk_import_tracker, :started, :batched, entity: entity) }
+  let(:pipeline_tracker) do
+    create(
+      :bulk_import_tracker,
+      :started,
+      :batched,
+      entity: entity,
+      pipeline_name: 'FakePipeline'
+    )
+  end
 
   subject(:worker) { described_class.new }
 
   describe '#perform' do
+    before do
+      stub_const('FakePipeline', pipeline_class)
+
+      allow_next_instance_of(BulkImports::Projects::Stage) do |instance|
+        allow(instance).to receive(:pipelines)
+          .and_return([{ stage: 0, pipeline: pipeline_class }])
+      end
+    end
+
     context 'when import is in progress' do
-      it 'marks the pipeline as finished' do
+      it 'marks the tracker as finished' do
         expect_next_instance_of(BulkImports::Logger) do |logger|
           expect(logger).to receive(:info).with(
             a_hash_including('message' => 'Tracker finished')
@@ -26,6 +58,14 @@
           .from(false).to(true)
       end
 
+      it "calls the pipeline's `#on_finish`" do
+        expect_next_instance_of(pipeline_class) do |pipeline|
+          expect(pipeline).to receive(:on_finish)
+        end
+
+        subject.perform(pipeline_tracker.id)
+      end
+
       it 're-enqueues for any started batches' do
         create(:bulk_import_batch_tracker, :started, tracker: pipeline_tracker)
 
@@ -65,29 +105,39 @@
         expect(pipeline_tracker.batches.first.reload.failed?).to eq(true)
       end
     end
+  end
 
-    context 'when pipeline is not batched' do
-      let(:pipeline_tracker) { create(:bulk_import_tracker, :started, entity: entity) }
+  shared_examples 'does nothing' do
+    it "does not call the tracker's `#finish!`" do
+      expect_next_found_instance_of(BulkImports::Tracker) do |instance|
+        expect(instance).not_to receive(:finish!)
+      end
 
-      it 'returns' do
-        expect_next_instance_of(BulkImports::Tracker) do |instance|
-          expect(instance).not_to receive(:finish!)
-        end
+      subject.perform(pipeline_tracker.id)
+    end
 
-        subject.perform(pipeline_tracker.id)
-      end
+    it "does not call the pipeline's `#on_finish`" do
+      expect(pipeline_class).not_to receive(:new)
+
+      subject.perform(pipeline_tracker.id)
     end
+  end
 
-    context 'when pipeline is not started' do
-      let(:status_event) { :start }
+  context 'when tracker is not batched' do
+    let(:pipeline_tracker) { create(:bulk_import_tracker, :started, entity: entity, batched: false) }
 
-      it 'returns' do
-        expect_next_instance_of(BulkImports::Tracker) do |instance|
-          expect(instance).not_to receive(:finish!)
-        end
+    include_examples 'does nothing'
+  end
 
-        described_class.new.perform(pipeline_tracker.id)
-      end
-    end
+  context 'when tracker is not started' do
+    let(:pipeline_tracker) { create(:bulk_import_tracker, :batched, :finished, entity: entity) }
+
+    include_examples 'does nothing'
+  end
+
+  context 'when pipeline is enqueued' do
+    let(:pipeline_tracker) { create(:bulk_import_tracker, status: 3, entity: entity) }
+
+    include_examples 'does nothing'
   end
 end
diff --git a/spec/workers/projects/import_export/after_import_merge_requests_worker_spec.rb b/spec/workers/projects/import_export/after_import_merge_requests_worker_spec.rb
new file mode 100644
index 0000000000000..42b67a0941a3e
--- /dev/null
+++ b/spec/workers/projects/import_export/after_import_merge_requests_worker_spec.rb
@@ -0,0 +1,23 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe Projects::ImportExport::AfterImportMergeRequestsWorker, feature_category: :importers do
+  let_it_be(:project) { create(:project) }
+  let_it_be(:merge_requests) { project.merge_requests }
+
+  let(:worker) { described_class.new }
+
+  describe '#perform' do
+    it 'sets the latest merge request diff ids' do
+      expect(project.class).to receive(:find_by_id).and_return(project)
+      expect(merge_requests).to receive(:set_latest_merge_request_diff_ids!)
+
+      worker.perform(project.id)
+    end
+
+    it_behaves_like 'an idempotent worker' do
+      let(:job_args) { [project.id] }
+    end
+  end
+end
-- 
GitLab