diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml
index 2233e6ba322814b5306b566299b758271395b716..c30735df170cc4a84b0d09877e381f6afd05a751 100644
--- a/config/sidekiq_queues.yml
+++ b/config/sidekiq_queues.yml
@@ -717,6 +717,8 @@
   - 1
 - - security_refresh_project_policies
   - 1
+- - security_scan_execution_policies_create_pipeline
+  - 1
 - - security_scan_execution_policies_rule_schedule
   - 1
 - - security_scan_result_policies_add_approvers_to_rules
diff --git a/ee/app/services/security/security_orchestration_policies/rule_schedule_service.rb b/ee/app/services/security/security_orchestration_policies/rule_schedule_service.rb
index 0953daed126cea107cbbae560deb3bfed57879a1..0708a4df956741af436b7bdae3603145236a7458 100644
--- a/ee/app/services/security/security_orchestration_policies/rule_schedule_service.rb
+++ b/ee/app/services/security/security_orchestration_policies/rule_schedule_service.rb
@@ -14,7 +14,15 @@ def execute(schedule)
 
         branches = branches_for(rule)
         actions = actions_for(schedule)
-        schedule_errors = schedule_scan(actions, branches).select { |service_result| service_result[:status] == :error }
+
+        if Feature.enabled?(:scan_execution_pipeline_worker, project)
+          schedule_scans_using_a_worker(branches, schedule) unless actions.blank?
+          schedule_errors = []
+        else
+          schedule_errors = schedule_scan(actions, branches).select do |service_result|
+            service_result[:status] == :error
+          end
+        end
 
         return ServiceResponse.success if schedule_errors.blank?
 
@@ -62,6 +70,15 @@ def schedule_scan(actions, branches)
             .execute
         end
       end
+
+      def schedule_scans_using_a_worker(branches, schedule)
+        branches.map do |branch|
+          ::Security::ScanExecutionPolicies::CreatePipelineWorker.perform_async(project.id,
+            current_user.id,
+            schedule.id,
+            branch)
+        end
+      end
     end
   end
 end
diff --git a/ee/app/workers/all_queues.yml b/ee/app/workers/all_queues.yml
index 58cb2426768587f0fa42559c897c945ab815864c..a1d298febfe945871e602b8cfbafbd11f45596ed 100644
--- a/ee/app/workers/all_queues.yml
+++ b/ee/app/workers/all_queues.yml
@@ -2082,6 +2082,15 @@
   :weight: 1
   :idempotent: true
   :tags: []
+- :name: security_scan_execution_policies_create_pipeline
+  :worker_name: Security::ScanExecutionPolicies::CreatePipelineWorker
+  :feature_category: :security_policy_management
+  :has_external_dependencies: false
+  :urgency: :throttled
+  :resource_boundary: :unknown
+  :weight: 1
+  :idempotent: false
+  :tags: []
 - :name: security_scan_execution_policies_rule_schedule
   :worker_name: Security::ScanExecutionPolicies::RuleScheduleWorker
   :feature_category: :security_policy_management
