From 8a259407c6d341b98872cad48e5c01a5a145d354 Mon Sep 17 00:00:00 2001
From: Pam Artiaga <partiaga@gitlab.com>
Date: Mon, 8 Apr 2024 16:38:25 +0000
Subject: [PATCH] Re-spawn the AssignResource worker if busy

This is introduced behind a feature flag: `respawn_assign_resource_worker`
---
 app/models/ci/resource_group.rb               |  4 +
 ...gn_resource_from_resource_group_service.rb | 29 ++++++-
 .../respawn_assign_resource_worker.yml        |  9 +++
 .../projects/pipelines/pipeline_spec.rb       |  6 ++
 spec/models/ci/resource_group_spec.rb         | 76 +++++++++++--------
 .../cross_project_pipeline_spec.rb            |  6 ++
 .../parent_child_pipeline_spec.rb             |  6 ++
 ...source_from_resource_group_service_spec.rb | 48 ++++++++++++
 8 files changed, 151 insertions(+), 33 deletions(-)
 create mode 100644 config/feature_flags/gitlab_com_derisk/respawn_assign_resource_worker.yml

diff --git a/app/models/ci/resource_group.rb b/app/models/ci/resource_group.rb
index 48f321a236d33..8d4d6e028e35a 100644
--- a/app/models/ci/resource_group.rb
+++ b/app/models/ci/resource_group.rb
@@ -58,6 +58,10 @@ def upcoming_processables
       end
     end
 
+    def waiting_processables
+      processables.waiting_for_resource
+    end
+
     def current_processable
       Ci::Processable.find_by('(id, partition_id) IN (?)', resources.select('build_id, partition_id'))
     end
diff --git a/app/services/ci/resource_groups/assign_resource_from_resource_group_service.rb b/app/services/ci/resource_groups/assign_resource_from_resource_group_service.rb
index d7078200c1455..3b53087c49c04 100644
--- a/app/services/ci/resource_groups/assign_resource_from_resource_group_service.rb
+++ b/app/services/ci/resource_groups/assign_resource_from_resource_group_service.rb
@@ -3,12 +3,37 @@
 module Ci
   module ResourceGroups
     class AssignResourceFromResourceGroupService < ::BaseService
-      # rubocop: disable CodeReuse/ActiveRecord
+      RESPAWN_WAIT_TIME = 1.minute
+
       def execute(resource_group)
         release_resource_from_stale_jobs(resource_group)
 
         free_resources = resource_group.resources.free.count
 
+        if free_resources == 0
+          if resource_group.waiting_processables.any?
+            # if the resource group is still 'tied up' in other processables,
+            #   and there are more upcoming processables
+            # kick off the worker again for the current resource group
+            respawn_assign_resource_worker(resource_group)
+          end
+
+          return
+        end
+
+        enqueue_upcoming_processables(free_resources, resource_group)
+      end
+
+      private
+
+      def respawn_assign_resource_worker(resource_group)
+        return if Feature.disabled?(:respawn_assign_resource_worker, project, type: :gitlab_com_derisk)
+
+        Ci::ResourceGroups::AssignResourceFromResourceGroupWorker.perform_in(RESPAWN_WAIT_TIME, resource_group.id)
+      end
+
+      # rubocop: disable CodeReuse/ActiveRecord
+      def enqueue_upcoming_processables(free_resources, resource_group)
         resource_group.upcoming_processables.take(free_resources).each do |upcoming|
           Gitlab::OptimisticLocking.retry_lock(upcoming, name: 'enqueue_waiting_for_resource') do |processable|
             processable.enqueue_waiting_for_resource
@@ -17,8 +42,6 @@ def execute(resource_group)
       end
       # rubocop: enable CodeReuse/ActiveRecord
 
-      private
-
       def release_resource_from_stale_jobs(resource_group)
         resource_group.resources.stale_processables.find_each do |processable|
           resource_group.release_resource_from(processable)
diff --git a/config/feature_flags/gitlab_com_derisk/respawn_assign_resource_worker.yml b/config/feature_flags/gitlab_com_derisk/respawn_assign_resource_worker.yml
new file mode 100644
index 0000000000000..63a0e4e238b51
--- /dev/null
+++ b/config/feature_flags/gitlab_com_derisk/respawn_assign_resource_worker.yml
@@ -0,0 +1,9 @@
+---
+name: respawn_assign_resource_worker
+feature_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/436988
+introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/147313
+rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/450793
+milestone: '16.11'
+group: group::environments
+type: gitlab_com_derisk
+default_enabled: false
diff --git a/spec/features/projects/pipelines/pipeline_spec.rb b/spec/features/projects/pipelines/pipeline_spec.rb
index ec7777e958f84..5eee5a008404d 100644
--- a/spec/features/projects/pipelines/pipeline_spec.rb
+++ b/spec/features/projects/pipelines/pipeline_spec.rb
@@ -904,6 +904,12 @@
   end
 
   context 'when build requires resource', :sidekiq_inline do
