diff --git a/app/models/ci/resource_group.rb b/app/models/ci/resource_group.rb
index 48f321a236d339f4e4525988bb49475427ea8010..8d4d6e028e35aafe59d931c257ae58dee01d0433 100644
--- a/app/models/ci/resource_group.rb
+++ b/app/models/ci/resource_group.rb
@@ -58,6 +58,10 @@ def upcoming_processables
       end
     end
 
+    def waiting_processables
+      processables.waiting_for_resource
+    end
+
     def current_processable
       Ci::Processable.find_by('(id, partition_id) IN (?)', resources.select('build_id, partition_id'))
     end
diff --git a/app/services/ci/resource_groups/assign_resource_from_resource_group_service.rb b/app/services/ci/resource_groups/assign_resource_from_resource_group_service.rb
index d7078200c145573ab3db479022716dc43f8dead7..3b53087c49c049c7625fca52c116086877453726 100644
--- a/app/services/ci/resource_groups/assign_resource_from_resource_group_service.rb
+++ b/app/services/ci/resource_groups/assign_resource_from_resource_group_service.rb
@@ -3,12 +3,37 @@
 module Ci
   module ResourceGroups
     class AssignResourceFromResourceGroupService < ::BaseService
-      # rubocop: disable CodeReuse/ActiveRecord
+      RESPAWN_WAIT_TIME = 1.minute
+
       def execute(resource_group)
         release_resource_from_stale_jobs(resource_group)
 
         free_resources = resource_group.resources.free.count
 
+        if free_resources == 0
+          if resource_group.waiting_processables.any?
+            # if the resource group is still 'tied up' in other processables,
+            #   and there are more upcoming processables
+            # kick off the worker again for the current resource group
+            respawn_assign_resource_worker(resource_group)
+          end
+
+          return
+        end
+
+        enqueue_upcoming_processables(free_resources, resource_group)
+      end
+
+      private
+
+      def respawn_assign_resource_worker(resource_group)
+        return if Feature.disabled?(:respawn_assign_resource_worker, project, type: :gitlab_com_derisk)
+
+        Ci::ResourceGroups::AssignResourceFromResourceGroupWorker.perform_in(RESPAWN_WAIT_TIME, resource_group.id)
+      end
+
+      # rubocop: disable CodeReuse/ActiveRecord
+      def enqueue_upcoming_processables(free_resources, resource_group)
         resource_group.upcoming_processables.take(free_resources).each do |upcoming|
           Gitlab::OptimisticLocking.retry_lock(upcoming, name: 'enqueue_waiting_for_resource') do |processable|
             processable.enqueue_waiting_for_resource
@@ -17,8 +42,6 @@ def execute(resource_group)
       end
       # rubocop: enable CodeReuse/ActiveRecord
 
-      private
-
       def release_resource_from_stale_jobs(resource_group)
         resource_group.resources.stale_processables.find_each do |processable|
           resource_group.release_resource_from(processable)
diff --git a/config/feature_flags/gitlab_com_derisk/respawn_assign_resource_worker.yml b/config/feature_flags/gitlab_com_derisk/respawn_assign_resource_worker.yml
new file mode 100644
index 0000000000000000000000000000000000000000..63a0e4e238b517c470edb7b4047baa76779daf7c
--- /dev/null
+++ b/config/feature_flags/gitlab_com_derisk/respawn_assign_resource_worker.yml
@@ -0,0 +1,9 @@
+---
+name: respawn_assign_resource_worker
+feature_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/436988
+introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/147313
+rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/450793
+milestone: '16.11'
+group: group::environments
+type: gitlab_com_derisk
+default_enabled: false
diff --git a/spec/features/projects/pipelines/pipeline_spec.rb b/spec/features/projects/pipelines/pipeline_spec.rb
index ec7777e958f845ce035230da2635cd321e08bd8f..5eee5a008404d1b5897087d675bfb498363fe43a 100644
--- a/spec/features/projects/pipelines/pipeline_spec.rb
+++ b/spec/features/projects/pipelines/pipeline_spec.rb
@@ -904,6 +904,12 @@
   end
 
   context 'when build requires resource', :sidekiq_inline do
+    before do
+      allow_next_instance_of(Ci::ResourceGroups::AssignResourceFromResourceGroupService) do |resource_service|
+        allow(resource_service).to receive(:respawn_assign_resource_worker)
+      end
+    end
+
     let_it_be(:project) { create(:project, :repository) }
 
     let(:pipeline) { create(:ci_pipeline, project: project) }