diff --git a/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb b/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb
new file mode 100644
index 0000000000000000000000000000000000000000..92df0eae842308ad4d6ad205db81d589d6c9905e
--- /dev/null
+++ b/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb
@@ -0,0 +1,56 @@
+# frozen_string_literal: true
+
+module Security
+  module ScanExecutionPolicies
+    class CreatePipelineWorker # rubocop:disable Scalability/IdempotentWorker -- The worker should not run multiple times to avoid creating multiple pipelines
+      include ApplicationWorker
+
+      feature_category :security_policy_management
+      deduplicate :until_executing
+      urgency :throttled
+      data_consistency :delayed
+
+      concurrency_limit -> { 50 if Feature.enabled?(:scan_execution_pipeline_worker) }
+
+      def perform(project_id, current_user_id, schedule_id, branch)
+        project = Project.find_by_id(project_id)
+        return unless project
+
+        current_user = User.find_by_id(current_user_id)
+        return unless current_user
+
+        schedule = Security::OrchestrationPolicyRuleSchedule.find_by_id(schedule_id)
+        return unless schedule
+
+        actions = actions_for(schedule)
+
+        service_result = ::Security::SecurityOrchestrationPolicies::CreatePipelineService
+          .new(project: project, current_user: current_user, params: { actions: actions, branch: branch })
+          .execute
+
+        return unless service_result[:status] == :error
+
+        log_error(current_user, schedule, service_result[:message])
+      end
+
+      private
+
+      def actions_for(schedule)
+        policy = schedule.policy
+        return [] if policy.blank?
+
+        policy[:actions]
+      end
+
+      def log_error(current_user, schedule, message)
+        ::Gitlab::AppJsonLogger.warn(
+          build_structured_payload(
+            security_orchestration_policy_configuration_id: schedule&.security_orchestration_policy_configuration&.id,
+            user_id: current_user.id,
+            message: message
+          )
+        )
+      end
+    end
+  end
+end
diff --git a/ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_worker.yml b/ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_worker.yml
new file mode 100644
index 0000000000000000000000000000000000000000..7ba30b65ed2b7d83222f017cb55d8646d5f3dfc0
--- /dev/null
+++ b/ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_worker.yml
@@ -0,0 +1,8 @@
+---
+name: scan_execution_pipeline_worker
+introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/147691
+rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/451890
+milestone: '16.11'
+type: gitlab_com_derisk
+group: group::security policies
+default_enabled: false
diff --git a/ee/spec/services/security/security_orchestration_policies/rule_schedule_service_spec.rb b/ee/spec/services/security/security_orchestration_policies/rule_schedule_service_spec.rb
index c11bde0da0da4c86f7198c2ec2c9f9e96aee23bf..bbcb3c3e45e6fc979938aff17a082b93abbe8050 100644
--- a/ee/spec/services/security/security_orchestration_policies/rule_schedule_service_spec.rb
+++ b/ee/spec/services/security/security_orchestration_policies/rule_schedule_service_spec.rb
@@ -15,6 +15,7 @@
     let(:rule) { { type: 'schedule', branches: branches, cadence: '*/20 * * * *' } }
     let(:other_schedule_rule) { { type: 'schedule', branches: ['main'], cadence: '0 10 * * *' } }
     let(:branches) { %w[master production non-existing-branch] }
+    let(:existing_branches) { %w[master production] }
 
     subject(:service) { described_class.new(project: project, current_user: current_user) }
 
@@ -25,6 +26,7 @@
     end
 
     before do
+      stub_feature_flags(scan_execution_pipeline_worker: false)
       stub_licensed_features(security_on_demand_scans: true)
 
       project.repository.create_branch('production', project.default_branch)
@@ -41,6 +43,39 @@
       expect(service_result.success?).to be(true)
     end
 
+    shared_examples 'with scan' do |scan_type|
+      context "when scan is #{scan_type}" do
+        context 'when the feature flag scan_execution_pipeline_worker is enabled' do
+          before do
+            stub_feature_flags(scan_execution_pipeline_worker: true)
+          end
+
+          it 'enqueues Security::SyncScanPoliciesWorker for each branch' do
+            existing_branches.each do |branch|
+              expect(::Security::ScanExecutionPolicies::CreatePipelineWorker).to(
+                receive(:perform_async)
+                  .with(project.id, current_user.id, schedule.id, branch)
+                  .and_call_original
+              )
+            end
+
+            service.execute(schedule)
+          end
+
+          it 'does not invokes Security::SecurityOrchestrationPolicies::CreatePipelineService' do
+            existing_branches.each do |branch|
+              expect(::Security::SecurityOrchestrationPolicies::CreatePipelineService).not_to(
+                receive(:new)
+                  .with(project: project, current_user: current_user,
+                    params: { actions: [{ scan: 'scan_type' }], branch: branch }))
+            end
+
+            service.execute(schedule)
+          end
+        end
+      end
+    end
+
     context 'when scan type is dast' do
       before do
         policy[:actions] = [{ scan: 'dast' }]