+    before do
+      allow_next_instance_of(Ci::ResourceGroups::AssignResourceFromResourceGroupService) do |resource_service|
+        allow(resource_service).to receive(:respawn_assign_resource_worker)
+      end
+    end
+
     let_it_be(:project) { create(:project, :repository) }
 
     let(:pipeline) { create(:ci_pipeline, project: project) }
diff --git a/spec/models/ci/resource_group_spec.rb b/spec/models/ci/resource_group_spec.rb
index e2aaeb2a18e2c..ad55e48868830 100644
--- a/spec/models/ci/resource_group_spec.rb
+++ b/spec/models/ci/resource_group_spec.rb
@@ -109,9 +109,7 @@
     end
   end
 
-  describe '#upcoming_processables' do
-    subject { resource_group.upcoming_processables }
-
+  describe 'processables scope' do
     let_it_be(:project) { create(:project, :repository, group: group) }
     let_it_be(:pipeline_1) { create(:ci_pipeline, project: project) }
     let_it_be(:pipeline_2) { create(:ci_pipeline, project: project) }
@@ -123,45 +121,63 @@
       let!("build_2_#{status}") { create(:ci_build, pipeline: pipeline_2, status: status, resource_group: resource_group) }
     end
 
-    context 'when process mode is unordered' do
-      let(:process_mode) { :unordered }
+    describe '#upcoming_processables' do
+      subject { resource_group.upcoming_processables }
+
+      context 'when process mode is unordered' do
+        let(:process_mode) { :unordered }
 
-      it 'returns correct jobs in an indeterministic order' do
-        expect(subject).to contain_exactly(build_1_waiting_for_resource, build_2_waiting_for_resource)
+        it 'returns correct jobs in an indeterministic order' do
+          expect(subject).to contain_exactly(build_1_waiting_for_resource, build_2_waiting_for_resource)
+        end
       end
-    end
 
-    context 'when process mode is oldest_first' do
-      let(:process_mode) { :oldest_first }
+      context 'when process mode is oldest_first' do
+        let(:process_mode) { :oldest_first }
 
-      it 'returns correct jobs in a specific order' do
-        expect(subject[0]).to eq(build_1_waiting_for_resource)
-        expect(subject[1..2]).to contain_exactly(build_1_created, build_1_scheduled)
-        expect(subject[3]).to eq(build_2_waiting_for_resource)
-        expect(subject[4..5]).to contain_exactly(build_2_created, build_2_scheduled)
+        it 'returns correct jobs in a specific order' do
+          expect(subject[0]).to eq(build_1_waiting_for_resource)
+          expect(subject[1..2]).to contain_exactly(build_1_created, build_1_scheduled)
+          expect(subject[3]).to eq(build_2_waiting_for_resource)
+          expect(subject[4..5]).to contain_exactly(build_2_created, build_2_scheduled)
+        end
       end
-    end
 
-    context 'when process mode is newest_first' do
-      let(:process_mode) { :newest_first }
+      context 'when process mode is newest_first' do
+        let(:process_mode) { :newest_first }
 
-      it 'returns correct jobs in a specific order' do
-        expect(subject[0]).to eq(build_2_waiting_for_resource)
-        expect(subject[1..2]).to contain_exactly(build_2_created, build_2_scheduled)
-        expect(subject[3]).to eq(build_1_waiting_for_resource)
-        expect(subject[4..5]).to contain_exactly(build_1_created, build_1_scheduled)
+        it 'returns correct jobs in a specific order' do
+          expect(subject[0]).to eq(build_2_waiting_for_resource)
+          expect(subject[1..2]).to contain_exactly(build_2_created, build_2_scheduled)
+          expect(subject[3]).to eq(build_1_waiting_for_resource)
+          expect(subject[4..5]).to contain_exactly(build_1_created, build_1_scheduled)
+        end
       end
-    end
 
-    context 'when process mode is unknown' do
-      let(:process_mode) { :unordered }
+      context 'when process mode is unknown' do
+        let(:process_mode) { :unordered }
 
-      before do
-        resource_group.update_column(:process_mode, 3)
+        before do
+          resource_group.update_column(:process_mode, 3)
+        end
+
+        it 'returns empty' do
+          is_expected.to be_empty
+        end
       end
