diff --git a/config/gitlab.yml.example b/config/gitlab.yml.example
index 03a453534656252e7219be18e307f16082fba899..9c0fa7832504f217b4279b2ef96de0bdcc3eb291 100644
--- a/config/gitlab.yml.example
+++ b/config/gitlab.yml.example
@@ -469,6 +469,11 @@ production: &base
     geo_file_download_dispatch_worker:
       cron: "*/1 * * * *"
 
+    # GitLab Geo registry sync worker (for backfilling)
+    # NOTE: This will only take effect if Geo is enabled (secondary nodes only)
+    geo_registry_sync_worker:
+      cron: "*/1 * * * *"
+
     # GitLab Geo migrated local files clean up worker
     # NOTE: This will only take effect if Geo is enabled (secondary nodes only)
     geo_migrated_local_files_clean_up_worker:
diff --git a/config/initializers/1_settings.rb b/config/initializers/1_settings.rb
index 2699d0984dc9b4931fb96320000906e999034d37..df552e33fd06a21d85bc87bde32b51c97102aafb 100644
--- a/config/initializers/1_settings.rb
+++ b/config/initializers/1_settings.rb
@@ -513,6 +513,9 @@
   Settings.cron_jobs['geo_file_download_dispatch_worker'] ||= Settingslogic.new({})
   Settings.cron_jobs['geo_file_download_dispatch_worker']['cron'] ||= '*/1 * * * *'
   Settings.cron_jobs['geo_file_download_dispatch_worker']['job_class'] ||= 'Geo::FileDownloadDispatchWorker'
+  Settings.cron_jobs['geo_registry_sync_worker'] ||= Settingslogic.new({})
+  Settings.cron_jobs['geo_registry_sync_worker']['cron'] ||= '*/1 * * * *'
+  Settings.cron_jobs['geo_registry_sync_worker']['job_class'] ||= 'Geo::RegistrySyncWorker'
   Settings.cron_jobs['geo_metrics_update_worker'] ||= Settingslogic.new({})
   Settings.cron_jobs['geo_metrics_update_worker']['cron'] ||= '*/1 * * * *'
   Settings.cron_jobs['geo_metrics_update_worker']['job_class'] ||= 'Geo::MetricsUpdateWorker'
diff --git a/ee/app/models/geo/base_registry.rb b/ee/app/models/geo/base_registry.rb
index fafe75a16cd905587ac7bbb3e05632be8dfcfa58..2d4fb8ae4287d51c1bc200a0feb7bcffb8f8d44f 100644
--- a/ee/app/models/geo/base_registry.rb
+++ b/ee/app/models/geo/base_registry.rb
@@ -34,4 +34,21 @@ def self.insert_for_model_ids(ids)
   def self.delete_for_model_ids(ids)
     raise NotImplementedError, "#{self.class} does not implement #{__method__}"
   end
+
+  def self.find_unsynced_registries(batch_size:, except_ids: [])
+    pending
+      .model_id_not_in(except_ids)
+      .limit(batch_size)
+  end
+
+  def self.find_failed_registries(batch_size:, except_ids: [])
+    failed
+      .retry_due
+      .model_id_not_in(except_ids)
+      .limit(batch_size)
+  end
+
+  def model_record_id
+    read_attribute(self.class::MODEL_FOREIGN_KEY)
+  end
 end
diff --git a/ee/app/models/geo/package_file_registry.rb b/ee/app/models/geo/package_file_registry.rb
index 7d9744784ee65d5753b70adfa34622e500e07fbd..c7b3867b2b853723ff2dbdc00b0f72406719d9f3 100644
--- a/ee/app/models/geo/package_file_registry.rb
+++ b/ee/app/models/geo/package_file_registry.rb
@@ -4,6 +4,8 @@ class Geo::PackageFileRegistry < Geo::BaseRegistry
   include ::Delay
   include ShaAttribute
 
+  MODEL_FOREIGN_KEY = :package_file_id
+
   def self.declarative_policy_class
     'Geo::RegistryPolicy'
   end