diff --git a/spec/models/ci/resource_group_spec.rb b/spec/models/ci/resource_group_spec.rb
index e2aaeb2a18e2c473e4f3f5217150d7d9b0a93378..ad55e48868830ce292e0ce419065aa529a951423 100644
--- a/spec/models/ci/resource_group_spec.rb
+++ b/spec/models/ci/resource_group_spec.rb
@@ -109,9 +109,7 @@
     end
   end
 
-  describe '#upcoming_processables' do
-    subject { resource_group.upcoming_processables }
-
+  describe 'processables scope' do
     let_it_be(:project) { create(:project, :repository, group: group) }
     let_it_be(:pipeline_1) { create(:ci_pipeline, project: project) }
     let_it_be(:pipeline_2) { create(:ci_pipeline, project: project) }
@@ -123,45 +121,63 @@
       let!("build_2_#{status}") { create(:ci_build, pipeline: pipeline_2, status: status, resource_group: resource_group) }
     end
 
-    context 'when process mode is unordered' do
-      let(:process_mode) { :unordered }
+    describe '#upcoming_processables' do
+      subject { resource_group.upcoming_processables }
+
+      context 'when process mode is unordered' do
+        let(:process_mode) { :unordered }
 
-      it 'returns correct jobs in an indeterministic order' do
-        expect(subject).to contain_exactly(build_1_waiting_for_resource, build_2_waiting_for_resource)
+        it 'returns correct jobs in an indeterministic order' do
+          expect(subject).to contain_exactly(build_1_waiting_for_resource, build_2_waiting_for_resource)
+        end
       end
-    end
 
-    context 'when process mode is oldest_first' do
-      let(:process_mode) { :oldest_first }
+      context 'when process mode is oldest_first' do
+        let(:process_mode) { :oldest_first }
 
-      it 'returns correct jobs in a specific order' do
-        expect(subject[0]).to eq(build_1_waiting_for_resource)
-        expect(subject[1..2]).to contain_exactly(build_1_created, build_1_scheduled)
-        expect(subject[3]).to eq(build_2_waiting_for_resource)
-        expect(subject[4..5]).to contain_exactly(build_2_created, build_2_scheduled)
+        it 'returns correct jobs in a specific order' do
+          expect(subject[0]).to eq(build_1_waiting_for_resource)
+          expect(subject[1..2]).to contain_exactly(build_1_created, build_1_scheduled)
+          expect(subject[3]).to eq(build_2_waiting_for_resource)
+          expect(subject[4..5]).to contain_exactly(build_2_created, build_2_scheduled)
+        end
       end
-    end
 
-    context 'when process mode is newest_first' do
-      let(:process_mode) { :newest_first }
+      context 'when process mode is newest_first' do
+        let(:process_mode) { :newest_first }
 
-      it 'returns correct jobs in a specific order' do
-        expect(subject[0]).to eq(build_2_waiting_for_resource)
-        expect(subject[1..2]).to contain_exactly(build_2_created, build_2_scheduled)
-        expect(subject[3]).to eq(build_1_waiting_for_resource)
-        expect(subject[4..5]).to contain_exactly(build_1_created, build_1_scheduled)
+        it 'returns correct jobs in a specific order' do
+          expect(subject[0]).to eq(build_2_waiting_for_resource)
+          expect(subject[1..2]).to contain_exactly(build_2_created, build_2_scheduled)
+          expect(subject[3]).to eq(build_1_waiting_for_resource)
+          expect(subject[4..5]).to contain_exactly(build_1_created, build_1_scheduled)
+        end
       end
-    end
 
-    context 'when process mode is unknown' do
-      let(:process_mode) { :unordered }
+      context 'when process mode is unknown' do
+        let(:process_mode) { :unordered }
 
-      before do
-        resource_group.update_column(:process_mode, 3)
+        before do
+          resource_group.update_column(:process_mode, 3)
+        end
+
+        it 'returns empty' do
+          is_expected.to be_empty
+        end
       end
+    end
+
+    describe '#waiting_processables' do
+      subject { resource_group.waiting_processables }
+
+      where(:mode) { [:unordered, :oldest_first, :newest_first] }
+
+      with_them do
+        let(:process_mode) { mode }
 
-      it 'returns empty' do
-        is_expected.to be_empty
+        it 'returns waiting_for_resource jobs in an indeterministic order' do
+          expect(subject).to contain_exactly(build_1_waiting_for_resource, build_2_waiting_for_resource)
+        end
       end
     end
   end