+    end
+
+    describe '#waiting_processables' do
+      subject { resource_group.waiting_processables }
+
+      where(:mode) { [:unordered, :oldest_first, :newest_first] }
+
+      with_them do
+        let(:process_mode) { mode }
 
-      it 'returns empty' do
-        is_expected.to be_empty
+        it 'returns waiting_for_resource jobs in an indeterministic order' do
+          expect(subject).to contain_exactly(build_1_waiting_for_resource, build_2_waiting_for_resource)
+        end
       end
     end
   end
diff --git a/spec/services/ci/create_pipeline_service/cross_project_pipeline_spec.rb b/spec/services/ci/create_pipeline_service/cross_project_pipeline_spec.rb
index 07bc3aa28cf8f..7e66a851fab54 100644
--- a/spec/services/ci/create_pipeline_service/cross_project_pipeline_spec.rb
+++ b/spec/services/ci/create_pipeline_service/cross_project_pipeline_spec.rb
@@ -71,6 +71,12 @@
     end
 
     context 'when sidekiq processes the job', :sidekiq_inline do
+      before do
+        allow_next_instance_of(Ci::ResourceGroups::AssignResourceFromResourceGroupService) do |resource_service|
+          allow(resource_service).to receive(:respawn_assign_resource_worker)
+        end
+      end
+
       it 'transitions to pending status and triggers a downstream pipeline' do
         pipeline = create_pipeline!
 
diff --git a/spec/services/ci/create_pipeline_service/parent_child_pipeline_spec.rb b/spec/services/ci/create_pipeline_service/parent_child_pipeline_spec.rb
index 65180ac055f5b..4cc738539f328 100644
--- a/spec/services/ci/create_pipeline_service/parent_child_pipeline_spec.rb
+++ b/spec/services/ci/create_pipeline_service/parent_child_pipeline_spec.rb
@@ -131,6 +131,12 @@
       end
 
       context 'when sidekiq processes the job', :sidekiq_inline do
+        before do
+          allow_next_instance_of(Ci::ResourceGroups::AssignResourceFromResourceGroupService) do |resource_service|
+            allow(resource_service).to receive(:respawn_assign_resource_worker)
+          end
+        end
+
         it 'transitions to pending status and triggers a downstream pipeline' do
           pipeline = create_pipeline!
 
diff --git a/spec/services/ci/resource_groups/assign_resource_from_resource_group_service_spec.rb b/spec/services/ci/resource_groups/assign_resource_from_resource_group_service_spec.rb
index ea15e3ea2c0c9..85c011a07f3f6 100644
--- a/spec/services/ci/resource_groups/assign_resource_from_resource_group_service_spec.rb
+++ b/spec/services/ci/resource_groups/assign_resource_from_resource_group_service_spec.rb
@@ -10,6 +10,10 @@
 
   let(:service) { described_class.new(project, user) }
 
+  before do
+    allow(Ci::ResourceGroups::AssignResourceFromResourceGroupWorker).to receive(:perform_in)
+  end
+
   describe '#execute' do
     subject { service.execute(resource_group) }
 
@@ -166,6 +170,50 @@
         expect(build.reload).to be_waiting_for_resource
       end
 
+      it 're-spawns the worker for assigning a resource' do
+        expect(Ci::ResourceGroups::AssignResourceFromResourceGroupWorker).to receive(:perform_in).with(1.minute, resource_group.id)
+
+        subject
+      end
+
+      context 'when there are no upcoming processables' do
+        before do
+          build.update!(status: :success)
+        end
+
+        it 'does not re-spawn the worker for assigning a resource' do
+          expect(Ci::ResourceGroups::AssignResourceFromResourceGroupWorker).not_to receive(:perform_in)
+
+          subject
+        end
+      end
+
+      context 'when there are no waiting processables and process_mode is ordered' do
+        let(:resource_group) { create(:ci_resource_group, process_mode: :oldest_first, project: project) }
+
+        before do
+          build.update!(status: :created)
+        end
+
+        it 'does not re-spawn the worker for assigning a resource' do
+          expect(Ci::ResourceGroups::AssignResourceFromResourceGroupWorker).not_to receive(:perform_in)
+
+          subject
+        end
+      end
+
+      context 'when :respawn_assign_resource_worker FF is disabled' do
+        before do
+          stub_feature_flags(respawn_assign_resource_worker: false)
+        end
+
+        it 'does not re-spawn the worker for assigning a resource' do
+          expect(Ci::ResourceGroups::AssignResourceFromResourceGroupWorker).not_to receive(:perform_in)
+
+          subject
+        end
+      end
+
       context 'when there is a stale build assigned to a resource' do
         before do
           other_build.doom!
-- 
GitLab