diff --git a/app/models/ci/catalog/resource.rb b/app/models/ci/catalog/resource.rb
index 8739450fb8e472b58d79a5d8a6344d380a223eaa..4be48f1bd1f618bf14d5c9fff1bd1cc7f7e27dda 100644
--- a/app/models/ci/catalog/resource.rb
+++ b/app/models/ci/catalog/resource.rb
@@ -17,6 +17,8 @@ class Resource < ::ApplicationRecord
         inverse_of: :catalog_resource
       has_many :versions, class_name: 'Ci::Catalog::Resources::Version', foreign_key: :catalog_resource_id,
         inverse_of: :catalog_resource
+      has_many :sync_events, class_name: 'Ci::Catalog::Resources::SyncEvent', foreign_key: :catalog_resource_id,
+        inverse_of: :catalog_resource
 
       scope :for_projects, ->(project_ids) { where(project_id: project_ids) }
 
@@ -37,6 +39,14 @@ class Resource < ::ApplicationRecord
 
       before_create :sync_with_project
 
+      class << self
+        # Used by Ci::ProcessSyncEventsService
+        def sync!(event)
+          # There may be orphaned records since this table does not enforce FKs
+          event.catalog_resource&.sync_with_project!
+        end
+      end
+
       def to_param
         full_path
       end
@@ -54,17 +64,16 @@ def sync_with_project!
         save!
       end
 
-      # Triggered in Ci::Catalog::Resources::Version and Release model callbacks.
+      # Triggered in Ci::Catalog::Resources::Version and Release model callbacks
       def update_latest_released_at!
         update!(latest_released_at: versions.latest&.released_at)
       end
 
       private
 
-      # These columns are denormalized from the `projects` table. We first sync these
-      # columns when the catalog resource record is created. Then any updates to the
-      # `projects` columns will be synced to the `catalog_resources` table by a worker
-      # (to be implemented in https://gitlab.com/gitlab-org/gitlab/-/issues/429376.)
+      # These denormalized columns are first synced when a new catalog resource is created.
+      # A PG trigger adds a SyncEvent when the associated project updates any of these columns.
+      # A worker processes the SyncEvents with Ci::ProcessSyncEventsService.
       def sync_with_project
         self.name = project.name
         self.description = project.description
diff --git a/app/models/ci/catalog/resources/sync_event.rb b/app/models/ci/catalog/resources/sync_event.rb
new file mode 100644
index 0000000000000000000000000000000000000000..a28191028756210e6546a131b9ee6bd1ba586d11
--- /dev/null
+++ b/app/models/ci/catalog/resources/sync_event.rb
@@ -0,0 +1,88 @@
+# frozen_string_literal: true
+
+module Ci
+  module Catalog
+    module Resources
+      # This table is used as a queue of catalog resources that need to be synchronized with `projects`.
+      # A PG trigger adds a SyncEvent when the associated `projects` record of a catalog resource
+      # updates any of the relevant columns referenced in `Ci::Catalog::Resource#sync_with_project`
+      # (DB function name: `insert_catalog_resource_sync_event`).
+      class SyncEvent < ::ApplicationRecord
+        include PartitionedTable
+        include IgnorableColumns
+
+        PARTITION_DURATION = 1.day
+
+        self.table_name = 'p_catalog_resource_sync_events'
+        self.primary_key = :id
+        self.sequence_name = :p_catalog_resource_sync_events_id_seq
+
+        ignore_column :partition_id, remove_with: '3000.0', remove_after: '3000-01-01'
+
+        belongs_to :catalog_resource, class_name: 'Ci::Catalog::Resource', inverse_of: :sync_events
+        belongs_to :project, inverse_of: :catalog_resource_sync_events
+
+        scope :for_partition, ->(partition) { where(partition_id: partition) }
+        scope :select_with_partition,
+          -> { select(:id, :catalog_resource_id, arel_table[:partition_id].as('partition')) }
+
+        scope :unprocessed_events, -> { select_with_partition.status_pending }
+        scope :preload_synced_relation, -> { preload(catalog_resource: :project) }
+
+        enum status: { pending: 1, processed: 2 }, _prefix: :status
+
+        partitioned_by :partition_id, strategy: :sliding_list,
+          next_partition_if: ->(active_partition) do
+            oldest_record_in_partition = Ci::Catalog::Resources::SyncEvent
+              .select(:id, :created_at)
+              .for_partition(active_partition.value)
+              .order(:id)
+              .limit(1)
+              .take
+
+            oldest_record_in_partition.present? &&
+              oldest_record_in_partition.created_at < PARTITION_DURATION.ago
+          end,
+          detach_partition_if: ->(partition) do
+            !Ci::Catalog::Resources::SyncEvent
+              .for_partition(partition.value)
+              .status_pending
+              .exists?
+          end
+
+        class << self
+          def mark_records_processed(records)
+            update_by_partition(records) do |partitioned_scope|
+              partitioned_scope.update_all(status: :processed)
+            end
+          end
+
+          def enqueue_worker
+            return unless Feature.enabled?(:ci_process_catalog_resource_sync_events)
+
+            ::Ci::Catalog::Resources::ProcessSyncEventsWorker.perform_async # rubocop:disable CodeReuse/Worker -- Worker is scheduled in model callback functions
+          end
+
+          def upper_bound_count
+            select('COALESCE(MAX(id) - MIN(id) + 1, 0) AS upper_bound_count')
+              .status_pending.to_a.first.upper_bound_count
+          end
+
+          private
+
+          # You must use .select_with_partition before calling this method
+          # as it requires the partition to be explicitly selected.
+          def update_by_partition(records)
+            records.group_by(&:partition).each do |partition, records_within_partition|
+              partitioned_scope = status_pending
+                .for_partition(partition)
+                .where(id: records_within_partition.map(&:id))
+
+              yield(partitioned_scope)
+            end
+          end
+        end
+      end
+    end
+  end
+end
diff --git a/app/models/namespaces/sync_event.rb b/app/models/namespaces/sync_event.rb
index fbe047f2c5ab8256156cf478862c8fe3297b0764..c06d86cb2978e0fabf41506a8108cf7950017de8 100644
--- a/app/models/namespaces/sync_event.rb
+++ b/app/models/namespaces/sync_event.rb
@@ -7,9 +7,14 @@ class Namespaces::SyncEvent < ApplicationRecord
 
   belongs_to :namespace
 
+  scope :unprocessed_events, -> { all }
   scope :preload_synced_relation, -> { preload(:namespace) }
   scope :order_by_id_asc, -> { order(id: :asc) }
 
+  def self.mark_records_processed(records)
+    id_in(records).delete_all
+  end
+
   def self.enqueue_worker
     ::Namespaces::ProcessSyncEventsWorker.perform_async # rubocop:disable CodeReuse/Worker
   end
diff --git a/app/models/project.rb b/app/models/project.rb
index bb421fc7dc666f6e281d0bc6c18c2d22049b20a5..35e38cd37d1ef85e9e1a3c9a25b8e929b3e9ad53 100644
--- a/app/models/project.rb
+++ b/app/models/project.rb
@@ -142,9 +142,8 @@ class Project < ApplicationRecord
   after_create :set_timestamps_for_create
   after_create :check_repository_absence!
 
