From 0a76bea2167cf0ba47998ff442ed1f29a98205c6 Mon Sep 17 00:00:00 2001
From: George Koltsov <gkoltsov@gitlab.com>
Date: Tue, 14 Nov 2023 16:18:46 +0000
Subject: [PATCH] Add bulk_import_entity_id to Sidekiq job start/done logs

---
 app/services/bulk_imports/process_service.rb  |  4 +++-
 app/workers/bulk_imports/entity_worker.rb     | 16 ++++++++++------
 .../bulk_imports/export_request_worker.rb     |  4 +++-
 .../finish_batched_pipeline_worker.rb         |  4 +++-
 .../bulk_imports/pipeline_batch_worker.rb     | 19 ++++++++++++++-----
 app/workers/bulk_imports/pipeline_worker.rb   | 18 +++++++++++-------
 doc/api/graphql/reference/index.md            |  1 +
 lib/gitlab/application_context.rb             | 10 ++++++++--
 spec/lib/gitlab/application_context_spec.rb   |  8 ++++++++
 9 files changed, 61 insertions(+), 23 deletions(-)

diff --git a/app/services/bulk_imports/process_service.rb b/app/services/bulk_imports/process_service.rb
index 7a6a883f1a997..2d9a0d6a6c965 100644
--- a/app/services/bulk_imports/process_service.rb
+++ b/app/services/bulk_imports/process_service.rb
@@ -32,7 +32,9 @@ def process_bulk_import
 
         entity.start!
 
-        BulkImports::ExportRequestWorker.perform_async(entity.id)
+        Gitlab::ApplicationContext.with_context(bulk_import_entity_id: entity.id) do
+          BulkImports::ExportRequestWorker.perform_async(entity.id)
+        end
       end
     end
 
diff --git a/app/workers/bulk_imports/entity_worker.rb b/app/workers/bulk_imports/entity_worker.rb
index e510a8c0d06f0..caee292a50497 100644
--- a/app/workers/bulk_imports/entity_worker.rb
+++ b/app/workers/bulk_imports/entity_worker.rb
@@ -49,7 +49,9 @@ def perform_failure(exception, entity_id)
     attr_reader :entity
 
     def re_enqueue
-      BulkImports::EntityWorker.perform_in(PERFORM_DELAY, entity.id)
+      with_context(bulk_import_entity_id: entity.id) do
+        BulkImports::EntityWorker.perform_in(PERFORM_DELAY, entity.id)
+      end
     end
 
     def running_tracker
@@ -66,11 +68,13 @@ def start_next_stage
       next_pipeline_trackers.each_with_index do |pipeline_tracker, index|
         log_info(message: 'Stage starting', entity_stage: pipeline_tracker.stage) if index == 0
 
-        BulkImports::PipelineWorker.perform_async(
-          pipeline_tracker.id,
-          pipeline_tracker.stage,
-          entity.id
-        )
+        with_context(bulk_import_entity_id: entity.id) do
+          BulkImports::PipelineWorker.perform_async(
+            pipeline_tracker.id,
+            pipeline_tracker.stage,
+            entity.id
+          )
+        end
       end
     end
 
diff --git a/app/workers/bulk_imports/export_request_worker.rb b/app/workers/bulk_imports/export_request_worker.rb
index f7456ddccb1fa..54815c05c67d6 100644
--- a/app/workers/bulk_imports/export_request_worker.rb
+++ b/app/workers/bulk_imports/export_request_worker.rb
@@ -20,7 +20,9 @@ def perform(entity_id)
       set_source_xid
       request_export
 
-      BulkImports::EntityWorker.perform_async(entity_id)
+      with_context(bulk_import_entity_id: entity_id) do
+        BulkImports::EntityWorker.perform_async(entity_id)
+      end
     end
 
     def perform_failure(exception, entity_id)
diff --git a/app/workers/bulk_imports/finish_batched_pipeline_worker.rb b/app/workers/bulk_imports/finish_batched_pipeline_worker.rb
index 40d26e14dc1f6..73ae2188ac4d7 100644
--- a/app/workers/bulk_imports/finish_batched_pipeline_worker.rb
+++ b/app/workers/bulk_imports/finish_batched_pipeline_worker.rb
@@ -38,7 +38,9 @@ def perform(pipeline_tracker_id)
     attr_reader :tracker
 
     def re_enqueue
-      self.class.perform_in(REQUEUE_DELAY, tracker.id)
+      with_context(bulk_import_entity_id: tracker.entity.id) do
+        self.class.perform_in(REQUEUE_DELAY, tracker.id)
+      end
     end
 
     def import_in_progress?
