From 682b6f200eb595383aca11fc917ca944ca2ba2fd Mon Sep 17 00:00:00 2001
From: Max Fan <mfan@gitlab.com>
Date: Thu, 5 Oct 2023 21:02:37 +0000
Subject: [PATCH] Adding cache strategy for Pipeline Runner and caching
 NdJsonPipeline

This is to prevent duplication of records if workers stop mid-process and retry
---
 .../rspec/missing_feature_category.yml        |   1 -
 .../bulk_import_idempotent_workers.yml        |   8 ++
 .../groups/pipelines/epics_pipeline_spec.rb   |   2 +-
 .../common/pipelines/badges_pipeline.rb       |   9 ++
 lib/bulk_imports/ndjson_pipeline.rb           |   9 ++
 lib/bulk_imports/pipeline/extracted_data.rb   |   6 +-
 lib/bulk_imports/pipeline/runner.rb           |  20 +++-
 .../common/pipelines/badges_pipeline_spec.rb  |  18 +++
 spec/lib/bulk_imports/ndjson_pipeline_spec.rb |  30 ++++-
 .../pipeline/extracted_data_spec.rb           |  14 +++
 spec/lib/bulk_imports/pipeline/runner_spec.rb | 109 ++++++++++++++++++
 .../pipelines/ci_pipelines_pipeline_spec.rb   |   2 +-
 .../pipelines/issues_pipeline_spec.rb         |   2 +-
 .../pipelines/merge_requests_pipeline_spec.rb |   2 +-
 .../pipeline_schedules_pipeline_spec.rb       |   2 +-
 .../pipelines/references_pipeline_spec.rb     |   5 +-
 .../pipelines/releases_pipeline_spec.rb       |   2 +-
 .../pipelines/snippets_pipeline_spec.rb       |   2 +-
 18 files changed, 223 insertions(+), 20 deletions(-)
 create mode 100644 config/feature_flags/development/bulk_import_idempotent_workers.yml

diff --git a/.rubocop_todo/rspec/missing_feature_category.yml b/.rubocop_todo/rspec/missing_feature_category.yml
index 39d1c8eb3930..91fdd34f18bf 100644
--- a/.rubocop_todo/rspec/missing_feature_category.yml
+++ b/.rubocop_todo/rspec/missing_feature_category.yml
@@ -2761,7 +2761,6 @@ RSpec/MissingFeatureCategory:
     - 'spec/lib/bulk_imports/projects/pipelines/external_pull_requests_pipeline_spec.rb'
     - 'spec/lib/bulk_imports/projects/pipelines/issues_pipeline_spec.rb'
     - 'spec/lib/bulk_imports/projects/pipelines/merge_requests_pipeline_spec.rb'
-    - 'spec/lib/bulk_imports/projects/pipelines/pipeline_schedules_pipeline_spec.rb'
     - 'spec/lib/bulk_imports/projects/pipelines/project_attributes_pipeline_spec.rb'
     - 'spec/lib/bulk_imports/projects/pipelines/project_feature_pipeline_spec.rb'
     - 'spec/lib/bulk_imports/projects/pipelines/protected_branches_pipeline_spec.rb'
diff --git a/config/feature_flags/development/bulk_import_idempotent_workers.yml b/config/feature_flags/development/bulk_import_idempotent_workers.yml
new file mode 100644
index 000000000000..83d5b7f65c70
--- /dev/null
+++ b/config/feature_flags/development/bulk_import_idempotent_workers.yml
@@ -0,0 +1,8 @@
+---
+name: bulk_import_idempotent_workers
+introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/132702
+rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/426480
+milestone: '16.5'
+type: development
+group: group::import and integrate
+default_enabled: false
diff --git a/ee/spec/lib/bulk_imports/groups/pipelines/epics_pipeline_spec.rb b/ee/spec/lib/bulk_imports/groups/pipelines/epics_pipeline_spec.rb
index 78c456be796b..c37ad97e06ac 100644
--- a/ee/spec/lib/bulk_imports/groups/pipelines/epics_pipeline_spec.rb
+++ b/ee/spec/lib/bulk_imports/groups/pipelines/epics_pipeline_spec.rb
@@ -31,7 +31,7 @@
 
   subject { described_class.new(context) }
 
