From 9f960c976ffefa38ae025b5531f24886517cfda6 Mon Sep 17 00:00:00 2001
From: Gregory Havenga <11164960-ghavenga@users.noreply.gitlab.com>
Date: Fri, 9 Feb 2024 16:37:04 +0000
Subject: [PATCH] Track traversal sync events, update vulnerability reads
 accordingly

Changelog: added
EE: true
---
 config/sidekiq_queues.yml                     |  2 +
 .../services/ee/projects/transfer_service.rb  |  1 +
 ...pace_ids_of_vulnerability_reads_service.rb |  9 +-
 ee/app/workers/all_queues.yml                 |  9 ++
 .../process_transfer_events_worker.rb         | 38 ++++++++
 ...ate_vuln_reads_traversal_ids_via_event.yml |  9 ++
 ee/lib/ee/gitlab/event_store.rb               | 12 +++
 ee/spec/factories/projects.rb                 |  2 +
 .../projects/transfer_service_spec.rb         | 48 +++++++--
 ...ids_of_vulnerability_reads_service_spec.rb |  8 +-
 .../process_transfer_events_worker_spec.rb    | 97 +++++++++++++++++++
 11 files changed, 220 insertions(+), 15 deletions(-)
 create mode 100644 ee/app/workers/vulnerabilities/process_transfer_events_worker.rb
 create mode 100644 ee/config/feature_flags/gitlab_com_derisk/update_vuln_reads_traversal_ids_via_event.yml
 create mode 100644 ee/spec/workers/vulnerabilities/process_transfer_events_worker_spec.rb

diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml
index a7182e334180c..ddb3830d37bbb 100644
--- a/config/sidekiq_queues.yml
+++ b/config/sidekiq_queues.yml
@@ -753,6 +753,8 @@
   - 1
 - - vulnerabilities_mark_dropped_as_resolved
   - 1
+- - vulnerabilities_process_transfer_events
+  - 1
 - - vulnerabilities_remove_all_vulnerabilities
   - 1
 - - vulnerabilities_statistics_adjustment
diff --git a/ee/app/services/ee/projects/transfer_service.rb b/ee/app/services/ee/projects/transfer_service.rb
index f36677dbf3997..539712e34f289 100644
--- a/ee/app/services/ee/projects/transfer_service.rb
+++ b/ee/app/services/ee/projects/transfer_service.rb
@@ -67,6 +67,7 @@ def delete_compliance_framework_setting
       end
 
       def schedule_vulnerability_reads_update
+        return if ::Feature.enabled?(:update_vuln_reads_traversal_ids_via_event, project, type: :gitlab_com_derisk)
         return unless project.project_setting&.has_vulnerabilities?
 
         Vulnerabilities::UpdateNamespaceIdsOfVulnerabilityReadsWorker.perform_async(project.id)
diff --git a/ee/app/services/vulnerabilities/update_namespace_ids_of_vulnerability_reads_service.rb b/ee/app/services/vulnerabilities/update_namespace_ids_of_vulnerability_reads_service.rb
index 99baee1afbf6c..75c9c8f521738 100644
--- a/ee/app/services/vulnerabilities/update_namespace_ids_of_vulnerability_reads_service.rb
+++ b/ee/app/services/vulnerabilities/update_namespace_ids_of_vulnerability_reads_service.rb
@@ -5,7 +5,6 @@ class UpdateNamespaceIdsOfVulnerabilityReadsService
     include Gitlab::ExclusiveLeaseHelpers
 
     BATCH_SIZE = 100
-    LEASE_KEY = 'update_vulnerability_reads_namespace_id'
     LEASE_TTL = 5.minutes
     LEASE_TRY_AFTER = 3.seconds
 
@@ -20,20 +19,24 @@ def initialize(project_id)
     def execute
       return unless project
 
-      in_lock(LEASE_KEY, ttl: LEASE_TTL, sleep_sec: LEASE_TRY_AFTER) { update_vulnerability_reads }
+      in_lock(lease_key, ttl: LEASE_TTL, sleep_sec: LEASE_TRY_AFTER) { update_vulnerability_reads }
     end
 
     private
 
     attr_reader :project_id
 
+    def lease_key
+      "update_vulnerability_reads_namespace_id_and_traversal_ids:projects:#{project_id}"
+    end
+
     def project
       @project ||= Project.find_by_id(project_id)
     end
 
     def update_vulnerability_reads
       project.vulnerability_reads.each_batch(of: BATCH_SIZE) do |batch|
