diff --git a/app/services/click_house/rebuild_materialized_view_service.rb b/app/services/click_house/rebuild_materialized_view_service.rb
index 9072f4bfd5fb6b79620a1a4b828549a70a4d8147..4fe87ee1b86296c3cddb4d9fabccd2aaec467e8c 100644
--- a/app/services/click_house/rebuild_materialized_view_service.rb
+++ b/app/services/click_house/rebuild_materialized_view_service.rb
@@ -10,29 +10,34 @@ class RebuildMaterializedViewService
       table_schema = {database_name:String}
     SQL
 
-    def initialize(connection:, state: {})
+    def initialize(connection:, runtime_limiter: Gitlab::Metrics::RuntimeLimiter.new, state: {})
       @connection = connection
 
+      @runtime_limiter = runtime_limiter
       @view_name = state.fetch(:view_name)
       @tmp_view_name = state.fetch(:tmp_view_name)
       @view_table_name = state.fetch(:view_table_name)
       @tmp_view_table_name = state.fetch(:tmp_view_table_name)
       @source_table_name = state.fetch(:source_table_name)
+      @next_value = state[:next_value]
     end
 
     def execute
       create_tmp_materialized_view_table
       create_tmp_materialized_view
 
-      backfill_data
-
-      rename_table
-      drop_tmp_tables
+      backfill_data.tap do |service_response|
+        if service_response.payload[:status] == :finished
+          rename_table
+          drop_tmp_tables if Feature.enabled?(:rebuild_mv_drop_old_tables, type: :gitlab_com_derisk)
+        end
+      end
     end
 
     private
 
-    attr_reader :connection, :view_name, :tmp_view_name, :view_table_name, :tmp_view_table_name, :source_table_name
+    attr_reader :connection, :view_name, :tmp_view_name, :view_table_name, :tmp_view_table_name, :source_table_name,
+      :next_value, :runtime_limiter
 
     def create_tmp_materialized_view_table
       # Create a tmp table from the existing table, use IF NOT EXISTS to avoid failure when the table exists.
@@ -64,7 +69,8 @@ def backfill_data
       })
       view_query = connection.select(query).first['view_definition']
 
-      iterator.each_batch(column: :id, of: INSERT_BATCH_SIZE) do |scope|
+      payload = { status: :finished }
+      iterator.each_batch(column: :id, of: INSERT_BATCH_SIZE) do |scope, _min, max|
         # Use the materialized view query to backfill the new temporary table.
         # The materialized view query selects from the source table, example: FROM events.
         # Replace the FROM part and select data from a batched subquery.
@@ -76,7 +82,14 @@ def backfill_data
 
         # Insert the batch
         connection.execute("INSERT INTO #{quote(tmp_view_table_name)} #{query}")
+
+        if runtime_limiter.over_time?
+          payload.merge!(status: :over_time, next_value: max + 1)
+          break
+        end
       end
+
+      ServiceResponse.success(payload: payload)
     end
 
     def rename_table
@@ -103,7 +116,8 @@ def quote(table)
 
     def iterator
       builder = ClickHouse::QueryBuilder.new(source_table_name)
-      ClickHouse::Iterator.new(query_builder: builder, connection: connection)
+      ClickHouse::Iterator.new(query_builder: builder, connection: connection, min_value: next_value,
+        min_max_strategy: :order_limit)
     end
   end
 end
diff --git a/app/workers/click_house/rebuild_materialized_view_cron_worker.rb b/app/workers/click_house/rebuild_materialized_view_cron_worker.rb
index 2b7513803e8f302dd233e1c1ac94c91daf0c29f0..3064a8f5b1228ac8502186587142f2ab96d31748 100644
--- a/app/workers/click_house/rebuild_materialized_view_cron_worker.rb
+++ b/app/workers/click_house/rebuild_materialized_view_cron_worker.rb
@@ -12,21 +12,75 @@ class RebuildMaterializedViewCronWorker
     worker_has_external_dependencies! # the worker interacts with a ClickHouse database
     feature_category :value_stream_management
 
-    MATERIALIZED_VIEWS = [
-      {
-        view_name: 'contributions_mv',
-        view_table_name: 'contributions',
-        tmp_view_name: 'tmp_contributions_mv',
-        tmp_view_table_name: 'tmp_contributions',
-        source_table_name: 'events'
-      }.freeze
-    ].freeze
+    MAX_TTL = 5.minutes
+    MAX_RUNTIME = 4.minutes
+    REBUILDING_SCHEDULE = 1.week
+    MATERIALIZED_VIEW = {
+      view_name: 'contributions_mv',
+      view_table_name: 'contributions',
+      tmp_view_name: 'tmp_contributions_mv',
+      tmp_view_table_name: 'tmp_contributions',
+      source_table_name: 'events'
+    }.freeze
+
+    def self.redis_key
+      "rebuild_click_house_materialized_view:#{MATERIALIZED_VIEW[:view_name]}"
+    end
 
     def perform