-  describe '#run' do
+  describe '#run', :clean_gitlab_redis_cache do
     before do
       allow(Dir).to receive(:mktmpdir).and_return(tmpdir)
       allow_next_instance_of(BulkImports::FileDownloadService) do |service|
diff --git a/lib/bulk_imports/common/pipelines/badges_pipeline.rb b/lib/bulk_imports/common/pipelines/badges_pipeline.rb
index 33a24e61a3ff..f7dfb72124d2 100644
--- a/lib/bulk_imports/common/pipelines/badges_pipeline.rb
+++ b/lib/bulk_imports/common/pipelines/badges_pipeline.rb
@@ -33,6 +33,15 @@ def load(context, data)
           end
         end
 
+        def already_processed?(data, _)
+          values = Gitlab::Cache::Import::Caching.values_from_set(cache_key)
+          values.include?(OpenSSL::Digest::SHA256.hexdigest(data.to_s))
+        end
+
+        def save_processed_entry(data, _)
+          Gitlab::Cache::Import::Caching.set_add(cache_key, OpenSSL::Digest::SHA256.hexdigest(data.to_s))
+        end
+
         private
 
         def group_badge?(data)
diff --git a/lib/bulk_imports/ndjson_pipeline.rb b/lib/bulk_imports/ndjson_pipeline.rb
index 3c392910c1f5..ad990c8be070 100644
--- a/lib/bulk_imports/ndjson_pipeline.rb
+++ b/lib/bulk_imports/ndjson_pipeline.rb
@@ -128,6 +128,15 @@ def relation_definition
         import_export_config.top_relation_tree(relation)
       end
 