-        batch.update_all(namespace_id: project.namespace_id)
+        batch.update_all(namespace_id: project.namespace_id, traversal_ids: project.namespace.traversal_ids)
       end
     end
   end
diff --git a/ee/app/workers/all_queues.yml b/ee/app/workers/all_queues.yml
index 18f6fff4d67a5..074872f5a3258 100644
--- a/ee/app/workers/all_queues.yml
+++ b/ee/app/workers/all_queues.yml
@@ -2100,6 +2100,15 @@
   :weight: 1
   :idempotent: true
   :tags: []
+- :name: vulnerabilities_process_transfer_events
+  :worker_name: Vulnerabilities::ProcessTransferEventsWorker
+  :feature_category: :vulnerability_management
+  :has_external_dependencies: false
+  :urgency: :low
+  :resource_boundary: :unknown
+  :weight: 1
+  :idempotent: true
+  :tags: []
 - :name: vulnerabilities_remove_all_vulnerabilities
   :worker_name: Vulnerabilities::RemoveAllVulnerabilitiesWorker
   :feature_category: :vulnerability_management
diff --git a/ee/app/workers/vulnerabilities/process_transfer_events_worker.rb b/ee/app/workers/vulnerabilities/process_transfer_events_worker.rb
new file mode 100644
index 0000000000000..c8d6bb80924ac
--- /dev/null
+++ b/ee/app/workers/vulnerabilities/process_transfer_events_worker.rb
@@ -0,0 +1,38 @@
+# frozen_string_literal: true
+
+module Vulnerabilities
+  class ProcessTransferEventsWorker
+    include Gitlab::EventStore::Subscriber
+
+    idempotent!
+    deduplicate :until_executing, including_scheduled: true
+    data_consistency :always
+
+    feature_category :vulnerability_management
+
+    def handle_event(event)
+      bulk_arguments = ProjectSetting
+        .for_projects(project_ids(event))
+        .has_vulnerabilities
+        .pluck_primary_key
+        .zip
+
+      Vulnerabilities::UpdateNamespaceIdsOfVulnerabilityReadsWorker.perform_bulk(bulk_arguments)
+    end
+
+    private
+
+    def project_ids(event)
+      case event
+      when ::Projects::ProjectTransferedEvent
+        [event.data[:project_id]]
+      when ::Groups::GroupTransferedEvent
+        group = Group.find_by_id(event.data[:group_id])
+
+        return [] unless group
+
+        group.all_project_ids
+      end
+    end
+  end
+end
diff --git a/ee/config/feature_flags/gitlab_com_derisk/update_vuln_reads_traversal_ids_via_event.yml b/ee/config/feature_flags/gitlab_com_derisk/update_vuln_reads_traversal_ids_via_event.yml
new file mode 100644
index 0000000000000..7269964d56a58
--- /dev/null
+++ b/ee/config/feature_flags/gitlab_com_derisk/update_vuln_reads_traversal_ids_via_event.yml
@@ -0,0 +1,9 @@
+---
+name: update_vuln_reads_traversal_ids_via_event
+feature_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/437669
+introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/143537
+rollout_issue_url: https://gitlab.com/gitlab-com/gl-infra/production/-/issues/17555
+milestone: '16.9'
+group: group::threat insights
+type: gitlab_com_derisk
+default_enabled: false
diff --git a/ee/lib/ee/gitlab/event_store.rb b/ee/lib/ee/gitlab/event_store.rb
index 3324ac09c32d2..114f18c654241 100644
--- a/ee/lib/ee/gitlab/event_store.rb
+++ b/ee/lib/ee/gitlab/event_store.rb
@@ -49,6 +49,18 @@ def configure!(store)
             to: ::ProjectAuthorizations::AuthorizationsAddedEvent
           store.subscribe ::Security::RefreshComplianceFrameworkSecurityPoliciesWorker,
             to: ::Projects::ComplianceFrameworkChangedEvent
+          store.subscribe ::Vulnerabilities::ProcessTransferEventsWorker,
+            to: ::Projects::ProjectTransferedEvent,
+            if: ->(event) {
+                  ::Feature.enabled?(:update_vuln_reads_traversal_ids_via_event,
+                    ::Project.find_by_id(event.data['project_id']), type: :gitlab_com_derisk)
+                }
+          store.subscribe ::Vulnerabilities::ProcessTransferEventsWorker,
+            to: ::Groups::GroupTransferedEvent,
+            if: ->(event) {
+                  ::Feature.enabled?(:update_vuln_reads_traversal_ids_via_event,
+                    ::Group.find_by_id(event.data['group_id']), type: :gitlab_com_derisk)
+                }
         end
       end
     end