-  # TODO: Remove this callback after background syncing is implemented. See https://gitlab.com/gitlab-org/gitlab/-/issues/429376.
-  after_update :update_catalog_resource,
-    if: -> { (saved_change_to_name? || saved_change_to_description? || saved_change_to_visibility_level?) && catalog_resource }
+  after_update :enqueue_catalog_resource_sync_event_worker,
+    if: -> { catalog_resource && (saved_change_to_name? || saved_change_to_description? || saved_change_to_visibility_level?) }
 
   before_destroy :remove_private_deploy_keys
   after_destroy :remove_exports
@@ -187,6 +186,7 @@ class Project < ApplicationRecord
   has_one :catalog_resource, class_name: 'Ci::Catalog::Resource', inverse_of: :project
   has_many :ci_components, class_name: 'Ci::Catalog::Resources::Component', inverse_of: :project
   has_many :catalog_resource_versions, class_name: 'Ci::Catalog::Resources::Version', inverse_of: :project
+  has_many :catalog_resource_sync_events, class_name: 'Ci::Catalog::Resources::SyncEvent', inverse_of: :project
 
   has_one :last_event, -> { order 'events.created_at DESC' }, class_name: 'Event'
   has_many :boards
@@ -3468,8 +3468,13 @@ def pool_repository_shard_matches_repository?(pool)
     pool_repository_shard == repository_storage
   end
 
-  def update_catalog_resource
-    catalog_resource.sync_with_project!
+  # Catalog resource SyncEvents are created by PG triggers
+  def enqueue_catalog_resource_sync_event_worker
+    catalog_resource.sync_with_project! if Feature.disabled?(:ci_process_catalog_resource_sync_events)
+
+    run_after_commit do
+      ::Ci::Catalog::Resources::SyncEvent.enqueue_worker
+    end
   end
 end
 
diff --git a/app/models/projects/sync_event.rb b/app/models/projects/sync_event.rb
index 7af863c0cf0dffc5820f20da8afe8eb21d2f5c9e..f1688bcd19d11c858193203b18c2f13cc62da75c 100644
--- a/app/models/projects/sync_event.rb
+++ b/app/models/projects/sync_event.rb
@@ -7,9 +7,14 @@ class Projects::SyncEvent < ApplicationRecord
 
   belongs_to :project
 
+  scope :unprocessed_events, -> { all }
   scope :preload_synced_relation, -> { preload(:project) }
   scope :order_by_id_asc, -> { order(id: :asc) }
 
+  def self.mark_records_processed(records)
+    id_in(records).delete_all
+  end
+
   def self.enqueue_worker
     ::Projects::ProcessSyncEventsWorker.perform_async # rubocop:disable CodeReuse/Worker
   end
diff --git a/app/services/ci/process_sync_events_service.rb b/app/services/ci/process_sync_events_service.rb
index d90ee02b1c681adadb9f20e007c8a21405f9ca62..d3c699597b68ad3500f122824b8dbdb5b02bb1ef 100644
--- a/app/services/ci/process_sync_events_service.rb
+++ b/app/services/ci/process_sync_events_service.rb
@@ -13,7 +13,7 @@ def initialize(sync_event_class, sync_class)
     end
 
     def execute
-      # preventing parallel processing over the same event table
+      # To prevent parallel processing over the same event table
       try_obtain_lease { process_events }
 
       enqueue_worker_if_there_still_event
@@ -26,7 +26,7 @@ def execute
     def process_events
       add_result(estimated_total_events: @sync_event_class.upper_bound_count)
 
-      events = @sync_event_class.preload_synced_relation.first(BATCH_SIZE)
+      events = @sync_event_class.unprocessed_events.preload_synced_relation.first(BATCH_SIZE)
 
       add_result(consumable_events: events.size)
 
@@ -42,12 +42,12 @@ def process_events
         end
       ensure
         add_result(processed_events: processed_events.size)
-        @sync_event_class.id_in(processed_events).delete_all
+        @sync_event_class.mark_records_processed(processed_events)
       end
     end
 
     def enqueue_worker_if_there_still_event
-      @sync_event_class.enqueue_worker if @sync_event_class.exists?
+      @sync_event_class.enqueue_worker if @sync_event_class.unprocessed_events.exists?
     end
 
     def lease_key
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml
index db255f222e0ea148d0794c5028860e3a5eec0ce7..89047bda4456c9d2e3dc5ae201195eb545f0e6e3 100644
--- a/app/workers/all_queues.yml
+++ b/app/workers/all_queues.yml
@@ -2694,6 +2694,15 @@
   :weight: 1
   :idempotent: true
   :tags: []
+- :name: ci_catalog_resources_process_sync_events
+  :worker_name: Ci::Catalog::Resources::ProcessSyncEventsWorker
+  :feature_category: :pipeline_composition
+  :has_external_dependencies: false
+  :urgency: :high
+  :resource_boundary: :unknown
+  :weight: 1
+  :idempotent: true
+  :tags: []
 - :name: ci_delete_objects
   :worker_name: Ci::DeleteObjectsWorker
   :feature_category: :continuous_integration
diff --git a/app/workers/ci/catalog/resources/process_sync_events_worker.rb b/app/workers/ci/catalog/resources/process_sync_events_worker.rb
new file mode 100644
index 0000000000000000000000000000000000000000..a577f36858d35428793d21eddf96d4b08148e327
--- /dev/null
+++ b/app/workers/ci/catalog/resources/process_sync_events_worker.rb
@@ -0,0 +1,31 @@
+# frozen_string_literal: true
+
+module Ci
+  module Catalog
+    module Resources
+      # This worker can be called multiple times simultaneously but only one can process events
+      # at a time. This is ensured by `try_obtain_lease` in `Ci::ProcessSyncEventsService`.
+      class ProcessSyncEventsWorker
+        include ApplicationWorker
+
+        feature_category :pipeline_composition
+
+        data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency -- We should not sync stale data
+        urgency :high
+
+        idempotent!
+        deduplicate :until_executed, if_deduplicated: :reschedule_once, ttl: 1.minute
+
+        def perform
+          results = ::Ci::ProcessSyncEventsService.new(
+            ::Ci::Catalog::Resources::SyncEvent, ::Ci::Catalog::Resource
+          ).execute
+
+          results.each do |key, value|
+            log_extra_metadata_on_done(key, value)
+          end
+        end
+      end
+    end
+  end
+end
diff --git a/config/feature_flags/development/ci_process_catalog_resource_sync_events.yml b/config/feature_flags/development/ci_process_catalog_resource_sync_events.yml
new file mode 100644
index 0000000000000000000000000000000000000000..374ccc4409a2ecd224855909eea046dea8e77967
--- /dev/null
+++ b/config/feature_flags/development/ci_process_catalog_resource_sync_events.yml
@@ -0,0 +1,8 @@
+---
+name: ci_process_catalog_resource_sync_events
+introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/137238
+rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/432963
+milestone: '16.7'
+type: development
+group: group::pipeline authoring
+default_enabled: false
diff --git a/config/initializers/postgres_partitioning.rb b/config/initializers/postgres_partitioning.rb
index 458feacba0d8e8edd970472925f2270a70b8bb18..5086f6f7da29f83dc2f57a75537a9615824e5a1e 100644
--- a/config/initializers/postgres_partitioning.rb
+++ b/config/initializers/postgres_partitioning.rb
@@ -12,7 +12,8 @@
     CommitStatus,
     BatchedGitRefUpdates::Deletion,
     Users::ProjectVisit,
-    Users::GroupVisit
+    Users::GroupVisit,
+    Ci::Catalog::Resources::SyncEvent
   ])
 
 if Gitlab.ee?
diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml
index 77503814158d3b2245343d598f02e3fe35bfe408..25a4299ab70148e4b038a153f620a3cd84331420 100644
--- a/config/sidekiq_queues.yml
+++ b/config/sidekiq_queues.yml
@@ -157,6 +157,8 @@
   - 1
 - - ci_cancel_redundant_pipelines
   - 1
+- - ci_catalog_resources_process_sync_events
+  - 1
 - - ci_delete_objects
   - 1
 - - ci_initialize_pipelines_iid_sequence
diff --git a/db/docs/p_catalog_resource_sync_events.yml b/db/docs/p_catalog_resource_sync_events.yml
new file mode 100644
index 0000000000000000000000000000000000000000..b74a644fd74bc0a6913cb062f37f8579f2232757
--- /dev/null
+++ b/db/docs/p_catalog_resource_sync_events.yml
@@ -0,0 +1,13 @@
+---
+table_name: p_catalog_resource_sync_events
+classes:
+- Ci::Catalog::Resources::SyncEvent
+feature_categories:
+- pipeline_composition
+description: A queue of catalog resources that need to be synchronized with data from
+  their associated `projects` records.
+introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/137238
+milestone: '16.7'
+gitlab_schema: gitlab_main_cell
+sharding_key:
+  project_id: projects
diff --git a/db/migrate/20231124191759_add_catalog_resource_sync_events_table.rb b/db/migrate/20231124191759_add_catalog_resource_sync_events_table.rb
new file mode 100644
index 0000000000000000000000000000000000000000..d4c628a17708f318ed4f754b81954f6e79d3a1dc
--- /dev/null
+++ b/db/migrate/20231124191759_add_catalog_resource_sync_events_table.rb
@@ -0,0 +1,39 @@
+# frozen_string_literal: true
+
+class AddCatalogResourceSyncEventsTable < Gitlab::Database::Migration[2.2]
+  milestone '16.7'
+
+  enable_lock_retries!
+
+  def up
+    options = {
+      primary_key: [:id, :partition_id],
+      options: 'PARTITION BY LIST (partition_id)',
+      if_not_exists: true
+    }
+
+    create_table(:p_catalog_resource_sync_events, **options) do |t|
+      t.bigserial :id, null: false
+      # We will not bother with foreign keys as they come with a performance cost; they will get cleaned up over time.
+      t.bigint :catalog_resource_id, null: false
+      t.bigint :project_id, null: false
+      t.bigint :partition_id, null: false, default: 1
+      t.integer :status, null: false, default: 1, limit: 2
+      t.timestamps_with_timezone null: false, default: -> { 'NOW()' }
+
+      t.index :id,
+        where: 'status = 1',
+        name: :index_p_catalog_resource_sync_events_on_id_where_pending
+    end
+
+    connection.execute(<<~SQL)
+      CREATE TABLE IF NOT EXISTS gitlab_partitions_dynamic.p_catalog_resource_sync_events_1
+        PARTITION OF p_catalog_resource_sync_events
+        FOR VALUES IN (1);
+    SQL
+  end
+
+  def down
+    drop_table :p_catalog_resource_sync_events
+  end
+end
diff --git a/db/migrate/20231124282441_add_catalog_resource_sync_event_triggers.rb b/db/migrate/20231124282441_add_catalog_resource_sync_event_triggers.rb
new file mode 100644
index 0000000000000000000000000000000000000000..01f87d61e023dfe064fccec4ed9f979c0a947609
--- /dev/null
+++ b/db/migrate/20231124282441_add_catalog_resource_sync_event_triggers.rb
@@ -0,0 +1,44 @@
+# frozen_string_literal: true
+
+class AddCatalogResourceSyncEventTriggers < Gitlab::Database::Migration[2.2]
+  milestone '16.7'
+
+  include Gitlab::Database::SchemaHelpers
+
+  enable_lock_retries!
+
+  EVENTS_TABLE_NAME = 'p_catalog_resource_sync_events'
+  RESOURCES_TABLE_NAME = 'catalog_resources'
+  PROJECTS_TABLE_NAME = 'projects'
+
+  TRIGGER_FUNCTION_NAME = 'insert_catalog_resource_sync_event'
+  TRIGGER_NAME = 'trigger_catalog_resource_sync_event_on_project_update'
+
+  def up
+    create_trigger_function(TRIGGER_FUNCTION_NAME, replace: true) do
+      <<~SQL
+        INSERT INTO #{EVENTS_TABLE_NAME} (catalog_resource_id, project_id)
+        SELECT id, OLD.id FROM #{RESOURCES_TABLE_NAME}
+        WHERE project_id = OLD.id;
+        RETURN NULL;
+      SQL
+    end
+
+    create_trigger(
+      PROJECTS_TABLE_NAME, TRIGGER_NAME, TRIGGER_FUNCTION_NAME, fires: 'AFTER UPDATE'
+    ) do
+      <<~SQL
+        WHEN (
+          OLD.name IS DISTINCT FROM NEW.name OR
+          OLD.description IS DISTINCT FROM NEW.description OR
+          OLD.visibility_level IS DISTINCT FROM NEW.visibility_level
+        )
+      SQL
+    end
+  end
+
+  def down
+    drop_trigger(PROJECTS_TABLE_NAME, TRIGGER_NAME)
+    drop_function(TRIGGER_FUNCTION_NAME)
+  end
+end
diff --git a/db/schema_migrations/20231124191759 b/db/schema_migrations/20231124191759
new file mode 100644
index 0000000000000000000000000000000000000000..adbafd9b2bdbad31ecb8d8356e3332ac513b725e
--- /dev/null
+++ b/db/schema_migrations/20231124191759
@@ -0,0 +1 @@
+32a80f29a5a3511a8dfdea203874aecde5a58eab6665ba127379c9c2e01d254f
\ No newline at end of file
diff --git a/db/schema_migrations/20231124282441 b/db/schema_migrations/20231124282441
new file mode 100644
index 0000000000000000000000000000000000000000..78c0636635f2285f04d436c7859a33f387926baa
--- /dev/null
+++ b/db/schema_migrations/20231124282441
@@ -0,0 +1 @@
+2bdaabfe2fa23ce334af1878b1234618b4717f05a9b68f7f9839f48c7f38f410
\ No newline at end of file
diff --git a/db/structure.sql b/db/structure.sql
index 286cff2b88c6e99804c32a766d658bdcc867cd45..a6a80d9e7732c1d05d4af49a6af0bd6e866aec9a 100644
--- a/db/structure.sql
+++ b/db/structure.sql
@@ -290,6 +290,18 @@ BEGIN
 END
 $$;
 
+CREATE FUNCTION insert_catalog_resource_sync_event() RETURNS trigger
+    LANGUAGE plpgsql
+    AS $$
+BEGIN
+INSERT INTO p_catalog_resource_sync_events (catalog_resource_id, project_id)
+SELECT id, OLD.id FROM catalog_resources
+WHERE project_id = OLD.id;
+RETURN NULL;
+
+END
+$$;
+
 CREATE FUNCTION insert_into_loose_foreign_keys_deleted_records() RETURNS trigger
     LANGUAGE plpgsql
     AS $$
@@ -909,6 +921,17 @@ CREATE TABLE p_batched_git_ref_updates_deletions (
 )
 PARTITION BY LIST (partition_id);
 