-      connection = ClickHouse::Connection.new(:main)
-      ClickHouse::RebuildMaterializedViewService
-        .new(connection: connection, state: MATERIALIZED_VIEWS.first)
-        .execute
+      return if Feature.disabled?(:rebuild_contributions_mv, type: :gitlab_com_derisk)
+
+      in_lock("#{self.class}:#{MATERIALIZED_VIEW[:view_name]}", ttl: MAX_TTL, retries: 0) do
+        state = build_state
+
+        if state[:finished_at] && DateTime.parse(Gitlab::Json.parse(state[:finished_at])) > REBUILDING_SCHEDULE.ago
+          break
+        end
+
+        service_response = ClickHouse::RebuildMaterializedViewService
+          .new(
+            connection: ClickHouse::Connection.new(:main),
+            runtime_limiter: Gitlab::Metrics::RuntimeLimiter.new(MAX_RUNTIME),
+            state: state)
+          .execute
+
+        payload = service_response.payload
+        current_time = Time.current.to_json
+        if payload[:status] == :over_time
+          state.merge!(
+            next_value: payload[:next_value],
+            last_update_at: current_time,
+            finished_at: nil
+          )
+        else
+          state.merge!(
+            next_value: nil,
+            last_update_at: current_time,
+            finished_at: current_time,
+            started_at: nil
+          )
+        end
+
+        Gitlab::Redis::SharedState.with do |redis|
+          redis.set(self.class.redis_key, Gitlab::Json.dump(state))
+        end
+
+        log_extra_metadata_on_done(:state, state)
+      end
+    end
+
+    private
+
+    def build_state
+      Gitlab::Redis::SharedState.with do |redis|
+        raw = redis.get(self.class.redis_key)
+        state = raw.present? ? Gitlab::Json.parse(raw) : {}
+        state.merge(initial_state).symbolize_keys
+      end
+    end
+
+    def initial_state
+      MATERIALIZED_VIEW.merge(started_at: Time.current)
     end
   end
 end
diff --git a/config/feature_flags/gitlab_com_derisk/rebuild_contributions_mv.yml b/config/feature_flags/gitlab_com_derisk/rebuild_contributions_mv.yml
new file mode 100644
index 0000000000000000000000000000000000000000..2d6306be320c63227d4132e6d1279794dc5e4576
--- /dev/null
+++ b/config/feature_flags/gitlab_com_derisk/rebuild_contributions_mv.yml
@@ -0,0 +1,9 @@
+---
+name: rebuild_contributions_mv
+feature_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/431453
+introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/144478
+rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/441620
+milestone: '16.10'
+group: group::optimize
+type: gitlab_com_derisk
+default_enabled: false
diff --git a/config/feature_flags/gitlab_com_derisk/rebuild_mv_drop_old_tables.yml b/config/feature_flags/gitlab_com_derisk/rebuild_mv_drop_old_tables.yml
new file mode 100644
index 0000000000000000000000000000000000000000..537f00bc614ac290339bbc68a49c5c1c9ad25279
--- /dev/null
+++ b/config/feature_flags/gitlab_com_derisk/rebuild_mv_drop_old_tables.yml
@@ -0,0 +1,9 @@
+---
+name: rebuild_mv_drop_old_tables
+feature_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/431453
+introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/144478
+rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/441584
+milestone: '16.10'
+group: group::optimize
+type: gitlab_com_derisk
+default_enabled: false
diff --git a/config/initializers/1_settings.rb b/config/initializers/1_settings.rb
index 206b0525610873d84ae8a6497e80749f20647484..fb7660cde24958ae641acc912a4ec14ed57ecae2 100644
--- a/config/initializers/1_settings.rb
+++ b/config/initializers/1_settings.rb
@@ -918,6 +918,9 @@
     Settings.cron_jobs['click_house_audit_events_sync_worker'] ||= {}
     Settings.cron_jobs['click_house_audit_events_sync_worker']['cron'] ||= "*/3 * * * *"
     Settings.cron_jobs['click_house_audit_events_sync_worker']['job_class'] = 'ClickHouse::AuditEventsSyncWorker'
