diff --git a/changelogs/unreleased/pb-track-background-job-executions.yml b/changelogs/unreleased/pb-track-background-job-executions.yml
new file mode 100644
index 0000000000000000000000000000000000000000..bedf6317e8d24dc58d21ed97a00ddad9d88b4ad9
--- /dev/null
+++ b/changelogs/unreleased/pb-track-background-job-executions.yml
@@ -0,0 +1,5 @@
+---
+title: Create tables to track auto-batched background migrations
+merge_request: 54628
+author:
+type: added
diff --git a/db/migrate/20210128172149_create_background_migration_tracking_tables.rb b/db/migrate/20210128172149_create_background_migration_tracking_tables.rb
new file mode 100644
index 0000000000000000000000000000000000000000..767bd8737a35c638b5feb29b5aa67fe98e061222
--- /dev/null
+++ b/db/migrate/20210128172149_create_background_migration_tracking_tables.rb
@@ -0,0 +1,59 @@
+# frozen_string_literal: true
+
+class CreateBackgroundMigrationTrackingTables < ActiveRecord::Migration[6.0]
+  include Gitlab::Database::MigrationHelpers
+
+  DOWNTIME = false
+
+  def change
+    create_table_with_constraints :batched_background_migrations do |t|
+      t.timestamps_with_timezone
+      t.bigint :min_value, null: false, default: 1
+      t.bigint :max_value, null: false
+      t.integer :batch_size, null: false
+      t.integer :sub_batch_size, null: false
+      t.integer :interval, limit: 2, null: false
+      t.integer :status, limit: 2, null: false, default: 0
+      t.text :job_class_name, null: false
+      t.text :batch_class_name, null: false,
+        default: 'Gitlab::Database::BackgroundMigration::PrimaryKeyBatchingStrategy'
+      t.text :table_name, null: false
+      t.text :column_name, null: false
+      t.jsonb :job_arguments, null: false, default: '[]'
+
+      t.text_limit :job_class_name, 100
+      t.text_limit :batch_class_name, 100
+      t.text_limit :table_name, 63
+      t.text_limit :column_name, 63
+
+      t.check_constraint :check_positive_min_value, 'min_value > 0'
+      t.check_constraint :check_max_value_in_range, 'max_value >= min_value'
+
+      t.check_constraint :check_positive_sub_batch_size, 'sub_batch_size > 0'
+      t.check_constraint :check_batch_size_in_range, 'batch_size >= sub_batch_size'
+
+      t.index %i[job_class_name table_name column_name], name: :index_batched_migrations_on_job_table_and_column_name
+    end
+
+    create_table :batched_background_migration_jobs do |t|
+      t.timestamps_with_timezone
+      t.datetime_with_timezone :started_at
+      t.datetime_with_timezone :finished_at
+      t.references :batched_background_migration, null: false, index: false, foreign_key: { on_delete: :cascade }
+      t.bigint :min_value, null: false
+      t.bigint :max_value, null: false
+      t.integer :batch_size, null: false
+      t.integer :sub_batch_size, null: false
+      t.integer :status, limit: 2, null: false, default: 0
+      t.integer :attempts, limit: 2, null: false, default: 0
+
+      t.index [:batched_background_migration_id, :id], name: :index_batched_jobs_by_batched_migration_id_and_id
+    end
+  end
+
+  def down
+    drop_table :batched_background_migration_jobs
+
+    drop_table :batched_background_migrations
+  end
+end
diff --git a/db/schema_migrations/20210128172149 b/db/schema_migrations/20210128172149
new file mode 100644
index 0000000000000000000000000000000000000000..cc0e050be3efee8af212586698e3e2bf52064f2a
--- /dev/null
+++ b/db/schema_migrations/20210128172149
@@ -0,0 +1 @@
+1cf1305ad5eaaef51f99f057b8a2e81731d69a6d02629c0c9a7d94dfdecbea47
\ No newline at end of file
diff --git a/db/structure.sql b/db/structure.sql
index 3d0e8b903cb3b5600edf61e567bdfdebeb32fe05..8a7ddfdfa027e2bd894c35d79e6496cd22b5f880 100644
--- a/db/structure.sql
+++ b/db/structure.sql
@@ -9768,6 +9768,64 @@ CREATE SEQUENCE badges_id_seq
 
 ALTER SEQUENCE badges_id_seq OWNED BY badges.id;
 