+CREATE TABLE p_catalog_resource_sync_events (
+    id bigint NOT NULL,
+    catalog_resource_id bigint NOT NULL,
+    project_id bigint NOT NULL,
+    partition_id bigint DEFAULT 1 NOT NULL,
+    status smallint DEFAULT 1 NOT NULL,
+    created_at timestamp with time zone DEFAULT now() NOT NULL,
+    updated_at timestamp with time zone DEFAULT now() NOT NULL
+)
+PARTITION BY LIST (partition_id);
+
 CREATE TABLE p_ci_finished_build_ch_sync_events (
     build_id bigint NOT NULL,
     partition bigint DEFAULT 1 NOT NULL,
@@ -20069,6 +20092,15 @@ CREATE SEQUENCE p_batched_git_ref_updates_deletions_id_seq
 
 ALTER SEQUENCE p_batched_git_ref_updates_deletions_id_seq OWNED BY p_batched_git_ref_updates_deletions.id;
 
+CREATE SEQUENCE p_catalog_resource_sync_events_id_seq
+    START WITH 1
+    INCREMENT BY 1
+    NO MINVALUE
+    NO MAXVALUE
+    CACHE 1;
+
+ALTER SEQUENCE p_catalog_resource_sync_events_id_seq OWNED BY p_catalog_resource_sync_events.id;
+
 CREATE SEQUENCE p_ci_job_annotations_id_seq
     START WITH 1
     INCREMENT BY 1
@@ -26906,6 +26938,8 @@ ALTER TABLE ONLY organizations ALTER COLUMN id SET DEFAULT nextval('organization
 
 ALTER TABLE ONLY p_batched_git_ref_updates_deletions ALTER COLUMN id SET DEFAULT nextval('p_batched_git_ref_updates_deletions_id_seq'::regclass);
 
+ALTER TABLE ONLY p_catalog_resource_sync_events ALTER COLUMN id SET DEFAULT nextval('p_catalog_resource_sync_events_id_seq'::regclass);
+
 ALTER TABLE ONLY p_ci_builds_metadata ALTER COLUMN id SET DEFAULT nextval('ci_builds_metadata_id_seq'::regclass);
 
 ALTER TABLE ONLY p_ci_job_annotations ALTER COLUMN id SET DEFAULT nextval('p_ci_job_annotations_id_seq'::regclass);
@@ -29265,6 +29299,9 @@ ALTER TABLE ONLY organizations
 ALTER TABLE ONLY p_batched_git_ref_updates_deletions
     ADD CONSTRAINT p_batched_git_ref_updates_deletions_pkey PRIMARY KEY (id, partition_id);
 
+ALTER TABLE ONLY p_catalog_resource_sync_events
+    ADD CONSTRAINT p_catalog_resource_sync_events_pkey PRIMARY KEY (id, partition_id);
+
 ALTER TABLE ONLY p_ci_finished_build_ch_sync_events
     ADD CONSTRAINT p_ci_finished_build_ch_sync_events_pkey PRIMARY KEY (build_id, partition);
 
@@ -33770,6 +33807,8 @@ CREATE INDEX index_organization_users_on_user_id ON organization_users USING btr
 
 CREATE UNIQUE INDEX index_organizations_on_unique_name_per_group ON customer_relations_organizations USING btree (group_id, lower(name), id);
 
+CREATE INDEX index_p_catalog_resource_sync_events_on_id_where_pending ON ONLY p_catalog_resource_sync_events USING btree (id) WHERE (status = 1);
+
 CREATE UNIQUE INDEX index_p_ci_job_annotations_on_partition_id_job_id_name ON ONLY p_ci_job_annotations USING btree (partition_id, job_id, name);
 
 CREATE INDEX index_p_ci_runner_machine_builds_on_runner_machine_id ON ONLY p_ci_runner_machine_builds USING btree (runner_machine_id);
@@ -37032,6 +37071,8 @@ CREATE TRIGGER trigger_10ee1357e825 BEFORE INSERT OR UPDATE ON p_ci_builds FOR E
 
 CREATE TRIGGER trigger_b2d852e1e2cb BEFORE INSERT OR UPDATE ON ci_pipelines FOR EACH ROW EXECUTE FUNCTION trigger_b2d852e1e2cb();
 
+CREATE TRIGGER trigger_catalog_resource_sync_event_on_project_update AFTER UPDATE ON projects FOR EACH ROW WHEN ((((old.name)::text IS DISTINCT FROM (new.name)::text) OR (old.description IS DISTINCT FROM new.description) OR (old.visibility_level IS DISTINCT FROM new.visibility_level))) EXECUTE FUNCTION insert_catalog_resource_sync_event();
+
 CREATE TRIGGER trigger_delete_project_namespace_on_project_delete AFTER DELETE ON projects FOR EACH ROW WHEN ((old.project_namespace_id IS NOT NULL)) EXECUTE FUNCTION delete_associated_project_namespace();
 
 CREATE TRIGGER trigger_eaec934fe6b2 BEFORE INSERT OR UPDATE ON system_note_metadata FOR EACH ROW EXECUTE FUNCTION trigger_eaec934fe6b2();
diff --git a/spec/db/schema_spec.rb b/spec/db/schema_spec.rb
index 92835d21bb6ab4e65a2f417ebd70b53c4cf6be0b..7e3f2a3b61ec05fc17fae1723cea45212398fe0c 100644
--- a/spec/db/schema_spec.rb
+++ b/spec/db/schema_spec.rb
@@ -93,6 +93,7 @@
     oauth_applications: %w[owner_id],
     p_ci_builds: %w[erased_by_id trigger_request_id partition_id auto_canceled_by_partition_id],
     p_batched_git_ref_updates_deletions: %w[project_id partition_id],
+    p_catalog_resource_sync_events: %w[catalog_resource_id project_id partition_id],
     p_ci_finished_build_ch_sync_events: %w[build_id],
     product_analytics_events_experimental: %w[event_id txn_id user_id],
     project_build_artifacts_size_refreshes: %w[last_job_artifact_id],
diff --git a/spec/factories/ci/catalog/resources/sync_events.rb b/spec/factories/ci/catalog/resources/sync_events.rb
new file mode 100644
index 0000000000000000000000000000000000000000..0579cec648ee8e80c131021fdbe04481702d215d
--- /dev/null
+++ b/spec/factories/ci/catalog/resources/sync_events.rb
@@ -0,0 +1,8 @@
+# frozen_string_literal: true
+
+FactoryBot.define do
+  factory :ci_catalog_resource_sync_event, class: 'Ci::Catalog::Resources::SyncEvent' do
+    catalog_resource factory: :ci_catalog_resource
+    project { catalog_resource.project }
+  end
+end
diff --git a/spec/lib/gitlab/import_export/all_models.yml b/spec/lib/gitlab/import_export/all_models.yml
index c91efadbf98f5199f1feafc027eafab6e0aac50d..0361e2967e4c8abebad51750614869e34ae59f25 100644
--- a/spec/lib/gitlab/import_export/all_models.yml
+++ b/spec/lib/gitlab/import_export/all_models.yml
@@ -523,6 +523,7 @@ container_repositories:
 - name
 project:
 - catalog_resource
+- catalog_resource_sync_events
 - catalog_resource_versions
 - ci_components
 - external_status_checks
@@ -1054,6 +1055,7 @@ catalog_resource:
   - project
   - catalog_resource_components
   - catalog_resource_versions
+  - catalog_resource_sync_events
 catalog_resource_versions:
   - project
   - release
diff --git a/spec/models/ci/catalog/resource_spec.rb b/spec/models/ci/catalog/resource_spec.rb
index a9054e92204b44b75663bb761c333d1f260f94d7..047ba135cd52ff7a4ea2ee2f0e56939c1c033724 100644
--- a/spec/models/ci/catalog/resource_spec.rb
+++ b/spec/models/ci/catalog/resource_spec.rb
@@ -3,7 +3,21 @@
 require 'spec_helper'
 
 RSpec.describe Ci::Catalog::Resource, feature_category: :pipeline_composition do
-  include_context 'when there are catalog resources with versions'
+  let_it_be(:current_user) { create(:user) }
+
+  let_it_be(:project_a) { create(:project, name: 'A') }
+  let_it_be(:project_b) { create(:project, name: 'B') }
+  let_it_be(:project_c) { create(:project, name: 'C', description: 'B') }
+
+  let_it_be_with_reload(:resource_a) do
+    create(:ci_catalog_resource, project: project_a, latest_released_at: '2023-02-01T00:00:00Z')
+  end
+
+  let_it_be(:resource_b) do
+    create(:ci_catalog_resource, project: project_b, latest_released_at: '2023-01-01T00:00:00Z')
+  end
+
+  let_it_be(:resource_c) { create(:ci_catalog_resource, project: project_c) }
 
   it { is_expected.to belong_to(:project) }
 
@@ -17,6 +31,11 @@
       have_many(:versions).class_name('Ci::Catalog::Resources::Version').with_foreign_key(:catalog_resource_id))
   end
 
+  it do
+    is_expected.to(
+      have_many(:sync_events).class_name('Ci::Catalog::Resources::SyncEvent').with_foreign_key(:catalog_resource_id))
+  end
+
   it { is_expected.to delegate_method(:avatar_path).to(:project) }
   it { is_expected.to delegate_method(:star_count).to(:project) }
 
@@ -24,17 +43,17 @@
 
   describe '.for_projects' do
     it 'returns catalog resources for the given project IDs' do
-      resources_for_projects = described_class.for_projects(project1.id)
+      resources_for_projects = described_class.for_projects(project_a.id)
 
-      expect(resources_for_projects).to contain_exactly(resource1)
+      expect(resources_for_projects).to contain_exactly(resource_a)
     end
   end
 
   describe '.search' do
     it 'returns catalog resources whose name or description match the search term' do
-      resources = described_class.search('Z')
+      resources = described_class.search('B')
 
-      expect(resources).to contain_exactly(resource2, resource3)
+      expect(resources).to contain_exactly(resource_b, resource_c)
     end
   end
 
@@ -42,7 +61,7 @@
     it 'returns catalog resources sorted by descending created at' do
       ordered_resources = described_class.order_by_created_at_desc
 
-      expect(ordered_resources.to_a).to eq([resource3, resource2, resource1])
+      expect(ordered_resources.to_a).to eq([resource_c, resource_b, resource_a])
     end
   end
 
@@ -50,7 +69,7 @@
     it 'returns catalog resources sorted by ascending created at' do
       ordered_resources = described_class.order_by_created_at_asc
 
-      expect(ordered_resources.to_a).to eq([resource1, resource2, resource3])
+      expect(ordered_resources.to_a).to eq([resource_a, resource_b, resource_c])
     end
   end
 
@@ -58,13 +77,13 @@
     subject(:ordered_resources) { described_class.order_by_name_desc }
 
     it 'returns catalog resources sorted by descending name' do
-      expect(ordered_resources.pluck(:name)).to eq(%w[Z L A])
+      expect(ordered_resources.pluck(:name)).to eq(%w[C B A])
     end
 
     it 'returns catalog resources sorted by descending name with nulls last' do
-      resource1.update!(name: nil)
+      resource_a.update!(name: nil)
 
-      expect(ordered_resources.pluck(:name)).to eq(['Z', 'L', nil])
+      expect(ordered_resources.pluck(:name)).to eq(['C', 'B', nil])
     end
   end
 
@@ -72,13 +91,13 @@
     subject(:ordered_resources) { described_class.order_by_name_asc }
 
     it 'returns catalog resources sorted by ascending name' do
-      expect(ordered_resources.pluck(:name)).to eq(%w[A L Z])
+      expect(ordered_resources.pluck(:name)).to eq(%w[A B C])
     end
 
     it 'returns catalog resources sorted by ascending name with nulls last' do
-      resource1.update!(name: nil)
+      resource_a.update!(name: nil)
 
-      expect(ordered_resources.pluck(:name)).to eq(['L', 'Z', nil])
+      expect(ordered_resources.pluck(:name)).to eq(['B', 'C', nil])
     end
   end
 
@@ -86,7 +105,7 @@
     it 'returns catalog resources sorted by latest_released_at descending with nulls last' do
       ordered_resources = described_class.order_by_latest_released_at_desc
 
-      expect(ordered_resources).to eq([resource2, resource1, resource3])
+      expect(ordered_resources).to eq([resource_a, resource_b, resource_c])
     end
   end
 
@@ -94,35 +113,35 @@
     it 'returns catalog resources sorted by latest_released_at ascending with nulls last' do
       ordered_resources = described_class.order_by_latest_released_at_asc
 
-      expect(ordered_resources).to eq([resource1, resource2, resource3])
+      expect(ordered_resources).to eq([resource_b, resource_a, resource_c])
     end
   end
 
   describe '#state' do
     it 'defaults to draft' do
-      expect(resource1.state).to eq('draft')
+      expect(resource_a.state).to eq('draft')
     end
   end
 
   describe '#publish!' do
     context 'when the catalog resource is in draft state' do
       it 'updates the state of the catalog resource to published' do
-        expect(resource1.state).to eq('draft')
+        expect(resource_a.state).to eq('draft')
 
-        resource1.publish!
+        resource_a.publish!
 
-        expect(resource1.reload.state).to eq('published')
+        expect(resource_a.reload.state).to eq('published')
       end
     end
 
     context 'when the catalog resource already has a published state' do
       it 'leaves the state as published' do
-        resource1.update!(state: :published)
-        expect(resource1.state).to eq('published')
+        resource_a.update!(state: :published)
+        expect(resource_a.state).to eq('published')
 
-        resource1.publish!
+        resource_a.publish!
 
-        expect(resource1.state).to eq('published')
+        expect(resource_a.state).to eq('published')
       end
     end
   end
@@ -130,61 +149,115 @@
   describe '#unpublish!' do
     context 'when the catalog resource is in published state' do
       it 'updates the state of the catalog resource to draft' do
-        resource1.update!(state: :published)
-        expect(resource1.state).to eq('published')
+        resource_a.update!(state: :published)
+        expect(resource_a.state).to eq('published')
 
-        resource1.unpublish!
+        resource_a.unpublish!
 
-        expect(resource1.reload.state).to eq('draft')
+        expect(resource_a.reload.state).to eq('draft')
       end
     end
 
     context 'when the catalog resource is already in draft state' do
       it 'leaves the state as draft' do
-        expect(resource1.state).to eq('draft')
+        expect(resource_a.state).to eq('draft')
 
-        resource1.unpublish!
+        resource_a.unpublish!
 
-        expect(resource1.reload.state).to eq('draft')
+        expect(resource_a.reload.state).to eq('draft')
       end
     end
   end
 
-  describe 'synchronizing denormalized columns with `projects` table' do
-    shared_examples 'denormalized columns of the catalog resource match the project' do
-      it do
-        resource1.reload
-        project1.reload
+  describe 'synchronizing denormalized columns with `projects` table', :sidekiq_inline do
+    let_it_be_with_reload(:project) { create(:project, name: 'Test project', description: 'Test description') }
+
+    context 'when the catalog resource is created' do
+      let(:resource) { build(:ci_catalog_resource, project: project) }
 
-        expect(resource1.name).to eq(project1.name)
-        expect(resource1.description).to eq(project1.description)
-        expect(resource1.visibility_level).to eq(project1.visibility_level)
+      it 'updates the catalog resource columns to match the project' do
+        resource.save!
+        resource.reload
+
+        expect(resource.name).to eq(project.name)
+        expect(resource.description).to eq(project.description)
+        expect(resource.visibility_level).to eq(project.visibility_level)
       end
     end
 
-    context 'when the catalog resource is created' do
-      it 'calls sync_with_project' do
-        new_project = create(:project)
-        new_resource = build(:ci_catalog_resource, project: new_project)
+    context 'when the project is updated' do
+      let_it_be(:resource) { create(:ci_catalog_resource, project: project) }
+
+      context 'when project name is updated' do
+        it 'updates the catalog resource name to match' do
+          project.update!(name: 'New name')
 
-        expect(new_resource).to receive(:sync_with_project).once
+          expect(resource.reload.name).to eq(project.name)
+        end
+      end
+
+      context 'when project description is updated' do
+        it 'updates the catalog resource description to match' do
+          project.update!(description: 'New description')
 
-        new_resource.save!
+          expect(resource.reload.description).to eq(project.description)
+        end
       end
 
-      it_behaves_like 'denormalized columns of the catalog resource match the project'
+      context 'when project visibility_level is updated' do
+        it 'updates the catalog resource visibility_level to match' do
+          project.update!(visibility_level: Gitlab::VisibilityLevel::INTERNAL)
+
+          expect(resource.reload.visibility_level).to eq(project.visibility_level)
+        end
+      end
     end
 
-    context 'when the project attributes are updated' do
-      before_all do
-        project1.update!(
-          name: 'New name',
-          description: 'New description',
-          visibility_level: Gitlab::VisibilityLevel::INTERNAL
-        )
+    context 'when FF `ci_process_catalog_resource_sync_events` is disabled' do
+      before do
+        stub_feature_flags(ci_process_catalog_resource_sync_events: false)
+      end
+
+      context 'when the catalog resource is created' do
+        let(:resource) { build(:ci_catalog_resource, project: project) }
+
+        it 'updates the catalog resource columns to match the project' do
+          resource.save!
+          resource.reload
+
+          expect(resource.name).to eq(project.name)
+          expect(resource.description).to eq(project.description)
+          expect(resource.visibility_level).to eq(project.visibility_level)
+        end
       end
 
-      it_behaves_like 'denormalized columns of the catalog resource match the project'
+      context 'when the project is updated' do
+        let_it_be(:resource) { create(:ci_catalog_resource, project: project) }
+
+        context 'when project name is updated' do
+          it 'updates the catalog resource name to match' do
+            project.update!(name: 'New name')
+
+            expect(resource.reload.name).to eq(project.name)
+          end
+        end
+
+        context 'when project description is updated' do
+          it 'updates the catalog resource description to match' do
+            project.update!(description: 'New description')
+
+            expect(resource.reload.description).to eq(project.description)
+          end
+        end
+
+        context 'when project visibility_level is updated' do
+          it 'updates the catalog resource visibility_level to match' do
+            project.update!(visibility_level: Gitlab::VisibilityLevel::INTERNAL)
+
+            expect(resource.reload.visibility_level).to eq(project.visibility_level)
+          end
+        end
+      end
     end
   end
 
diff --git a/spec/models/ci/catalog/resources/sync_event_spec.rb b/spec/models/ci/catalog/resources/sync_event_spec.rb
new file mode 100644
index 0000000000000000000000000000000000000000..5d907aae9b60514a6228eccc8f88e18782f6d856
--- /dev/null
+++ b/spec/models/ci/catalog/resources/sync_event_spec.rb
@@ -0,0 +1,190 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe Ci::Catalog::Resources::SyncEvent, type: :model, feature_category: :pipeline_composition do
+  let_it_be_with_reload(:project1) { create(:project) }
+  let_it_be_with_reload(:project2) { create(:project) }
+  let_it_be(:resource1) { create(:ci_catalog_resource, project: project1) }
+
+  it { is_expected.to belong_to(:catalog_resource).class_name('Ci::Catalog::Resource') }
+  it { is_expected.to belong_to(:project) }
+
+  describe 'PG triggers' do
+    context 'when the associated project of a catalog resource is updated' do
+      context 'when project name is updated' do
+        it 'creates a sync event record' do
+          expect do
+            project1.update!(name: 'New name')
+          end.to change { described_class.count }.by(1)
+        end
+      end
+
+      context 'when project description is updated' do
+        it 'creates a sync event record' do
+          expect do
+            project1.update!(description: 'New description')
+          end.to change { described_class.count }.by(1)
+        end
+      end
+
+      context 'when project visibility_level is updated' do
+        it 'creates a sync event record' do
+          expect do
+            project1.update!(visibility_level: Gitlab::VisibilityLevel::INTERNAL)
+          end.to change { described_class.count }.by(1)
+        end
+      end
+    end
+
+    context 'when a project without an associated catalog resource is updated' do
+      it 'does not create a sync event record' do
+        expect do
+          project2.update!(name: 'New name')
+        end.not_to change { described_class.count }
+      end
+    end
+  end
+
+  describe 'when there are sync event records' do
+    let_it_be(:resource2) { create(:ci_catalog_resource, project: project2) }
+
+    before_all do
+      create(:ci_catalog_resource_sync_event, catalog_resource: resource1, status: :processed)
+      create(:ci_catalog_resource_sync_event, catalog_resource: resource1)
+      create_list(:ci_catalog_resource_sync_event, 2, catalog_resource: resource2)
+    end
+
+    describe '.unprocessed_events' do
+      it 'returns the events in pending status' do
+        # 1 pending event from resource1 + 2 pending events from resource2
+        expect(described_class.unprocessed_events.size).to eq(3)
+      end
+
+      it 'selects the partition attribute in the result' do
+        described_class.unprocessed_events.each do |event|
+          expect(event.partition).not_to be_nil
+        end
+      end
+    end
+
+    describe '.mark_records_processed' do
+      it 'updates the records to processed status' do
+        expect(described_class.status_pending.count).to eq(3)
+        expect(described_class.status_processed.count).to eq(1)
+
+        described_class.mark_records_processed(described_class.unprocessed_events)
+
+        expect(described_class.pluck(:status).uniq).to eq(['processed'])
+
+        expect(described_class.status_pending.count).to eq(0)
+        expect(described_class.status_processed.count).to eq(4)
+      end
+    end
+  end
+
+  describe '.upper_bound_count' do
+    it 'returns 0 when there are no records in the table' do
+      expect(described_class.upper_bound_count).to eq(0)
+    end
+
+    it 'returns an estimated number of unprocessed records' do
+      create_list(:ci_catalog_resource_sync_event, 5, catalog_resource: resource1)
+      described_class.order(:id).limit(2).update_all(status: :processed)
+
+      expect(described_class.upper_bound_count).to eq(3)
+    end
+  end
+
+  describe 'sliding_list partitioning' do
+    let(:partition_manager) { Gitlab::Database::Partitioning::PartitionManager.new(described_class) }
+
+    describe 'next_partition_if callback' do
+      let(:active_partition) { described_class.partitioning_strategy.active_partition }
+
+      subject(:value) { described_class.partitioning_strategy.next_partition_if.call(active_partition) }
+
+      context 'when the partition is empty' do
+        it { is_expected.to eq(false) }
+      end
+
+      context 'when the partition has records' do
+        before do
+          create(:ci_catalog_resource_sync_event, catalog_resource: resource1, status: :processed)
+          create(:ci_catalog_resource_sync_event, catalog_resource: resource1)
+        end
+
+        it { is_expected.to eq(false) }
+      end
+
+      context 'when the first record of the partition is older than PARTITION_DURATION' do
+        before do
+          create(:ci_catalog_resource_sync_event, catalog_resource: resource1)
+          described_class.first.update!(created_at: (described_class::PARTITION_DURATION + 1.day).ago)
+        end
+
+        it { is_expected.to eq(true) }
+      end
+    end
+
+    describe 'detach_partition_if callback' do
+      let(:active_partition) { described_class.partitioning_strategy.active_partition }
+
+      subject(:value) { described_class.partitioning_strategy.detach_partition_if.call(active_partition) }
+
+      before_all do
+        create(:ci_catalog_resource_sync_event, catalog_resource: resource1, status: :processed)
+        create(:ci_catalog_resource_sync_event, catalog_resource: resource1)
+      end
+
+      context 'when the partition contains unprocessed records' do
+        it { is_expected.to eq(false) }
+      end
+
+      context 'when the partition contains only processed records' do
+        before do
+          described_class.update_all(status: :processed)
+        end
+
+        it { is_expected.to eq(true) }
+      end
+    end
+
+    describe 'strategy behavior' do
+      it 'moves records to new partitions as time passes', :freeze_time do
+        # We start with partition 1
+        expect(described_class.partitioning_strategy.current_partitions.map(&:value)).to eq([1])
+
+        # Add one record so the initial partition is not empty
+        create(:ci_catalog_resource_sync_event, catalog_resource: resource1)
+
+        # It's not a day old yet so no new partitions are created
+        partition_manager.sync_partitions
+
+        expect(described_class.partitioning_strategy.current_partitions.map(&:value)).to eq([1])
+
+        # After traveling forward a day
+        travel(described_class::PARTITION_DURATION + 1.second)
+
+        # a new partition is created
+        partition_manager.sync_partitions
+
+        expect(described_class.partitioning_strategy.current_partitions.map(&:value)).to contain_exactly(1, 2)
+
+        # and we can insert to the new partition
+        create(:ci_catalog_resource_sync_event, catalog_resource: resource1)
+
+        # After processing records in partition 1
+        described_class.mark_records_processed(described_class.for_partition(1).select_with_partition)
+
+        partition_manager.sync_partitions
+
+        # partition 1 is removed
+        expect(described_class.partitioning_strategy.current_partitions.map(&:value)).to eq([2])
+
+        # and we only have the newly created partition left.
+        expect(described_class.count).to eq(1)
+      end
+    end
+  end
+end
diff --git a/spec/models/project_spec.rb b/spec/models/project_spec.rb
index dcd2e634ce3ea50b2f3d3294f40994ca8a32afc5..bbff7e5ee406dd100884cc60b8459d0bd71e0b44 100644
--- a/spec/models/project_spec.rb
+++ b/spec/models/project_spec.rb
@@ -50,6 +50,7 @@
     it { is_expected.to have_one(:catalog_resource) }
     it { is_expected.to have_many(:ci_components).class_name('Ci::Catalog::Resources::Component') }
     it { is_expected.to have_many(:catalog_resource_versions).class_name('Ci::Catalog::Resources::Version') }
+    it { is_expected.to have_many(:catalog_resource_sync_events).class_name('Ci::Catalog::Resources::SyncEvent') }
     it { is_expected.to have_one(:microsoft_teams_integration) }
     it { is_expected.to have_one(:mattermost_integration) }
     it { is_expected.to have_one(:hangouts_chat_integration) }
@@ -8937,62 +8938,68 @@ def create_hook
     end
   end
 
-  # TODO: Remove/update this spec after background syncing is implemented. See https://gitlab.com/gitlab-org/gitlab/-/issues/429376.
-  describe '#update_catalog_resource' do
-    let_it_be_with_reload(:project) { create(:project, name: 'My project name', description: 'My description') }
-    let_it_be_with_reload(:resource) { create(:ci_catalog_resource, project: project) }
+  describe 'catalog resource process sync events worker' do
+    let_it_be_with_reload(:project) { create(:project, name: 'Test project', description: 'Test description') }
 
-    shared_examples 'name, description, and visibility_level of the catalog resource match the project' do
-      it do
-        expect(project).to receive(:update_catalog_resource).once.and_call_original
+    context 'when the project has a catalog resource' do
+      let_it_be(:resource) { create(:ci_catalog_resource, project: project) }
 
-        project.save!
+      context 'when project name is updated' do
+        it 'enqueues Ci::Catalog::Resources::ProcessSyncEventsWorker' do
+          expect(Ci::Catalog::Resources::ProcessSyncEventsWorker).to receive(:perform_async).once
 
-        expect(resource.name).to eq(project.name)
-        expect(resource.description).to eq(project.description)
-        expect(resource.visibility_level).to eq(project.visibility_level)
+          project.update!(name: 'New name')
+        end
       end
-    end
 
-    context 'when the project name is updated' do
-      before do
-        project.name = 'My new project name'
+      context 'when project description is updated' do
+        it 'enqueues Ci::Catalog::Resources::ProcessSyncEventsWorker' do
+          expect(Ci::Catalog::Resources::ProcessSyncEventsWorker).to receive(:perform_async).once
+
+          project.update!(description: 'New description')
+        end
       end
 
-      it_behaves_like 'name, description, and visibility_level of the catalog resource match the project'
-    end
+      context 'when project visibility_level is updated' do
+        it 'enqueues Ci::Catalog::Resources::ProcessSyncEventsWorker' do
+          expect(Ci::Catalog::Resources::ProcessSyncEventsWorker).to receive(:perform_async).once
 
-    context 'when the project description is updated' do
-      before do
-        project.description = 'My new description'
+          project.update!(visibility_level: Gitlab::VisibilityLevel::INTERNAL)
+        end
       end
 
-      it_behaves_like 'name, description, and visibility_level of the catalog resource match the project'
-    end
+      context 'when neither the project name, description, nor visibility_level are updated' do
+        it 'does not enqueue Ci::Catalog::Resources::ProcessSyncEventsWorker' do
+          expect(Ci::Catalog::Resources::ProcessSyncEventsWorker).not_to receive(:perform_async)
 
-    context 'when the project visibility_level is updated' do
-      before do
-        project.visibility_level = 10
+          project.update!(path: 'path')
+        end
       end
 
-      it_behaves_like 'name, description, and visibility_level of the catalog resource match the project'
-    end
+      context 'when FF `ci_process_catalog_resource_sync_events` is disabled' do
+        before do
+          stub_feature_flags(ci_process_catalog_resource_sync_events: false)
+        end
 
-    context 'when neither the project name, description, nor visibility_level are updated' do
-      it 'does not call update_catalog_resource' do
-        expect(project).not_to receive(:update_catalog_resource)
+        it 'does not enqueue Ci::Catalog::Resources::ProcessSyncEventsWorker' do
+          expect(Ci::Catalog::Resources::ProcessSyncEventsWorker).not_to receive(:perform_async)
 
-        project.update!(path: 'path')
+          project.update!(
+            name: 'New name',
+            description: 'New description',
+            visibility_level: Gitlab::VisibilityLevel::INTERNAL)
+        end
       end
     end
 
     context 'when the project does not have a catalog resource' do
-      let_it_be(:project2) { create(:project) }
-
-      it 'does not call update_catalog_resource' do
-        expect(project2).not_to receive(:update_catalog_resource)
+      it 'does not enqueue Ci::Catalog::Resources::ProcessSyncEventsWorker' do
+        expect(Ci::Catalog::Resources::ProcessSyncEventsWorker).not_to receive(:perform_async)
 
-        project.update!(name: 'name')
+        project.update!(
+          name: 'New name',
+          description: 'New description',
+          visibility_level: Gitlab::VisibilityLevel::INTERNAL)
       end
     end
   end
diff --git a/spec/services/ci/process_sync_events_service_spec.rb b/spec/services/ci/process_sync_events_service_spec.rb
index c58d73815b0ea11cee0b43846e0e1be15faf44ec..48b70eb38c997a4940dc7ce77e0f3d46333a16c3 100644
--- a/spec/services/ci/process_sync_events_service_spec.rb
+++ b/spec/services/ci/process_sync_events_service_spec.rb
@@ -163,5 +163,84 @@ def service_results(total, consumable, processed)
         execute
       end
     end
+
+    context 'for Ci::Catalog::Resources::SyncEvent' do
+      let(:sync_event_class) { Ci::Catalog::Resources::SyncEvent }
+      let(:hierarchy_class) { Ci::Catalog::Resource }
+
+      let_it_be(:project1) { create(:project) }
+      let_it_be(:project2) { create(:project) }
+      let_it_be_with_refind(:resource1) { create(:ci_catalog_resource, project: project1) }
+      let_it_be(:resource2) { create(:ci_catalog_resource, project: project2) }
+
+      before_all do
+        create(:ci_catalog_resource_sync_event, catalog_resource: resource1, status: :processed)
+        # PG trigger adds an event for each update
+        project1.update!(name: 'Name 1', description: 'Test 1')
+        project1.update!(visibility_level: Gitlab::VisibilityLevel::INTERNAL)
+        project2.update!(name: 'Name 2', description: 'Test 2')
+        project2.update!(visibility_level: Gitlab::VisibilityLevel::PUBLIC)
+      end
+
+      it 'processes the events', :aggregate_failures do
+        # 2 pending events from resource1 + 2 pending events from resource2
+        expect { execute }.to change(Ci::Catalog::Resources::SyncEvent.status_pending, :count).from(4).to(0)
+
+        expect(resource1.reload.name).to eq(project1.name)
+        expect(resource2.reload.name).to eq(project2.name)
+        expect(resource1.reload.description).to eq(project1.description)
+        expect(resource2.reload.description).to eq(project2.description)
+        expect(resource1.reload.visibility_level).to eq(project1.visibility_level)
+        expect(resource2.reload.visibility_level).to eq(project2.visibility_level)
+      end
+
+      context 'when there are no remaining unprocessed events' do
+        it 'does not enqueue Ci::Catalog::Resources::ProcessSyncEventsWorker' do
+          stub_const("#{described_class}::BATCH_SIZE", 4)
+
+          expect(Ci::Catalog::Resources::ProcessSyncEventsWorker).not_to receive(:perform_async)
+
+          execute
+        end
+      end
+
+      context 'when there are remaining unprocessed events' do
+        it 'enqueues Ci::Catalog::Resources::ProcessSyncEventsWorker' do
+          stub_const("#{described_class}::BATCH_SIZE", 1)
+
+          expect(Ci::Catalog::Resources::ProcessSyncEventsWorker).to receive(:perform_async)
+
+          execute
+        end
+
+        context 'when FF `ci_process_catalog_resource_sync_events` is disabled' do
+          before do
+            stub_feature_flags(ci_process_catalog_resource_sync_events: false)
+          end
+
+          it 'does not enqueue Ci::Catalog::Resources::ProcessSyncEventsWorker' do
+            stub_const("#{described_class}::BATCH_SIZE", 1)
+
+            expect(Ci::Catalog::Resources::ProcessSyncEventsWorker).not_to receive(:perform_async)
+
+            execute
+          end
+        end
+      end
+
+      # The `p_catalog_resource_sync_events` table does not enforce an FK on catalog_resource_id
+      context 'when there are orphaned sync events' do
+        it 'processes the events', :aggregate_failures do
+          resource1.destroy!
+
+          # 2 pending events from resource1 + 2 pending events from resource2
+          expect { execute }.to change(Ci::Catalog::Resources::SyncEvent.status_pending, :count).from(4).to(0)
+
+          expect(resource2.reload.name).to eq(project2.name)
+          expect(resource2.reload.description).to eq(project2.description)
+          expect(resource2.reload.visibility_level).to eq(project2.visibility_level)
+        end
+      end
+    end
   end
 end
diff --git a/spec/workers/ci/catalog/resources/process_sync_events_worker_spec.rb b/spec/workers/ci/catalog/resources/process_sync_events_worker_spec.rb
new file mode 100644
index 0000000000000000000000000000000000000000..3c5f7bc0bf92a68673d95c929a2ae029f227e1cf
--- /dev/null
+++ b/spec/workers/ci/catalog/resources/process_sync_events_worker_spec.rb
@@ -0,0 +1,52 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe Ci::Catalog::Resources::ProcessSyncEventsWorker, feature_category: :pipeline_composition do
+  subject(:worker) { described_class.new }
+
+  include_examples 'an idempotent worker'
+
+  it 'has the `until_executed` deduplicate strategy' do
+    expect(described_class.get_deduplicate_strategy).to eq(:until_executed)
+  end
+
+  it 'has the option to reschedule once if deduplicated and a TTL of 1 minute' do
+    expect(described_class.get_deduplication_options).to include({ if_deduplicated: :reschedule_once, ttl: 1.minute })
+  end
+
+  describe '#perform' do
+    let_it_be(:project) { create(:project) }
+    let_it_be(:resource) { create(:ci_catalog_resource, project: project) }
+
+    before_all do
+      create(:ci_catalog_resource_sync_event, catalog_resource: resource, status: :processed)
+      create_list(:ci_catalog_resource_sync_event, 2, catalog_resource: resource)
+      # PG trigger adds an event for this update
+      project.update!(name: 'Name', description: 'Test', visibility_level: Gitlab::VisibilityLevel::INTERNAL)
+    end
+
+    subject(:perform) { worker.perform }
+
+    it 'consumes all sync events' do
+      expect { perform }.to change { Ci::Catalog::Resources::SyncEvent.status_pending.count }
+        .from(3).to(0)
+    end
+
+    it 'syncs the denormalized columns of catalog resource with the project' do
+      perform
+
+      expect(resource.reload.name).to eq(project.name)
+      expect(resource.reload.description).to eq(project.description)
+      expect(resource.reload.visibility_level).to eq(project.visibility_level)
+    end
+
+    it 'logs the service result', :aggregate_failures do
+      expect(worker).to receive(:log_extra_metadata_on_done).with(:estimated_total_events, 3)
+      expect(worker).to receive(:log_extra_metadata_on_done).with(:consumable_events, 3)
+      expect(worker).to receive(:log_extra_metadata_on_done).with(:processed_events, 3)
+
+      perform
+    end
+  end
+end