+    Settings.cron_jobs['click_house_rebuild_materialized_view_cron_worker'] ||= {}
+    Settings.cron_jobs['click_house_rebuild_materialized_view_cron_worker']['cron'] ||= "*/10 * * * *"
+    Settings.cron_jobs['click_house_rebuild_materialized_view_cron_worker']['job_class'] = 'ClickHouse::RebuildMaterializedViewCronWorker'
     Settings.cron_jobs['vertex_ai_refresh_access_token_worker'] ||= {}
     Settings.cron_jobs['vertex_ai_refresh_access_token_worker']['cron'] ||= '*/50 * * * *'
     Settings.cron_jobs['vertex_ai_refresh_access_token_worker']['job_class'] = 'Llm::VertexAiAccessTokenRefreshWorker'
diff --git a/lib/click_house/iterator.rb b/lib/click_house/iterator.rb
index cd247be2e530c590b155b7c9cec0051f11b0bf0c..9e301cd909514b978396a738a64aaca6240a4e1d 100644
--- a/lib/click_house/iterator.rb
+++ b/lib/click_house/iterator.rb
@@ -31,7 +31,7 @@ def initialize(query_builder:, connection:, min_value: nil, min_max_strategy: :m
 
     def each_batch(column: :id, of: 10_000)
       min, max = min_max(column)
-      return if min.nil? || max == 0
+      return if min.nil? || max.nil? || max == 0
 
       loop do
         break if min > max
diff --git a/spec/services/click_house/rebuild_materialized_view_service_spec.rb b/spec/services/click_house/rebuild_materialized_view_service_spec.rb
index 895cd965041f69b73827f32abe7eec90e2388917..33a2568902b5b4b1cbfbd075d0b050dbcc382324 100644
--- a/spec/services/click_house/rebuild_materialized_view_service_spec.rb
+++ b/spec/services/click_house/rebuild_materialized_view_service_spec.rb
@@ -10,19 +10,30 @@
   let_it_be(:event3) { create(:closed_issue_event) }
 
   let(:connection) { ClickHouse::Connection.new(:main) }
+  let(:runtime_limiter) { Gitlab::Metrics::RuntimeLimiter.new }
 
-  before do
-    insert_events_into_click_house
-  end
-
-  def invoke_service
-    described_class.new(connection: connection, state: {
+  let(:state) do
+    {
       view_name: 'contributions_mv',
       view_table_name: 'contributions',
       tmp_view_name: 'tmp_contributions_mv',
       tmp_view_table_name: 'tmp_contributions',
       source_table_name: 'events'
-    }).execute
+    }
+  end
+
+  subject(:service_response) { run_service }
+
+  def run_service(new_state = state)
+    described_class.new(
+      connection: connection,
+      runtime_limiter: runtime_limiter,
+      state: new_state
+    ).execute
+  end
+
+  before do
+    insert_events_into_click_house
   end
 
   it 're-creates the materialized view with correct data from the source table' do
@@ -35,14 +46,16 @@ def invoke_service
     expect(ids).to eq([event1.id])
 
     # Rebuild the MV so we get the inconsistency corrected
-    invoke_service
+    expect(service_response).to be_success
+    payload = service_response.payload
+    expect(payload[:status]).to eq(:finished)
 
     ids = connection.select('SELECT id FROM contributions FINAL').pluck('id')
     expect(ids).to match_array([event1.id, event2.id, event3.id])
   end
 
   it 'does not leave temporary tables around' do
-    invoke_service
+    expect(service_response).to be_success
 
     view_query = <<~SQL
       SELECT view_definition FROM information_schema.views
@@ -59,4 +72,59 @@ def invoke_service
     expect(connection.select(view_query)).to be_empty
     expect(connection.select(table_query)).to be_empty
   end
+
+  context 'when the rebuild_mv_drop_old_tables FF is off' do
+    it 'preserves the old tables' do
+      stub_feature_flags(rebuild_mv_drop_old_tables: false)
+      expect(service_response).to be_success
+
+      view_query = <<~SQL
+        SELECT view_definition FROM information_schema.views
+        WHERE table_name = 'tmp_contributions_mv' AND
+        table_schema = '#{connection.database_name}'
+      SQL
+
+      table_query = <<~SQL
+        SELECT table_name FROM information_schema.tables
+        WHERE table_name = 'tmp_contributions' AND
+        table_schema = '#{connection.database_name}'
+      SQL
+
+      expect(connection.select(view_query)).not_to be_empty
+      expect(connection.select(table_query)).not_to be_empty
+    end
+  end
+
+  context 'when the processing is stopped due to over time' do
+    before do
+      stub_const("#{described_class}::INSERT_BATCH_SIZE", 1)
+    end
+
+    it 'returns time_limit status and the cursor' do
+      allow(runtime_limiter).to receive(:over_time?).and_return(true)
+      expect(service_response).to be_success
+
+      payload = service_response.payload
+      expect(payload[:status]).to eq(:over_time)
+      expect(payload[:next_value]).to eq(event1.id + 1)
+    end
+
+    context 'when the service is invoked three times' do
+      it 'finishes the processing' do
+        allow(runtime_limiter).to receive(:over_time?).and_return(true)
+
+        service_response = run_service
+        expect(service_response.payload[:status]).to eq(:over_time)
+
+        service_response = run_service(state.merge(next_value: service_response.payload[:next_value]))
+        expect(service_response.payload[:status]).to eq(:over_time)
+
+        service_response = run_service(state.merge(next_value: service_response.payload[:next_value]))
+        expect(service_response.payload[:status]).to eq(:over_time)
+
+        service_response = run_service(state.merge(next_value: service_response.payload[:next_value]))
+        expect(service_response.payload[:status]).to eq(:finished)
+      end
+    end
+  end
 end
diff --git a/spec/workers/click_house/rebuild_materialized_view_cron_worker_spec.rb b/spec/workers/click_house/rebuild_materialized_view_cron_worker_spec.rb
index 93631e9d4d53eb620a29b6cf0d1dac9721c3c316..1f123e3fc5ffabf2eb09421e655adfe8d7ff9c5a 100644
--- a/spec/workers/click_house/rebuild_materialized_view_cron_worker_spec.rb
+++ b/spec/workers/click_house/rebuild_materialized_view_cron_worker_spec.rb
@@ -2,12 +2,81 @@
 
 require 'spec_helper'
 
-RSpec.describe ClickHouse::RebuildMaterializedViewCronWorker, feature_category: :database do
-  it 'invokes the RebuildMaterializedViewService' do
-    allow_next_instance_of(ClickHouse::RebuildMaterializedViewService) do |instance|
-      allow(instance).to receive(:execute)
+RSpec.describe ClickHouse::RebuildMaterializedViewCronWorker, :clean_gitlab_redis_shared_state, :freeze_time, feature_category: :database do
+  def run_job
+    described_class.new.perform
+  end
+
+  context 'when the previous run was just recently' do
+    before do
+      Gitlab::Redis::SharedState.with do |redis|
+        state = { finished_at: 1.day.ago.to_json }
+        redis.set(described_class.redis_key, Gitlab::Json.dump(state))
+      end
     end
 
-    described_class.new.perform
+    it 'does not invoke the service' do
+      expect(ClickHouse::RebuildMaterializedViewService).not_to receive(:new)
+
+      run_job
+    end
+  end
+
+  context 'when the rebuild_contributions_mv feature flag is disabled' do
+    it 'does not invoke the service' do
+      stub_feature_flags(rebuild_contributions_mv: false)
+
+      expect(ClickHouse::RebuildMaterializedViewService).not_to receive(:new)
+
+      run_job
+    end
+  end
+
+  context 'when the service is finished', :click_house do
+    it 'persists the finished_at timestamp' do
+      run_job
+
+      Gitlab::Redis::SharedState.with do |redis|
+        data = Gitlab::Json.parse(redis.get(described_class.redis_key))
+        expect(DateTime.parse(data['finished_at'])).to eq(Time.current)
+      end
+    end
+  end
+
+  context 'when the service is interrupted' do
+    it 'persists the next value to continue the processing from' do
+      allow_next_instance_of(ClickHouse::RebuildMaterializedViewService) do |instance|
+        allow(instance).to receive(:execute).and_return(ServiceResponse.success(payload: { status: :over_time,
+                                                                                           next_value: 100 }))
+      end
+
+      run_job
+
+      Gitlab::Redis::SharedState.with do |redis|
+        data = Gitlab::Json.parse(redis.get(described_class.redis_key))
+        expect(data['finished_at']).to eq(nil)
+        expect(data['next_value']).to eq(100)
+      end
+    end
+  end
+
+  context 'when the previous run was interrupted' do
+    before do
+      Gitlab::Redis::SharedState.with do |redis|
+        state = { started_at: 1.day.ago.to_json, next_value: 200 }
+        redis.set(described_class.redis_key, Gitlab::Json.dump(state))
+      end
+    end
+
+    it 'continues from the the previously persisted next_value' do
+      service = instance_double('ClickHouse::RebuildMaterializedViewService',
+        execute: ServiceResponse.success(payload: { status: :finished }))
+
+      expect(ClickHouse::RebuildMaterializedViewService).to receive(:new) do |args|
+        expect(args[:state][:next_value]).to eq(200)
+      end.and_return(service)
+
+      run_job
+    end
   end
 end