@@ -20,6 +22,7 @@ def self.declarative_policy_class
   scope :never, -> { where(last_synced_at: nil) }
   scope :failed, -> { with_state(:failed) }
   scope :synced, -> { with_state(:synced) }
+  scope :pending, -> { with_state(:pending) }
   scope :retry_due, -> { where(arel_table[:retry_at].eq(nil).or(arel_table[:retry_at].lt(Time.current))) }
   scope :ordered, -> { order(:id) }
 
diff --git a/ee/app/workers/all_queues.yml b/ee/app/workers/all_queues.yml
index c65cdbc5202cf82bed8d0e70c4975627c2e99109..a1db755a329b03f1473fe83363c6d4fb83d18723 100644
--- a/ee/app/workers/all_queues.yml
+++ b/ee/app/workers/all_queues.yml
@@ -75,6 +75,14 @@
   :weight: 1
   :idempotent: 
   :tags: []
+- :name: cronjob:geo_registry_sync
+  :feature_category: :geo_replication
+  :has_external_dependencies: 
+  :urgency: :low
+  :resource_boundary: :unknown
+  :weight: 1
+  :idempotent: true
+  :tags: []
 - :name: cronjob:geo_repository_sync
   :feature_category: :geo_replication
   :has_external_dependencies: 
diff --git a/ee/app/workers/geo/registry_sync_worker.rb b/ee/app/workers/geo/registry_sync_worker.rb
new file mode 100644
index 0000000000000000000000000000000000000000..6ccf3ece8bc7688471ada432c324a1b0478b93fc
--- /dev/null
+++ b/ee/app/workers/geo/registry_sync_worker.rb
@@ -0,0 +1,84 @@
+# frozen_string_literal: true
+
+module Geo
+  class RegistrySyncWorker < Geo::Scheduler::Secondary::SchedulerWorker
+    # This worker does not perform work scoped to a context
+    include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
+
+    idempotent!
+
+    private
+
+    # We use inexpensive queries now so we don't need a backoff time
+    #
+    # Overrides Geo::Scheduler::SchedulerWorker#should_apply_backoff?
+    def should_apply_backoff?
+      false
+    end
+
+    def max_capacity
+      # Transition-period-solution.
+      # Explained in https://gitlab.com/gitlab-org/gitlab/-/issues/213872#note_336828581
+      [current_node.files_max_capacity / 4, 1].max
+    end
+
+    def schedule_job(replicable_name, model_record_id)
+      job_id = ::Geo::EventWorker.perform_async(replicable_name, :created, model_record_id: model_record_id)
+
+      { model_record_id: model_record_id, replicable_name: replicable_name, job_id: job_id } if job_id
+    end
+
+    # Pools for new resources to be transferred
+    #
+    # @return [Array] resources to be transferred
+    def load_pending_resources
+      resources = find_unsynced_jobs(batch_size: db_retrieve_batch_size)
+      remaining_capacity = db_retrieve_batch_size - resources.count
+
+      if remaining_capacity.zero?
+        resources
+      else
+        resources + find_low_priority_jobs(batch_size: remaining_capacity)
+      end
+    end
+
+    # Get a batch of unsynced resources, taking equal parts from each resource.
+    #
+    # @return [Array] job arguments of unsynced resources
+    def find_unsynced_jobs(batch_size:)
+      jobs = replicator_classes.reduce([]) do |jobs, replicator_class|
+        except_ids = scheduled_replicable_ids(replicator_class.replicable_name)
+
+        jobs << replicator_class
+                  .find_unsynced_registries(batch_size: batch_size, except_ids: except_ids)
+                  .map { |registry| [replicator_class.replicable_name, registry.model_record_id] }
+      end
+
+      take_batch(*jobs, batch_size: batch_size)
+    end
+
+    # Get a batch of failed and synced-but-missing-on-primary resources, taking
+    # equal parts from each resource.
+    #
+    # @return [Array] job arguments of low priority resources
+    def find_low_priority_jobs(batch_size:)
+      jobs = replicator_classes.reduce([]) do |jobs, replicator_class|
+        except_ids = scheduled_replicable_ids(replicator_class.replicable_name)
+
+        jobs << replicator_class
+                  .find_failed_registries(batch_size: batch_size, except_ids: except_ids)
+                  .map { |registry| [replicator_class.replicable_name, registry.model_record_id] }
+      end
+
+      take_batch(*jobs, batch_size: batch_size)
+    end
+
+    def scheduled_replicable_ids(replicable_name)
+      scheduled_jobs.select { |data| data[:replicable_name] == replicable_name }.map { |data| data[:model_record_id] }
+    end
+
+    def replicator_classes
+      Gitlab::Geo::ReplicableModel.replicators
+    end
+  end
+end
diff --git a/ee/lib/gitlab/geo/cron_manager.rb b/ee/lib/gitlab/geo/cron_manager.rb
index 07c45a768746f9954a66769fe04663fdf101ee86..f6b2d2feb8698d2e4f1ec0d67b9b8ed4a1768520 100644
--- a/ee/lib/gitlab/geo/cron_manager.rb
+++ b/ee/lib/gitlab/geo/cron_manager.rb
@@ -15,6 +15,7 @@ class CronManager
 
       SECONDARY_JOBS = %w[
         geo_file_download_dispatch_worker
+        geo_registry_sync_worker
         geo_migrated_local_files_clean_up_worker
         geo_repository_sync_worker
         geo_container_repository_sync_worker
diff --git a/ee/lib/gitlab/geo/replicable_model.rb b/ee/lib/gitlab/geo/replicable_model.rb
index 7117bd58d16e2717119d94611ddcb7cdd429e8c0..6c2b3d02bb9c402109d3f91dc3339b2ef550f89b 100644
--- a/ee/lib/gitlab/geo/replicable_model.rb
+++ b/ee/lib/gitlab/geo/replicable_model.rb
@@ -24,6 +24,8 @@ module ReplicableModel
         def with_replicator(klass)
           raise ArgumentError, 'Must be a class inheriting from Gitlab::Geo::Replicator' unless klass < ::Gitlab::Geo::Replicator
 
+          Gitlab::Geo::ReplicableModel.add_replicator(klass)
+
           class_eval <<-RUBY, __FILE__, __LINE__ + 1
             define_method :replicator do
               @_replicator ||= klass.new(model_record: self)
@@ -32,6 +34,16 @@ def with_replicator(klass)
         end
       end
 
+      def self.add_replicator(klass)
+        @_replicators ||= []
+        @_replicators << klass
+      end
+
+      def self.replicators
+        @_replicators ||= []
+        @_replicators.filter { |replicator| const_defined?(replicator.to_s) }
+      end
+
       # Geo Replicator
       #
       # @abstract
diff --git a/ee/lib/gitlab/geo/replicator.rb b/ee/lib/gitlab/geo/replicator.rb
index 1b24edf3c9ba8e0943dca52822df678f8c07ebbf..9754d80db8f6ee27ceff3e77650214c09f1ca089 100644
--- a/ee/lib/gitlab/geo/replicator.rb
+++ b/ee/lib/gitlab/geo/replicator.rb
@@ -17,8 +17,13 @@ class Replicator
       CLASS_SUFFIXES = %w(RegistryFinder RegistriesResolver).freeze
 
       attr_reader :model_record_id
+
       delegate :model, to: :class
 
+      class << self
+        delegate :find_unsynced_registries, :find_failed_registries, to: :registry_class
+      end
+
       # Declare supported event
       #
       # @example Declaring support for :update and :delete events
diff --git a/ee/spec/lib/gitlab/geo/cron_manager_spec.rb b/ee/spec/lib/gitlab/geo/cron_manager_spec.rb
index 01e1fea22c2b1572d4c8d50c8599ef36df8b68f5..75b95f0f9fd47be22b01b5dbded3b091af6d388f 100644
--- a/ee/spec/lib/gitlab/geo/cron_manager_spec.rb
+++ b/ee/spec/lib/gitlab/geo/cron_manager_spec.rb
@@ -11,6 +11,7 @@
     geo_repository_verification_primary_batch_worker
     geo_repository_sync_worker
     geo_file_download_dispatch_worker
+    geo_registry_sync_worker
     geo_container_repository_sync_worker
     geo_repository_verification_secondary_scheduler_worker
     geo_metrics_update_worker
@@ -35,6 +36,7 @@ def job(name)
     let(:secondary_jobs) do
       [
         job('geo_file_download_dispatch_worker'),
+        job('geo_registry_sync_worker'),
         job('geo_repository_sync_worker'),
         job('geo_container_repository_sync_worker'),
         job('geo_repository_verification_secondary_scheduler_worker'),
diff --git a/ee/spec/models/geo/package_file_registry_spec.rb b/ee/spec/models/geo/package_file_registry_spec.rb
index 8429ef06b0c16641c6361b2cac61e03ab2b34005..b757b13f9d2416dd518c71b6f47edcef4d941018 100644
--- a/ee/spec/models/geo/package_file_registry_spec.rb
+++ b/ee/spec/models/geo/package_file_registry_spec.rb
@@ -7,4 +7,6 @@
     let(:valid_items_for_bulk_insertion) { build_list(:package_file_registry, 10, created_at: Time.zone.now) }
     let(:invalid_items_for_bulk_insertion) { [] } # class does not have any validations defined
   end
+
+  include_examples 'a Geo framework registry'
 end
diff --git a/ee/spec/support/shared_examples/models/geo_framework_registry_shared_examples.rb b/ee/spec/support/shared_examples/models/geo_framework_registry_shared_examples.rb
new file mode 100644
index 0000000000000000000000000000000000000000..4e3397368bf58c2a030d9518eeac86fdbc17788b
--- /dev/null
+++ b/ee/spec/support/shared_examples/models/geo_framework_registry_shared_examples.rb
@@ -0,0 +1,44 @@
+# frozen_string_literal: true
+
+shared_examples 'a Geo framework registry' do
+  let(:registry_class_factory) { described_class.underscore.tr('/', '_').sub('geo_', '').to_sym }
+
+  let!(:failed_item1) { create(registry_class_factory, :failed) }
+  let!(:failed_item2) { create(registry_class_factory, :failed) }
+  let!(:unsynced_item1) { create(registry_class_factory) }
+  let!(:unsynced_item2) { create(registry_class_factory) }
+
+  describe '.find_unsynced_registries' do
+    it 'returns unsynced items' do
+      result = described_class.find_unsynced_registries(batch_size: 10)
+
+      expect(result).to include(unsynced_item1, unsynced_item2)
+    end
+
+    it 'returns unsynced items except some specific item ID' do
+      except_id = unsynced_item1.model_record_id
+
+      result = described_class.find_unsynced_registries(batch_size: 10, except_ids: [except_id])
+
+      expect(result).to include(unsynced_item2)
+      expect(result).not_to include(unsynced_item1)
+    end
+  end
+
+  describe '.find_failed_registries' do
+    it 'returns failed items' do
+      result = described_class.find_failed_registries(batch_size: 10)
+
+      expect(result).to include(failed_item1, failed_item2)
+    end
+
+    it 'returns failed items except some specific item ID' do
+      except_id = failed_item1.model_record_id
+
+      result = described_class.find_failed_registries(batch_size: 10, except_ids: [except_id])
+
+      expect(result).to include(failed_item2)
+      expect(result).not_to include(failed_item1)
+    end
+  end
+end
diff --git a/ee/spec/workers/geo/registry_sync_worker_spec.rb b/ee/spec/workers/geo/registry_sync_worker_spec.rb
new file mode 100644
index 0000000000000000000000000000000000000000..0174f3fa754db4647744519fbeee210550263b77
--- /dev/null
+++ b/ee/spec/workers/geo/registry_sync_worker_spec.rb
@@ -0,0 +1,123 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+describe Geo::RegistrySyncWorker, :geo, :use_sql_query_cache_for_tracking_db do
+  include ::EE::GeoHelpers
+  include ExclusiveLeaseHelpers
+
+  let(:primary)   { create(:geo_node, :primary) }
+  let(:secondary) { create(:geo_node) }
+
+  before do
+    stub_current_geo_node(secondary)
+    stub_exclusive_lease(renew: true)
+
+    allow_next_instance_of(described_class) do |instance|
+      allow(instance).to receive(:over_time?).and_return(false)
+    end
+  end
+
+  it 'does not schedule anything when tracking database is not configured' do
+    create(:package_file_registry)
+
+    expect(::Geo::EventWorker).not_to receive(:perform_async)
+
+    with_no_geo_database_configured do
+      subject.perform
+    end
+  end
+
+  it 'does not schedule anything when node is disabled' do
+    create(:package_file_registry)
+
+    secondary.enabled = false
+    secondary.save!
+
+    expect(::Geo::EventWorker).not_to receive(:perform_async)
+
+    subject.perform
+  end
+
+  it 'does not schedule duplicated jobs' do
+    package_file_1 = create(:package_file_registry)
+    package_file_2 = create(:package_file_registry)
+
+    stub_const('Geo::Scheduler::SchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 5)
+    secondary.update!(files_max_capacity: 8)
+    allow(Gitlab::SidekiqStatus).to receive(:job_status).with([]).and_return([]).twice
+    allow(Gitlab::SidekiqStatus).to receive(:job_status).with(array_including('123', '456')).and_return([true, true], [true, true], [false, false])
+
+    expect(::Geo::EventWorker)
+      .to receive(:perform_async)
+      .with('package_file', :created, { model_record_id: package_file_1.package_file.id })
+      .once
+      .and_return('123')
+    expect(::Geo::EventWorker)
+      .to receive(:perform_async)
+      .with('package_file', :created, { model_record_id: package_file_2.package_file.id })
+      .once
+      .and_return('456')
+
+    subject.perform
+  end
+
+  it 'does not schedule duplicated jobs because of query cache' do
+    package_file_1 = create(:package_file_registry)
+    package_file_2 = create(:package_file_registry)
+
+    # We retrieve all the items in a single batch
+    stub_const('Geo::Scheduler::SchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 2)
+    # 8 / 4 = 2 We use one quarter of common files_max_capacity in the Geo::RegistrySyncWorker
+    secondary.update!(files_max_capacity: 8)
+
+    expect(Geo::EventWorker).to receive(:perform_async).with('package_file', :created, { model_record_id: package_file_1.package_file.id }).once do
+      Thread.new do
+        # Rails will invalidate the query cache if the update happens in the same thread
+        Geo::PackageFileRegistry.update(state: Geo::PackageFileRegistry::STATE_VALUES[:synced])
+      end
+    end
+
+    expect(Geo::EventWorker).to receive(:perform_async)
+                                  .with('package_file', :created, { model_record_id: package_file_2.package_file.id })
+                                  .once
+
+    subject.perform
+  end
+
+  # Test the case where we have:
+  #
+  # 1. A total of 10 files in the queue, and we can load a maximimum of 5 and send 2 at a time.
+  # 2. We send 2, wait for 1 to finish, and then send again.
+  it 'attempts to load a new batch without pending downloads' do
+    stub_const('Geo::Scheduler::SchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 5)
+    # 8 / 4 = 2 We use one quarter of common files_max_capacity in the Geo::RegistrySyncWorker
+    secondary.update!(files_max_capacity: 8)
+
+    result_object = double(
+      :result,
+      success: true,
+      bytes_downloaded: 100,
+      primary_missing_file: false,
+      reason: '',
+      extra_details: {}
+    )
+
+    allow_any_instance_of(::Gitlab::Geo::Replication::BlobDownloader).to receive(:execute).and_return(result_object)
+
+    create_list(:package_file_registry, 10)
+
+    expect(::Geo::EventWorker).to receive(:perform_async).exactly(10).times.and_call_original
+    # For 10 downloads, we expect four database reloads:
+    # 1. Load the first batch of 5.
+    # 2. 4 get sent out, 1 remains. This triggers another reload, which loads in the next 5.
+    # 3. Those 4 get sent out, and 1 remains.
+    # 3. Since the second reload filled the pipe with 4, we need to do a final reload to ensure
+    #    zero are left.
+    expect(subject).to receive(:load_pending_resources).exactly(4).times.and_call_original
+
+    Sidekiq::Testing.inline! do
+      subject.perform
+    end
+  end
+end