diff --git a/app/workers/concerns/update_repository_storage_worker.rb b/app/workers/concerns/update_repository_storage_worker.rb
index 01744d1e57dc861f7efadd43ffe8eccf4a342d86..c92d498ee1ba6cbc0b42067c8633d1f71b532d9c 100644
--- a/app/workers/concerns/update_repository_storage_worker.rb
+++ b/app/workers/concerns/update_repository_storage_worker.rb
@@ -11,6 +11,8 @@ module UpdateRepositoryStorageWorker
     urgency :throttled
   end
 
+  LEASE_TIMEOUT = 30.minutes.to_i
+
   def perform(container_id, new_repository_storage_key, repository_storage_move_id = nil)
     repository_storage_move =
       if repository_storage_move_id
@@ -24,7 +26,37 @@ def perform(container_id, new_repository_storage_key, repository_storage_move_id
         )
       end
 
-    update_repository_storage(repository_storage_move)
+    if Feature.enabled?(:use_lock_for_update_repository_storage)
+      # Use exclusive lock to prevent multiple storage migrations at the same time
+      #
+      # Note: instead of using a randomly generated `uuid`, we provide a worker jid value.
+      # That will allow to track a worker that requested a lease.
+      lease_key = [self.class.name.underscore, container_id].join(':')
+      exclusive_lease = Gitlab::ExclusiveLease.new(lease_key, uuid: jid, timeout: LEASE_TIMEOUT)
+      lease = exclusive_lease.try_obtain
+
+      if lease
+        begin
+          update_repository_storage(repository_storage_move)
+        ensure
+          exclusive_lease.cancel
+        end
+      else
+        # If there is an ungoing storage migration, then the current one should be marked as failed
+        repository_storage_move.do_fail!
+
+        # A special case
+        # Sidekiq can receive an interrupt signal during the processing.
+        # It kills existing workers and reschedules their jobs using the same jid.
+        # But it can cause a situation when the migration is only half complete (see https://gitlab.com/gitlab-org/gitlab/-/issues/429049#note_1635650597)
+        #
+        # Here we detect this case and release the lock.
+        uuid = Gitlab::ExclusiveLease.get_uuid(lease_key)
+        exclusive_lease.cancel if uuid == jid
+      end
+    else
+      update_repository_storage(repository_storage_move)
+    end
   end
 
   private
diff --git a/config/feature_flags/development/use_lock_for_update_repository_storage.yml b/config/feature_flags/development/use_lock_for_update_repository_storage.yml
new file mode 100644
index 0000000000000000000000000000000000000000..8f08208bca000b3c5d18f8d1796b128e20738e80
--- /dev/null
+++ b/config/feature_flags/development/use_lock_for_update_repository_storage.yml
@@ -0,0 +1,8 @@
+---
+name: use_lock_for_update_repository_storage
+introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/136169
+rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/431198
+milestone: '16.6'
+type: development
+group: group::source code
+default_enabled: false
diff --git a/spec/support/shared_examples/workers/update_repository_move_shared_examples.rb b/spec/support/shared_examples/workers/update_repository_move_shared_examples.rb
index 9b7183a9eac2b24e40fa3c123dcf58302f5bca7d..5682725638f3df9e69514e0a6dc69484e98dff32 100644
--- a/spec/support/shared_examples/workers/update_repository_move_shared_examples.rb
+++ b/spec/support/shared_examples/workers/update_repository_move_shared_examples.rb
@@ -1,11 +1,15 @@
 # frozen_string_literal: true
 
 RSpec.shared_examples 'an update storage move worker' do
+  let(:worker) { described_class.new }
+
   it 'has the `until_executed` deduplicate strategy' do
     expect(described_class.get_deduplicate_strategy).to eq(:until_executed)
   end
 
-  describe '#perform' do
+  describe '#perform', :clean_gitlab_redis_shared_state do
+    subject { worker.perform(container.id, 'test_second_storage', repository_storage_move_id) }
+
     let(:service) { double(:update_repository_storage_service) }
 
     before do