@@ -59,6 +94,8 @@
 
         service.execute(schedule)
       end
+
+      it_behaves_like 'with scan', 'dast'
     end
 
     context 'when scan type is secret_detection' do
@@ -79,6 +116,8 @@
 
         service.execute(schedule)
       end
+
+      it_behaves_like 'with scan', 'secret_detection'
     end
 
     context 'when scan type is container_scanning' do
@@ -100,6 +139,8 @@
 
           service.execute(schedule)
         end
+
+        it_behaves_like 'with scan', 'container_scanning'
       end
 
       context 'when agents are defined in the rule' do
@@ -131,6 +172,8 @@
 
         service.execute(schedule)
       end
+
+      it_behaves_like 'with scan', 'sast'
     end
 
     context 'when policy actions exists and there are multiple matching branches' do
diff --git a/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb b/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb
new file mode 100644
index 0000000000000000000000000000000000000000..9e9e2d4995ce54303b56150fd59c9783a6afaa03
--- /dev/null
+++ b/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb
@@ -0,0 +1,83 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe Security::ScanExecutionPolicies::CreatePipelineWorker, feature_category: :security_policy_management do
+  let_it_be(:project) { create(:project) }
+  let_it_be(:current_user) { create(:user) }
+  let_it_be(:security_orchestration_policy_configuration) do
+    create(:security_orchestration_policy_configuration, project: project)
+  end
+
+  let_it_be(:schedule) do
+    create(:security_orchestration_policy_rule_schedule,
+      security_orchestration_policy_configuration: security_orchestration_policy_configuration)
+  end
+
+  let(:project_id) { project.id }
+  let(:current_user_id) { current_user.id }
+  let(:branch) { 'production' }
+  let(:actions) { [{ scan: 'dast' }] }
+  let(:params) { { actions: actions, branch: branch } }
+  let(:schedule_id) { schedule.id }
+  let(:policy) { build(:scan_execution_policy, enabled: true, actions: [{ scan: 'dast' }]) }
+
+  shared_examples_for 'does not call RuleScheduleService' do
+    it do
+      expect(Security::SecurityOrchestrationPolicies::RuleScheduleService).not_to receive(:new)
+
+      run_worker
+    end
+  end
+
+  describe '#perform' do
+    before do
+      allow_next_found_instance_of(Security::OrchestrationPolicyConfiguration) do |instance|
+        allow(instance).to receive(:active_scan_execution_policies).and_return([policy])
+      end
+    end
+
+    subject(:run_worker) { described_class.new.perform(project_id, current_user_id, schedule_id, branch) }
+
+    context 'when project is not found' do
+      let(:project_id) { non_existing_record_id }
+
+      it_behaves_like 'does not call RuleScheduleService'
+    end
+
+    context 'when user is not found' do
+      let(:current_user_id) { non_existing_record_id }
+
+      it_behaves_like 'does not call RuleScheduleService'
+    end
+
+    context 'when the user and project exists' do
+      it 'delegates the pipeline creation to Security::SecurityOrchestrationPolicies::CreatePipelineService' do
+        expect(::Security::SecurityOrchestrationPolicies::CreatePipelineService).to(
+          receive(:new)
+            .with(project: project, current_user: current_user, params: params)
+            .and_call_original)
+
+        run_worker
+      end
+
+      context 'when create pipeline service returns errors' do
+        before do
+          allow_next_instance_of(::Security::SecurityOrchestrationPolicies::CreatePipelineService) do |service|
+            allow(service).to receive(:execute).and_return(ServiceResponse.error(message: 'message'))
+          end
+        end
+
+        it 'logs the error' do
+          expect(::Gitlab::AppJsonLogger).to receive(:warn).with({
+            'class' => 'Security::ScanExecutionPolicies::CreatePipelineWorker',
+            'security_orchestration_policy_configuration_id' => security_orchestration_policy_configuration.id,
+            'user_id' => current_user.id,
+            'message' => 'message'
+          })
+          run_worker
+        end
+      end
+    end
+  end
+end