diff --git a/ee/spec/factories/projects.rb b/ee/spec/factories/projects.rb
index f85412aa038c8..5a3310b537fc9 100644
--- a/ee/spec/factories/projects.rb
+++ b/ee/spec/factories/projects.rb
@@ -30,12 +30,14 @@
     trait :with_vulnerability do
       after(:create) do |project|
         create(:vulnerability, :detected, project: project)
+        project.project_setting.update!(has_vulnerabilities: true)
       end
     end
 
     trait :with_vulnerabilities do
       after(:create) do |project|
         create_list(:vulnerability, 2, :with_finding, :detected, project: project)
+        project.project_setting.update!(has_vulnerabilities: true)
       end
     end
 
diff --git a/ee/spec/services/projects/transfer_service_spec.rb b/ee/spec/services/projects/transfer_service_spec.rb
index d85a3dfa90e0f..9cbc63f01ff2a 100644
--- a/ee/spec/services/projects/transfer_service_spec.rb
+++ b/ee/spec/services/projects/transfer_service_spec.rb
@@ -70,23 +70,51 @@ def operation
       allow(Vulnerabilities::UpdateNamespaceIdsOfVulnerabilityReadsWorker).to receive(:perform_async)
     end
 
-    context 'when the project does not have vulnerabilities' do
-      it 'does not schedule the update job' do
-        subject.execute(group)
+    context 'when update_vuln_reads_on_project_transfer_via_event is disabled' do
+      before do
+        stub_feature_flags(update_vuln_reads_traversal_ids_via_event: false)
+      end
+
+      context 'when the project does not have vulnerabilities' do
+        it 'does not schedule the update job' do
+          subject.execute(group)
+
+          expect(Vulnerabilities::UpdateNamespaceIdsOfVulnerabilityReadsWorker).not_to have_received(:perform_async)
+        end
+      end
+
+      context 'when the project has vulnerabilities' do
+        before do
+          create(:project_setting, project: project, has_vulnerabilities: true)
+        end
+
+        it 'schedules the update job' do
+          subject.execute(group)
 
-        expect(Vulnerabilities::UpdateNamespaceIdsOfVulnerabilityReadsWorker).not_to have_received(:perform_async)
+          expect(Vulnerabilities::UpdateNamespaceIdsOfVulnerabilityReadsWorker).to have_received(:perform_async).with(project.id)
+        end
       end
     end
 
-    context 'when the project has vulnerabilities' do
-      before do
-        create(:project_setting, project: project, has_vulnerabilities: true)
+    context 'when update_vuln_reads_on_project_transfer_via_event is enabled' do
+      context 'when the project does not have vulnerabilities' do
+        it 'does not schedule the update job' do
+          subject.execute(group)
+
+          expect(Vulnerabilities::UpdateNamespaceIdsOfVulnerabilityReadsWorker).not_to have_received(:perform_async)
+        end
       end
 
-      it 'schedules the update job' do
-        subject.execute(group)
+      context 'when the project has vulnerabilities' do
+        before do
+          create(:project_setting, project: project, has_vulnerabilities: true)
+        end
 
-        expect(Vulnerabilities::UpdateNamespaceIdsOfVulnerabilityReadsWorker).to have_received(:perform_async).with(project.id)
+        it 'does not schedule the update job' do
+          subject.execute(group)
+
+          expect(Vulnerabilities::UpdateNamespaceIdsOfVulnerabilityReadsWorker).not_to have_received(:perform_async)
+        end
       end
     end
   end
diff --git a/ee/spec/services/vulnerabilities/update_namespace_ids_of_vulnerability_reads_service_spec.rb b/ee/spec/services/vulnerabilities/update_namespace_ids_of_vulnerability_reads_service_spec.rb
index f2eeb1c5d18df..3b8ca99664ecb 100644
--- a/ee/spec/services/vulnerabilities/update_namespace_ids_of_vulnerability_reads_service_spec.rb
+++ b/ee/spec/services/vulnerabilities/update_namespace_ids_of_vulnerability_reads_service_spec.rb
@@ -35,17 +35,21 @@
 
       before do
         vulnerability_read.update_column(:namespace_id, old_namespace.id)
+        vulnerability_read.update_column(:traversal_ids, old_namespace.traversal_ids)
       end
 