diff --git a/app/workers/bulk_imports/pipeline_batch_worker.rb b/app/workers/bulk_imports/pipeline_batch_worker.rb
index 1485275e6162c..957669d4a66d8 100644
--- a/app/workers/bulk_imports/pipeline_batch_worker.rb
+++ b/app/workers/bulk_imports/pipeline_batch_worker.rb
@@ -42,6 +42,7 @@ def perform(batch_id)
       @batch = ::BulkImports::BatchTracker.find(batch_id)
 
       @tracker = @batch.tracker
+      @entity = @tracker.entity
       @pending_retry = false
 
       return unless process_batch?
@@ -50,7 +51,11 @@ def perform(batch_id)
 
       try_obtain_lease { run }
     ensure
-      ::BulkImports::FinishBatchedPipelineWorker.perform_async(tracker.id) unless pending_retry
+      unless pending_retry
+        with_context(bulk_import_entity_id: entity.id) do
+          ::BulkImports::FinishBatchedPipelineWorker.perform_async(tracker.id)
+        end
+      end
     end
 
     def perform_failure(batch_id, exception)
@@ -62,7 +67,7 @@ def perform_failure(batch_id, exception)
 
     private
 
-    attr_reader :batch, :tracker, :pending_retry
+    attr_reader :batch, :tracker, :pending_retry, :entity
 
     def run
       return batch.skip! if tracker.failed? || tracker.finished?
@@ -83,7 +88,7 @@ def fail_batch(exception)
       Gitlab::ErrorTracking.track_exception(exception, log_attributes(message: 'Batch tracker failed'))
 
       BulkImports::Failure.create(
-        bulk_import_entity_id: batch.tracker.entity.id,
+        bulk_import_entity_id: tracker.entity.id,
         pipeline_class: tracker.pipeline_name,
         pipeline_step: 'pipeline_batch_worker_run',
         exception_class: exception.class.to_s,
@@ -91,7 +96,9 @@ def fail_batch(exception)
         correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
       )
 
-      ::BulkImports::FinishBatchedPipelineWorker.perform_async(tracker.id)
+      with_context(bulk_import_entity_id: tracker.entity.id) do
+        ::BulkImports::FinishBatchedPipelineWorker.perform_async(tracker.id)
+      end
     end
 
     def context
@@ -115,7 +122,9 @@ def lease_key
     def re_enqueue(delay = FILE_EXTRACTION_PIPELINE_PERFORM_DELAY)
       log_extra_metadata_on_done(:re_enqueue, true)
 
-      self.class.perform_in(delay, batch.id)
+      with_context(bulk_import_entity_id: entity.id) do
+        self.class.perform_in(delay, batch.id)
+      end
     end
 
     def process_batch?
diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb
index 2c1d28b33c521..4bf6da73fdf65 100644
--- a/app/workers/bulk_imports/pipeline_worker.rb
+++ b/app/workers/bulk_imports/pipeline_worker.rb
@@ -124,12 +124,14 @@ def logger
     def re_enqueue(delay = FILE_EXTRACTION_PIPELINE_PERFORM_DELAY)
       log_extra_metadata_on_done(:re_enqueue, true)
 
-      self.class.perform_in(
-        delay,
-        pipeline_tracker.id,
-        pipeline_tracker.stage,
-        entity.id
-      )
+      with_context(bulk_import_entity_id: entity.id) do
+        self.class.perform_in(
+          delay,
+          pipeline_tracker.id,
+          pipeline_tracker.stage,
+          entity.id
+        )
+      end
     end
 
     def context
@@ -218,7 +220,9 @@ def enqueue_batches
       1.upto(export_status.batches_count) do |batch_number|
         batch = pipeline_tracker.batches.find_or_create_by!(batch_number: batch_number) # rubocop:disable CodeReuse/ActiveRecord
 
-        ::BulkImports::PipelineBatchWorker.perform_async(batch.id)
+        with_context(bulk_import_entity_id: entity.id) do
+          ::BulkImports::PipelineBatchWorker.perform_async(batch.id)
+        end
       end
     end
   end
