From f5b00c6ef59b7fe9782f7ec7b6141c16a7143094 Mon Sep 17 00:00:00 2001
From: Arturo Herrero <arturo.herrero@gmail.com>
Date: Tue, 11 Mar 2025 12:42:38 +0100
Subject: [PATCH] Fix Elasticsearch reindexing count race condition

Ensure original and new index counts are captured at the same time after
indexing has paused. This prevents false positives when comparing document
counts caused by pending indexing operations.

EE: true
---
 .../search/elastic/cluster_reindexing_service.rb     | 11 ++++-------
 .../elastic/cluster_reindexing_service_spec.rb       | 12 ++++++++----
 2 files changed, 12 insertions(+), 11 deletions(-)

diff --git a/ee/app/services/search/elastic/cluster_reindexing_service.rb b/ee/app/services/search/elastic/cluster_reindexing_service.rb
index 496891a10875c..7fd90efaa8527 100644
--- a/ee/app/services/search/elastic/cluster_reindexing_service.rb
+++ b/ee/app/services/search/elastic/cluster_reindexing_service.rb
@@ -133,10 +133,7 @@ def load_alias_info(klass:, name_suffix:)
 
       def launch_subtasks(items_to_reindex)
         items_to_reindex.each do |item|
-          # Record documents count
-          documents_count = elastic_helper.documents_count(index_name: item[:index_name_from], refresh: true)
-          # Create all subtasks
-          subtask = current_task.subtasks.create!(item.merge(documents_count: documents_count))
+          subtask = current_task.subtasks.create!(item)
 
           number_of_shards = elastic_helper.get_settings(index_name: item[:index_name_from])['number_of_shards'].to_i
           max_slice = number_of_shards * current_task.slice_multiplier
@@ -153,10 +150,10 @@ def launch_subtasks(items_to_reindex)
 
       def save_documents_count!(refresh:)
         current_task.subtasks.each do |subtask|
-          elastic_helper.refresh_index(index_name: subtask.index_name_to) if refresh
+          documents_count = elastic_helper.documents_count(index_name: subtask.index_name_from, refresh: refresh)
+          new_documents_count = elastic_helper.documents_count(index_name: subtask.index_name_to, refresh: refresh)
 
-          new_documents_count = elastic_helper.documents_count(index_name: subtask.index_name_to)
-          subtask.update!(documents_count_target: new_documents_count)
+          subtask.update!(documents_count: documents_count, documents_count_target: new_documents_count)
         end
       end
 
diff --git a/ee/spec/services/search/elastic/cluster_reindexing_service_spec.rb b/ee/spec/services/search/elastic/cluster_reindexing_service_spec.rb
index 56a3d5ee053a1..c69f16aa34428 100644
--- a/ee/spec/services/search/elastic/cluster_reindexing_service_spec.rb
+++ b/ee/spec/services/search/elastic/cluster_reindexing_service_spec.rb
@@ -218,15 +218,17 @@
           'response' => { 'total' => 20, 'created' => 20, 'updated' => 0, 'deleted' => 0 }
         }
       )
-      allow(helper).to receive(:refresh_index).and_return(true)
       allow(helper).to receive(:reindex).and_return('task_1', 'task_2', 'task_3', 'task_4', 'task_5', 'task_6')
     end
 
     context 'when errors are raised' do
       context 'when documents count does not match' do
         before do
-          allow(helper).to receive(:documents_count)
-            .with(index_name: anything).and_return(subtask.reload.documents_count * 2)
+          allow(helper).to receive(:documents_count).with(index_name: subtask.index_name_from, refresh: anything)
+            .and_return(subtask.reload.documents_count)
+          allow(helper).to receive(:documents_count).with(index_name: subtask.index_name_to, refresh: anything)
+            .and_return(subtask.reload.documents_count * 2)
+          allow(helper).to receive(:get_settings).with(index_name: subtask.index_name_from)
         end
 
         it 'changes task state to failure' do
@@ -398,7 +400,9 @@
 
       with_them do
         before do
-          allow(helper).to receive(:documents_count).with(index_name: subtask.index_name_to)
+          allow(helper).to receive(:documents_count).with(index_name: subtask.index_name_from, refresh: anything)
+            .and_return(subtask.reload.documents_count)
+          allow(helper).to receive(:documents_count).with(index_name: subtask.index_name_to, refresh: anything)
             .and_return(subtask.reload.documents_count)
           allow(helper).to receive(:get_settings).with(index_name: subtask.index_name_from)
             .and_return(current_settings.with_indifferent_access)
-- 
GitLab