diff --git a/db/docs/batched_background_migrations/backfill_root_storage_statistics_fork_storage_sizes.yml b/db/docs/batched_background_migrations/backfill_root_storage_statistics_fork_storage_sizes.yml
new file mode 100644
index 0000000000000000000000000000000000000000..e029488f5c4b6aeea7ef858f5a46eb895522ad22
--- /dev/null
+++ b/db/docs/batched_background_migrations/backfill_root_storage_statistics_fork_storage_sizes.yml
@@ -0,0 +1,6 @@
+---
+migration_job_name: BackfillRootStorageStatisticsForkStorageSizes
+description: Backfill the public_forks_storage_size, internal_forks_storage_size, and private_forks_storage_size columns on the namespace_root_storage_statistics table
+feature_category: consumables_cost_management
+introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/120916
+milestone: 16.1
diff --git a/db/post_migrate/20230517163300_queue_backfill_root_storage_statistics_fork_storage_sizes.rb b/db/post_migrate/20230517163300_queue_backfill_root_storage_statistics_fork_storage_sizes.rb
new file mode 100644
index 0000000000000000000000000000000000000000..6732e33d0a4ce30ba48893808b291e0542dbc51c
--- /dev/null
+++ b/db/post_migrate/20230517163300_queue_backfill_root_storage_statistics_fork_storage_sizes.rb
@@ -0,0 +1,25 @@
+# frozen_string_literal: true
+
+class QueueBackfillRootStorageStatisticsForkStorageSizes < Gitlab::Database::Migration[2.1]
+  MIGRATION = "BackfillRootStorageStatisticsForkStorageSizes"
+  DELAY_INTERVAL = 2.minutes
+  BATCH_SIZE = 1000
+  SUB_BATCH_SIZE = 100
+
+  restrict_gitlab_migration gitlab_schema: :gitlab_main
+
+  def up
+    queue_batched_background_migration(
+      MIGRATION,
+      :namespace_root_storage_statistics,
+      :namespace_id,
+      job_interval: DELAY_INTERVAL,
+      batch_size: BATCH_SIZE,
+      sub_batch_size: SUB_BATCH_SIZE
+    )
+  end
+
+  def down
+    delete_batched_background_migration(MIGRATION, :namespace_root_storage_statistics, :namespace_id, [])
+  end
+end
diff --git a/db/schema_migrations/20230517163300 b/db/schema_migrations/20230517163300
new file mode 100644
index 0000000000000000000000000000000000000000..a2baef62fe995fa408f7644f3e4b54e362b03cbb
--- /dev/null
+++ b/db/schema_migrations/20230517163300
@@ -0,0 +1 @@
+bdda58a5015942f8fe98dedbbab66ff9e39505229da2e6d1726eb73105a89ae1
\ No newline at end of file
diff --git a/lib/gitlab/background_migration/backfill_root_storage_statistics_fork_storage_sizes.rb b/lib/gitlab/background_migration/backfill_root_storage_statistics_fork_storage_sizes.rb
new file mode 100644
index 0000000000000000000000000000000000000000..23c510720c071802fb5707c7b691da5fc468b324
--- /dev/null
+++ b/lib/gitlab/background_migration/backfill_root_storage_statistics_fork_storage_sizes.rb
@@ -0,0 +1,99 @@
+# frozen_string_literal: true
+
+module Gitlab
+  module BackgroundMigration
+    # Backfill the following columns on the namespace_root_storage_statistics table:
+    #   - public_forks_storage_size
+    #   - internal_forks_storage_size
+    #   - private_forks_storage_size
+    class BackfillRootStorageStatisticsForkStorageSizes < BatchedMigrationJob
+      operation_name :backfill_root_storage_statistics_fork_sizes
+      feature_category :consumables_cost_management
+
+      VISIBILITY_LEVELS_TO_STORAGE_SIZE_COLUMNS = {
+        0 => :private_forks_storage_size,
+        10 => :internal_forks_storage_size,
+        20 => :public_forks_storage_size
+      }.freeze
+
+      def perform
+        each_sub_batch do |sub_batch|
+          sub_batch.each do |root_storage_statistics|
+            next if has_fork_data?(root_storage_statistics)
+
+            namespace_id = root_storage_statistics.namespace_id
+
+            namespace_type = execute("SELECT type FROM namespaces WHERE id = #{namespace_id}").first&.fetch('type')
+
+            next if namespace_type.nil?
+
+            sql = if user_namespace?(namespace_type)
+                    user_namespace_sql(namespace_id)
+                  else
+                    group_namespace_sql(namespace_id)
+                  end
+
+            stats = execute(sql)
+              .map { |h| { h['projects_visibility_level'] => h['sum_project_statistics_storage_size'] } }
+              .reduce({}) { |memo, h| memo.merge(h) }
+              .transform_keys { |k| VISIBILITY_LEVELS_TO_STORAGE_SIZE_COLUMNS[k] }
+
+            root_storage_statistics.update!(stats)
+          end
+        end
+      end
+
+      def has_fork_data?(root_storage_statistics)
+        root_storage_statistics.public_forks_storage_size != 0 ||
+          root_storage_statistics.internal_forks_storage_size != 0 ||
+          root_storage_statistics.private_forks_storage_size != 0
+      end
+
+      def user_namespace?(type)
+        type.nil? || type == 'User' || !(type == 'Group' || type == 'Project')
+      end
+
+      def execute(sql)
+        ::ApplicationRecord.connection.execute(sql)
+      end
+
+      def user_namespace_sql(namespace_id)
+        <<~SQL
+          SELECT
+            SUM("project_statistics"."storage_size") AS sum_project_statistics_storage_size,
+            "projects"."visibility_level" AS projects_visibility_level
+          FROM
+            "projects"
+            INNER JOIN "project_statistics" ON "project_statistics"."project_id" = "projects"."id"
+            INNER JOIN "fork_network_members" ON "fork_network_members"."project_id" = "projects"."id"
+            INNER JOIN "fork_networks" ON "fork_networks"."id" = "fork_network_members"."fork_network_id"
+          WHERE
+            "projects"."namespace_id" = #{namespace_id}
+            AND (fork_networks.root_project_id != projects.id)
+          GROUP BY "projects"."visibility_level"
+        SQL
+      end
+
+      def group_namespace_sql(namespace_id)
+        <<~SQL
+          SELECT
+            SUM("project_statistics"."storage_size") AS sum_project_statistics_storage_size,
+            "projects"."visibility_level" AS projects_visibility_level
+          FROM
+            "projects"
+            INNER JOIN "project_statistics" ON "project_statistics"."project_id" = "projects"."id"
+            INNER JOIN "fork_network_members" ON "fork_network_members"."project_id" = "projects"."id"
+            INNER JOIN "fork_networks" ON "fork_networks"."id" = "fork_network_members"."fork_network_id"
+          WHERE
+            "projects"."namespace_id" IN (
+              SELECT namespaces.traversal_ids[array_length(namespaces.traversal_ids, 1)] AS id
+              FROM "namespaces"
+              WHERE "namespaces"."type" = 'Group' AND (traversal_ids @> ('{#{namespace_id}}'))
+            )
+            AND (fork_networks.root_project_id != projects.id)
+          GROUP BY "projects"."visibility_level"
+        SQL
+      end
+    end
+  end
+end
diff --git a/spec/lib/gitlab/background_migration/backfill_root_storage_statistics_fork_storage_sizes_spec.rb b/spec/lib/gitlab/background_migration/backfill_root_storage_statistics_fork_storage_sizes_spec.rb
new file mode 100644
index 0000000000000000000000000000000000000000..a464f89ee69e465004872870d6c658065a5312f5
--- /dev/null
+++ b/spec/lib/gitlab/background_migration/backfill_root_storage_statistics_fork_storage_sizes_spec.rb
@@ -0,0 +1,302 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe Gitlab::BackgroundMigration::BackfillRootStorageStatisticsForkStorageSizes, schema: 20230517163300, feature_category: :consumables_cost_management do # rubocop:disable Layout/LineLength
+  describe '#perform' do
+    let(:namespaces_table) { table(:namespaces) }
+    let(:root_storage_statistics_table) { table(:namespace_root_storage_statistics) }
+    let(:projects_table) { table(:projects) }
+    let(:project_statistics_table) { table(:project_statistics) }
+    let(:fork_networks_table) { table(:fork_networks) }
+    let(:fork_network_members_table) { table(:fork_network_members) }
+
+    it 'updates the public_forks_storage_size' do
+      namespace, root_storage_statistics = create_namespace!
+      project = create_project!(namespace: namespace, visibility_level: Gitlab::VisibilityLevel::PUBLIC)
+      create_fork!(project, storage_size: 100)
+
+      migrate
+
+      expect(root_storage_statistics.reload.public_forks_storage_size).to eq(100)
+    end
+
+    it 'totals the size of public forks in the namespace' do
+      namespace, root_storage_statistics = create_namespace!
+      project = create_project!(namespace: namespace, visibility_level: Gitlab::VisibilityLevel::PUBLIC)
+      create_fork!(project, name: 'my fork', storage_size: 100)
+      create_fork!(project, name: 'my other fork', storage_size: 100)
+
+      migrate
+
+      expect(root_storage_statistics.reload.public_forks_storage_size).to eq(200)
+    end
+
+    it 'updates the internal_forks_storage_size' do
+      namespace, root_storage_statistics = create_namespace!
+      project = create_project!(namespace: namespace, visibility_level: Gitlab::VisibilityLevel::INTERNAL)
+      create_fork!(project, storage_size: 250)
+
+      migrate
+
+      expect(root_storage_statistics.reload.internal_forks_storage_size).to eq(250)
+    end
+
+    it 'totals the size of internal forks in the namespace' do
+      namespace, root_storage_statistics = create_namespace!
+      project = create_project!(namespace: namespace, visibility_level: Gitlab::VisibilityLevel::INTERNAL)
+      create_fork!(project, name: 'my fork', storage_size: 300)
+      create_fork!(project, name: 'my other fork', storage_size: 300)
+
+      migrate
+
+      expect(root_storage_statistics.reload.internal_forks_storage_size).to eq(600)
+    end
+
+    it 'updates the private_forks_storage_size' do
+      namespace, root_storage_statistics = create_namespace!
+      project = create_project!(namespace: namespace, visibility_level: Gitlab::VisibilityLevel::PRIVATE)
+      create_fork!(project, storage_size: 50)
+
+      migrate
+
+      expect(root_storage_statistics.reload.private_forks_storage_size).to eq(50)
+    end
+
+    it 'totals the size of private forks in the namespace' do
+      namespace, root_storage_statistics = create_namespace!
+      project = create_project!(namespace: namespace, visibility_level: Gitlab::VisibilityLevel::PRIVATE)
+      create_fork!(project, name: 'my fork', storage_size: 350)
+      create_fork!(project, name: 'my other fork', storage_size: 400)
+
+      migrate
+
+      expect(root_storage_statistics.reload.private_forks_storage_size).to eq(750)
+    end
+
+    it 'counts only the size of forks' do
+      namespace, root_storage_statistics = create_namespace!
+      project = create_project!(namespace: namespace, storage_size: 100,
+        visibility_level: Gitlab::VisibilityLevel::PUBLIC)
+      create_fork!(project, name: 'my public fork', storage_size: 150,
+        visibility_level: Gitlab::VisibilityLevel::PUBLIC)
+      create_fork!(project, name: 'my internal fork', storage_size: 250,
+        visibility_level: Gitlab::VisibilityLevel::INTERNAL)
+      create_fork!(project, name: 'my private fork', storage_size: 350,
+        visibility_level: Gitlab::VisibilityLevel::PRIVATE)
+
+      migrate
+
+      root_storage_statistics.reload
+      expect(root_storage_statistics.public_forks_storage_size).to eq(150)
+      expect(root_storage_statistics.internal_forks_storage_size).to eq(250)
+      expect(root_storage_statistics.private_forks_storage_size).to eq(350)
+    end
+
+    it 'sums forks for multiple namespaces' do
+      namespace_a, root_storage_statistics_a = create_namespace!
+      namespace_b, root_storage_statistics_b = create_namespace!
+      project = create_project!(namespace: namespace_a)
+      create_fork!(project, namespace: namespace_a, storage_size: 100)
+      create_fork!(project, namespace: namespace_b, storage_size: 200)
+
+      migrate
+
+      expect(root_storage_statistics_a.reload.private_forks_storage_size).to eq(100)
+      expect(root_storage_statistics_b.reload.private_forks_storage_size).to eq(200)
+    end
+
+    it 'counts the size of forks in subgroups' do
+      group, root_storage_statistics = create_group!
+      subgroup = create_group!(parent: group)
+      project = create_project!(namespace: group, visibility_level: Gitlab::VisibilityLevel::PUBLIC)
+      create_fork!(project, namespace: subgroup, name: 'my fork A',
+        storage_size: 123, visibility_level: Gitlab::VisibilityLevel::PUBLIC)
+      create_fork!(project, namespace: subgroup, name: 'my fork B',
+        storage_size: 456, visibility_level: Gitlab::VisibilityLevel::INTERNAL)
+      create_fork!(project, namespace: subgroup, name: 'my fork C',
+        storage_size: 789, visibility_level: Gitlab::VisibilityLevel::PRIVATE)
+
+      migrate
+
+      root_storage_statistics.reload
+      expect(root_storage_statistics.public_forks_storage_size).to eq(123)
+      expect(root_storage_statistics.internal_forks_storage_size).to eq(456)
+      expect(root_storage_statistics.private_forks_storage_size).to eq(789)
+    end
+
+    it 'counts the size of forks in more nested subgroups' do
+      root, root_storage_statistics = create_group!
+      child = create_group!(parent: root)
+      grand_child = create_group!(parent: child)
+      great_grand_child = create_group!(parent: grand_child)
+      project = create_project!(namespace: root, visibility_level: Gitlab::VisibilityLevel::PUBLIC)
+      create_fork!(project, namespace: grand_child, name: 'my fork A',
+        storage_size: 200, visibility_level: Gitlab::VisibilityLevel::PUBLIC)
+      create_fork!(project, namespace: great_grand_child, name: 'my fork B',
+        storage_size: 300, visibility_level: Gitlab::VisibilityLevel::INTERNAL)
+      create_fork!(project, namespace: great_grand_child, name: 'my fork C',
+        storage_size: 400, visibility_level: Gitlab::VisibilityLevel::PRIVATE)
+
+      migrate
+
+      root_storage_statistics.reload
+      expect(root_storage_statistics.public_forks_storage_size).to eq(200)
+      expect(root_storage_statistics.internal_forks_storage_size).to eq(300)
+      expect(root_storage_statistics.private_forks_storage_size).to eq(400)
+    end
+
+    it 'counts forks of forks' do
+      group, root_storage_statistics = create_group!
+      other_group, other_root_storage_statistics = create_group!
+      project = create_project!(namespace: group)
+      fork_a = create_fork!(project, namespace: group, storage_size: 100)
+      fork_b = create_fork!(fork_a, name: 'my other fork', namespace: group, storage_size: 50)
+      create_fork!(fork_b, namespace: other_group, storage_size: 27)
+
+      migrate
+
+      expect(root_storage_statistics.reload.private_forks_storage_size).to eq(150)
+      expect(other_root_storage_statistics.reload.private_forks_storage_size).to eq(27)
+    end
+
+    it 'counts multiple forks of the same project' do
+      group, root_storage_statistics = create_group!
+      project = create_project!(namespace: group)
+      create_fork!(project, storage_size: 200)
+      create_fork!(project, name: 'my other fork', storage_size: 88)
+
+      migrate
+
+      expect(root_storage_statistics.reload.private_forks_storage_size).to eq(288)
+    end
+
+    it 'updates a namespace with no forks' do
+      namespace, root_storage_statistics = create_namespace!
+      create_project!(namespace: namespace)
+
+      migrate
+
+      root_storage_statistics.reload
+      expect(root_storage_statistics.public_forks_storage_size).to eq(0)
+      expect(root_storage_statistics.internal_forks_storage_size).to eq(0)
+      expect(root_storage_statistics.private_forks_storage_size).to eq(0)
+    end
+
+    it 'skips the update if the public_forks_storage_size has already been set' do
+      namespace, root_storage_statistics = create_namespace!
+      project = create_project!(namespace: namespace, visibility_level: Gitlab::VisibilityLevel::PUBLIC)
+      create_fork!(project, storage_size: 200)
+      root_storage_statistics.update!(public_forks_storage_size: 100)
+
+      migrate
+
+      root_storage_statistics.reload
+      expect(root_storage_statistics.public_forks_storage_size).to eq(100)
+    end
+
+    it 'skips the update if the internal_forks_storage_size has already been set' do
+      namespace, root_storage_statistics = create_namespace!
+      project = create_project!(namespace: namespace, visibility_level: Gitlab::VisibilityLevel::INTERNAL)
+      create_fork!(project, storage_size: 200)
+      root_storage_statistics.update!(internal_forks_storage_size: 100)
+
+      migrate
+
+      root_storage_statistics.reload
+      expect(root_storage_statistics.internal_forks_storage_size).to eq(100)
+    end
+
+    it 'skips the update if the private_forks_storage_size has already been set' do
+      namespace, root_storage_statistics = create_namespace!
+      project = create_project!(namespace: namespace, visibility_level: Gitlab::VisibilityLevel::PRIVATE)
+      create_fork!(project, storage_size: 200)
+      root_storage_statistics.update!(private_forks_storage_size: 100)
+
+      migrate
+
+      root_storage_statistics.reload
+      expect(root_storage_statistics.private_forks_storage_size).to eq(100)
+    end
+
+    it 'skips the update if the namespace is not found' do
+      namespace, root_storage_statistics = create_namespace!
+      project = create_project!(namespace: namespace)
+      create_fork!(project, storage_size: 100)
+      allow(::ApplicationRecord.connection).to receive(:execute)
+        .with("SELECT type FROM namespaces WHERE id = #{namespace.id}")
+        .and_return([])
+
+      migrate
+
+      root_storage_statistics.reload
+      expect(root_storage_statistics.public_forks_storage_size).to eq(0)
+      expect(root_storage_statistics.internal_forks_storage_size).to eq(0)
+      expect(root_storage_statistics.private_forks_storage_size).to eq(0)
+    end
+  end
+
+  def create_namespace!(name: 'abc', path: 'abc')
+    namespace = namespaces_table.create!(name: name, path: path)
+    namespace.update!(traversal_ids: [namespace.id])
+    root_storage_statistics = root_storage_statistics_table.create!(namespace_id: namespace.id)
+
+    [namespace, root_storage_statistics]
+  end
+
+  def create_group!(name: 'abc', path: 'abc', parent: nil)
+    parent_id = parent.try(:id)
+    group = namespaces_table.create!(name: name, path: path, type: 'Group', parent_id: parent_id)
+
+    if parent_id
+      parent_traversal_ids = namespaces_table.find(parent_id).traversal_ids
+      group.update!(traversal_ids: parent_traversal_ids + [group.id])
+      group
+    else
+      group.update!(traversal_ids: [group.id])
+      root_storage_statistics = root_storage_statistics_table.create!(namespace_id: group.id)
+      [group, root_storage_statistics]
+    end
+  end
+
+  def create_project!(
+    namespace:, storage_size: 100, name: 'my project',
+    visibility_level: Gitlab::VisibilityLevel::PRIVATE)
+    project_namespace = namespaces_table.create!(name: name, path: name)
+    project = projects_table.create!(name: name, namespace_id: namespace.id, project_namespace_id: project_namespace.id,
+      visibility_level: visibility_level)
+    project_statistics_table.create!(project_id: project.id, namespace_id: project.namespace_id,
+      storage_size: storage_size)
+
+    project
+  end
+
+  def create_fork!(project, storage_size:, name: 'my fork', visibility_level: nil, namespace: nil)
+    fork_namespace = namespace || namespaces_table.find(project.namespace_id)
+    fork_visibility_level = visibility_level || project.visibility_level
+
+    project_fork = create_project!(name: name, namespace: fork_namespace,
+      visibility_level: fork_visibility_level, storage_size: storage_size)
+
+    fork_network_id = if membership = fork_network_members_table.find_by(project_id: project.id)
+                        membership.fork_network_id
+                      else
+                        fork_network = fork_networks_table.create!(root_project_id: project.id)
+                        fork_network_members_table.create!(fork_network_id: fork_network.id, project_id: project.id)
+                        fork_network.id
+                      end
+
+    fork_network_members_table.create!(fork_network_id: fork_network_id, project_id: project_fork.id,
+      forked_from_project_id: project.id)
+
+    project_fork
+  end
+
+  def migrate
+    described_class.new(start_id: 1, end_id: root_storage_statistics_table.last.id,
+      batch_table: 'namespace_root_storage_statistics',
+      batch_column: 'namespace_id',
+      sub_batch_size: 100, pause_ms: 0,
+      connection: ApplicationRecord.connection).perform
+  end
+end
diff --git a/spec/migrations/20230517163300_queue_backfill_root_storage_statistics_fork_storage_sizes_spec.rb b/spec/migrations/20230517163300_queue_backfill_root_storage_statistics_fork_storage_sizes_spec.rb
new file mode 100644
index 0000000000000000000000000000000000000000..f7052020005d6886c600fdcc958e3ab9a29eb42a
--- /dev/null
+++ b/spec/migrations/20230517163300_queue_backfill_root_storage_statistics_fork_storage_sizes_spec.rb
@@ -0,0 +1,26 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+require_migration!
+
+RSpec.describe QueueBackfillRootStorageStatisticsForkStorageSizes, feature_category: :consumables_cost_management do
+  let!(:batched_migration) { described_class::MIGRATION }
+
+  it 'schedules a new batched migration' do
+    reversible_migration do |migration|
+      migration.before -> {
+        expect(batched_migration).not_to have_scheduled_batched_migration
+      }
+
+      migration.after -> {
+        expect(batched_migration).to have_scheduled_batched_migration(
+          table_name: :namespace_root_storage_statistics,
+          column_name: :namespace_id,
+          interval: described_class::DELAY_INTERVAL,
+          batch_size: described_class::BATCH_SIZE,
+          sub_batch_size: described_class::SUB_BATCH_SIZE
+        )
+      }
+    end
+  end
+end