+CREATE TABLE batched_background_migration_jobs (
+    id bigint NOT NULL,
+    created_at timestamp with time zone NOT NULL,
+    updated_at timestamp with time zone NOT NULL,
+    started_at timestamp with time zone,
+    finished_at timestamp with time zone,
+    batched_background_migration_id bigint NOT NULL,
+    min_value bigint NOT NULL,
+    max_value bigint NOT NULL,
+    batch_size integer NOT NULL,
+    sub_batch_size integer NOT NULL,
+    status smallint DEFAULT 0 NOT NULL,
+    attempts smallint DEFAULT 0 NOT NULL
+);
+
+CREATE SEQUENCE batched_background_migration_jobs_id_seq
+    START WITH 1
+    INCREMENT BY 1
+    NO MINVALUE
+    NO MAXVALUE
+    CACHE 1;
+
+ALTER SEQUENCE batched_background_migration_jobs_id_seq OWNED BY batched_background_migration_jobs.id;
+
+CREATE TABLE batched_background_migrations (
+    id bigint NOT NULL,
+    created_at timestamp with time zone NOT NULL,
+    updated_at timestamp with time zone NOT NULL,
+    min_value bigint DEFAULT 1 NOT NULL,
+    max_value bigint NOT NULL,
+    batch_size integer NOT NULL,
+    sub_batch_size integer NOT NULL,
+    "interval" smallint NOT NULL,
+    status smallint DEFAULT 0 NOT NULL,
+    job_class_name text NOT NULL,
+    batch_class_name text DEFAULT 'Gitlab::Database::BackgroundMigration::PrimaryKeyBatchingStrategy'::text NOT NULL,
+    table_name text NOT NULL,
+    column_name text NOT NULL,
+    job_arguments jsonb DEFAULT '"[]"'::jsonb NOT NULL,
+    CONSTRAINT check_5bb0382d6f CHECK ((char_length(column_name) <= 63)),
+    CONSTRAINT check_6b6a06254a CHECK ((char_length(table_name) <= 63)),
+    CONSTRAINT check_batch_size_in_range CHECK ((batch_size >= sub_batch_size)),
+    CONSTRAINT check_e6c75b1e29 CHECK ((char_length(job_class_name) <= 100)),
+    CONSTRAINT check_fe10674721 CHECK ((char_length(batch_class_name) <= 100)),
+    CONSTRAINT check_max_value_in_range CHECK ((max_value >= min_value)),
+    CONSTRAINT check_positive_min_value CHECK ((min_value > 0)),
+    CONSTRAINT check_positive_sub_batch_size CHECK ((sub_batch_size > 0))
+);
+
+CREATE SEQUENCE batched_background_migrations_id_seq
+    START WITH 1
+    INCREMENT BY 1
+    NO MINVALUE
+    NO MAXVALUE
+    CACHE 1;
+
+ALTER SEQUENCE batched_background_migrations_id_seq OWNED BY batched_background_migrations.id;
+
 CREATE TABLE board_assignees (
     id integer NOT NULL,
     board_id integer NOT NULL,
@@ -18812,6 +18870,10 @@ ALTER TABLE ONLY background_migration_jobs ALTER COLUMN id SET DEFAULT nextval('
 
 ALTER TABLE ONLY badges ALTER COLUMN id SET DEFAULT nextval('badges_id_seq'::regclass);
 
+ALTER TABLE ONLY batched_background_migration_jobs ALTER COLUMN id SET DEFAULT nextval('batched_background_migration_jobs_id_seq'::regclass);
+
+ALTER TABLE ONLY batched_background_migrations ALTER COLUMN id SET DEFAULT nextval('batched_background_migrations_id_seq'::regclass);
+
 ALTER TABLE ONLY board_assignees ALTER COLUMN id SET DEFAULT nextval('board_assignees_id_seq'::regclass);
 
 ALTER TABLE ONLY board_group_recent_visits ALTER COLUMN id SET DEFAULT nextval('board_group_recent_visits_id_seq'::regclass);
@@ -19893,6 +19955,12 @@ ALTER TABLE ONLY background_migration_jobs
 ALTER TABLE ONLY badges
     ADD CONSTRAINT badges_pkey PRIMARY KEY (id);
 
+ALTER TABLE ONLY batched_background_migration_jobs
+    ADD CONSTRAINT batched_background_migration_jobs_pkey PRIMARY KEY (id);
+
+ALTER TABLE ONLY batched_background_migrations
+    ADD CONSTRAINT batched_background_migrations_pkey PRIMARY KEY (id);
+
 ALTER TABLE ONLY board_assignees
     ADD CONSTRAINT board_assignees_pkey PRIMARY KEY (id);
 
@@ -21641,6 +21709,10 @@ CREATE INDEX index_badges_on_group_id ON badges USING btree (group_id);
 
 CREATE INDEX index_badges_on_project_id ON badges USING btree (project_id);
 
+CREATE INDEX index_batched_jobs_by_batched_migration_id_and_id ON batched_background_migration_jobs USING btree (batched_background_migration_id, id);
+
+CREATE INDEX index_batched_migrations_on_job_table_and_column_name ON batched_background_migrations USING btree (job_class_name, table_name, column_name);
+
 CREATE INDEX index_board_assignees_on_assignee_id ON board_assignees USING btree (assignee_id);
 
 CREATE UNIQUE INDEX index_board_assignees_on_board_id_and_assignee_id ON board_assignees USING btree (board_id, assignee_id);
@@ -25383,6 +25455,9 @@ ALTER TABLE ONLY ci_resources
 ALTER TABLE ONLY clusters_applications_fluentd
     ADD CONSTRAINT fk_rails_4319b1dcd2 FOREIGN KEY (cluster_id) REFERENCES clusters(id) ON DELETE CASCADE;
 
+ALTER TABLE ONLY batched_background_migration_jobs
+    ADD CONSTRAINT fk_rails_432153b86d FOREIGN KEY (batched_background_migration_id) REFERENCES batched_background_migrations(id) ON DELETE CASCADE;
+
 ALTER TABLE ONLY operations_strategies_user_lists
     ADD CONSTRAINT fk_rails_43241e8d29 FOREIGN KEY (strategy_id) REFERENCES operations_strategies(id) ON DELETE CASCADE;
 
diff --git a/lib/gitlab/background_migration/copy_column_using_background_migration_job.rb b/lib/gitlab/background_migration/copy_column_using_background_migration_job.rb
index 16c0de39a3b47b2a89c1f51c97c64eb52864aaea..60682bd2ec11ba179a2d3e9280a665c486124ca8 100644
--- a/lib/gitlab/background_migration/copy_column_using_background_migration_job.rb
+++ b/lib/gitlab/background_migration/copy_column_using_background_migration_job.rb
@@ -2,13 +2,11 @@
 
 module Gitlab
   module BackgroundMigration
-    # Background migration that extends CopyColumn to update the value of a
+    # Background migration that updates the value of a
     # column using the value of another column in the same table.
     #
     # - The {start_id, end_id} arguments are at the start so that it can be used
-    #   with `queue_background_migration_jobs_by_range_at_intervals`
-    # - Provides support for background job tracking through the use of
-    #   Gitlab::Database::BackgroundMigrationJob
+    #   with `queue_batched_background_migration`
     # - Uses sub-batching so that we can keep each update's execution time at
     #   low 100s ms, while being able to update more records per 2 minutes
     #   that we allow background migration jobs to be scheduled one after the other
@@ -22,28 +20,24 @@ class CopyColumnUsingBackgroundMigrationJob
 
       # start_id - The start ID of the range of rows to update.
       # end_id - The end ID of the range of rows to update.
-      # table - The name of the table that contains the columns.
-      # primary_key - The primary key column of the table.
-      # copy_from - The column containing the data to copy.
-      # copy_to - The column to copy the data to.
+      # batch_table - The name of the table that contains the columns.
+      # batch_column - The name of the column we use to batch over the table.
       # sub_batch_size - We don't want updates to take more than ~100ms
       #                  This allows us to run multiple smaller batches during
       #                  the minimum 2.minute interval that we can schedule jobs
-      def perform(start_id, end_id, table, primary_key, copy_from, copy_to, sub_batch_size)
+      # copy_from - The column containing the data to copy.
+      # copy_to - The column to copy the data to.
+      def perform(start_id, end_id, batch_table, batch_column, sub_batch_size, copy_from, copy_to)
         quoted_copy_from = connection.quote_column_name(copy_from)
         quoted_copy_to = connection.quote_column_name(copy_to)
 
-        parent_batch_relation = relation_scoped_to_range(table, primary_key, start_id, end_id)
+        parent_batch_relation = relation_scoped_to_range(batch_table, batch_column, start_id, end_id)
 
-        parent_batch_relation.each_batch(column: primary_key, of: sub_batch_size) do |sub_batch|
+        parent_batch_relation.each_batch(column: batch_column, of: sub_batch_size) do |sub_batch|
           sub_batch.update_all("#{quoted_copy_to}=#{quoted_copy_from}")
 
           sleep(PAUSE_SECONDS)
         end
-
-        # We have to add all arguments when marking a job as succeeded as they
-        #  are all used to track the job by `queue_background_migration_jobs_by_range_at_intervals`
-        mark_job_as_succeeded(start_id, end_id, table, primary_key, copy_from, copy_to, sub_batch_size)
       end
 
       private
@@ -52,10 +46,6 @@ def connection
         ActiveRecord::Base.connection
       end
 
-      def mark_job_as_succeeded(*arguments)
-        Gitlab::Database::BackgroundMigrationJob.mark_all_as_succeeded(self.class.name, arguments)
-      end
-
       def relation_scoped_to_range(source_table, source_key_column, start_id, stop_id)
         define_batchable_model(source_table).where(source_key_column => start_id..stop_id)
       end
diff --git a/lib/gitlab/database/background_migration/batched_job.rb b/lib/gitlab/database/background_migration/batched_job.rb
new file mode 100644
index 0000000000000000000000000000000000000000..3b624df2bfdd0d37ea2fed28d60fa8f18083df56
--- /dev/null
+++ b/lib/gitlab/database/background_migration/batched_job.rb
@@ -0,0 +1,23 @@
+# frozen_string_literal: true
+
+module Gitlab
+  module Database
+    module BackgroundMigration
+      class BatchedJob < ActiveRecord::Base # rubocop:disable Rails/ApplicationRecord
+        self.table_name = :batched_background_migration_jobs
+
+        belongs_to :batched_migration, foreign_key: :batched_background_migration_id
+
+        enum status: {
+          pending: 0,
+          running: 1,
+          failed: 2,
+          succeeded: 3
+        }
+
+        delegate :aborted?, :job_class, :table_name, :column_name, :job_arguments,
+          to: :batched_migration, prefix: :migration
+      end
+    end
+  end
+end
diff --git a/lib/gitlab/database/background_migration/batched_migration.rb b/lib/gitlab/database/background_migration/batched_migration.rb
new file mode 100644
index 0000000000000000000000000000000000000000..316a6dafeea8c3af1497f9cc210c2de077ccfc8d
--- /dev/null
+++ b/lib/gitlab/database/background_migration/batched_migration.rb
@@ -0,0 +1,57 @@
+# frozen_string_literal: true
+
+module Gitlab
+  module Database
+    module BackgroundMigration
+      class BatchedMigration < ActiveRecord::Base # rubocop:disable Rails/ApplicationRecord
+        self.table_name = :batched_background_migrations
+
+        has_many :batched_jobs, foreign_key: :batched_background_migration_id
+        has_one :last_job, -> { order(id: :desc) },
+          class_name: 'Gitlab::Database::BackgroundMigration::BatchedJob',
+          foreign_key: :batched_background_migration_id
+
+        scope :queue_order, -> { order(id: :asc) }
+
+        enum status: {
+          paused: 0,
+          active: 1,
+          aborted: 2,
+          finished: 3
+        }
+
+        def self.remove_toplevel_prefix(name)
+          name&.sub(/\A::/, '')
+        end
+
+        def interval_elapsed?
+          last_job.nil? || last_job.created_at <= Time.current - interval
+        end
+
+        def create_batched_job!(min, max)
+          batched_jobs.create!(min_value: min, max_value: max, batch_size: batch_size, sub_batch_size: sub_batch_size)
+        end
+
+        def next_min_value
+          last_job&.max_value&.next || min_value
+        end
+
+        def job_class
+          job_class_name.constantize
+        end
+
+        def batch_class
+          batch_class_name.constantize
+        end
+
+        def job_class_name=(class_name)
+          write_attribute(:job_class_name, self.class.remove_toplevel_prefix(class_name))
+        end
+
+        def batch_class_name=(class_name)
+          write_attribute(:batch_class_name, self.class.remove_toplevel_prefix(class_name))
+        end
+      end
+    end
+  end
+end
diff --git a/lib/gitlab/database/background_migration/batched_migration_wrapper.rb b/lib/gitlab/database/background_migration/batched_migration_wrapper.rb
new file mode 100644
index 0000000000000000000000000000000000000000..299bd99219730c3ed6b0dd9f9fb2ad24768d462b
--- /dev/null
+++ b/lib/gitlab/database/background_migration/batched_migration_wrapper.rb
@@ -0,0 +1,46 @@
+# frozen_string_literal: true
+
+module Gitlab
+  module Database
+    module BackgroundMigration
+      class BatchedMigrationWrapper
+        def perform(batch_tracking_record)
+          start_tracking_execution(batch_tracking_record)
+
+          execute_batch(batch_tracking_record)
+
+          batch_tracking_record.status = :succeeded
+        rescue => e
+          batch_tracking_record.status = :failed
+
+          raise e
+        ensure
+          finish_tracking_execution(batch_tracking_record)
+        end
+
+        private
+
+        def start_tracking_execution(tracking_record)
+          tracking_record.update!(attempts: tracking_record.attempts + 1, status: :running, started_at: Time.current)
+        end
+
+        def execute_batch(tracking_record)
+          job_instance = tracking_record.migration_job_class.new
+
+          job_instance.perform(
+            tracking_record.min_value,
+            tracking_record.max_value,
+            tracking_record.migration_table_name,
+            tracking_record.migration_column_name,
+            tracking_record.sub_batch_size,
+            *tracking_record.migration_job_arguments)
+        end
+
+        def finish_tracking_execution(tracking_record)
+          tracking_record.finished_at = Time.current
+          tracking_record.save!
+        end
+      end
+    end
+  end
+end
diff --git a/lib/gitlab/database/background_migration/primary_key_batching_strategy.rb b/lib/gitlab/database/background_migration/primary_key_batching_strategy.rb
new file mode 100644
index 0000000000000000000000000000000000000000..46af1371dad2ad09b3ca4d8636ad12b37c28ab9a
--- /dev/null
+++ b/lib/gitlab/database/background_migration/primary_key_batching_strategy.rb
@@ -0,0 +1,27 @@
+# frozen_string_literal: true
+
+module Gitlab
+  module Database
+    module BackgroundMigration
+      class PrimaryKeyBatchingStrategy
+        include Gitlab::Database::DynamicModelHelpers
+
+        def next_batch(table_name, column_name, batch_min_value:, batch_size:)
+          model_class = define_batchable_model(table_name)
+
+          quoted_column_name = model_class.connection.quote_column_name(column_name)
+          relation = model_class.where("#{quoted_column_name} >= ?", batch_min_value)
+          next_batch_bounds = nil
+
+          relation.each_batch(of: batch_size, column: column_name) do |batch| # rubocop:disable Lint/UnreachableLoop
+            next_batch_bounds = batch.pluck(Arel.sql("MIN(#{quoted_column_name}), MAX(#{quoted_column_name})")).first
+
+            break
+          end
+
+          next_batch_bounds
+        end
+      end
+    end
+  end
+end
diff --git a/lib/gitlab/database/background_migration/scheduler.rb b/lib/gitlab/database/background_migration/scheduler.rb
new file mode 100644
index 0000000000000000000000000000000000000000..5f8a5ec06a58ccd761c0a0b030e9effaae4e6fcd
--- /dev/null
+++ b/lib/gitlab/database/background_migration/scheduler.rb
@@ -0,0 +1,60 @@
+# frozen_string_literal: true
+
+module Gitlab
+  module Database
+    module BackgroundMigration
+      class Scheduler
+        def perform(migration_wrapper: BatchedMigrationWrapper.new)
+          active_migration = BatchedMigration.active.queue_order.first
+
+          return unless active_migration&.interval_elapsed?
+
+          if next_batched_job = create_next_batched_job!(active_migration)
+            migration_wrapper.perform(next_batched_job)
+          else
+            finish_active_migration(active_migration)
+          end
+        end
+
+        private
+
+        def create_next_batched_job!(active_migration)
+          next_batch_range = find_next_batch_range(active_migration)
+
+          return if next_batch_range.nil?
+
+          active_migration.create_batched_job!(next_batch_range.min, next_batch_range.max)
+        end
+
+        def find_next_batch_range(active_migration)
+          batching_strategy = active_migration.batch_class.new
+          batch_min_value = active_migration.next_min_value
+
+          next_batch_bounds = batching_strategy.next_batch(
+            active_migration.table_name,
+            active_migration.column_name,
+            batch_min_value: batch_min_value,
+            batch_size: active_migration.batch_size)
+
+          return if next_batch_bounds.nil?
+
+          clamped_batch_range(active_migration, next_batch_bounds)
+        end
+
+        def clamped_batch_range(active_migration, next_bounds)
+          min_value, max_value = next_bounds
+
+          return if min_value > active_migration.max_value
+
+          max_value = max_value.clamp(min_value, active_migration.max_value)
+
+          (min_value..max_value)
+        end
+
+        def finish_active_migration(active_migration)
+          active_migration.finished!
+        end
+      end
+    end
+  end
+end
diff --git a/lib/gitlab/database/migration_helpers.rb b/lib/gitlab/database/migration_helpers.rb
index 6b169a504f37d4a07757f81df44c019f780f2156..e8ed3bb1258afa8138dbb1b90a4607786da0691e 100644
--- a/lib/gitlab/database/migration_helpers.rb
+++ b/lib/gitlab/database/migration_helpers.rb
@@ -1015,7 +1015,7 @@ def initialize_conversion_of_integer_to_bigint(
           'CopyColumnUsingBackgroundMigrationJob',
           interval,
           batch_size: batch_size,
-          other_job_arguments: [table, primary_key, column, tmp_column, sub_batch_size],
+          other_job_arguments: [table, primary_key, sub_batch_size, column, tmp_column],
           track_jobs: true,
           primary_column_name: primary_key
         )
diff --git a/spec/factories/gitlab/database/background_migration/batched_jobs.rb b/spec/factories/gitlab/database/background_migration/batched_jobs.rb
new file mode 100644
index 0000000000000000000000000000000000000000..52bc04447da571611542aa8fafc5c9cbdbd00ddc
--- /dev/null
+++ b/spec/factories/gitlab/database/background_migration/batched_jobs.rb
@@ -0,0 +1,12 @@
+# frozen_string_literal: true
+
+FactoryBot.define do
+  factory :batched_background_migration_job, class: '::Gitlab::Database::BackgroundMigration::BatchedJob' do
+    batched_migration factory: :batched_background_migration
+
+    min_value { 1 }
+    max_value { 10 }
+    batch_size { 5 }
+    sub_batch_size { 1 }
+  end
+end
diff --git a/spec/factories/gitlab/database/background_migration/batched_migrations.rb b/spec/factories/gitlab/database/background_migration/batched_migrations.rb
new file mode 100644
index 0000000000000000000000000000000000000000..03ae2e664f005b7efeac480899f5ff5385bfda21
--- /dev/null
+++ b/spec/factories/gitlab/database/background_migration/batched_migrations.rb
@@ -0,0 +1,13 @@
+# frozen_string_literal: true
+
+FactoryBot.define do
+  factory :batched_background_migration, class: '::Gitlab::Database::BackgroundMigration::BatchedMigration' do
+    max_value { 10 }
+    batch_size { 5 }
+    sub_batch_size { 1 }
+    interval { 2.minutes }
+    job_class_name { 'Gitlab::BackgroundMigration::CopyColumnUsingBackgroundMigrationJob' }
+    table_name { :events }
+    column_name { :id }
+  end
+end
diff --git a/spec/lib/gitlab/background_migration/copy_column_using_background_migration_job_spec.rb b/spec/lib/gitlab/background_migration/copy_column_using_background_migration_job_spec.rb
index 110a1ff8a08a220aeecdb2db4c87010c941dd884..7ad93c3124ace304c2e4fcbbd9a7c36231989a16 100644
--- a/spec/lib/gitlab/background_migration/copy_column_using_background_migration_job_spec.rb
+++ b/spec/lib/gitlab/background_migration/copy_column_using_background_migration_job_spec.rb
@@ -38,22 +38,9 @@
 
   describe '#perform' do
     let(:migration_class) { described_class.name }
-    let!(:job1) do
-      table(:background_migration_jobs).create!(
-        class_name: migration_class,
-        arguments: [1, 10, table_name, 'id', 'id', 'id_convert_to_bigint', sub_batch_size]
-      )
-    end
-
-    let!(:job2) do
-      table(:background_migration_jobs).create!(
-        class_name: migration_class,
-        arguments: [11, 20, table_name, 'id', 'id', 'id_convert_to_bigint', sub_batch_size]
-      )
-    end
 
     it 'copies all primary keys in range' do
-      subject.perform(12, 15, table_name, 'id', 'id', 'id_convert_to_bigint', sub_batch_size)
+      subject.perform(12, 15, table_name, 'id', sub_batch_size, 'id', 'id_convert_to_bigint')
 
       expect(test_table.where('id = id_convert_to_bigint').pluck(:id)).to contain_exactly(12, 15)
       expect(test_table.where(id_convert_to_bigint: 0).pluck(:id)).to contain_exactly(11, 19)
@@ -61,7 +48,7 @@
     end
 
     it 'copies all foreign keys in range' do
-      subject.perform(10, 14, table_name, 'id', 'fk', 'fk_convert_to_bigint', sub_batch_size)
+      subject.perform(10, 14, table_name, 'id', sub_batch_size, 'fk', 'fk_convert_to_bigint')
 
       expect(test_table.where('fk = fk_convert_to_bigint').pluck(:id)).to contain_exactly(11, 12)
       expect(test_table.where(fk_convert_to_bigint: 0).pluck(:id)).to contain_exactly(15, 19)
@@ -71,21 +58,11 @@
     it 'copies columns with NULLs' do
       expect(test_table.where("name_convert_to_text = 'no name'").count).to eq(4)
 
-      subject.perform(10, 20, table_name, 'id', 'name', 'name_convert_to_text', sub_batch_size)
+      subject.perform(10, 20, table_name, 'id', sub_batch_size, 'name', 'name_convert_to_text')
 
       expect(test_table.where('name = name_convert_to_text').pluck(:id)).to contain_exactly(11, 12, 19)
       expect(test_table.where('name is NULL and name_convert_to_text is NULL').pluck(:id)).to contain_exactly(15)
       expect(test_table.where("name_convert_to_text = 'no name'").count).to eq(0)
     end
-
-    it 'tracks completion with BackgroundMigrationJob' do
-      expect do
-        subject.perform(11, 20, table_name, 'id', 'id', 'id_convert_to_bigint', sub_batch_size)
-      end.to change { Gitlab::Database::BackgroundMigrationJob.succeeded.count }.from(0).to(1)
-
-      expect(job1.reload.status).to eq(0)
-      expect(job2.reload.status).to eq(1)
-      expect(test_table.where('id = id_convert_to_bigint').count).to eq(4)
-    end
   end
 end
diff --git a/spec/lib/gitlab/database/background_migration/batched_job_spec.rb b/spec/lib/gitlab/database/background_migration/batched_job_spec.rb
new file mode 100644
index 0000000000000000000000000000000000000000..1020aafcf085797fa7fac0478ea8c64bf5eda708
--- /dev/null
+++ b/spec/lib/gitlab/database/background_migration/batched_job_spec.rb
@@ -0,0 +1,50 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe Gitlab::Database::BackgroundMigration::BatchedJob, type: :model do
+  it_behaves_like 'having unique enum values'
+
+  describe 'associations' do
+    it { is_expected.to belong_to(:batched_migration).with_foreign_key(:batched_background_migration_id) }
+  end
+
+  describe 'delegated batched_migration attributes' do
+    let(:batched_job) { build(:batched_background_migration_job) }
+    let(:batched_migration) { batched_job.batched_migration }
+
+    describe '#migration_aborted?' do
+      before do
+        batched_migration.status = :aborted
+      end
+
+      it 'returns the migration aborted?' do
+        expect(batched_job.migration_aborted?).to eq(batched_migration.aborted?)
+      end
+    end
+
+    describe '#migration_job_class' do
+      it 'returns the migration job_class' do
+        expect(batched_job.migration_job_class).to eq(batched_migration.job_class)
+      end
+    end
+
+    describe '#migration_table_name' do
+      it 'returns the migration table_name' do
+        expect(batched_job.migration_table_name).to eq(batched_migration.table_name)
+      end
+    end
+
+    describe '#migration_column_name' do
+      it 'returns the migration column_name' do
+        expect(batched_job.migration_column_name).to eq(batched_migration.column_name)
+      end
+    end
+
+    describe '#migration_job_arguments' do
+      it 'returns the migration job_arguments' do
+        expect(batched_job.migration_job_arguments).to eq(batched_migration.job_arguments)
+      end
+    end
+  end
+end
diff --git a/spec/lib/gitlab/database/background_migration/batched_migration_spec.rb b/spec/lib/gitlab/database/background_migration/batched_migration_spec.rb
new file mode 100644
index 0000000000000000000000000000000000000000..df0abaae7218c4b440c10f9fb1106ecc343ad70e
--- /dev/null
+++ b/spec/lib/gitlab/database/background_migration/batched_migration_spec.rb
@@ -0,0 +1,160 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe Gitlab::Database::BackgroundMigration::BatchedMigration, type: :model do
+  it_behaves_like 'having unique enum values'
+
+  describe 'associations' do
+    it { is_expected.to have_many(:batched_jobs).with_foreign_key(:batched_background_migration_id) }
+
+    describe '#last_job' do
+      let!(:batched_migration) { create(:batched_background_migration) }
+      let!(:batched_job1) { create(:batched_background_migration_job, batched_migration: batched_migration) }
+      let!(:batched_job2) { create(:batched_background_migration_job, batched_migration: batched_migration) }
+
+      it 'returns the most recent (in order of id) batched job' do
+        expect(batched_migration.last_job).to eq(batched_job2)
+      end
+    end
+  end
+
+  describe '.queue_order' do
+    let!(:migration1) { create(:batched_background_migration) }
+    let!(:migration2) { create(:batched_background_migration) }
+    let!(:migration3) { create(:batched_background_migration) }
+
+    it 'returns batched migrations ordered by their id' do
+      expect(described_class.queue_order.all).to eq([migration1, migration2, migration3])
+    end
+  end
+
+  describe '#interval_elapsed?' do
+    context 'when the migration has no last_job' do
+      let(:batched_migration) { build(:batched_background_migration) }
+
+      it 'returns true' do
+        expect(batched_migration.interval_elapsed?).to eq(true)
+      end
+    end
+
+    context 'when the migration has a last_job' do
+      let(:interval) { 2.minutes }
+      let(:batched_migration) { create(:batched_background_migration, interval: interval) }
+
+      context 'when the last_job is less than an interval old' do
+        it 'returns false' do
+          freeze_time do
+            create(:batched_background_migration_job,
+              batched_migration: batched_migration,
+              created_at: Time.current - 1.minute)
+
+            expect(batched_migration.interval_elapsed?).to eq(false)
+          end
+        end
+      end
+
+      context 'when the last_job is exactly an interval old' do
+        it 'returns true' do
+          freeze_time do
+            create(:batched_background_migration_job,
+              batched_migration: batched_migration,
+              created_at: Time.current - 2.minutes)
+
+            expect(batched_migration.interval_elapsed?).to eq(true)
+          end
+        end
+      end
+
+      context 'when the last_job is more than an interval old' do
+        it 'returns true' do
+          freeze_time do
+            create(:batched_background_migration_job,
+              batched_migration: batched_migration,
+              created_at: Time.current - 3.minutes)
+
+            expect(batched_migration.interval_elapsed?).to eq(true)
+          end
+        end
+      end
+    end
+  end
+
+  describe '#create_batched_job!' do
+    let(:batched_migration) { create(:batched_background_migration) }
+
+    it 'creates a batched_job with the correct batch configuration' do
+      batched_job = batched_migration.create_batched_job!(1, 5)
+
+      expect(batched_job).to have_attributes(
+        min_value: 1,
+        max_value: 5,
+        batch_size: batched_migration.batch_size,
+        sub_batch_size: batched_migration.sub_batch_size)
+    end
+  end
+
+  describe '#next_min_value' do
+    let!(:batched_migration) { create(:batched_background_migration) }
+
+    context 'when a previous job exists' do
+      let!(:batched_job) { create(:batched_background_migration_job, batched_migration: batched_migration) }
+
+      it 'returns the next value after the previous maximum' do
+        expect(batched_migration.next_min_value).to eq(batched_job.max_value + 1)
+      end
+    end
+
+    context 'when a previous job does not exist' do
+      it 'returns the migration minimum value' do
+        expect(batched_migration.next_min_value).to eq(batched_migration.min_value)
+      end
+    end
+  end
+
+  describe '#job_class' do
+    let(:job_class) { Gitlab::BackgroundMigration::CopyColumnUsingBackgroundMigrationJob }
+    let(:batched_migration) { build(:batched_background_migration) }
+
+    it 'returns the class of the job for the migration' do
+      expect(batched_migration.job_class).to eq(job_class)
+    end
+  end
+
+  describe '#batch_class' do
+    let(:batch_class) { Gitlab::Database::BackgroundMigration::PrimaryKeyBatchingStrategy}
+    let(:batched_migration) { build(:batched_background_migration) }
+
+    it 'returns the class of the batch strategy for the migration' do
+      expect(batched_migration.batch_class).to eq(batch_class)
+    end
+  end
+
+  shared_examples_for 'an attr_writer that normalizes assigned class names' do |attribute_name|
+    let(:batched_migration) { build(:batched_background_migration) }
+
+    context 'when the toplevel namespace prefix exists' do
+      it 'removes the leading prefix' do
+        batched_migration.public_send(:"#{attribute_name}=", '::Foo::Bar')
+
+        expect(batched_migration[attribute_name]).to eq('Foo::Bar')
+      end
+    end
+
+    context 'when the toplevel namespace prefix does not exist' do
+      it 'does not change the given class name' do
+        batched_migration.public_send(:"#{attribute_name}=", '::Foo::Bar')
+
+        expect(batched_migration[attribute_name]).to eq('Foo::Bar')
+      end
+    end
+  end
+
+  describe '#job_class_name=' do
+    it_behaves_like 'an attr_writer that normalizes assigned class names', :job_class_name
+  end
+
+  describe '#batch_class_name=' do
+    it_behaves_like 'an attr_writer that normalizes assigned class names', :batch_class_name
+  end
+end
diff --git a/spec/lib/gitlab/database/background_migration/batched_migration_wrapper_spec.rb b/spec/lib/gitlab/database/background_migration/batched_migration_wrapper_spec.rb
new file mode 100644
index 0000000000000000000000000000000000000000..17cceb35ff7745ce4f5f86ee91001395407c04ca
--- /dev/null
+++ b/spec/lib/gitlab/database/background_migration/batched_migration_wrapper_spec.rb
@@ -0,0 +1,70 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe Gitlab::Database::BackgroundMigration::BatchedMigrationWrapper, '#perform' do
+  let(:migration_wrapper) { described_class.new }
+  let(:job_class) { Gitlab::BackgroundMigration::CopyColumnUsingBackgroundMigrationJob }
+
+  let_it_be(:active_migration) { create(:batched_background_migration, :active, job_arguments: [:id, :other_id]) }
+
+  let!(:job_record) { create(:batched_background_migration_job, batched_migration: active_migration) }
+
+  it 'runs the migration job' do
+    expect_next_instance_of(job_class) do |job_instance|
+      expect(job_instance).to receive(:perform).with(1, 10, 'events', 'id', 1, 'id', 'other_id')
+    end
+
+    migration_wrapper.perform(job_record)
+  end
+
+  it 'updates the the tracking record in the database' do
+    expect(job_record).to receive(:update!).with(hash_including(attempts: 1, status: :running)).and_call_original
+
+    freeze_time do
+      migration_wrapper.perform(job_record)
+
+      reloaded_job_record = job_record.reload
+
+      expect(reloaded_job_record).not_to be_pending
+      expect(reloaded_job_record.attempts).to eq(1)
+      expect(reloaded_job_record.started_at).to eq(Time.current)
+    end
+  end
+
+  context 'when the migration job does not raise an error' do
+    it 'marks the tracking record as succeeded' do
+      expect_next_instance_of(job_class) do |job_instance|
+        expect(job_instance).to receive(:perform).with(1, 10, 'events', 'id', 1, 'id', 'other_id')
+      end
+
+      freeze_time do
+        migration_wrapper.perform(job_record)
+
+        reloaded_job_record = job_record.reload
+
+        expect(reloaded_job_record).to be_succeeded
+        expect(reloaded_job_record.finished_at).to eq(Time.current)
+      end
+    end
+  end
+
+  context 'when the migration job raises an error' do
+    it 'marks the tracking record as failed before raising the error' do
+      expect_next_instance_of(job_class) do |job_instance|
+        expect(job_instance).to receive(:perform)
+          .with(1, 10, 'events', 'id', 1, 'id', 'other_id')
+          .and_raise(RuntimeError, 'Something broke!')
+      end
+
+      freeze_time do
+        expect { migration_wrapper.perform(job_record) }.to raise_error(RuntimeError, 'Something broke!')
+
+        reloaded_job_record = job_record.reload
+
+        expect(reloaded_job_record).to be_failed
+        expect(reloaded_job_record.finished_at).to eq(Time.current)
+      end
+    end
+  end
+end
diff --git a/spec/lib/gitlab/database/background_migration/primary_key_batching_strategy_spec.rb b/spec/lib/gitlab/database/background_migration/primary_key_batching_strategy_spec.rb
new file mode 100644
index 0000000000000000000000000000000000000000..6baf6f319cf05053f6401e05fc60ac1617b7c88a
--- /dev/null
+++ b/spec/lib/gitlab/database/background_migration/primary_key_batching_strategy_spec.rb
@@ -0,0 +1,44 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe Gitlab::Database::BackgroundMigration::PrimaryKeyBatchingStrategy, '#next_batch' do
+  let(:batching_strategy) { described_class.new }
+
+  let_it_be(:event1) { create(:event) }
+  let_it_be(:event2) { create(:event) }
+  let_it_be(:event3) { create(:event) }
+  let_it_be(:event4) { create(:event) }
+
+  context 'when starting on the first batch' do
+    it 'returns the bounds of the next batch' do
+      batch_bounds = batching_strategy.next_batch(:events, :id, batch_min_value: event1.id, batch_size: 3)
+
+      expect(batch_bounds).to eq([event1.id, event3.id])
+    end
+  end
+
+  context 'when additional batches remain' do
+    it 'returns the bounds of the next batch' do
+      batch_bounds = batching_strategy.next_batch(:events, :id, batch_min_value: event2.id, batch_size: 3)
+
+      expect(batch_bounds).to eq([event2.id, event4.id])
+    end
+  end
+
+  context 'when on the final batch' do
+    it 'returns the bounds of the next batch' do
+      batch_bounds = batching_strategy.next_batch(:events, :id, batch_min_value: event4.id, batch_size: 3)
+
+      expect(batch_bounds).to eq([event4.id, event4.id])
+    end
+  end
+
+  context 'when no additional batches remain' do
+    it 'returns nil' do
+      batch_bounds = batching_strategy.next_batch(:events, :id, batch_min_value: event4.id + 1, batch_size: 1)
+
+      expect(batch_bounds).to be_nil
+    end
+  end
+end
diff --git a/spec/lib/gitlab/database/background_migration/scheduler_spec.rb b/spec/lib/gitlab/database/background_migration/scheduler_spec.rb
new file mode 100644
index 0000000000000000000000000000000000000000..ba745acdf8a35a21f477b4395bac3e9592135eab
--- /dev/null
+++ b/spec/lib/gitlab/database/background_migration/scheduler_spec.rb
@@ -0,0 +1,182 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe Gitlab::Database::BackgroundMigration::Scheduler, '#perform' do
+  let(:scheduler) { described_class.new }
+
+  shared_examples_for 'it has no jobs to run' do
+    it 'does not create and run a migration job' do
+      test_wrapper = double('test wrapper')
+
+      expect(test_wrapper).not_to receive(:perform)
+
+      expect do
+        scheduler.perform(migration_wrapper: test_wrapper)
+      end.not_to change { Gitlab::Database::BackgroundMigration::BatchedJob.count }
+    end
+  end
+
+  context 'when there are no active migrations' do
+    let!(:migration) { create(:batched_background_migration, :finished) }
+
+    it_behaves_like 'it has no jobs to run'
+  end
+
+  shared_examples_for 'it has completed the migration' do
+    it 'marks the migration as finished' do
+      relation = Gitlab::Database::BackgroundMigration::BatchedMigration.finished.where(id: first_migration.id)
+
+      expect { scheduler.perform }.to change { relation.count }.by(1)
+    end
+  end
+
+  context 'when there are active migrations' do
+    let!(:first_migration) { create(:batched_background_migration, :active, batch_size: 2) }
+    let!(:last_migration) { create(:batched_background_migration, :active) }
+
+    let(:job_relation) do
+      Gitlab::Database::BackgroundMigration::BatchedJob.where(batched_background_migration_id: first_migration.id)
+    end
+
+    context 'when the migration interval has not elapsed' do
+      before do
+        expect_next_found_instance_of(Gitlab::Database::BackgroundMigration::BatchedMigration) do |migration|
+          expect(migration).to receive(:interval_elapsed?).and_return(false)
+        end
+      end
+
+      it_behaves_like 'it has no jobs to run'
+    end
+
+    context 'when the interval has elapsed' do
+      before do
+        expect_next_found_instance_of(Gitlab::Database::BackgroundMigration::BatchedMigration) do |migration|
+          expect(migration).to receive(:interval_elapsed?).and_return(true)
+        end
+      end
+
+      context 'when the first migration has no previous jobs' do
+        context 'when the migration has batches to process' do
+          let!(:event1) { create(:event) }
+          let!(:event2) { create(:event) }
+          let!(:event3) { create(:event) }
+
+          it 'runs the job for the first batch' do
+            first_migration.update!(min_value: event1.id, max_value: event3.id)
+
+            expect_next_instance_of(Gitlab::Database::BackgroundMigration::BatchedMigrationWrapper) do |wrapper|
+              expect(wrapper).to receive(:perform).and_wrap_original do |_, job_record|
+                expect(job_record).to eq(job_relation.first)
+              end
+            end
+
+            expect { scheduler.perform }.to change { job_relation.count }.by(1)
+
+            expect(job_relation.first).to have_attributes(
+              min_value: event1.id,
+              max_value: event2.id,
+              batch_size: first_migration.batch_size,
+              sub_batch_size: first_migration.sub_batch_size)
+          end
+        end
+
+        context 'when the migration has no batches to process' do
+          it_behaves_like 'it has no jobs to run'
+          it_behaves_like 'it has completed the migration'
+        end
+      end
+
+      context 'when the first migration has previous jobs' do
+        let!(:event1) { create(:event) }
+        let!(:event2) { create(:event) }
+        let!(:event3) { create(:event) }
+
+        let!(:previous_job) do
+          create(:batched_background_migration_job,
+            batched_migration: first_migration,
+            min_value: event1.id,
+            max_value: event2.id,
+            batch_size: 2,
+            sub_batch_size: 1)
+        end
+
+        context 'when the migration is ready to process another job' do
+          it 'runs the migration job for the next batch' do
+            first_migration.update!(min_value: event1.id, max_value: event3.id)
+
+            expect_next_instance_of(Gitlab::Database::BackgroundMigration::BatchedMigrationWrapper) do |wrapper|
+              expect(wrapper).to receive(:perform).and_wrap_original do |_, job_record|
+                expect(job_record).to eq(job_relation.last)
+              end
+            end
+
+            expect { scheduler.perform }.to change { job_relation.count }.by(1)
+
+            expect(job_relation.last).to have_attributes(
+              min_value: event3.id,
+              max_value: event3.id,
+              batch_size: first_migration.batch_size,
+              sub_batch_size: first_migration.sub_batch_size)
+          end
+        end
+
+        context 'when the migration has no batches remaining' do
+          let!(:final_job) do
+            create(:batched_background_migration_job,
+              batched_migration: first_migration,
+              min_value: event3.id,
+              max_value: event3.id,
+              batch_size: 2,
+              sub_batch_size: 1)
+          end
+
+          it_behaves_like 'it has no jobs to run'
+          it_behaves_like 'it has completed the migration'
+        end
+      end
+
+      context 'when the bounds of the next batch exceed the migration maximum value' do
+        let!(:events) { create_list(:event, 3) }
+        let(:event1) { events[0] }
+        let(:event2) { events[1] }
+
+        context 'when the batch maximum exceeds the migration maximum' do
+          it 'clamps the batch maximum to the migration maximum' do
+            first_migration.update!(batch_size: 5, min_value: event1.id, max_value: event2.id)
+
+            expect_next_instance_of(Gitlab::Database::BackgroundMigration::BatchedMigrationWrapper) do |wrapper|
+              expect(wrapper).to receive(:perform)
+            end
+
+            expect { scheduler.perform }.to change { job_relation.count }.by(1)
+
+            expect(job_relation.first).to have_attributes(
+              min_value: event1.id,
+              max_value: event2.id,
+              batch_size: first_migration.batch_size,
+              sub_batch_size: first_migration.sub_batch_size)
+          end
+        end
+
+        context 'when the batch minimum exceeds the migration maximum' do
+          let!(:previous_job) do
+            create(:batched_background_migration_job,
+              batched_migration: first_migration,
+              min_value: event1.id,
+              max_value: event2.id,
+              batch_size: 5,
+              sub_batch_size: 1)
+          end
+
+          before do
+            first_migration.update!(batch_size: 5, min_value: 1, max_value: event2.id)
+          end
+
+          it_behaves_like 'it has no jobs to run'
+          it_behaves_like 'it has completed the migration'
+        end
+      end
+    end
+  end
+end
diff --git a/spec/lib/gitlab/database/migration_helpers_spec.rb b/spec/lib/gitlab/database/migration_helpers_spec.rb
index 6de7fc3a50eef98e48582549483fdceda92819c6..aceb2299cc0adc2c2735ba940bb1df9cb03a2d09 100644
--- a/spec/lib/gitlab/database/migration_helpers_spec.rb
+++ b/spec/lib/gitlab/database/migration_helpers_spec.rb
@@ -1720,7 +1720,7 @@
           .with(
             2.minutes,
             'CopyColumnUsingBackgroundMigrationJob',
-            [event.id, event.id, :events, :id, :id, 'id_convert_to_bigint', 100]
+            [event.id, event.id, :events, :id, 100, :id, 'id_convert_to_bigint']
           )
 
         expect(Gitlab::BackgroundMigration)