From c795e22a3b55ebb6105e4567776cf1338b42af7f Mon Sep 17 00:00:00 2001
From: Rodrigo Tomonari <rtomonari@gitlab.com>
Date: Tue, 10 Sep 2024 12:11:01 +0000
Subject: [PATCH] Retry source user creation

Although SourceUserMapper#find_or_create_source_user implements a lock
to prevent duplicated source users from being created, occasionally the
lock does not work and a duplicated error is raised.

This change updates SourceUserMapper to use the Retriable GEM for
retrying the creation of the source user. It also updates BulkImport
pipelines to re-enqueue if the number of retries is exhausted.
---
 app/models/import/source_user.rb              |  1 +
 .../bulk_imports/pipeline_batch_worker.rb     |  4 ++
 lib/bulk_imports/pipeline/runner.rb           |  7 ++--
 lib/bulk_imports/retry_pipeline_error.rb      |  2 +-
 lib/gitlab/import/source_user_mapper.rb       | 41 ++++++++++++++++---
 spec/lib/bulk_imports/pipeline/runner_spec.rb | 31 +++++++-------
 .../gitlab/import/source_user_mapper_spec.rb  | 31 ++++++++++++++
 ...user_member_attributes_transformer_spec.rb | 12 ++++++
 spec/models/import/source_user_spec.rb        |  7 ++++
 9 files changed, 112 insertions(+), 24 deletions(-)

diff --git a/app/models/import/source_user.rb b/app/models/import/source_user.rb
index 85ea02be1f87..52e7cd68955f 100644
--- a/app/models/import/source_user.rb
+++ b/app/models/import/source_user.rb
@@ -20,6 +20,7 @@ class SourceUser < ApplicationRecord
     belongs_to :namespace
 
     validates :namespace_id, :import_type, :source_hostname, :source_user_identifier, :status, presence: true
+    validates :source_user_identifier, uniqueness: { scope: [:namespace_id, :source_hostname, :import_type] }
     validates :placeholder_user_id, presence: true, unless: :completed?
     validates :reassign_to_user_id, presence: true, if: -> {
                                                           awaiting_approval? || reassignment_in_progress? || completed?
diff --git a/app/workers/bulk_imports/pipeline_batch_worker.rb b/app/workers/bulk_imports/pipeline_batch_worker.rb
index 0d8f5df9cca0..07487a0e83c9 100644
--- a/app/workers/bulk_imports/pipeline_batch_worker.rb
+++ b/app/workers/bulk_imports/pipeline_batch_worker.rb
@@ -109,6 +109,10 @@ def context
     def retry_batch(exception)
       batch.retry!
 
+      logger.error(log_attributes(
+        message: "Retrying pipeline", exception: { message: exception.message, class: exception.class.name }
+      ))
+
       re_enqueue(exception.retry_delay)
     end
 
diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb
index f4dd06648a05..8d937dcf4535 100644
--- a/lib/bulk_imports/pipeline/runner.rb
+++ b/lib/bulk_imports/pipeline/runner.rb
@@ -80,11 +80,12 @@ def run_pipeline_step(step, class_name = nil, entry = nil)
           importer: 'gitlab_migration'
         )
       rescue BulkImports::NetworkError => e
-        raise BulkImports::RetryPipelineError.new(e.message, e.retry_delay) if e.retriable?(context.tracker)
+        raise BulkImports::RetryPipelineError.new(e.message, e.retry_delay), cause: e if e.retriable?(context.tracker)
 
         log_and_fail(e, step, entry)
-      rescue Gitlab::Import::SourceUserMapper::FailedToObtainLockError => e
-        raise BulkImports::RetryPipelineError.new(e.message, nil)
+      rescue Gitlab::Import::SourceUserMapper::FailedToObtainLockError,
+        Gitlab::Import::SourceUserMapper::DuplicatedSourceUserError => e
+        raise BulkImports::RetryPipelineError.new(e.message), cause: e
       rescue BulkImports::RetryPipelineError
         raise
       rescue StandardError => e
diff --git a/lib/bulk_imports/retry_pipeline_error.rb b/lib/bulk_imports/retry_pipeline_error.rb
index a1b02addf45e..4e052ae1766e 100644
--- a/lib/bulk_imports/retry_pipeline_error.rb
+++ b/lib/bulk_imports/retry_pipeline_error.rb
@@ -4,7 +4,7 @@ module BulkImports
   class RetryPipelineError < Error
     attr_reader :retry_delay
 
