diff --git a/ee/app/services/search/zoekt/indexing_task_service.rb b/ee/app/services/search/zoekt/indexing_task_service.rb
index 080b9e19dcb4f440abe21a60539b3c5c5c73bc2a..f6080c11e0393157e2f0886a1284ce5a69bf3f85 100644
--- a/ee/app/services/search/zoekt/indexing_task_service.rb
+++ b/ee/app/services/search/zoekt/indexing_task_service.rb
@@ -3,7 +3,11 @@
 module Search
   module Zoekt
     class IndexingTaskService
+      include ::Gitlab::Utils::StrongMemoize
+      include Gitlab::Loggable
+
       REINDEXING_CHANCE_PERCENTAGE = 0.5
+      WATERMARK_RESCHEDULE_INTERVAL = 30.minutes
 
       def self.execute(...)
         new(...).execute
@@ -24,6 +28,25 @@ def execute
 
         current_task_type = random_force_reindexing? ? :force_index_repo : task_type
         Router.fetch_indices_for_indexing(project_id, root_namespace_id: root_namespace_id).find_each do |idx|
+          # Note: we skip indexing tasks depending on storage watermark levels.
+          #
+          # If the low watermark is exceeded, we don't allow any new initial indexing tasks through,
+          # but we permit incremental indexing or force reindexing for existing repos.
+          #
+          # If the high watermark is exceeded, we don't allow any indexing tasks at all anymore.
+          if idx.high_watermark_exceeded? || (idx.low_watermark_exceeded? && initial_indexing?)
+            IndexingTaskWorker.perform_in(WATERMARK_RESCHEDULE_INTERVAL, project_id, task_type, { index_id: idx.id })
+            logger.info(
+              build_structured_payload(
+                indexing_task_type: task_type,
+                message: 'Indexing rescheduled due to storage watermark',
+                index_id: idx.id,
+                index_state: idx.state
+              )
+            )
+            next
+          end
+
           perform_at = Time.current
           perform_at += delay if delay
           ApplicationRecord.transaction do
@@ -34,10 +57,23 @@ def execute
         end
       end
 
+      def initial_indexing?
+        repo = Repository.find_by_project_identifier(project_id)
+
+        return true if repo.nil?
+        return true if repo.ready? && random_force_reindexing?
+
+        repo.pending? || repo.initializing? || repo.failed?
+      end
+
       private
 
       attr_reader :project_id, :project, :node_id, :root_namespace_id, :force, :task_type, :delay
 
+      def logger
+        @logger ||= ::Search::Zoekt::Logger.build
+      end
+
       def preflight_check?
         return true if task_type == :delete_repo
         return false unless project
@@ -48,10 +84,13 @@ def preflight_check?
 
       def random_force_reindexing?
         return true if task_type == :force_index_repo
-        return false unless task_type == :index_repo
-        return false if Feature.disabled?(:zoekt_random_force_reindexing, project, type: :ops)
 
-        rand * 100 <= REINDEXING_CHANCE_PERCENTAGE
+        eligible_for_force_reindexing? && (rand * 100 <= REINDEXING_CHANCE_PERCENTAGE)
+      end
+      strong_memoize_attr :random_force_reindexing?
+
+      def eligible_for_force_reindexing?
+        task_type == :index_repo && Feature.enabled?(:zoekt_random_force_reindexing, project, type: :ops)
       end
     end
   end
diff --git a/ee/spec/services/search/zoekt/indexing_task_service_spec.rb b/ee/spec/services/search/zoekt/indexing_task_service_spec.rb
index a7945873acf350e376af55f1a165455f880d174f..3c8e7ad3ee00119f5087a445ab793b2c5553a02d 100644
--- a/ee/spec/services/search/zoekt/indexing_task_service_spec.rb
+++ b/ee/spec/services/search/zoekt/indexing_task_service_spec.rb
@@ -20,6 +20,120 @@
   end
 
   describe '#execute' do