diff --git a/doc/api/graphql/reference/index.md b/doc/api/graphql/reference/index.md
index 553b1e7b6ab52..dfebf2aad020a 100644
--- a/doc/api/graphql/reference/index.md
+++ b/doc/api/graphql/reference/index.md
@@ -1212,6 +1212,7 @@ Input type: `AdminSidekiqQueuesDeleteJobsInput`
 | <a id="mutationadminsidekiqqueuesdeletejobsartifactusedcdn"></a>`artifactUsedCdn` | [`String`](#string) | Delete jobs matching artifact_used_cdn in the context metadata. |
 | <a id="mutationadminsidekiqqueuesdeletejobsartifactsdependenciescount"></a>`artifactsDependenciesCount` | [`String`](#string) | Delete jobs matching artifacts_dependencies_count in the context metadata. |
 | <a id="mutationadminsidekiqqueuesdeletejobsartifactsdependenciessize"></a>`artifactsDependenciesSize` | [`String`](#string) | Delete jobs matching artifacts_dependencies_size in the context metadata. |
+| <a id="mutationadminsidekiqqueuesdeletejobsbulkimportentityid"></a>`bulkImportEntityId` | [`String`](#string) | Delete jobs matching bulk_import_entity_id in the context metadata. |
 | <a id="mutationadminsidekiqqueuesdeletejobscallerid"></a>`callerId` | [`String`](#string) | Delete jobs matching caller_id in the context metadata. |
 | <a id="mutationadminsidekiqqueuesdeletejobsclientid"></a>`clientId` | [`String`](#string) | Delete jobs matching client_id in the context metadata. |
 | <a id="mutationadminsidekiqqueuesdeletejobsclientmutationid"></a>`clientMutationId` | [`String`](#string) | A unique identifier for the client performing the mutation. |
diff --git a/lib/gitlab/application_context.rb b/lib/gitlab/application_context.rb
index 67fc2ae2fcc0d..e46bbc2cfda92 100644
--- a/lib/gitlab/application_context.rb
+++ b/lib/gitlab/application_context.rb
@@ -26,7 +26,8 @@ class ApplicationContext
       :artifacts_dependencies_size,
       :artifacts_dependencies_count,
       :root_caller_id,
-      :merge_action_status
+      :merge_action_status,
+      :bulk_import_entity_id
     ].freeze
     private_constant :KNOWN_KEYS
 
@@ -45,7 +46,8 @@ class ApplicationContext
       Attribute.new(:artifacts_dependencies_size, Integer),
       Attribute.new(:artifacts_dependencies_count, Integer),
       Attribute.new(:root_caller_id, String),
-      Attribute.new(:merge_action_status, String)
+      Attribute.new(:merge_action_status, String),
+      Attribute.new(:bulk_import_entity_id, Integer)
     ].freeze
     private_constant :APPLICATION_ATTRIBUTES
 
@@ -95,6 +97,7 @@ def initialize(**args)
 
     # rubocop: disable Metrics/CyclomaticComplexity
     # rubocop: disable Metrics/PerceivedComplexity
+    # rubocop: disable Metrics/AbcSize
     def to_lazy_hash
       {}.tap do |hash|
         assign_hash_if_value(hash, :caller_id)
@@ -106,6 +109,7 @@ def to_lazy_hash
         assign_hash_if_value(hash, :artifacts_dependencies_size)
         assign_hash_if_value(hash, :artifacts_dependencies_count)
         assign_hash_if_value(hash, :merge_action_status)
+        assign_hash_if_value(hash, :bulk_import_entity_id)
 
         hash[:user] = -> { username } if include_user?
         hash[:user_id] = -> { user_id } if include_user?
@@ -115,10 +119,12 @@ def to_lazy_hash
         hash[:pipeline_id] = -> { job&.pipeline_id } if set_values.include?(:job)
         hash[:job_id] = -> { job&.id } if set_values.include?(:job)
         hash[:artifact_size] = -> { artifact&.size } if set_values.include?(:artifact)
+        hash[:bulk_import_entity_id] = -> { bulk_import_entity_id } if set_values.include?(:bulk_import_entity_id)
       end
     end
     # rubocop: enable Metrics/CyclomaticComplexity
     # rubocop: enable Metrics/PerceivedComplexity
+    # rubocop: enable Metrics/AbcSize
 
     def use
       Labkit::Context.with_context(to_lazy_hash) { yield }
diff --git a/spec/lib/gitlab/application_context_spec.rb b/spec/lib/gitlab/application_context_spec.rb
index 20c1536b9e6fd..99f932975d079 100644
--- a/spec/lib/gitlab/application_context_spec.rb
+++ b/spec/lib/gitlab/application_context_spec.rb
@@ -210,6 +210,14 @@ def result(context)
         expect(result(context)).to include(job_id: job.id, project: project.full_path, pipeline_id: job.pipeline_id)
       end
     end
+
+    context 'when using bulk import context' do
+      it 'sets expected bulk_import_entity_id value' do
+        context = described_class.new(bulk_import_entity_id: 1)
+
+        expect(result(context)).to include(bulk_import_entity_id: 1)
+      end
+    end
   end
 
   describe '#use' do
-- 
GitLab