-    def initialize(message, retry_delay)
+    def initialize(message, retry_delay = nil)
       super(message)
 
       @retry_delay = retry_delay
diff --git a/lib/gitlab/import/source_user_mapper.rb b/lib/gitlab/import/source_user_mapper.rb
index 70b7c6064c86..2424731da2e9 100644
--- a/lib/gitlab/import/source_user_mapper.rb
+++ b/lib/gitlab/import/source_user_mapper.rb
@@ -10,19 +10,37 @@ class SourceUserMapper
       LOCK_SLEEP = 0.3.seconds.freeze
       LOCK_RETRIES = 100
 
+      DuplicatedSourceUserError = Class.new(StandardError)
+
       def initialize(namespace:, import_type:, source_hostname:)
         @namespace = namespace.root_ancestor
         @import_type = import_type
         @source_hostname = source_hostname
       end
 
+      # Finds a source user by the provided `source_user_identifier`.
+      #
+      # This method first checks an in-memory LRU (Least Recently Used) cache,
+      # stored in `SafeRequestStore`, to avoid unnecessary database queries.
+      # If the source user is not present in the cache, it will query the database
+      # and store the result in the cache for future use.
+      #
+      # Since jobs may create source users concurrently, the ActiveRecord query
+      # cache is explicitly disabled when querying the database to ensure that
+      # we always get the latest data.
+      #
+      # @param [String] source_user_identifier The identifier for the source user to find.
+      # @return [Import::SourceUser, nil] The found source user object, or `nil` if no match is found.
+      #
       def find_source_user(source_user_identifier)
-        cache_from_request_store[source_user_identifier] ||= ::Import::SourceUser.find_source_user(
-          source_user_identifier: source_user_identifier,
-          namespace: namespace,
-          source_hostname: source_hostname,
-          import_type: import_type
-        )
+        cache_from_request_store[source_user_identifier] ||= ::Import::SourceUser.uncached do
+          ::Import::SourceUser.find_source_user(
+            source_user_identifier: source_user_identifier,
+            namespace: namespace,
+            source_hostname: source_hostname,
+            import_type: import_type
+          )
+        end
       end
 
       def find_or_create_source_user(source_name:, source_username:, source_user_identifier:)