-      it 'changes the `namespace_id` of vulnerability read record' do
+      it 'changes the `namespace_id` and `traversal_ids` of vulnerability read record' do
         expect { update_namespace_ids }
           .to change { vulnerability_read.reload.namespace_id }.from(old_namespace.id).to(project.namespace_id)
+        .and change {
+               vulnerability_read.reload.traversal_ids
+             }.from(old_namespace.traversal_ids).to(project.namespace.traversal_ids)
       end
 
       describe 'parallel execution' do
         include ExclusiveLeaseHelpers
 
-        let(:lease_key) { 'update_vulnerability_reads_namespace_id' }
+        let(:lease_key) { "update_vulnerability_reads_namespace_id_and_traversal_ids:projects:#{project_id}" }
         let(:lease_ttl) { 5.minutes }
 
         before do
diff --git a/ee/spec/workers/vulnerabilities/process_transfer_events_worker_spec.rb b/ee/spec/workers/vulnerabilities/process_transfer_events_worker_spec.rb
new file mode 100644
index 0000000000000..ac73f05735490
--- /dev/null
+++ b/ee/spec/workers/vulnerabilities/process_transfer_events_worker_spec.rb
@@ -0,0 +1,97 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe Vulnerabilities::ProcessTransferEventsWorker, feature_category: :vulnerability_management, type: :job do
+  let_it_be(:old_group) { create(:group) }
+  let_it_be(:group) { create(:group) }
+  let_it_be(:project) { create(:project, :with_vulnerability, group: group) }
+  let_it_be(:other_project) { create(:project, :with_vulnerability, group: group) }
+  let_it_be(:project_without_vulnerabilities) { create(:project, group: group) }
+
+  let(:project_event) do
+    ::Projects::ProjectTransferedEvent.new(data: {
+      project_id: project.id,
+      old_namespace_id: old_group.id,
+      old_root_namespace_id: old_group.id,
+      new_namespace_id: group.id,
+      new_root_namespace_id: group.id
+    })
+  end
+
+  let(:group_event) do
+    ::Groups::GroupTransferedEvent.new(data: {
+      group_id: group.id,
+      old_root_namespace_id: old_group.id,
+      new_root_namespace_id: group.id
+    })
+  end
+
+  it_behaves_like 'worker with data consistency', described_class, data_consistency: :always
+
+  subject(:use_event) { consume_event(subscriber: described_class, event: event) }
+
+  context 'when the associated project has vulnerabilities' do
+    before do
+      project.project_setting.update!(has_vulnerabilities: true)
+    end
+
+    context 'when a project transfered event is published', :sidekiq_inline do
+      let(:event) { project_event }
+
+      context 'when update_vuln_reads_on_project_transfer_via_event is disabled' do
+        before do
+          stub_feature_flags(update_vuln_reads_traversal_ids_via_event: false)
+        end
+
+        it_behaves_like 'ignores the published event'
+      end
+
+      it_behaves_like 'subscribes to event'
+
+      it 'enqueues a vulnerability reads namespace id update job for the project id' do
+        expect(Vulnerabilities::UpdateNamespaceIdsOfVulnerabilityReadsWorker).to receive(:perform_bulk).with(
+          [[project.id]]
+        )
+
+        use_event
+      end
+    end
+
+    context 'when a group transfered event is published', :sidekiq_inline do
+      let(:event) { group_event }
+
+      context 'when update_vuln_reads_on_project_transfer_via_event is disabled' do
+        before do
+          stub_feature_flags(update_vuln_reads_traversal_ids_via_event: false)
+        end
+
+        it_behaves_like 'ignores the published event'
+      end
+
+      it_behaves_like 'subscribes to event'
+
+      it 'enqueues a vulnerability reads namespace id update job for each project id belonging to the namespace id' do
+        expect(Vulnerabilities::UpdateNamespaceIdsOfVulnerabilityReadsWorker).to receive(:perform_bulk).with(
+          match_array([[project.id], [other_project.id]])
+        )
+
+        use_event
+      end
+    end
+  end
+
+  context 'when the associated project does not have vulnerabilities' do
+    let(:project) { project_without_vulnerabilities }
+
+    context 'when a project transfered event is published', :sidekiq_inline do
+      let(:event) { project_event }
+
+      it 'enqueues a vulnerability reads namespace id update job for the project id' do
+        expect(Vulnerabilities::UpdateNamespaceIdsOfVulnerabilityReadsWorker).to receive(:perform_bulk).with([])
+
+        use_event
+      end
+    end
+  end
+end
-- 
GitLab