+      def already_processed?(_, index)
+        last_index = Gitlab::Cache::Import::Caching.read(cache_key)
+        last_index && last_index.to_i >= index
+      end
+
+      def save_processed_entry(_, index)
+        Gitlab::Cache::Import::Caching.write(cache_key, index)
+      end
+
       def capture_invalid_subrelations(invalid_subrelations)
         invalid_subrelations.each do |record|
           BulkImports::Failure.create(
diff --git a/lib/bulk_imports/pipeline/extracted_data.rb b/lib/bulk_imports/pipeline/extracted_data.rb
index 0b36c0682981..e4640db08735 100644
--- a/lib/bulk_imports/pipeline/extracted_data.rb
+++ b/lib/bulk_imports/pipeline/extracted_data.rb
@@ -5,6 +5,8 @@ module Pipeline
     class ExtractedData
       attr_reader :data
 
+      delegate :each, :each_with_index, to: :data
+
       def initialize(data: nil, page_info: {})
         @data = data.is_a?(Enumerator) ? data : Array.wrap(data)
         @page_info = page_info
@@ -20,10 +22,6 @@ def has_next_page?
       def next_page
         @page_info&.dig('next_page')
       end
-
-      def each(&block)
-        data.each(&block)
-      end
     end
   end
 end
diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb
index 1e2d91520473..e6005478bd90 100644
--- a/lib/bulk_imports/pipeline/runner.rb
+++ b/lib/bulk_imports/pipeline/runner.rb
@@ -15,7 +15,10 @@ def run
         extracted_data = extracted_data_from
 
         if extracted_data
-          extracted_data.each do |entry|
+          extracted_data.each_with_index do |entry, index|
+            raw_entry = entry.dup
+            next if Feature.enabled?(:bulk_import_idempotent_workers) && already_processed?(raw_entry, index)
+
             transformers.each do |transformer|
               entry = run_pipeline_step(:transformer, transformer.class.name) do
                 transformer.transform(context, entry)
@@ -25,6 +28,8 @@ def run
             run_pipeline_step(:loader, loader.class.name) do
               loader.load(context, entry)
             end
+
+            save_processed_entry(raw_entry, index) if Feature.enabled?(:bulk_import_idempotent_workers)
           end
 
           tracker.update!(
@@ -73,6 +78,19 @@ def extracted_data_from
         end
       end
 
+      def cache_key
+        batch_number = context.extra[:batch_number] || 0
+
+        "#{self.class.name.underscore}/#{tracker.bulk_import_entity_id}/#{batch_number}"
+      end
+
+      # Overridden by child pipelines with different caching strategies
+      def already_processed?(*)
+        false
+      end
+
+      def save_processed_entry(*); end
+
       def after_run(extracted_data)
         run if extracted_data.has_next_page?
       end
diff --git a/spec/lib/bulk_imports/common/pipelines/badges_pipeline_spec.rb b/spec/lib/bulk_imports/common/pipelines/badges_pipeline_spec.rb
index 6c5465c8a667..a18d26bedf3d 100644
--- a/spec/lib/bulk_imports/common/pipelines/badges_pipeline_spec.rb
+++ b/spec/lib/bulk_imports/common/pipelines/badges_pipeline_spec.rb
@@ -33,6 +33,24 @@
       expect(badge.image_url).to eq(badge_data['image_url'])
     end
 
+    it 'skips already imported records' do
+      expect { pipeline.run }.to change(Badge, :count).by(2)
+
+      expect { pipeline.run }.to not_change(Badge, :count)
+    end
+
+    context 'with FF bulk_import_idempotent_workers disabled' do
+      before do
+        stub_feature_flags(bulk_import_idempotent_workers: false)
+      end
+
+      it 'creates duplicated badges' do
+        expect { pipeline.run }.to change(Badge, :count).by(2)
+
+        expect { pipeline.run }.to change(Badge, :count)
+      end
+    end
+
     context 'when project entity' do
       let(:first_page) { extracted_data(has_next_page: true) }
       let(:last_page) { extracted_data(name: 'badge2', kind: 'project') }
diff --git a/spec/lib/bulk_imports/ndjson_pipeline_spec.rb b/spec/lib/bulk_imports/ndjson_pipeline_spec.rb
index 29f42ab33661..5611879868d9 100644
--- a/spec/lib/bulk_imports/ndjson_pipeline_spec.rb
+++ b/spec/lib/bulk_imports/ndjson_pipeline_spec.rb
@@ -6,6 +6,8 @@
   let_it_be(:group) { create(:group) }
   let_it_be(:project) { create(:project) }
   let_it_be(:user) { create(:user) }
+  let(:tracker) { instance_double(BulkImports::Tracker, bulk_import_entity_id: 1) }
+  let(:context) { instance_double(BulkImports::Pipeline::Context, tracker: tracker, extra: { batch_number: 1 }) }
 
   let(:klass) do
     Class.new do
@@ -13,11 +15,12 @@
 
       relation_name 'test'
 
-      attr_reader :portable, :current_user
+      attr_reader :portable, :current_user, :context
 
-      def initialize(portable, user)
+      def initialize(portable, user, context)
         @portable = portable
         @current_user = user
+        @context = context
       end
     end
   end
@@ -26,12 +29,29 @@ def initialize(portable, user)
     stub_const('NdjsonPipelineClass', klass)
   end
 
-  subject { NdjsonPipelineClass.new(group, user) }
+  subject { NdjsonPipelineClass.new(group, user, context) }
 
   it 'marks pipeline as ndjson' do
     expect(NdjsonPipelineClass.file_extraction_pipeline?).to eq(true)
   end
 
+  describe 'caching' do
+    it 'saves completed entry in cache' do
+      subject.save_processed_entry("entry", 10)
+
+      expected_cache_key = "ndjson_pipeline_class/1/1"
+      expect(Gitlab::Cache::Import::Caching.read(expected_cache_key)).to eq("10")
+    end
+
+    it 'identifies completed entries' do
+      subject.save_processed_entry("entry", 10)
+
+      expect(subject.already_processed?("entry", 11)).to be_falsy
+      expect(subject.already_processed?("entry", 10)).to be_truthy
+      expect(subject.already_processed?("entry", 9)).to be_truthy
+    end
+  end
+
   describe '#deep_transform_relation!' do
     it 'transforms relation hash' do
       transformed = subject.deep_transform_relation!({}, 'test', {}) do |key, hash|
@@ -238,7 +258,7 @@ def initialize(portable, user)
     end
 
     context 'when portable is project' do
-      subject { NdjsonPipelineClass.new(project, user) }
+      subject { NdjsonPipelineClass.new(project, user, context) }
 
       it 'returns group relation name override' do
         expect(subject.relation_key_override('labels')).to eq('project_labels')
@@ -254,7 +274,7 @@ def initialize(portable, user)
     end
 
     context 'when portable is project' do
-      subject { NdjsonPipelineClass.new(project, user) }
+      subject { NdjsonPipelineClass.new(project, user, context) }
 
       it 'returns project relation factory' do
         expect(subject.relation_factory).to eq(Gitlab::ImportExport::Project::RelationFactory)
diff --git a/spec/lib/bulk_imports/pipeline/extracted_data_spec.rb b/spec/lib/bulk_imports/pipeline/extracted_data_spec.rb
index 045908de5c45..108ce05cb66d 100644
--- a/spec/lib/bulk_imports/pipeline/extracted_data_spec.rb
+++ b/spec/lib/bulk_imports/pipeline/extracted_data_spec.rb
@@ -50,4 +50,18 @@
       end
     end
   end
+
+  describe '#each_with_index' do
+    context 'when block is present' do
+      it 'yields each data item with index' do
+        expect { |b| subject.each_with_index(&b) }.to yield_control
+      end
+    end
+
+    context 'when block is not present' do
+      it 'returns enumerator' do
+        expect(subject.each_with_index).to be_instance_of(Enumerator)
+      end
+    end
+  end
 end
diff --git a/spec/lib/bulk_imports/pipeline/runner_spec.rb b/spec/lib/bulk_imports/pipeline/runner_spec.rb
index 2f54ab111c86..f1742f104079 100644
--- a/spec/lib/bulk_imports/pipeline/runner_spec.rb
+++ b/spec/lib/bulk_imports/pipeline/runner_spec.rb
@@ -277,6 +277,115 @@ def load(context, data); end
 
         it_behaves_like 'failed pipeline', 'StandardError', 'Error!'
       end
+
+      it 'saves entry in cache for de-duplication' do
+        expect_next_instance_of(BulkImports::Extractor) do |extractor|
+          expect(extractor)
+            .to receive(:extract)
+            .with(context)
+            .and_return(extracted_data)
+        end
+
+        expect_next_instance_of(BulkImports::Transformer) do |transformer|
+          expect(transformer)
+            .to receive(:transform)
+            .with(context, extracted_data.data.first)
+            .and_return(extracted_data.data.first)
+        end
+
+        expect_next_instance_of(BulkImports::MyPipeline) do |klass|
+          expect(klass).to receive(:save_processed_entry).with(extracted_data.data.first, anything)
+        end
+
+        subject.run
+      end
+
+      context 'with FF bulk_import_idempotent_workers disabled' do
+        before do
+          stub_feature_flags(bulk_import_idempotent_workers: false)
+        end
+
+        it 'does not touch the cache' do
+          expect_next_instance_of(BulkImports::Extractor) do |extractor|
+            expect(extractor)
+              .to receive(:extract)
+              .with(context)
+              .and_return(extracted_data)
+          end
+
+          expect_next_instance_of(BulkImports::Transformer) do |transformer|
+            expect(transformer)
+              .to receive(:transform)
+              .with(context, extracted_data.data.first)
+              .and_return(extracted_data.data.first)
+          end
+
+          expect_next_instance_of(BulkImports::MyPipeline) do |klass|
+            expect(klass).not_to receive(:save_processed_entry)
+          end
+
+          subject.run
+        end
+      end
+    end
+
+    context 'when the entry is already processed' do
+      before do
+        allow_next_instance_of(BulkImports::MyPipeline) do |klass|
+          allow(klass).to receive(:already_processed?).and_return true
+        end
+      end
+
+      it 'runs pipeline extractor, but not transformer or loader' do
+        expect_next_instance_of(BulkImports::Extractor) do |extractor|
+          expect(extractor)
+            .to receive(:extract)
+            .with(context)
+            .and_return(extracted_data)
+        end
+
+        allow_next_instance_of(BulkImports::Transformer) do |transformer|
+          expect(transformer)
+            .not_to receive(:transform)
+        end
+
+        allow_next_instance_of(BulkImports::Loader) do |loader|
+          expect(loader)
+            .not_to receive(:load)
+        end
+
+        subject.run
+      end
+
+      context 'with FF bulk_import_idempotent_workers disabled' do
+        before do
+          stub_feature_flags(bulk_import_idempotent_workers: false)
+        end
+
+        it 'calls extractor, transformer, and loader' do
+          expect_next_instance_of(BulkImports::Extractor) do |extractor|
+            expect(extractor)
+              .to receive(:extract)
+              .with(context)
+              .and_return(extracted_data)
+          end
+
+          expect_next_instance_of(BulkImports::Transformer) do |transformer|
+            expect(transformer)
+              .to receive(:transform)
+              .with(context, extracted_data.data.first)
+              .and_return(extracted_data.data.first)
+          end
+
+          expect_next_instance_of(BulkImports::Loader) do |loader|
+            expect(loader)
+              .to receive(:load)
+              .with(context, extracted_data.data.first)
+          end
+
+          subject.run
+        end
+      end
     end
 
     context 'when entity is marked as failed' do
diff --git a/spec/lib/bulk_imports/projects/pipelines/ci_pipelines_pipeline_spec.rb b/spec/lib/bulk_imports/projects/pipelines/ci_pipelines_pipeline_spec.rb
index 63e7cdf2e5af..0d32af27d4f7 100644
--- a/spec/lib/bulk_imports/projects/pipelines/ci_pipelines_pipeline_spec.rb
+++ b/spec/lib/bulk_imports/projects/pipelines/ci_pipelines_pipeline_spec.rb
@@ -43,7 +43,7 @@
 
   subject(:pipeline) { described_class.new(context) }
 
-  describe '#run' do
+  describe '#run', :clean_gitlab_redis_cache do
     before do
       group.add_owner(user)
 
diff --git a/spec/lib/bulk_imports/projects/pipelines/issues_pipeline_spec.rb b/spec/lib/bulk_imports/projects/pipelines/issues_pipeline_spec.rb
index fd13c10d61e2..625078b1b2aa 100644
--- a/spec/lib/bulk_imports/projects/pipelines/issues_pipeline_spec.rb
+++ b/spec/lib/bulk_imports/projects/pipelines/issues_pipeline_spec.rb
@@ -36,7 +36,7 @@
 
   subject(:pipeline) { described_class.new(context) }
 
-  describe '#run' do
+  describe '#run', :clean_gitlab_redis_cache do
     before do
       group.add_owner(user)
       issue_with_index = [issue, 0]
diff --git a/spec/lib/bulk_imports/projects/pipelines/merge_requests_pipeline_spec.rb b/spec/lib/bulk_imports/projects/pipelines/merge_requests_pipeline_spec.rb
index 5b85b3eee791..3fb7e28036ee 100644
--- a/spec/lib/bulk_imports/projects/pipelines/merge_requests_pipeline_spec.rb
+++ b/spec/lib/bulk_imports/projects/pipelines/merge_requests_pipeline_spec.rb
@@ -83,7 +83,7 @@
 
   subject(:pipeline) { described_class.new(context) }
 
-  describe '#run' do
+  describe '#run', :clean_gitlab_redis_cache do
     before do
       group.add_owner(user)
       group.add_maintainer(another_user)
diff --git a/spec/lib/bulk_imports/projects/pipelines/pipeline_schedules_pipeline_spec.rb b/spec/lib/bulk_imports/projects/pipelines/pipeline_schedules_pipeline_spec.rb
index 0bfd9410808d..6ba555aa328d 100644
--- a/spec/lib/bulk_imports/projects/pipelines/pipeline_schedules_pipeline_spec.rb
+++ b/spec/lib/bulk_imports/projects/pipelines/pipeline_schedules_pipeline_spec.rb
@@ -2,7 +2,7 @@
 
 require 'spec_helper'
 
-RSpec.describe BulkImports::Projects::Pipelines::PipelineSchedulesPipeline do
+RSpec.describe BulkImports::Projects::Pipelines::PipelineSchedulesPipeline, :clean_gitlab_redis_cache, feature_category: :importers do
   let_it_be(:user) { create(:user) }
   let_it_be(:group) { create(:group) }
   let_it_be(:project) { create(:project, group: group) }
diff --git a/spec/lib/bulk_imports/projects/pipelines/references_pipeline_spec.rb b/spec/lib/bulk_imports/projects/pipelines/references_pipeline_spec.rb
index af8bce47c3dc..e2b99fe4db4a 100644
--- a/spec/lib/bulk_imports/projects/pipelines/references_pipeline_spec.rb
+++ b/spec/lib/bulk_imports/projects/pipelines/references_pipeline_spec.rb
@@ -134,7 +134,7 @@ def create_username_project_data
     end
   end
 
-  describe '#transform' do
+  describe '#transform', :clean_gitlab_redis_cache do
     it 'updates matching urls and usernames with new ones' do
       transformed_mr = subject.transform(context, mr)
       transformed_note = subject.transform(context, mr_note)
@@ -154,7 +154,8 @@ def create_username_project_data
       expect(transformed_system_note.note).not_to include("@old_username")
       expect(transformed_username_system_note.note).not_to include("@source_username")
 
-      expect(transformed_issue.description).to eq('http://localhost:80/namespace1/project-1/-/issues/1')
+      expect(transformed_issue.description)
+        .to eq("http://localhost:80/#{transformed_issue.namespace.full_path}/-/issues/1")
       expect(transformed_mr.description).to eq("#{expected_url} @destination_username? @alice-gdk, @bob-gdk!")
       expect(transformed_note.note).to eq("#{expected_url} @same_username")
       expect(transformed_issue_note.note).to include("@newer_username, not_a@username, and @new_username.")
diff --git a/spec/lib/bulk_imports/projects/pipelines/releases_pipeline_spec.rb b/spec/lib/bulk_imports/projects/pipelines/releases_pipeline_spec.rb
index 339ca727b57c..9e0b5af6bfea 100644
--- a/spec/lib/bulk_imports/projects/pipelines/releases_pipeline_spec.rb
+++ b/spec/lib/bulk_imports/projects/pipelines/releases_pipeline_spec.rb
@@ -38,7 +38,7 @@
 
   subject(:pipeline) { described_class.new(context) }
 
-  describe '#run' do
+  describe '#run', :clean_gitlab_redis_cache do
     before do
       group.add_owner(user)
       with_index = [release, 0]
diff --git a/spec/lib/bulk_imports/projects/pipelines/snippets_pipeline_spec.rb b/spec/lib/bulk_imports/projects/pipelines/snippets_pipeline_spec.rb
index 41b3ea378040..1e3cfe20bf55 100644
--- a/spec/lib/bulk_imports/projects/pipelines/snippets_pipeline_spec.rb
+++ b/spec/lib/bulk_imports/projects/pipelines/snippets_pipeline_spec.rb
@@ -40,7 +40,7 @@
 
   subject(:pipeline) { described_class.new(context) }
 
-  describe '#run' do
+  describe '#run', :clean_gitlab_redis_cache do
     before do
       group.add_owner(user)
       snippet_with_index = [exported_snippet.dup, 0]
-- 
GitLab