diff --git a/app/models/import/source_user.rb b/app/models/import/source_user.rb index 85ea02be1f87c79fd56ed0fc83fab24f8a096450..52e7cd68955f70711a82067b22197549e11a5142 100644 --- a/app/models/import/source_user.rb +++ b/app/models/import/source_user.rb @@ -20,6 +20,7 @@ class SourceUser < ApplicationRecord belongs_to :namespace validates :namespace_id, :import_type, :source_hostname, :source_user_identifier, :status, presence: true + validates :source_user_identifier, uniqueness: { scope: [:namespace_id, :source_hostname, :import_type] } validates :placeholder_user_id, presence: true, unless: :completed? validates :reassign_to_user_id, presence: true, if: -> { awaiting_approval? || reassignment_in_progress? || completed? diff --git a/app/workers/bulk_imports/pipeline_batch_worker.rb b/app/workers/bulk_imports/pipeline_batch_worker.rb index 0d8f5df9cca0620c4088becfb270f7e5006e3df6..07487a0e83c945e070c8933238fc4d791af055b9 100644 --- a/app/workers/bulk_imports/pipeline_batch_worker.rb +++ b/app/workers/bulk_imports/pipeline_batch_worker.rb @@ -109,6 +109,10 @@ def context def retry_batch(exception) batch.retry! + logger.error(log_attributes( + message: "Retrying pipeline", exception: { message: exception.message, class: exception.class.name } + )) + re_enqueue(exception.retry_delay) end diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb index f4dd06648a0551c71430137f4ea568ec769c171e..8d937dcf453586cf7ee65f4a35b5d1233b130e45 100644 --- a/lib/bulk_imports/pipeline/runner.rb +++ b/lib/bulk_imports/pipeline/runner.rb @@ -80,11 +80,12 @@ def run_pipeline_step(step, class_name = nil, entry = nil) importer: 'gitlab_migration' ) rescue BulkImports::NetworkError => e - raise BulkImports::RetryPipelineError.new(e.message, e.retry_delay) if e.retriable?(context.tracker) + raise BulkImports::RetryPipelineError.new(e.message, e.retry_delay), cause: e if e.retriable?(context.tracker) log_and_fail(e, step, entry) - rescue Gitlab::Import::SourceUserMapper::FailedToObtainLockError => e - raise BulkImports::RetryPipelineError.new(e.message, nil) + rescue Gitlab::Import::SourceUserMapper::FailedToObtainLockError, + Gitlab::Import::SourceUserMapper::DuplicatedSourceUserError => e + raise BulkImports::RetryPipelineError.new(e.message), cause: e rescue BulkImports::RetryPipelineError raise rescue StandardError => e diff --git a/lib/bulk_imports/retry_pipeline_error.rb b/lib/bulk_imports/retry_pipeline_error.rb index a1b02addf45e6f0707f81cd9fd5affb477b9a639..4e052ae1766ea38b9d37c58db712246210faa1c2 100644 --- a/lib/bulk_imports/retry_pipeline_error.rb +++ b/lib/bulk_imports/retry_pipeline_error.rb @@ -4,7 +4,7 @@ module BulkImports class RetryPipelineError < Error attr_reader :retry_delay - def initialize(message, retry_delay) + def initialize(message, retry_delay = nil) super(message) @retry_delay = retry_delay diff --git a/lib/gitlab/import/source_user_mapper.rb b/lib/gitlab/import/source_user_mapper.rb index 70b7c6064c869506cc1f95f55011e62e6ed8793f..2424731da2e99de2911dd79c57d4766ce710b9b4 100644 --- a/lib/gitlab/import/source_user_mapper.rb +++ b/lib/gitlab/import/source_user_mapper.rb @@ -10,19 +10,37 @@ class SourceUserMapper LOCK_SLEEP = 0.3.seconds.freeze LOCK_RETRIES = 100 + DuplicatedSourceUserError = Class.new(StandardError) + def initialize(namespace:, import_type:, source_hostname:) @namespace = namespace.root_ancestor @import_type = import_type @source_hostname = source_hostname end + # Finds a source user by the provided `source_user_identifier`. + # + # This method first checks an in-memory LRU (Least Recently Used) cache, + # stored in `SafeRequestStore`, to avoid unnecessary database queries. + # If the source user is not present in the cache, it will query the database + # and store the result in the cache for future use. + # + # Since jobs may create source users concurrently, the ActiveRecord query + # cache is explicitly disabled when querying the database to ensure that + # we always get the latest data. + # + # @param [String] source_user_identifier The identifier for the source user to find. + # @return [Import::SourceUser, nil] The found source user object, or `nil` if no match is found. + # def find_source_user(source_user_identifier) - cache_from_request_store[source_user_identifier] ||= ::Import::SourceUser.find_source_user( - source_user_identifier: source_user_identifier, - namespace: namespace, - source_hostname: source_hostname, - import_type: import_type - ) + cache_from_request_store[source_user_identifier] ||= ::Import::SourceUser.uncached do + ::Import::SourceUser.find_source_user( + source_user_identifier: source_user_identifier, + namespace: namespace, + source_hostname: source_hostname, + import_type: import_type + ) + end end def find_or_create_source_user(source_name:, source_username:, source_user_identifier:) @@ -75,6 +93,12 @@ def create_source_user_mapping(source_name, source_username, source_user_identif import_source_user.save! import_source_user end + rescue PG::UniqueViolation, ActiveRecord::RecordNotUnique => e + raise DuplicatedSourceUserError.new(e.message), cause: e + rescue ActiveRecord::RecordInvalid => e + raise DuplicatedSourceUserError.new(e.message), cause: e if user_has_duplicated_errors?(e.record) + + raise end def create_placeholder_user(import_source_user) @@ -94,6 +118,11 @@ def placeholder_user_limit_exceeded? def lock_key(source_user_identifier) "import:source_user_mapper:#{namespace.id}:#{import_type}:#{source_hostname}:#{source_user_identifier}" end + + def user_has_duplicated_errors?(record) + attributes = %i[email username] + record.errors.filter { |error| error.type == :taken && attributes.include?(error.attribute) }.any? + end end end end diff --git a/spec/lib/bulk_imports/pipeline/runner_spec.rb b/spec/lib/bulk_imports/pipeline/runner_spec.rb index 14df1e380652f8dd1a8428ded17436b2e9a595e0..b61cbc568b2f82fb749664d01f6053d338a23db3 100644 --- a/spec/lib/bulk_imports/pipeline/runner_spec.rb +++ b/spec/lib/bulk_imports/pipeline/runner_spec.rb @@ -306,22 +306,25 @@ def load(context, data); end end end - context 'when the exception Gitlab::Import::SourceUserMapper::FailedToObtainLockError is raised' do - it 'raises the exception BulkImports::RetryPipelineError' do - allow_next_instance_of(BulkImports::Extractor) do |extractor| - allow(extractor) - .to receive(:extract) - .with(context) - .and_return(extracted_data) - end + [Gitlab::Import::SourceUserMapper::FailedToObtainLockError, + Gitlab::Import::SourceUserMapper::DuplicatedSourceUserError].each do |exception_class| + context "when #{exception_class} is raised" do + it 'raises the exception BulkImports::RetryPipelineError' do + allow_next_instance_of(BulkImports::Extractor) do |extractor| + allow(extractor) + .to receive(:extract) + .with(context) + .and_return(extracted_data) + end - allow_next_instance_of(BulkImports::Transformer) do |transformer| - allow(transformer) - .to receive(:transform) - .and_raise(Gitlab::Import::SourceUserMapper::FailedToObtainLockError) - end + allow_next_instance_of(BulkImports::Transformer) do |transformer| + allow(transformer) + .to receive(:transform) + .and_raise(exception_class) + end - expect { subject.run }.to raise_error(BulkImports::RetryPipelineError) + expect { subject.run }.to raise_error(BulkImports::RetryPipelineError) + end end end diff --git a/spec/lib/gitlab/import/source_user_mapper_spec.rb b/spec/lib/gitlab/import/source_user_mapper_spec.rb index b61bf5ec9fdc25049338bbb050cd85c2706e33c7..b5cf77d15e74775352a8723349d32fa12fa1d431 100644 --- a/spec/lib/gitlab/import/source_user_mapper_spec.rb +++ b/spec/lib/gitlab/import/source_user_mapper_spec.rb @@ -148,6 +148,37 @@ expect(new_import_source_user.placeholder_user).to eq(import_user) end end + + context 'when ActiveRecord::RecordNotUnique exception is raised during the source user creation' do + before do + allow_next_instance_of(::Import::SourceUser) do |source_user| + allow(source_user).to receive(:save!).and_raise(ActiveRecord::RecordNotUnique) + end + end + + it 'raises DuplicatedSourceUserError' do + expect { find_or_create_source_user }.to raise_error(described_class::DuplicatedSourceUserError) + end + end + + context 'when ActiveRecord::RecordInvalid exception because the placeholder user email or username is taken' do + it 'rescue the exception and raises DuplicatedSourceUserError' do + create(:user, email: 'user@example.com') + user = build(:user, email: 'user@example.com').tap(&:valid?) + allow(User).to receive(:new).and_return(user) + + expect { find_or_create_source_user }.to raise_error(described_class::DuplicatedSourceUserError) + end + end + + context 'when ActiveRecord::RecordInvalid exception raises for another reason' do + it 'bubbles up the ActiveRecord::RecordInvalid exception' do + user = build(:user, email: nil) + allow(User).to receive(:new).and_return(user) + + expect { find_or_create_source_user }.to raise_error(ActiveRecord::RecordInvalid) + end + end end describe '#find_source_user' do diff --git a/spec/lib/import/bulk_imports/common/transformers/source_user_member_attributes_transformer_spec.rb b/spec/lib/import/bulk_imports/common/transformers/source_user_member_attributes_transformer_spec.rb index b84f2def3792e828e765ed5a6bd06a98c1ec0583..48392b8f5a8d193149b711e750929d7332146473 100644 --- a/spec/lib/import/bulk_imports/common/transformers/source_user_member_attributes_transformer_spec.rb +++ b/spec/lib/import/bulk_imports/common/transformers/source_user_member_attributes_transformer_spec.rb @@ -125,6 +125,18 @@ expect(subject.transform(context, nil)).to eq(nil) end end + + context 'when ActiveRecord::RecordNotUnique is raised when creating the source user' do + before do + allow_next_instance_of(Gitlab::Import::SourceUserMapper) do |mapper| + allow(mapper).to receive(:find_or_create_source_user).and_raise(ActiveRecord::RecordNotUnique) + end + end + + it 'raises BulkImports::RetryPipelineError' do + expect { subject.transform(context, data) }.to raise_error { BulkImports::RetryPipelineError } + end + end end context 'with a project' do diff --git a/spec/models/import/source_user_spec.rb b/spec/models/import/source_user_spec.rb index 49a8e27fdb5cd82124f4da76306368c1a33882d7..c8add9d37f73c01e1fcf4b7dcda3556822ad217c 100644 --- a/spec/models/import/source_user_spec.rb +++ b/spec/models/import/source_user_spec.rb @@ -23,6 +23,13 @@ .with_message('already assigned to another placeholder') end + it 'validates uniqueness of source_user_identifier' do + create(:import_source_user) + + is_expected.to validate_uniqueness_of(:source_user_identifier) + .scoped_to(:namespace_id, :source_hostname, :import_type) + end + context 'when completed' do subject { build(:import_source_user, :completed) }