diff --git a/lib/gitlab/database/background_migration_job.rb b/lib/gitlab/database/background_migration_job.rb
index c0e3016fd3dcd70e1baafbc76d1612fa3e9398d7..5141dd05e4e461c9fd3334874fd33766e71bb262 100644
--- a/lib/gitlab/database/background_migration_job.rb
+++ b/lib/gitlab/database/background_migration_job.rb
@@ -13,10 +13,6 @@ class BackgroundMigrationJob < SharedModel
         for_migration_class(class_name).where('arguments = ?', arguments.to_json) # rubocop:disable Rails/WhereEquals
       end
 
-      scope :for_partitioning_migration, -> (class_name, table_name) do
-        for_migration_class(class_name).where('arguments ->> 2 = ?', table_name)
-      end
-
       enum status: {
         pending: 0,
         succeeded: 1
diff --git a/lib/gitlab/database/partitioning_migration_helpers/table_management_helpers.rb b/lib/gitlab/database/partitioning_migration_helpers/table_management_helpers.rb
index e3cf1298df646071fa05741b0b203bf5c6772007..8b49cb00bdfc5edd44d892c3c1d8827712a3e1cd 100644
--- a/lib/gitlab/database/partitioning_migration_helpers/table_management_helpers.rb
+++ b/lib/gitlab/database/partitioning_migration_helpers/table_management_helpers.rb
@@ -17,12 +17,6 @@ module TableManagementHelpers
         BATCH_SIZE = 50_000
         SUB_BATCH_SIZE = 2_500
 
-        JobArguments = Struct.new(:start_id, :stop_id, :source_table_name, :partitioned_table_name, :source_column) do
-          def self.from_array(arguments)
-            self.new(*arguments)
-          end
-        end
-
         # Creates a partitioned copy of an existing table, using a RANGE partitioning strategy on a timestamp column.
         # One partition is created per month between the given `min_date` and `max_date`. Also installs a trigger on
         # the original table to copy writes into the partitioned table. To copy over historic data from before creation
@@ -124,7 +118,7 @@ def enqueue_partitioning_data_migration(table_name)
 
         # Cleanup a previously enqueued background migration to copy data into a partitioned table. This will not
         # prevent the enqueued jobs from executing, but instead cleans up information in the database used to track the
-        # state of the background migration. It should be safe to also remove the partitioned table even if the
+        # state of the batched background migration. It should be safe to also remove the partitioned table even if the
         # background jobs are still in-progress, as the absence of the table will cause them to safely exit.
         #
         # Example:
@@ -134,7 +128,10 @@ def enqueue_partitioning_data_migration(table_name)
         def cleanup_partitioning_data_migration(table_name)
           assert_table_is_allowed(table_name)
 
-          cleanup_migration_jobs(table_name)
+          partitioned_table_name = make_partitioned_table_name(table_name)
+          primary_key = connection.primary_key(table_name)
+
+          delete_batched_background_migration(MIGRATION, table_name, primary_key, [partitioned_table_name])
         end
 
         def create_hash_partitions(table_name, number_of_partitions)
@@ -154,11 +151,8 @@ def create_hash_partitions(table_name, number_of_partitions)
           end
         end
 
-        # Executes cleanup tasks from a previous BackgroundMigration to backfill a partitioned table by finishing
-        # pending jobs and performing a final data synchronization.
-        # This performs two steps:
-        #   1. Wait to finish any pending BackgroundMigration jobs that have not succeeded
-        #   2. Inline copy any missed rows from the original table to the partitioned table
+        # Executes jobs from previous BatchedBackgroundMigration to backfill the partitioned table by finishing
+        # pending jobs.
         #
         # **NOTE** Migrations using this method cannot be scheduled in the same release as the migration that
         # schedules the background migration using the `enqueue_partitioning_data_migration` helper, or else the
@@ -169,23 +163,21 @@ def create_hash_partitions(table_name, number_of_partitions)
         #   finalize_backfilling_partitioned_table :audit_events
         #
         def finalize_backfilling_partitioned_table(table_name)
-          Gitlab::Database::QueryAnalyzers::RestrictAllowedSchemas.require_dml_mode!
-
           assert_table_is_allowed(table_name)
-          assert_not_in_transaction_block(scope: ERROR_SCOPE)
 
           partitioned_table_name = make_partitioned_table_name(table_name)
+
           unless table_exists?(partitioned_table_name)
             raise "could not find partitioned table for #{table_name}, " \
               "this could indicate the previous partitioning migration has been rolled back."
           end
 
-          Gitlab::BackgroundMigration.steal(MIGRATION_CLASS_NAME) do |background_job|
-            JobArguments.from_array(background_job.args.second).source_table_name == table_name.to_s
-          end
-
-          primary_key = connection.primary_key(table_name)
-          copy_missed_records(table_name, partitioned_table_name, primary_key)
+          ensure_batched_background_migration_is_finished(
+            job_class_name: MIGRATION,
+            table_name: table_name,
+            column_name: connection.primary_key(table_name),
+            job_arguments: [partitioned_table_name]
+          )
 
           Gitlab::Database::QueryAnalyzers::RestrictAllowedSchemas.with_suppressed do
             disable_statement_timeout do
@@ -456,33 +448,6 @@ def create_sync_trigger(table_name, trigger_name, function_name)
           create_trigger(table_name, trigger_name, function_name, fires: 'AFTER INSERT OR UPDATE OR DELETE')
         end
 
-        def cleanup_migration_jobs(table_name)
-          ::Gitlab::Database::BackgroundMigrationJob.for_partitioning_migration(MIGRATION_CLASS_NAME, table_name).delete_all
-        end
-
-        def copy_missed_records(source_table_name, partitioned_table_name, source_column)
-          backfill_table = BackfillPartitionedTable.new(connection: connection)
-
-          relation = ::Gitlab::Database::BackgroundMigrationJob.pending
-            .for_partitioning_migration(MIGRATION_CLASS_NAME, source_table_name)
-
-          relation.each_batch do |batch|
-            batch.each do |pending_migration_job|
-              job_arguments = JobArguments.from_array(pending_migration_job.arguments)
-              start_id = job_arguments.start_id
-              stop_id = job_arguments.stop_id
-
-              say("Backfilling data into partitioned table for ids from #{start_id} to #{stop_id}")
-              job_updated_count = backfill_table.perform(start_id, stop_id, source_table_name,
-                partitioned_table_name, source_column)
-
-              unless job_updated_count > 0
-                raise "failed to update tracking record for ids from #{start_id} to #{stop_id}"
-              end
-            end
-          end
-        end
-
         def replace_table(original_table_name, replacement_table_name, replaced_table_name, primary_key_name)
           replace_table = Gitlab::Database::Partitioning::ReplaceTable.new(connection,
               original_table_name.to_s, replacement_table_name, replaced_table_name, primary_key_name)
diff --git a/spec/lib/gitlab/database/background_migration_job_spec.rb b/spec/lib/gitlab/database/background_migration_job_spec.rb
index 1117c17c84a455a480053862f93c4af388b4ae68..6a1bedd800b87014e2a4138a2e95ff609a128090 100644
--- a/spec/lib/gitlab/database/background_migration_job_spec.rb
+++ b/spec/lib/gitlab/database/background_migration_job_spec.rb
@@ -27,26 +27,6 @@
     end
   end
 
-  describe '.for_partitioning_migration' do
-    let!(:job1) { create(:background_migration_job, arguments: [1, 100, 'other_table']) }
-    let!(:job2) { create(:background_migration_job, arguments: [1, 100, 'audit_events']) }
-    let!(:job3) { create(:background_migration_job, class_name: 'OtherJob', arguments: [1, 100, 'audit_events']) }
-
-    it 'returns jobs matching class_name and the table_name job argument' do
-      relation = described_class.for_partitioning_migration('TestJob', 'audit_events')
-
-      expect(relation.count).to eq(1)
-      expect(relation.first).to have_attributes(class_name: 'TestJob', arguments: [1, 100, 'audit_events'])
-    end
-
-    it 'normalizes class names by removing leading ::' do
-      relation = described_class.for_partitioning_migration('::TestJob', 'audit_events')
-
-      expect(relation.count).to eq(1)
-      expect(relation.first).to have_attributes(class_name: 'TestJob', arguments: [1, 100, 'audit_events'])
-    end
-  end
-
   describe '.mark_all_as_succeeded' do
     let!(:job1) { create(:background_migration_job, arguments: [1, 100]) }
     let!(:job2) { create(:background_migration_job, arguments: [1, 100]) }
diff --git a/spec/lib/gitlab/database/partitioning_migration_helpers/table_management_helpers_spec.rb b/spec/lib/gitlab/database/partitioning_migration_helpers/table_management_helpers_spec.rb
index d87ef7a09535dd8a26e3cb0b966baf5250b3e920..921a54bc92b5866ad99ff82ae0d8482223e498b5 100644
--- a/spec/lib/gitlab/database/partitioning_migration_helpers/table_management_helpers_spec.rb
+++ b/spec/lib/gitlab/database/partitioning_migration_helpers/table_management_helpers_spec.rb
@@ -123,10 +123,6 @@
     let(:old_primary_key) { 'id' }
     let(:new_primary_key) { [old_primary_key, partition_column] }
 
-    before do
-      allow(migration).to receive(:queue_background_migration_jobs_by_range_at_intervals)
-    end
-
     context 'when the table is not allowed' do
       let(:source_table) { :this_table_is_not_allowed }
 
@@ -528,18 +524,33 @@
       end
     end
 
-    context 'when tracking records exist in the background_migration_jobs table' do
-      let(:migration_class) { 'Gitlab::Database::PartitioningMigrationHelpers::BackfillPartitionedTable' }
-      let!(:job1) { create(:background_migration_job, class_name: migration_class, arguments: [1, 10, source_table]) }
-      let!(:job2) { create(:background_migration_job, class_name: migration_class, arguments: [11, 20, source_table]) }
-      let!(:job3) { create(:background_migration_job, class_name: migration_class, arguments: [1, 10, 'other_table']) }
+    context 'when tracking records exist in the batched_background_migrations table' do
+      let(:migration_class) { described_class::MIGRATION }
+
+      before do
+        create(
+          :batched_background_migration,
+          job_class_name: migration_class,
+          table_name: source_table,
+          column_name: :id,
+          job_arguments: [partitioned_table]
+        )
+
+        create(
+          :batched_background_migration,
+          job_class_name: migration_class,
+          table_name: 'other_table',
+          column_name: :id,
+          job_arguments: ['other_table_partitioned']
+        )
+      end
 
       it 'deletes those pertaining to the given table' do
         expect { migration.cleanup_partitioning_data_migration(source_table) }
-          .to change { ::Gitlab::Database::BackgroundMigrationJob.count }.from(3).to(1)
+          .to change { ::Gitlab::Database::BackgroundMigration::BatchedMigration.count }.from(2).to(1)
 
-        remaining_record = ::Gitlab::Database::BackgroundMigrationJob.first
-        expect(remaining_record).to have_attributes(class_name: migration_class, arguments: [1, 10, 'other_table'])
+        remaining_record = ::Gitlab::Database::BackgroundMigration::BatchedMigration.first
+        expect(remaining_record.table_name).to eq('other_table')
       end
     end
   end
@@ -601,131 +612,28 @@
       end
     end
 
-    context 'finishing pending background migration jobs' do
+    context 'finishing pending batched background migration jobs' do
       let(:source_table_double) { double('table name') }
       let(:raw_arguments) { [1, 50_000, source_table_double, partitioned_table, source_column] }
       let(:background_job) { double('background job', args: ['background jobs', raw_arguments]) }
-
-      before do
-        allow(migration).to receive(:table_exists?).with(partitioned_table).and_return(true)
-        allow(migration).to receive(:copy_missed_records)
-        allow(migration).to receive(:execute).with(/VACUUM/)
-        allow(migration).to receive(:execute).with(/^(RE)?SET/)
-      end
-
-      it 'finishes remaining jobs for the correct table' do
-        expect_next_instance_of(described_class::JobArguments) do |job_arguments|
-          expect(job_arguments).to receive(:source_table_name).and_call_original
-        end
-
-        expect(Gitlab::BackgroundMigration).to receive(:steal)
-          .with(described_class::MIGRATION_CLASS_NAME)
-          .and_yield(background_job)
-
-        expect(source_table_double).to receive(:==).with(source_table.to_s)
-
-        migration.finalize_backfilling_partitioned_table source_table
-      end
-
-      it 'requires the migration helper to execute in DML mode' do
-        expect(Gitlab::Database::QueryAnalyzers::RestrictAllowedSchemas).to receive(:require_dml_mode!)
-
-        expect(Gitlab::BackgroundMigration).to receive(:steal)
-          .with(described_class::MIGRATION_CLASS_NAME)
-          .and_yield(background_job)
-
-        migration.finalize_backfilling_partitioned_table source_table
-      end
-    end
-
-    context 'when there is missed data' do
-      let(:partitioned_model) { Class.new(ActiveRecord::Base) }
-      let(:timestamp) { Time.utc(2019, 12, 1, 12).round }
-      let!(:record1) { source_model.create!(name: 'Bob', age: 20, created_at: timestamp, updated_at: timestamp) }
-      let!(:record2) { source_model.create!(name: 'Alice', age: 30, created_at: timestamp, updated_at: timestamp) }
-      let!(:record3) { source_model.create!(name: 'Sam', age: 40, created_at: timestamp, updated_at: timestamp) }
-      let!(:record4) { source_model.create!(name: 'Sue', age: 50, created_at: timestamp, updated_at: timestamp) }
-
-      let!(:pending_job1) do
-        create(:background_migration_job,
-               class_name: described_class::MIGRATION_CLASS_NAME,
-               arguments: [record1.id, record2.id, source_table, partitioned_table, source_column])
-      end
-
-      let!(:pending_job2) do
-        create(:background_migration_job,
-               class_name: described_class::MIGRATION_CLASS_NAME,
-               arguments: [record3.id, record3.id, source_table, partitioned_table, source_column])
-      end
-
-      let!(:succeeded_job) do
-        create(:background_migration_job, :succeeded,
-               class_name: described_class::MIGRATION_CLASS_NAME,
-               arguments: [record4.id, record4.id, source_table, partitioned_table, source_column])
+      let(:bbm_arguments) do
+        {
+          job_class_name: described_class::MIGRATION,
+          table_name: source_table,
+          column_name: connection.primary_key(source_table),
+          job_arguments: [partitioned_table]
+        }
       end
 
       before do
-        partitioned_model.primary_key = :id
-        partitioned_model.table_name = partitioned_table
-
-        allow(migration).to receive(:queue_background_migration_jobs_by_range_at_intervals)
-
-        migration.partition_table_by_date source_table, partition_column, min_date: min_date, max_date: max_date
-
-        allow(Gitlab::BackgroundMigration).to receive(:steal)
+        allow(migration).to receive(:table_exists?).with(partitioned_table).and_return(true)
         allow(migration).to receive(:execute).with(/VACUUM/)
         allow(migration).to receive(:execute).with(/^(RE)?SET/)
       end
 
-      it 'idempotently cleans up after failed background migrations' do
-        expect(partitioned_model.count).to eq(0)
-
-        partitioned_model.insert(record2.attributes, unique_by: [:id, :created_at])
-
-        expect_next_instance_of(Gitlab::Database::PartitioningMigrationHelpers::BackfillPartitionedTable) do |backfill|
-          allow(backfill).to receive(:transaction_open?).and_return(false)
-
-          expect(backfill).to receive(:perform)
-            .with(record1.id, record2.id, source_table, partitioned_table, source_column)
-            .and_call_original
-
-          expect(backfill).to receive(:perform)
-            .with(record3.id, record3.id, source_table, partitioned_table, source_column)
-            .and_call_original
-        end
-
-        migration.finalize_backfilling_partitioned_table source_table
-
-        expect(partitioned_model.count).to eq(3)
-
-        [record1, record2, record3].each do |original|
-          copy = partitioned_model.find(original.id)
-          expect(copy.attributes).to eq(original.attributes)
-        end
-
-        expect(partitioned_model.find_by_id(record4.id)).to be_nil
-
-        [pending_job1, pending_job2].each do |job|
-          expect(job.reload).to be_succeeded
-        end
-      end
-
-      it 'raises an error if no job tracking records are marked as succeeded' do
-        expect_next_instance_of(Gitlab::Database::PartitioningMigrationHelpers::BackfillPartitionedTable) do |backfill|
-          allow(backfill).to receive(:transaction_open?).and_return(false)
-
-          expect(backfill).to receive(:perform).and_return(0)
-        end
-
-        expect do
-          migration.finalize_backfilling_partitioned_table source_table
-        end.to raise_error(/failed to update tracking record/)
-      end
-
-      it 'vacuums the table after loading is complete' do
-        expect_next_instance_of(Gitlab::Database::PartitioningMigrationHelpers::BackfillPartitionedTable) do |backfill|
-          allow(backfill).to receive(:perform).and_return(1)
-        end
+      it 'ensures finishing of remaining jobs and vacuums the partitioned table' do
+        expect(migration).to receive(:ensure_batched_background_migration_is_finished)
+          .with(bbm_arguments)
 
         expect(Gitlab::Database::QueryAnalyzers::RestrictAllowedSchemas).to receive(:with_suppressed).and_yield
         expect(migration).to receive(:disable_statement_timeout).and_call_original