diff --git a/spec/services/ci/create_pipeline_service/cross_project_pipeline_spec.rb b/spec/services/ci/create_pipeline_service/cross_project_pipeline_spec.rb
index 07bc3aa28cf8fd280aa1623e3843385f44173917..7e66a851fab5453790d4ff5a914bb96df9921e39 100644
--- a/spec/services/ci/create_pipeline_service/cross_project_pipeline_spec.rb
+++ b/spec/services/ci/create_pipeline_service/cross_project_pipeline_spec.rb
@@ -71,6 +71,12 @@
     end
 
     context 'when sidekiq processes the job', :sidekiq_inline do
+      before do
+        allow_next_instance_of(Ci::ResourceGroups::AssignResourceFromResourceGroupService) do |resource_service|
+          allow(resource_service).to receive(:respawn_assign_resource_worker)
+        end
+      end
+
       it 'transitions to pending status and triggers a downstream pipeline' do
         pipeline = create_pipeline!
 
diff --git a/spec/services/ci/create_pipeline_service/parent_child_pipeline_spec.rb b/spec/services/ci/create_pipeline_service/parent_child_pipeline_spec.rb
index 65180ac055f5b07417658d787589b05e5b147f37..4cc738539f328912b7c98097b1bf1e5e61b16d82 100644
--- a/spec/services/ci/create_pipeline_service/parent_child_pipeline_spec.rb
+++ b/spec/services/ci/create_pipeline_service/parent_child_pipeline_spec.rb
@@ -131,6 +131,12 @@
       end
 
       context 'when sidekiq processes the job', :sidekiq_inline do
+        before do
+          allow_next_instance_of(Ci::ResourceGroups::AssignResourceFromResourceGroupService) do |resource_service|
+            allow(resource_service).to receive(:respawn_assign_resource_worker)
+          end
+        end
+
         it 'transitions to pending status and triggers a downstream pipeline' do
           pipeline = create_pipeline!
 
diff --git a/spec/services/ci/resource_groups/assign_resource_from_resource_group_service_spec.rb b/spec/services/ci/resource_groups/assign_resource_from_resource_group_service_spec.rb
index ea15e3ea2c0c932ca36da710fb1403d05ccda181..85c011a07f3f6d583c7e5bbf5f1b3c4b3c740e07 100644
--- a/spec/services/ci/resource_groups/assign_resource_from_resource_group_service_spec.rb
+++ b/spec/services/ci/resource_groups/assign_resource_from_resource_group_service_spec.rb
@@ -10,6 +10,10 @@
 
   let(:service) { described_class.new(project, user) }
 
+  before do
+    allow(Ci::ResourceGroups::AssignResourceFromResourceGroupWorker).to receive(:perform_in)
+  end
+
   describe '#execute' do
     subject { service.execute(resource_group) }
 
@@ -166,6 +170,50 @@
         expect(build.reload).to be_waiting_for_resource
       end
 
+      it 're-spawns the worker for assigning a resource' do
+        expect(Ci::ResourceGroups::AssignResourceFromResourceGroupWorker).to receive(:perform_in).with(1.minute, resource_group.id)
+
+        subject
+      end
+
+      context 'when there are no upcoming processables' do
+        before do
+          build.update!(status: :success)
+        end
+
+        it 'does not re-spawn the worker for assigning a resource' do
+          expect(Ci::ResourceGroups::AssignResourceFromResourceGroupWorker).not_to receive(:perform_in)
+
+          subject
+        end
+      end
+
+      context 'when there are no waiting processables and process_mode is ordered' do
+        let(:resource_group) { create(:ci_resource_group, process_mode: :oldest_first, project: project) }
+
+        before do
+          build.update!(status: :created)
+        end
+
+        it 'does not re-spawn the worker for assigning a resource' do
+          expect(Ci::ResourceGroups::AssignResourceFromResourceGroupWorker).not_to receive(:perform_in)
+
+          subject
+        end
+      end
+
+      context 'when :respawn_assign_resource_worker FF is disabled' do
+        before do
+          stub_feature_flags(respawn_assign_resource_worker: false)
+        end
+
+        it 'does not re-spawn the worker for assigning a resource' do
+          expect(Ci::ResourceGroups::AssignResourceFromResourceGroupWorker).not_to receive(:perform_in)
+
+          subject
+        end
+      end
+
       context 'when there is a stale build assigned to a resource' do
         before do
           other_build.doom!