@@ -13,12 +17,14 @@
     end
 
     context 'without repository storage move' do
+      let(:repository_storage_move_id) { nil }
+
       it 'calls the update repository storage service' do
         expect(service_klass).to receive(:new).and_return(service)
         expect(service).to receive(:execute)
 
         expect do
-          subject.perform(container.id, 'test_second_storage')
+          worker.perform(container.id, 'test_second_storage')
         end.to change { repository_storage_move_klass.count }.by(1)
 
         storage_move = container.repository_storage_moves.last
@@ -30,14 +36,77 @@
     end
 
     context 'with repository storage move' do
+      let(:repository_storage_move_id) { repository_storage_move.id }
+
+      before do
+        allow(service_klass).to receive(:new).and_return(service)
+      end
+
       it 'calls the update repository storage service' do
-        expect(service_klass).to receive(:new).and_return(service)
         expect(service).to receive(:execute)
 
         expect do
-          subject.perform(nil, nil, repository_storage_move.id)
+          subject
         end.not_to change { repository_storage_move_klass.count }
       end
+
+      context 'when repository storage move raises an exception' do
+        let(:exception) { RuntimeError.new('boom') }
+
+        it 'releases the exclusive lock' do
+          expect(service).to receive(:execute).and_raise(exception)
+
+          allow_next_instance_of(Gitlab::ExclusiveLease) do |lease|
+            expect(lease).to receive(:cancel)
+          end
+
+          expect { subject }.to raise_error(exception)
+        end
+      end
+
+      context 'when exclusive lease already set' do
+        let(:lease_key) { [described_class.name.underscore, container.id].join(':') }
+        let(:exclusive_lease) { Gitlab::ExclusiveLease.new(lease_key, uuid: uuid, timeout: 1.minute) }
+        let(:uuid) { 'other_worker_jid' }
+
+        it 'does not call the update repository storage service' do
+          expect(exclusive_lease.try_obtain).to eq(uuid)
+          expect(service).not_to receive(:execute)
+
+          subject
+
+          expect(repository_storage_move.reload).to be_failed
+        end
+
+        context 'when exclusive lease was taken by the current worker' do
+          let(:uuid) { 'existing_worker_jid' }
+
+          before do
+            allow(worker).to receive(:jid).and_return(uuid)
+          end
+
+          it 'marks storage migration as failed' do
+            expect(exclusive_lease.try_obtain).to eq(worker.jid)
+            expect(service).not_to receive(:execute)
+
+            subject
+
+            expect(repository_storage_move.reload).to be_failed
+          end
+        end
+
+        context 'when feature flag "use_lock_for_update_repository_storage" is disabled' do
+          before do
+            stub_feature_flags(use_lock_for_update_repository_storage: false)
+          end
+
+          it 'ignores lock and calls the update repository storage service' do
+            expect(service).to receive(:execute)
+
+            subject
+          end
+        end
+      end
     end
   end
 end
diff --git a/spec/workers/projects/update_repository_storage_worker_spec.rb b/spec/workers/projects/update_repository_storage_worker_spec.rb
index 91445c2bbf6ae2cb6ba39cffa3fdfdbacbf7a0fc..44c2dc41b2b37b3e106014420d7daffab14c12e2 100644
--- a/spec/workers/projects/update_repository_storage_worker_spec.rb
+++ b/spec/workers/projects/update_repository_storage_worker_spec.rb
@@ -7,7 +7,7 @@
 
   it_behaves_like 'an update storage move worker' do
     let_it_be_with_refind(:container) { create(:project, :repository) }
-    let_it_be(:repository_storage_move) { create(:project_repository_storage_move) }
+    let_it_be_with_reload(:repository_storage_move) { create(:project_repository_storage_move) }
 
     let(:service_klass) { Projects::UpdateRepositoryStorageService }
     let(:repository_storage_move_klass) { Projects::RepositoryStorageMove }