@@ -75,6 +93,12 @@ def create_source_user_mapping(source_name, source_username, source_user_identif
           import_source_user.save!
           import_source_user
         end
+      rescue PG::UniqueViolation, ActiveRecord::RecordNotUnique => e
+        raise DuplicatedSourceUserError.new(e.message), cause: e
+      rescue ActiveRecord::RecordInvalid => e
+        raise DuplicatedSourceUserError.new(e.message), cause: e if user_has_duplicated_errors?(e.record)
+
+        raise
       end
 
       def create_placeholder_user(import_source_user)
@@ -94,6 +118,11 @@ def placeholder_user_limit_exceeded?
       def lock_key(source_user_identifier)
         "import:source_user_mapper:#{namespace.id}:#{import_type}:#{source_hostname}:#{source_user_identifier}"
       end
+
+      def user_has_duplicated_errors?(record)
+        attributes = %i[email username]
+        record.errors.filter { |error| error.type == :taken && attributes.include?(error.attribute) }.any?
+      end
     end
   end
 end
diff --git a/spec/lib/bulk_imports/pipeline/runner_spec.rb b/spec/lib/bulk_imports/pipeline/runner_spec.rb
index 14df1e380652..b61cbc568b2f 100644
--- a/spec/lib/bulk_imports/pipeline/runner_spec.rb
+++ b/spec/lib/bulk_imports/pipeline/runner_spec.rb
@@ -306,22 +306,25 @@ def load(context, data); end
         end
       end
 
-      context 'when the exception Gitlab::Import::SourceUserMapper::FailedToObtainLockError is raised' do
-        it 'raises the exception BulkImports::RetryPipelineError' do
-          allow_next_instance_of(BulkImports::Extractor) do |extractor|
-            allow(extractor)
-              .to receive(:extract)
-              .with(context)
-              .and_return(extracted_data)
-          end
+      [Gitlab::Import::SourceUserMapper::FailedToObtainLockError,
+        Gitlab::Import::SourceUserMapper::DuplicatedSourceUserError].each do |exception_class|
+        context "when #{exception_class} is raised" do
+          it 'raises the exception BulkImports::RetryPipelineError' do
+            allow_next_instance_of(BulkImports::Extractor) do |extractor|
+              allow(extractor)
+                .to receive(:extract)
+                .with(context)
+                .and_return(extracted_data)
+            end
 
-          allow_next_instance_of(BulkImports::Transformer) do |transformer|
-            allow(transformer)
-              .to receive(:transform)
-              .and_raise(Gitlab::Import::SourceUserMapper::FailedToObtainLockError)
-          end
+            allow_next_instance_of(BulkImports::Transformer) do |transformer|
+              allow(transformer)
+                .to receive(:transform)
+                .and_raise(exception_class)
+            end
 
-          expect { subject.run }.to raise_error(BulkImports::RetryPipelineError)
+            expect { subject.run }.to raise_error(BulkImports::RetryPipelineError)
+          end
         end
       end
 
diff --git a/spec/lib/gitlab/import/source_user_mapper_spec.rb b/spec/lib/gitlab/import/source_user_mapper_spec.rb
index b61bf5ec9fdc..b5cf77d15e74 100644
--- a/spec/lib/gitlab/import/source_user_mapper_spec.rb
+++ b/spec/lib/gitlab/import/source_user_mapper_spec.rb
@@ -148,6 +148,37 @@
         expect(new_import_source_user.placeholder_user).to eq(import_user)
       end
     end
+
+    context 'when ActiveRecord::RecordNotUnique exception is raised during the source user creation' do
+      before do
+        allow_next_instance_of(::Import::SourceUser) do |source_user|
+          allow(source_user).to receive(:save!).and_raise(ActiveRecord::RecordNotUnique)
+        end
+      end
+
+      it 'raises DuplicatedSourceUserError' do
+        expect { find_or_create_source_user }.to raise_error(described_class::DuplicatedSourceUserError)
+      end
+    end
+
+    context 'when ActiveRecord::RecordInvalid exception because the placeholder user email or username is taken' do
+      it 'rescue the exception and raises DuplicatedSourceUserError' do
+        create(:user, email: 'user@example.com')
+        user = build(:user, email: 'user@example.com').tap(&:valid?)
+        allow(User).to receive(:new).and_return(user)
+
+        expect { find_or_create_source_user }.to raise_error(described_class::DuplicatedSourceUserError)
+      end
+    end
+
+    context 'when ActiveRecord::RecordInvalid exception raises for another reason' do
+      it 'bubbles up the ActiveRecord::RecordInvalid exception' do
+        user = build(:user, email: nil)
+        allow(User).to receive(:new).and_return(user)
+
+        expect { find_or_create_source_user }.to raise_error(ActiveRecord::RecordInvalid)
+      end
+    end
   end
 
   describe '#find_source_user' do
diff --git a/spec/lib/import/bulk_imports/common/transformers/source_user_member_attributes_transformer_spec.rb b/spec/lib/import/bulk_imports/common/transformers/source_user_member_attributes_transformer_spec.rb
index b84f2def3792..48392b8f5a8d 100644
--- a/spec/lib/import/bulk_imports/common/transformers/source_user_member_attributes_transformer_spec.rb
+++ b/spec/lib/import/bulk_imports/common/transformers/source_user_member_attributes_transformer_spec.rb
@@ -125,6 +125,18 @@
         expect(subject.transform(context, nil)).to eq(nil)
       end
     end
+
+    context 'when ActiveRecord::RecordNotUnique is raised when creating the source user' do
+      before do
+        allow_next_instance_of(Gitlab::Import::SourceUserMapper) do |mapper|
+          allow(mapper).to receive(:find_or_create_source_user).and_raise(ActiveRecord::RecordNotUnique)
+        end
+      end
+
+      it 'raises BulkImports::RetryPipelineError' do
+        expect { subject.transform(context, data) }.to raise_error { BulkImports::RetryPipelineError }
+      end
+    end
   end
 
   context 'with a project' do
diff --git a/spec/models/import/source_user_spec.rb b/spec/models/import/source_user_spec.rb
index 49a8e27fdb5c..c8add9d37f73 100644
--- a/spec/models/import/source_user_spec.rb
+++ b/spec/models/import/source_user_spec.rb
@@ -23,6 +23,13 @@
         .with_message('already assigned to another placeholder')
     end
 
+    it 'validates uniqueness of source_user_identifier' do
+      create(:import_source_user)
+
+      is_expected.to validate_uniqueness_of(:source_user_identifier)
+        .scoped_to(:namespace_id, :source_hostname, :import_type)
+    end
+
     context 'when completed' do
       subject { build(:import_source_user, :completed) }
 
-- 
GitLab