+    context 'when a watermark is exceeded' do
+      let(:service) { described_class.new(project.id, task_type) }
+      let(:task_type) { :index_repo }
+
+      before do
+        allow(Search::Zoekt::Router).to receive(:fetch_indices_for_indexing)
+          .with(project.id, root_namespace_id: zoekt_enabled_namespace.root_namespace_id)
+          .and_return(zoekt_index)
+
+        allow(zoekt_index).to receive(:find_each).and_yield(zoekt_index)
+      end
+
+      context 'on low watermark' do
+        before do
+          allow(zoekt_index).to receive(:low_watermark_exceeded?).and_return(true)
+        end
+
+        context 'with initial indexing' do
+          it 'does not create Search::Zoekt::Task record for initial indexing' do
+            expect { service.execute }.not_to change { Search::Zoekt::Task.count }
+          end
+
+          it 'reschedules the indexing task worker' do
+            expect(Search::Zoekt::IndexingTaskWorker).to receive(:perform_in).with(
+              30.minutes, project.id, task_type, { index_id: zoekt_index.id }
+            )
+
+            service.execute
+          end
+        end
+
+        context 'with force reindexing' do
+          let(:task_type) { :force_index_repo }
+
+          context 'when a repo does not exist' do
+            it 'does not create Search::Zoekt::Task record for initial indexing' do
+              expect(service.initial_indexing?).to eq(true)
+              expect { service.execute }.not_to change { Search::Zoekt::Task.count }
+            end
+
+            it 'reschedules the indexing task worker' do
+              expect(Search::Zoekt::IndexingTaskWorker).to receive(:perform_in).with(
+                30.minutes, project.id, task_type, { index_id: zoekt_index.id }
+              )
+
+              service.execute
+            end
+          end
+
+          context 'when a repo already exists' do
+            let_it_be(:repo_state) { ::Search::Zoekt::Repository.states.fetch(:pending) }
+            let_it_be(:zoekt_repo) do
+              create(:zoekt_repository, project: project, zoekt_index: zoekt_index, state: repo_state)
+            end
+
+            context 'and is ready' do
+              let_it_be(:repo_state) { ::Search::Zoekt::Repository.states.fetch(:ready) }
+
+              it 'does not create Search::Zoekt::Task record for initial indexing' do
+                expect(service.initial_indexing?).to eq(true)
+                expect { service.execute }.not_to change { Search::Zoekt::Task.count }
+              end
+
+              it 'reschedules the indexing task worker' do
+                expect(Search::Zoekt::IndexingTaskWorker).to receive(:perform_in).with(
+                  30.minutes, project.id, task_type, { index_id: zoekt_index.id }
+                )
+
+                service.execute
+              end
+            end
+
+            context 'and is not ready' do
+              let_it_be(:repo_state) { ::Search::Zoekt::Repository.states.fetch(:orphaned) }
+
+              it 'does not create Search::Zoekt::Task record for initial indexing' do
+                expect(service.initial_indexing?).to eq(true)
+                expect { service.execute }.not_to change { Search::Zoekt::Task.count }
+              end
+
+              it 'reschedules the indexing task worker' do
+                expect(Search::Zoekt::IndexingTaskWorker).to receive(:perform_in).with(
+                  30.minutes, project.id, task_type, { index_id: zoekt_index.id }
+                )
+
+                service.execute
+              end
+            end
+          end
+        end
+
+        context 'with incremental indexing' do
+          before do
+            create(:zoekt_repository, project: project, zoekt_index: zoekt_index, state: :ready)
+          end
+
+          it 'allows incremental indexing' do
+            expect(service.initial_indexing?).to eq(false)
+            expect { service.execute }.to change { Search::Zoekt::Task.count }.by(1)
+          end
+        end
+      end
+
+      context 'on high watermark' do
+        before do
+          allow(zoekt_index).to receive(:high_watermark_exceeded?).and_return(true)
+        end
+
+        it 'does not create Search::Zoekt::Task record' do
+          expect { service.execute }.not_to change { Search::Zoekt::Task.count }
+        end
+      end
+    end
+
     context 'when task_type is delete_repo' do
       let(:service) { described_class.new(project.id, :delete_repo) }