diff --git a/app/workers/concurrency_limit/resume_worker.rb b/app/workers/concurrency_limit/resume_worker.rb
index 7174f80405e06dfc82124637bfb4ca77d8913603..10547e0c165a75dc5c9567ac65914c1d42fd2e4d 100644
--- a/app/workers/concurrency_limit/resume_worker.rb
+++ b/app/workers/concurrency_limit/resume_worker.rb
@@ -7,7 +7,7 @@ class ResumeWorker
     include CronjobQueue # rubocop:disable Scalability/CronWorkerContext -- There is no onward scheduling and this cron handles work from across the
     # application, so there's no useful context to add.
 
-    DEFAULT_LIMIT = 1_000
+    BATCH_SIZE = 1_000
     RESCHEDULE_DELAY = 1.second
 
     data_consistency :sticky
@@ -18,24 +18,26 @@ def perform
       reschedule_job = false
 
       workers.each do |worker|
-        limit = ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.limit_for(worker: worker)&.call
+        limit = ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.limit_for(worker: worker)
         queue_size = queue_size(worker)
         report_prometheus_metrics(worker, queue_size, limit)
 
         next unless queue_size > 0
+        next if limit < 0 # do not re-queue jobs if circuit-broken
 
         reschedule_job = true
 
-        processing_limit = if limit
-                             current = current_concurrency(worker: worker)
+        current = current_concurrency(worker: worker)
+        processing_limit = if limit > 0
                              limit - current
                            else
-                             DEFAULT_LIMIT
+                             BATCH_SIZE
                            end
 
         next unless processing_limit > 0
 
         resume_processing!(worker, limit: processing_limit)
+        cleanup_stale_trackers(worker)
       end
 
       self.class.perform_in(RESCHEDULE_DELAY) if reschedule_job
@@ -44,6 +46,10 @@ def perform
     private
 
     def current_concurrency(worker:)
+      if ::Feature.enabled?(:sidekiq_concurrency_limit_optimized_count, Feature.current_request)
+        return concurrent_worker_count(worker)
+      end
+
       @current_concurrency ||= ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersConcurrency.workers(
         skip_cache: true
       )
@@ -51,10 +57,18 @@ def current_concurrency(worker:)
       @current_concurrency[worker.name].to_i
     end
 
+    def concurrent_worker_count(worker)
+      Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService.concurrent_worker_count(worker.name)
+    end
+
     def queue_size(worker)
       Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService.queue_size(worker.name)
     end
 
+    def cleanup_stale_trackers(worker)
+      Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService.cleanup_stale_trackers(worker.name)
+    end
+
     def resume_processing!(worker, limit:)
       Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService.resume_processing!(worker.name, limit: limit)
     end
@@ -73,7 +87,7 @@ def report_prometheus_metrics(worker, queue_size, limit)
       limit_metric = Gitlab::Metrics.gauge(:sidekiq_concurrency_limit_max_concurrent_jobs,
         'Max number of concurrent running jobs.',
         {})
-      limit_metric.set({ worker: worker.name }, limit || DEFAULT_LIMIT)
+      limit_metric.set({ worker: worker.name }, limit || BATCH_SIZE)
     end
   end
 end
diff --git a/config/feature_flags/gitlab_com_derisk/sidekiq_concurrency_limit_optimized_count.yml b/config/feature_flags/gitlab_com_derisk/sidekiq_concurrency_limit_optimized_count.yml
new file mode 100644
index 0000000000000000000000000000000000000000..1d2d291c3c0968f0f40a4942c52e32fb4f6fecbd
--- /dev/null
+++ b/config/feature_flags/gitlab_com_derisk/sidekiq_concurrency_limit_optimized_count.yml
@@ -0,0 +1,9 @@
+---
+name: sidekiq_concurrency_limit_optimized_count
+feature_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/490936
+introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/169561
+rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/496335
+milestone: '17.6'
+group: group::scalability
+type: gitlab_com_derisk
+default_enabled: false
diff --git a/ee/spec/workers/concurrency_limit/resume_worker_spec.rb b/ee/spec/workers/concurrency_limit/resume_worker_spec.rb
index c7c96cacde3b9b36e89bc3f52975372eb82867e3..5bd0f950f011861948efeb40bfa4425f992af5d1 100644
--- a/ee/spec/workers/concurrency_limit/resume_worker_spec.rb
+++ b/ee/spec/workers/concurrency_limit/resume_worker_spec.rb
@@ -12,28 +12,16 @@
       allow(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService).to receive(:resume_processing!)
     end
 
-    context 'when there are no jobs in the queue' do
-      before do
-        allow(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService).to receive(:has_jobs_in_queue?)
-          .and_return(0)
-      end
-
-      it 'does nothing' do
-        expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService)
-          .not_to receive(:resume_processing!)
-
-        worker.perform
-      end
-
-      it 'reports prometheus metrics' do
-        stub_application_setting(elasticsearch_max_code_indexing_concurrency: 30)
+    shared_examples 'report prometheus metrics' do |limit = described_class::BATCH_SIZE, queue_size = 100|
+      it do
         queue_size_gauge_double = instance_double(Prometheus::Client::Gauge)
         expect(Gitlab::Metrics).to receive(:gauge).at_least(:once)
-          .with(:sidekiq_concurrency_limit_queue_jobs, anything, {}, :max)
-          .and_return(queue_size_gauge_double)
+                                                  .with(:sidekiq_concurrency_limit_queue_jobs, anything, {}, :max)
+                                                  .and_return(queue_size_gauge_double)
 
-        allow(queue_size_gauge_double).to receive(:set).with({ worker: anything }, 0)
-        expect(queue_size_gauge_double).to receive(:set).with({ worker: worker_with_concurrency_limit.name }, 0)
+        allow(queue_size_gauge_double).to receive(:set).with({ worker: anything }, anything)
+        expect(queue_size_gauge_double).to receive(:set).with({ worker: worker_with_concurrency_limit.name },
+          queue_size)
 
         limit_gauge_double = instance_double(Prometheus::Client::Gauge)
         expect(Gitlab::Metrics).to receive(:gauge).at_least(:once)
@@ -41,104 +29,123 @@
                                                   .and_return(limit_gauge_double)
 
         allow(limit_gauge_double).to receive(:set).with({ worker: anything }, anything)
-        expect(limit_gauge_double).to receive(:set).with({ worker: worker_with_concurrency_limit.name }, 30)
+        expect(limit_gauge_double).to receive(:set)
+          .with({ worker: worker_with_concurrency_limit.name }, limit)
+
+        worker.perform
+      end
+    end
+
+    shared_examples 'no jobs in the queue' do
+      before do
+        allow(Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap).to receive(:limit_for)
+          .and_return(10)
+        allow(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService).to receive(:queue_size)
+          .and_return(0)
+      end
+
+      it 'does nothing' do
+        expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService)
+          .not_to receive(:resume_processing!)
 
         worker.perform
       end
+
+      it_behaves_like 'report prometheus metrics', 10, 0
     end
 
-    context 'when there are jobs in the queue' do
+    shared_examples 'jobs in the queue' do
       before do
         allow(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService).to receive(:queue_size)
-          .and_return(100)
+          .and_return(0)
+        allow(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService).to receive(:queue_size)
+          .with(worker_with_concurrency_limit.name).and_return(100)
+        stub_application_setting(elasticsearch_max_code_indexing_concurrency: 60)
       end
 
       it 'resumes processing' do
         stub_application_setting(elasticsearch_max_code_indexing_concurrency: 35)
         expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService)
          .to receive(:resume_processing!)
-         .with(worker_with_concurrency_limit.name, limit: 35)
+         .with(worker_with_concurrency_limit.name, limit: 35 - concurrent_workers)
 
         worker.perform
       end
 
       it 'resumes processing if there are other jobs' do
         stub_application_setting(elasticsearch_max_code_indexing_concurrency: 60)
-        allow(::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersConcurrency).to receive(:workers)
-          .and_return(worker_with_concurrency_limit.name => 15)
         expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService)
          .to receive(:resume_processing!)
-         .with(worker_with_concurrency_limit.name, limit: 45)
+         .with(worker_with_concurrency_limit.name, limit: 60 - concurrent_workers)
 
         worker.perform
       end
 
-      it 'reports prometheus metrics' do
-        stub_application_setting(elasticsearch_max_code_indexing_concurrency: 60)
-        allow(::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersConcurrency).to receive(:workers)
-          .and_return(worker_with_concurrency_limit.name => 15)
-
-        queue_size_gauge_double = instance_double(Prometheus::Client::Gauge)
-        expect(Gitlab::Metrics).to receive(:gauge).at_least(:once)
-                                                  .with(:sidekiq_concurrency_limit_queue_jobs, anything, {}, :max)
-                                                  .and_return(queue_size_gauge_double)
+      it_behaves_like 'report prometheus metrics', 60
 
-        allow(queue_size_gauge_double).to receive(:set).with({ worker: anything }, anything)
-        expect(queue_size_gauge_double).to receive(:set).with({ worker: worker_with_concurrency_limit.name }, 100)
+      context 'when limit is negative' do
+        before do
+          allow(::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap).to receive(:limit_for).and_return(0)
+          allow(::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap).to receive(:limit_for)
+            .with(worker: worker_with_concurrency_limit)
+            .and_return(-1)
+        end
 
-        limit_gauge_double = instance_double(Prometheus::Client::Gauge)
-        expect(Gitlab::Metrics).to receive(:gauge).at_least(:once)
-                                                  .with(:sidekiq_concurrency_limit_max_concurrent_jobs, anything, {})
-                                                  .and_return(limit_gauge_double)
+        it 'does not schedule any workers' do
+          expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService)
+            .not_to receive(:resume_processing!)
+          expect(described_class).not_to receive(:perform_in)
 
-        allow(limit_gauge_double).to receive(:set).with({ worker: anything }, anything)
-        expect(limit_gauge_double).to receive(:set).with({ worker: worker_with_concurrency_limit.name }, 60)
+          worker.perform
+        end
 
-        worker.perform
+        it_behaves_like 'report prometheus metrics', -1
       end
 
       context 'when limit is not set' do
         before do
-          allow(Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap).to receive(:limit_for)
-          nil_proc = -> { nil }
+          allow(::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap).to receive(:limit_for).and_return(0)
           allow(::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap).to receive(:limit_for)
             .with(worker: worker_with_concurrency_limit)
-            .and_return(nil_proc)
+            .and_return(0)
         end
 
-        it 'resumes processing using the DEFAULT_LIMIT' do
+        it 'resumes processing using the BATCH_SIZE' do
           expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService)
             .to receive(:resume_processing!)
-              .with(worker_with_concurrency_limit.name, limit: described_class::DEFAULT_LIMIT)
+              .with(worker_with_concurrency_limit.name, limit: described_class::BATCH_SIZE)
           expect(described_class).to receive(:perform_in)
 
           worker.perform
         end
 
-        it 'reports limit as DEFAULT_LIMIT' do
-          allow(::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersConcurrency).to receive(:workers)
-            .and_return(worker_with_concurrency_limit.name => 15)
+        it_behaves_like 'report prometheus metrics', 0
+      end
+    end
 
-          queue_size_gauge_double = instance_double(Prometheus::Client::Gauge)
-          expect(Gitlab::Metrics).to receive(:gauge).at_least(:once)
-                                                    .with(:sidekiq_concurrency_limit_queue_jobs, anything, {}, :max)
-                                                    .and_return(queue_size_gauge_double)
+    context 'when sidekiq_concurrency_limit_optimized_count feature flag is disabled' do
+      let(:concurrent_workers) { 6 }
 
-          allow(queue_size_gauge_double).to receive(:set).with({ worker: anything }, anything)
-          expect(queue_size_gauge_double).to receive(:set).with({ worker: worker_with_concurrency_limit.name }, 100)
+      before do
+        stub_feature_flags(sidekiq_concurrency_limit_optimized_count: false)
+        allow(::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersConcurrency).to receive(:workers)
+          .and_return(worker_with_concurrency_limit.name => concurrent_workers)
+      end
 
-          limit_gauge_double = instance_double(Prometheus::Client::Gauge)
-          expect(Gitlab::Metrics).to receive(:gauge).at_least(:once)
-                                                    .with(:sidekiq_concurrency_limit_max_concurrent_jobs, anything, {})
-                                                    .and_return(limit_gauge_double)
+      it_behaves_like 'no jobs in the queue'
+      it_behaves_like 'jobs in the queue'
+    end
 
-          allow(limit_gauge_double).to receive(:set).with({ worker: anything }, anything)
-          expect(limit_gauge_double).to receive(:set)
-            .with({ worker: worker_with_concurrency_limit.name }, described_class::DEFAULT_LIMIT)
+    context 'when sidekiq_concurrency_limit_optimized_count feature flag is enabled' do
+      let(:concurrent_workers) { 5 }
 
-          worker.perform
-        end
+      before do
+        allow(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService)
+          .to receive(:concurrent_worker_count).and_return(concurrent_workers)
       end
+
+      it_behaves_like 'no jobs in the queue'
+      it_behaves_like 'jobs in the queue'
     end
   end
 end
diff --git a/ee/spec/workers/elastic_commit_indexer_worker_spec.rb b/ee/spec/workers/elastic_commit_indexer_worker_spec.rb
index b8107f41653ba90ab5b91e15f05ad22b0efbb5a9..d46b5a068f62595fdae3fd1d663627707a5ade9f 100644
--- a/ee/spec/workers/elastic_commit_indexer_worker_spec.rb
+++ b/ee/spec/workers/elastic_commit_indexer_worker_spec.rb
@@ -256,7 +256,7 @@
   it 'registers worker to limit concurrency' do
     stub_application_setting(elasticsearch_max_code_indexing_concurrency: 35)
 
-    max_jobs = ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.limit_for(worker: described_class).call
+    max_jobs = ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.limit_for(worker: described_class)
     expect(max_jobs).to eq(35)
   end
 end
diff --git a/lib/gitlab/sidekiq_logging/structured_logger.rb b/lib/gitlab/sidekiq_logging/structured_logger.rb
index 05b26db6cf233fadeac2c2873d91d0a88b014e9d..67b757d0f8291aef2271e449b1ad395a9ebbceee 100644
--- a/lib/gitlab/sidekiq_logging/structured_logger.rb
+++ b/lib/gitlab/sidekiq_logging/structured_logger.rb
@@ -110,9 +110,7 @@ def log_job_done(job, started_time, payload, job_exception = nil)
       end
 
       def add_thread_identity(payload)
-        # Similar to what Sidekiq does ith it's out-of-the-box logger:
-        # https://github.com/sidekiq/sidekiq/blob/2451d70080db95cb5f69effcbd74381cf3b3f727/lib/sidekiq/logger.rb#L80
-        payload['sidekiq_tid'] = (Thread.current.object_id ^ ::Process.pid).to_s(36)
+        payload['sidekiq_tid'] = Gitlab::SidekiqProcess.tid
         payload['sidekiq_thread_name'] = Thread.current.name if Thread.current.name
       end
 
diff --git a/lib/gitlab/sidekiq_middleware/concurrency_limit/concurrency_limit_service.rb b/lib/gitlab/sidekiq_middleware/concurrency_limit/concurrency_limit_service.rb
index 8580ad755f6b1314cf5945ca68abd5af646a6179..e04cbbcccfa003e8b9e93b3db5e402c2cdb98a13 100644
--- a/lib/gitlab/sidekiq_middleware/concurrency_limit/concurrency_limit_service.rb
+++ b/lib/gitlab/sidekiq_middleware/concurrency_limit/concurrency_limit_service.rb
@@ -4,11 +4,17 @@ module Gitlab
   module SidekiqMiddleware
     module ConcurrencyLimit
       class ConcurrencyLimitService
-        # Class for managing queues for deferred workers
+        REDIS_KEY_PREFIX = 'sidekiq:concurrency_limit'
+
+        delegate :add_to_queue!, :queue_size, :has_jobs_in_queue?, :resume_processing!, to: :@queue_manager
+
+        delegate :track_execution_start, :track_execution_end, :cleanup_stale_trackers,
+          :concurrent_worker_count, to: :@worker_execution_tracker
 
         def initialize(worker_name)
           @worker_name = worker_name
-          @redis_key = "sidekiq:concurrency_limit:throttled_jobs:{#{worker_name.underscore}}"
+          @queue_manager = QueueManager.new(worker_name: worker_name, prefix: REDIS_KEY_PREFIX)
+          @worker_execution_tracker = WorkerExecutionTracker.new(worker_name: worker_name, prefix: REDIS_KEY_PREFIX)
         end
 
         class << self
@@ -27,84 +33,22 @@ def resume_processing!(worker_name, limit:)
           def queue_size(worker_name)
             new(worker_name).queue_size
           end
-        end
 
-        def add_to_queue!(args, context)
-          with_redis do |redis|
-            redis.rpush(redis_key, serialize(args, context))
+          def cleanup_stale_trackers(worker_name)
+            new(worker_name).cleanup_stale_trackers
           end
 
-          deferred_job_counter.increment({ worker: worker_name })
-        end
-
-        def queue_size
-          with_redis { |redis| redis.llen(redis_key) }
-        end
-
-        def has_jobs_in_queue?
-          queue_size != 0
-        end
-
-        def resume_processing!(limit:)
-          with_redis do |redis|
-            jobs = next_batch_from_queue(redis, limit: limit)
-            break if jobs.empty?
-
-            jobs.each { |j| send_to_processing_queue(deserialize(j)) }
-
-            remove_processed_jobs(redis, limit: jobs.length)
-
-            jobs.length
+          def track_execution_start(worker_name)
+            new(worker_name).track_execution_start
           end
-        end
-
-        private
-
-        attr_reader :worker_name, :redis_key
-
-        def with_redis(&blk)
-          Gitlab::Redis::SharedState.with(&blk) # rubocop:disable CodeReuse/ActiveRecord -- Not active record
-        end
-
-        def serialize(args, context)
-          {
-            args: args,
-            context: context
-          }.to_json
-        end
-
-        def deserialize(json)
-          Gitlab::Json.parse(json)
-        end
-
-        def send_to_processing_queue(job)
-          context = job['context'] || {}
-
-          Gitlab::ApplicationContext.with_raw_context(context) do
-            args = job['args']
-
-            Gitlab::SidekiqLogging::ConcurrencyLimitLogger.instance.resumed_log(worker_name, args)
 
-            worker_klass = worker_name.safe_constantize
-            next if worker_klass.nil?
-
-            worker_klass.concurrency_limit_resume.perform_async(*args)
+          def track_execution_end(worker_name)
+            new(worker_name).track_execution_end
           end
-        end
-
-        def next_batch_from_queue(redis, limit:)
-          return [] unless limit > 0
-
-          redis.lrange(redis_key, 0, limit - 1)
-        end
-
-        def remove_processed_jobs(redis, limit:)
-          redis.ltrim(redis_key, limit, -1)
-        end
 
-        def deferred_job_counter
-          @deferred_job_count ||= ::Gitlab::Metrics.counter(:sidekiq_concurrency_limit_deferred_jobs_total,
-            'Count of jobs deferred by the concurrency limit middleware.')
+          def concurrent_worker_count(worker_name)
+            new(worker_name).concurrent_worker_count
+          end
         end
       end
     end
diff --git a/lib/gitlab/sidekiq_middleware/concurrency_limit/middleware.rb b/lib/gitlab/sidekiq_middleware/concurrency_limit/middleware.rb
index 9877f7f5ee85363f05b554e1ba3af0de6f61c807..1396b1a6bc35c21504e4a87d547d31f76b8f6297 100644
--- a/lib/gitlab/sidekiq_middleware/concurrency_limit/middleware.rb
+++ b/lib/gitlab/sidekiq_middleware/concurrency_limit/middleware.rb
@@ -7,6 +7,9 @@ class Middleware
         def initialize(worker, job)
           @worker = worker
           @job = job
+
+          worker_class = worker.is_a?(Class) ? worker : worker.class
+          @worker_class = worker_class.name
         end
 
         # This will continue the middleware chain if the job should be scheduled
@@ -29,17 +32,21 @@ def perform
             return
           end
 
+          track_execution_start
+
           yield
+        ensure
+          track_execution_end
         end
 
         private
 
-        attr_reader :job, :worker
+        attr_reader :job, :worker, :worker_class
 
         def should_defer_schedule?
           return false if Feature.disabled?(:sidekiq_concurrency_limit_middleware, Feature.current_request, type: :ops)
           return false if resumed?
-          return false unless ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.limit_for(worker: worker)
+          return false if worker_limit == 0
 
           has_jobs_in_queue?
         end
@@ -57,13 +64,30 @@ def concurrency_service
           ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService
         end
 
+        def track_execution_start
+          return if Feature.disabled?(:sidekiq_concurrency_limit_middleware, Feature.current_request, type: :ops)
+          return unless worker_limit > 0
+
+          concurrency_service.track_execution_start(worker_class)
+        end
+
+        def track_execution_end
+          return if Feature.disabled?(:sidekiq_concurrency_limit_middleware, Feature.current_request, type: :ops)
+          return unless worker_limit > 0
+
+          concurrency_service.track_execution_end(worker_class)
+        end
+
+        def worker_limit
+          @worker_limit ||= ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.limit_for(worker: worker)
+        end
+
         def resumed?
           job['concurrency_limit_resume'] == true
         end
 
         def has_jobs_in_queue?
-          worker_class = worker.is_a?(Class) ? worker : worker.class
-          concurrency_service.has_jobs_in_queue?(worker_class.name)
+          concurrency_service.has_jobs_in_queue?(worker_class)
         end
 
         def defer_job!
diff --git a/lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb b/lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb
new file mode 100644
index 0000000000000000000000000000000000000000..dc0340d1f87e208c99792bb41015d0686dfdea26
--- /dev/null
+++ b/lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb
@@ -0,0 +1,86 @@
+# frozen_string_literal: true
+
+module Gitlab
+  module SidekiqMiddleware
+    module ConcurrencyLimit
+      class QueueManager
+        attr_reader :redis_key
+
+        def initialize(worker_name:, prefix:)
+          @worker_name = worker_name
+          @redis_key = "#{prefix}:throttled_jobs:{#{worker_name.underscore}}"
+        end
+
+        def add_to_queue!(args, context)
+          with_redis do |redis|
+            redis.rpush(@redis_key, serialize(args, context))
+          end
+
+          deferred_job_counter.increment({ worker: @worker_name })
+        end
+
+        def queue_size
+          with_redis { |redis| redis.llen(@redis_key) }
+        end
+
+        def has_jobs_in_queue?
+          queue_size != 0
+        end
+
+        def resume_processing!(limit:)
+          with_redis do |redis|
+            jobs = next_batch_from_queue(redis, limit: limit)
+            break if jobs.empty?
+
+            jobs.each { |job| send_to_processing_queue(deserialize(job)) }
+            remove_processed_jobs(redis, limit: jobs.length)
+
+            jobs.length
+          end
+        end
+
+        private
+
+        def with_redis(&)
+          Gitlab::Redis::SharedState.with(&) # rubocop:disable CodeReuse/ActiveRecord -- Not active record
+        end
+
+        def serialize(args, context)
+          { args: args, context: context }.to_json
+        end
+
+        def deserialize(json)
+          Gitlab::Json.parse(json)
+        end
+
+        def send_to_processing_queue(job)
+          context = job['context'] || {}
+
+          Gitlab::ApplicationContext.with_raw_context(context) do
+            args = job['args']
+            Gitlab::SidekiqLogging::ConcurrencyLimitLogger.instance.resumed_log(@worker_name, args)
+            worker_klass = @worker_name.safe_constantize
+            next if worker_klass.nil?
+
+            worker_klass.concurrency_limit_resume.perform_async(*args)
+          end
+        end
+
+        def next_batch_from_queue(redis, limit:)
+          return [] unless limit > 0
+
+          redis.lrange(@redis_key, 0, limit - 1)
+        end
+
+        def remove_processed_jobs(redis, limit:)
+          redis.ltrim(@redis_key, limit, -1)
+        end
+
+        def deferred_job_counter
+          @deferred_job_counter ||= ::Gitlab::Metrics.counter(:sidekiq_concurrency_limit_deferred_jobs_total,
+            'Count of jobs deferred by the concurrency limit middleware.')
+        end
+      end
+    end
+  end
+end
diff --git a/lib/gitlab/sidekiq_middleware/concurrency_limit/worker_execution_tracker.rb b/lib/gitlab/sidekiq_middleware/concurrency_limit/worker_execution_tracker.rb
new file mode 100644
index 0000000000000000000000000000000000000000..ec2ea385c25043d10b526e2f4878625e6767f9a3
--- /dev/null
+++ b/lib/gitlab/sidekiq_middleware/concurrency_limit/worker_execution_tracker.rb
@@ -0,0 +1,104 @@
+# frozen_string_literal: true
+
+module Gitlab
+  module SidekiqMiddleware
+    module ConcurrencyLimit
+      class WorkerExecutionTracker
+        TRACKING_KEY_TTL = 600.seconds
+
+        def initialize(worker_name:, prefix:)
+          @worker_name = worker_name
+          @prefix = prefix
+        end
+
+        def track_execution_start
+          return if sidekiq_pid.nil?
+
+          process_thread_id = process_thread_id_key(sidekiq_pid, sidekiq_tid)
+          with_redis do |r|
+            r.hset(worker_executing_hash_key, process_thread_id, Time.now.utc.tv_sec)
+          end
+        end
+
+        def track_execution_end
+          return if sidekiq_pid.nil?
+
+          process_thread_id = process_thread_id_key(sidekiq_pid, sidekiq_tid)
+          with_redis do |r|
+            r.hdel(worker_executing_hash_key, process_thread_id)
+          end
+        end
+
+        def cleanup_stale_trackers
+          executing_threads_hash = with_redis { |r| r.hgetall(worker_executing_hash_key) }
+          return if executing_threads_hash.empty?
+
+          dangling = executing_threads_hash.filter { |k, v| !still_executing?(k, v) }
+          return if dangling.empty?
+
+          with_redis do |r|
+            r.hdel(worker_executing_hash_key, dangling)
+          end
+        end
+
+        def concurrent_worker_count
+          with_redis { |r| r.hlen(worker_executing_hash_key).to_i }
+        end
+
+        private
+
+        attr_reader :worker_name
+
+        def with_redis(&)
+          Redis::QueuesMetadata.with(&) # rubocop:disable CodeReuse/ActiveRecord -- Not active record
+        end
+
+        def worker_executing_hash_key
+          "#{@prefix}:{#{worker_name.underscore}}:executing"
+        end
+
+        def process_thread_id_key(pid, tid)
+          "#{pid}:tid:#{tid}"
+        end
+
+        def sidekiq_pid
+          Gitlab::SidekiqProcess.pid
+        end
+
+        def sidekiq_tid
+          Gitlab::SidekiqProcess.tid
+        end
+
+        def still_executing?(ptid, started_at)
+          return true unless started_at.to_i < TRACKING_KEY_TTL.ago.utc.to_i
+
+          pid, tid = ptid.split(":tid:")
+          return false unless pid && tid
+
+          job_hash = fetch_sidekiq_process_work_hash(pid, tid)
+          return false if job_hash.empty?
+
+          job_hash['class'] == worker_name
+        end
+
+        def fetch_sidekiq_process_work_hash(pid, tid)
+          job_hash = {}
+          Gitlab::SidekiqSharding::Router.route(worker_name.safe_constantize) do
+            hash = Sidekiq.redis { |r| r.hget("#{pid}:work", tid) } # rubocop:disable Cop/SidekiqRedisCall -- checking process work hash
+            next if hash.nil?
+
+            # There are 2 layers of JSON encoding
+            # 1. when a job is pushed into the queue -- https://github.com/sidekiq/sidekiq/blob/v7.2.4/lib/sidekiq/client.rb#L261
+            # 2. When the workstate is written into the pid:work hash -- https://github.com/sidekiq/sidekiq/blob/v7.2.4/lib/sidekiq/launcher.rb#L148
+            job_hash = ::Gitlab::Json.parse(::Gitlab::Json.parse(hash)&.dig('payload'))
+          end
+
+          job_hash
+        rescue JSON::ParserError => e
+          Gitlab::ErrorTracking.track_exception(e, worker_class: worker_name)
+          {}
+        end
+      end
+    end
+  end
+end
diff --git a/lib/gitlab/sidekiq_middleware/concurrency_limit/workers_concurrency.rb b/lib/gitlab/sidekiq_middleware/concurrency_limit/workers_concurrency.rb
index a18c08a0e4aa57eec0c8e856fec74efc13deff55..f133751520197774a901b7dd535043276e6ac3ca 100644
--- a/lib/gitlab/sidekiq_middleware/concurrency_limit/workers_concurrency.rb
+++ b/lib/gitlab/sidekiq_middleware/concurrency_limit/workers_concurrency.rb
@@ -15,6 +15,11 @@ def current_for(worker:, skip_cache: false)
             worker_class = worker.is_a?(Class) ? worker : worker.class
             worker_name = worker_class.name
 
+            if ::Feature.enabled?(:sidekiq_concurrency_limit_optimized_count, Feature.current_request,
+              type: :gitlab_com_derisk)
+              return ConcurrencyLimitService.concurrent_worker_count(worker_name)
+            end
+
             workers(skip_cache: skip_cache)[worker_name].to_i
           end
 
diff --git a/lib/gitlab/sidekiq_middleware/concurrency_limit/workers_map.rb b/lib/gitlab/sidekiq_middleware/concurrency_limit/workers_map.rb
index 3eed4d172a4fac2a9fa855c25c936f1ba3032ec4..3524d1ce8a6e35c2bb174aa94c99b4fa3147f938 100644
--- a/lib/gitlab/sidekiq_middleware/concurrency_limit/workers_map.rb
+++ b/lib/gitlab/sidekiq_middleware/concurrency_limit/workers_map.rb
@@ -12,18 +12,20 @@ def set_limit_for(worker:, max_jobs:)
             @data[worker] = max_jobs
           end
 
+          # Returns an integer value where:
+          # - positive value is returned to enforce a valid concurrency limit
+          # - 0 value is returned for workers without concurrency limits
+          # - negative value is returned for paused workers
           def limit_for(worker:)
-            return unless data
-            return if Feature.disabled?(:sidekiq_concurrency_limit_middleware, Feature.current_request, type: :ops)
+            return 0 unless data
+            return 0 if Feature.disabled?(:sidekiq_concurrency_limit_middleware, Feature.current_request, type: :ops)
 
             worker_class = worker.is_a?(Class) ? worker : worker.class
-            data[worker_class]
+            data[worker_class]&.call.to_i
           end
 
           def over_the_limit?(worker:)
-            limit_proc = limit_for(worker: worker)
-
-            limit = limit_proc&.call.to_i
+            limit = limit_for(worker: worker)
             return false if limit == 0
             return true if limit < 0
 
diff --git a/lib/gitlab/sidekiq_process.rb b/lib/gitlab/sidekiq_process.rb
new file mode 100644
index 0000000000000000000000000000000000000000..f34dd17246790aa81ab1d55c48af84f32ed302ad
--- /dev/null
+++ b/lib/gitlab/sidekiq_process.rb
@@ -0,0 +1,17 @@
+# frozen_string_literal: true
+
+module Gitlab
+  module SidekiqProcess
+    class << self
+      def pid
+        # The sidekiq thread-local capsule is set in the Processor.
+        # https://github.com/sidekiq/sidekiq/blob/v7.2.4/lib/sidekiq/processor.rb#L70
+        Thread.current[:sidekiq_capsule]&.identity
+      end
+
+      def tid
+        Thread.current[:sidekiq_capsule]&.tid
+      end
+    end
+  end
+end
diff --git a/spec/lib/gitlab/sidekiq_logging/structured_logger_spec.rb b/spec/lib/gitlab/sidekiq_logging/structured_logger_spec.rb
index 43f8f3c396bf0867df99fd30c4031c6c4fa432a1..986b4f94dfa13c0526226dc99b8b0c668844de5b 100644
--- a/spec/lib/gitlab/sidekiq_logging/structured_logger_spec.rb
+++ b/spec/lib/gitlab/sidekiq_logging/structured_logger_spec.rb
@@ -7,7 +7,14 @@
     # We disable a memory instrumentation feature
     # as this requires a special patched Ruby
     allow(Gitlab::Memory::Instrumentation).to receive(:available?) { false }
+
+    # We disable Thread.current.name there could be state leak from other specs.
     allow(Thread.current).to receive(:name).and_return(nil)
+    Thread.current[:sidekiq_capsule] = Sidekiq::Capsule.new('test', Sidekiq.default_configuration)
+  end
+
+  after do
+    Thread.current[:sidekiq_capsule] = nil
   end
 
   describe '#call', :request_store do
diff --git a/spec/lib/gitlab/sidekiq_middleware/concurrency_limit/concurrency_limit_service_spec.rb b/spec/lib/gitlab/sidekiq_middleware/concurrency_limit/concurrency_limit_service_spec.rb
index ebd905ed50789f62efce62c264a95d48caa4e686..766c09b5ca1f44224ba339181877b13be2163b3c 100644
--- a/spec/lib/gitlab/sidekiq_middleware/concurrency_limit/concurrency_limit_service_spec.rb
+++ b/spec/lib/gitlab/sidekiq_middleware/concurrency_limit/concurrency_limit_service_spec.rb
@@ -2,7 +2,8 @@
 
 require 'spec_helper'
 
-RSpec.describe Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService, :clean_gitlab_redis_shared_state, feature_category: :global_search do
+RSpec.describe Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService,
+  :clean_gitlab_redis_shared_state, :clean_gitlab_redis_queues_metadata, feature_category: :global_search do
   let(:worker_class) do
     Class.new do
       def self.name
@@ -90,78 +91,51 @@ def self.name
     end
   end
 
-  describe '#add_to_queue!' do
-    subject(:add_to_queue!) { service.add_to_queue!(worker_args, worker_context) }
+  describe '.track_execution_start' do
+    subject(:track_execution_start) { described_class.track_execution_start(worker_class_name) }
 
-    it 'adds a job to the set' do
-      expect { add_to_queue! }
-        .to change { service.queue_size }
-        .from(0).to(1)
-    end
+    it 'calls an instance method' do
+      expect_next_instance_of(described_class) do |instance|
+        expect(instance).to receive(:track_execution_start)
+      end
 
-    it 'adds only one unique job to the set' do
-      expect do
-        2.times { add_to_queue! }
-      end.to change { service.queue_size }.from(0).to(1)
+      track_execution_start
     end
+  end
 
-    it 'stores context information' do
-      add_to_queue!
-
-      service.send(:with_redis) do |r|
-        set_key = service.send(:redis_key)
-        stored_job = service.send(:deserialize, r.lrange(set_key, 0, -1).first)
+  describe '.track_execution_end' do
+    subject(:track_execution_end) { described_class.track_execution_end(worker_class_name) }
 
-        expect(stored_job['context']).to eq(stored_context)
+    it 'calls an instance method' do
+      expect_next_instance_of(described_class) do |instance|
+        expect(instance).to receive(:track_execution_end)
       end
-    end
-  end
 
-  describe '#has_jobs_in_queue?' do
-    it 'uses queue_size' do
-      expect { service.add_to_queue!(worker_args, worker_context) }
-        .to change { service.has_jobs_in_queue? }
-        .from(false).to(true)
+      track_execution_end
     end
   end
 
-  describe '#resume_processing!' do
-    let(:jobs) { [[1], [2], [3]] }
-    let(:setter) { instance_double('Sidekiq::Job::Setter') }
+  describe '.concurrent_worker_count' do
+    subject(:concurrent_worker_count) { described_class.concurrent_worker_count(worker_class_name) }
 
-    it 'puts jobs back into the queue and respects order' do
-      jobs.each do |j|
-        service.add_to_queue!(j, worker_context)
+    it 'calls an instance method' do
+      expect_next_instance_of(described_class) do |instance|
+        expect(instance).to receive(:concurrent_worker_count)
       end
 
-      expect(worker_class).to receive(:concurrency_limit_resume).twice.and_return(setter)
-      expect(setter).to receive(:perform_async).with(1).ordered
-      expect(setter).to receive(:perform_async).with(2).ordered
-      expect(setter).not_to receive(:perform_async).with(3).ordered
-
-      expect(Gitlab::SidekiqLogging::ConcurrencyLimitLogger.instance)
-        .to receive(:resumed_log)
-        .with(worker_class_name, [1])
-      expect(Gitlab::SidekiqLogging::ConcurrencyLimitLogger.instance)
-        .to receive(:resumed_log)
-        .with(worker_class_name, [2])
-
-      service.resume_processing!(limit: 2)
+      concurrent_worker_count
     end
+  end
 
-    it 'drops a set after execution' do
-      jobs.each do |j|
-        service.add_to_queue!(j, worker_context)
-      end
+  describe '.cleanup_stale_trackers' do
+    subject(:cleanup_stale_trackers) { described_class.cleanup_stale_trackers(worker_class_name) }
 
-      expect(Gitlab::ApplicationContext).to receive(:with_raw_context)
-        .with(stored_context)
-        .exactly(jobs.count).times.and_call_original
-      expect(worker_class).to receive(:concurrency_limit_resume).exactly(3).times.and_return(setter)
-      expect(setter).to receive(:perform_async).exactly(jobs.count).times
+    it 'calls an instance method' do
+      expect_next_instance_of(described_class) do |instance|
+        expect(instance).to receive(:cleanup_stale_trackers)
+      end
 
-      expect { service.resume_processing!(limit: jobs.count) }
-        .to change { service.has_jobs_in_queue? }.from(true).to(false)
+      cleanup_stale_trackers
     end
   end
 
diff --git a/spec/lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager_spec.rb b/spec/lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager_spec.rb
new file mode 100644
index 0000000000000000000000000000000000000000..fcb01dc5484bbb73910dd1931bd4a9f871709c18
--- /dev/null
+++ b/spec/lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager_spec.rb
@@ -0,0 +1,113 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe Gitlab::SidekiqMiddleware::ConcurrencyLimit::QueueManager,
+  :clean_gitlab_redis_shared_state, feature_category: :global_search do
+  let(:worker_class) do
+    Class.new do
+      def self.name
+        'DummyWorker'
+      end
+
+      include ApplicationWorker
+    end
+  end
+
+  let(:worker_class_name) { worker_class.name }
+
+  let(:worker_context) do
+    { 'correlation_id' => 'context_correlation_id',
+      'meta.project' => 'gitlab-org/gitlab' }
+  end
+
+  let(:stored_context) do
+    {
+      "#{Gitlab::ApplicationContext::LOG_KEY}.project" => 'gitlab-org/gitlab',
+      "correlation_id" => 'context_correlation_id'
+    }
+  end
+
+  let(:worker_args) { [1, 2] }
+
+  subject(:service) { described_class.new(worker_name: worker_class_name, prefix: 'some_prefix') }
+
+  before do
+    stub_const(worker_class_name, worker_class)
+  end
+
+  describe '#add_to_queue!' do
+    subject(:add_to_queue!) { service.add_to_queue!(worker_args, worker_context) }
+
+    it 'adds a job to the set' do
+      expect { add_to_queue! }
+        .to change { service.queue_size }
+        .from(0).to(1)
+    end
+
+    it 'adds only one unique job to the set' do
+      expect do
+        2.times { add_to_queue! }
+      end.to change { service.queue_size }.from(0).to(1)
+    end
+
+    it 'stores context information' do
+      add_to_queue!
+
+      Gitlab::Redis::SharedState.with do |r|
+        set_key = service.redis_key
+        stored_job = service.send(:deserialize, r.lrange(set_key, 0, -1).first)
+
+        expect(stored_job['context']).to eq(stored_context)
+      end
+    end
+  end
+
+  describe '#has_jobs_in_queue?' do
+    it 'uses queue_size' do
+      expect { service.add_to_queue!(worker_args, worker_context) }
+        .to change { service.has_jobs_in_queue? }
+        .from(false).to(true)
+    end
+  end
+
+  describe '#resume_processing!' do
+    let(:jobs) { [[1], [2], [3]] }
+    let(:setter) { instance_double('Sidekiq::Job::Setter') }
+
+    it 'puts jobs back into the queue and respects order' do
+      jobs.each do |j|
+        service.add_to_queue!(j, worker_context)
+      end
+
+      expect(worker_class).to receive(:concurrency_limit_resume).twice.and_return(setter)
+      expect(setter).to receive(:perform_async).with(1).ordered
+      expect(setter).to receive(:perform_async).with(2).ordered
+      expect(setter).not_to receive(:perform_async).with(3).ordered
+
+      expect(Gitlab::SidekiqLogging::ConcurrencyLimitLogger.instance)
+        .to receive(:resumed_log)
+        .with(worker_class_name, [1])
+      expect(Gitlab::SidekiqLogging::ConcurrencyLimitLogger.instance)
+        .to receive(:resumed_log)
+        .with(worker_class_name, [2])
+
+      service.resume_processing!(limit: 2)
+    end
+
+    it 'drops a set after execution' do
+      jobs.each do |j|
+        service.add_to_queue!(j, worker_context)
+      end
+
+      expect(Gitlab::ApplicationContext).to receive(:with_raw_context)
+        .with(stored_context)
+        .exactly(jobs.count).times.and_call_original
+      expect(worker_class).to receive(:concurrency_limit_resume).exactly(3).times.and_return(setter)
+      expect(setter).to receive(:perform_async).exactly(jobs.count).times
+
+      expect { service.resume_processing!(limit: jobs.count) }
+        .to change { service.has_jobs_in_queue? }.from(true).to(false)
+    end
+  end
+end
diff --git a/spec/lib/gitlab/sidekiq_middleware/concurrency_limit/server_spec.rb b/spec/lib/gitlab/sidekiq_middleware/concurrency_limit/server_spec.rb
index a48eb4836ed7622719c1c612f9351c0fcfd3ac36..82f4e4b401d3877f72ff9aa436230b21fe5df698 100644
--- a/spec/lib/gitlab/sidekiq_middleware/concurrency_limit/server_spec.rb
+++ b/spec/lib/gitlab/sidekiq_middleware/concurrency_limit/server_spec.rb
@@ -22,9 +22,14 @@ def self.work; end
   end
 
   before do
+    Thread.current[:sidekiq_capsule] = Sidekiq::Capsule.new('test', Sidekiq.default_configuration)
     stub_const('TestConcurrencyLimitWorker', worker_class)
   end
 
+  after do
+    Thread.current[:sidekiq_capsule] = nil
+  end
+
   around do |example|
     with_sidekiq_server_middleware do |chain|
       chain.add described_class
@@ -32,6 +37,17 @@ def self.work; end
     end
   end
 
+  shared_examples 'skip execution tracking' do
+    it do
+      expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService)
+        .not_to receive(:track_execution_start)
+      expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService)
+        .not_to receive(:track_execution_end)
+
+      TestConcurrencyLimitWorker.perform_async('foo')
+    end
+  end
+
   describe '#call' do
     context 'when feature flag is disabled' do
       before do
@@ -45,6 +61,8 @@ def self.work; end
 
         TestConcurrencyLimitWorker.perform_async('foo')
       end
+
+      it_behaves_like 'skip execution tracking'
     end
 
     context 'when there are jobs in the queue' do
@@ -116,6 +134,30 @@ def self.work; end
 
           TestConcurrencyLimitWorker.perform_async('foo')
         end
+
+        it 'tracks execution concurrency' do
+          expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService)
+            .to receive(:track_execution_start)
+          expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService).to receive(:track_execution_end)
+
+          TestConcurrencyLimitWorker.perform_async('foo')
+        end
+
+        context 'when limit is set to zero' do
+          before do
+            allow(::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap).to receive(:limit_for).and_return(0)
+          end
+
+          it_behaves_like 'skip execution tracking'
+        end
+
+        context 'when limit is not defined' do
+          before do
+            ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.remove_instance_variable(:@data)
+          end
+
+          it_behaves_like 'skip execution tracking'
+        end
       end
 
       context 'when over the limit' do
diff --git a/spec/lib/gitlab/sidekiq_middleware/concurrency_limit/worker_execution_tracker_spec.rb b/spec/lib/gitlab/sidekiq_middleware/concurrency_limit/worker_execution_tracker_spec.rb
new file mode 100644
index 0000000000000000000000000000000000000000..e7b4caafa5fb6df47dbeeda933580a4299a44807
--- /dev/null
+++ b/spec/lib/gitlab/sidekiq_middleware/concurrency_limit/worker_execution_tracker_spec.rb
@@ -0,0 +1,195 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkerExecutionTracker,
+  :clean_gitlab_redis_queues_metadata, feature_category: :global_search do
+  let(:worker_class) do
+    Class.new do
+      def self.name
+        'DummyWorker'
+      end
+
+      include ApplicationWorker
+    end
+  end
+
+  let(:worker_class_name) { worker_class.name }
+
+  let(:redis_key_prefix) { 'random_prefix' }
+
+  let(:sidekiq_pid) { 'proc-abc' }
+  let(:sidekiq_tid) { 'proc-abc' }
+
+  let(:tracking_hash) { "#{redis_key_prefix}:{#{worker_class_name.underscore}}:executing" }
+  let(:tracking_elem) { "#{sidekiq_pid}:tid:#{sidekiq_tid}" }
+
+  subject(:service) { described_class.new(worker_name: worker_class_name, prefix: redis_key_prefix) }
+
+  before do
+    stub_const(worker_class_name, worker_class)
+
+    Thread.current[:sidekiq_capsule] = Sidekiq::Capsule.new('test', Sidekiq.default_configuration)
+    allow(Thread.current[:sidekiq_capsule]).to receive(:identity).and_return(sidekiq_pid)
+    allow(service).to receive(:sidekiq_tid).and_return(sidekiq_tid)
+  end
+
+  describe '#track_execution_start' do
+    subject(:track_execution_start) { service.track_execution_start }
+
+    it 'writes to Redis hash and string' do
+      track_execution_start
+
+      Gitlab::Redis::QueuesMetadata.with do |c|
+        expect(c.hexists(tracking_hash, tracking_elem)).to eq(true)
+      end
+    end
+
+    context 'when Thread.current[:sidekiq_capsule] is missing' do
+      before do
+        Thread.current[:sidekiq_capsule] = nil
+      end
+
+      it 'exits early without writing to redis' do
+        track_execution_start
+
+        Gitlab::Redis::QueuesMetadata.with do |c|
+          expect(c.hexists(tracking_hash, tracking_elem)).to eq(false)
+        end
+      end
+    end
+  end
+
+  describe '#track_execution_end' do
+    subject(:track_execution_end) { service.track_execution_end }
+
+    before do
+      service.track_execution_start
+    end
+
+    it 'clears to Redis hash and string' do
+      Gitlab::Redis::QueuesMetadata.with do |c|
+        expect { track_execution_end }
+          .to change { c.hexists(tracking_hash, tracking_elem) }.from(true).to(false)
+      end
+    end
+
+    context 'when Thread.current[:sidekiq_capsule] is missing' do
+      before do
+        Thread.current[:sidekiq_capsule] = nil
+      end
+
+      it 'exits early without writing to redis' do
+        Gitlab::Redis::QueuesMetadata.with do |c|
+          expect(c.hexists(tracking_hash, tracking_elem)).to eq(true)
+          track_execution_end
+          expect(c.hexists(tracking_hash, tracking_elem)).to eq(true)
+        end
+      end
+    end
+  end
+
+  describe '#concurrent_worker_count' do
+    let(:size) { 10 }
+
+    subject(:concurrent_worker_count) { service.concurrent_worker_count }
+
+    before do
+      Gitlab::Redis::QueuesMetadata.with do |c|
+        c.hset(tracking_hash, (1..size).flat_map { |i| [i, i] })
+      end
+    end
+
+    it 'returns hash size' do
+      expect(concurrent_worker_count).to eq(size)
+    end
+
+    context 'with empty hash' do
+      before do
+        Gitlab::Redis::QueuesMetadata.with { |c| c.del(tracking_hash) }
+      end
+
+      it 'returns 0' do
+        expect(concurrent_worker_count).to eq(0)
+      end
+    end
+  end
+
+  describe '#cleanup_stale_trackers' do
+    let(:dangling_tid) { 4567 }
+    let(:long_running_tid) { 5678 }
+    let(:invalid_process_thread_id) { 'proc-abc::4567' }
+    let(:dangling_process_thread_id) { 'proc-abc:tid:4567' }
+    let(:long_running_process_thread_id) { 'proc-abc:tid:5678' }
+
+    # Format from https://github.com/sidekiq/sidekiq/blob/v7.2.4/lib/sidekiq/api.rb#L1180
+    # The tid field in the `{pid}:work` hash contains a hash of 'payload' -> job hash.
+    def generate_sidekiq_hash(worker)
+      job_hash = { 'payload' => ::Gitlab::Json.dump({
+        'class' => worker,
+        'created_at' => Time.now.to_f - described_class::TRACKING_KEY_TTL
+      }) }
+
+      Sidekiq.dump_json(job_hash)
+    end
+
+    subject(:cleanup_stale_trackers) { service.cleanup_stale_trackers }
+
+    context 'when hash is valid' do
+      before do
+        Gitlab::Redis::QueuesMetadata.with do |r|
+          # element should not be deleted since it is within the ttl
+          r.hset(tracking_hash, tracking_elem, Time.now.utc.tv_sec - (0.1 * described_class::TRACKING_KEY_TTL.to_i))
+
+          # element should not be deleted since it is a long running process
+          r.hset(tracking_hash, long_running_process_thread_id,
+            Time.now.utc.tv_sec - (2 * described_class::TRACKING_KEY_TTL.to_i))
+
+          # element should be deleted since hash value is invalid
+          r.hset(tracking_hash, invalid_process_thread_id,
+            Time.now.utc.tv_sec - (2 * described_class::TRACKING_KEY_TTL.to_i))
+
+          # element should be deleted since it is a long running process
+          # but stale as the thread is executing another worker now
+          r.hset(tracking_hash, dangling_process_thread_id,
+            Time.now.utc.tv_sec - (2 * described_class::TRACKING_KEY_TTL.to_i))
+        end
+
+        Gitlab::SidekiqSharding::Validator.allow_unrouted_sidekiq_calls do
+          Sidekiq.redis do |r|
+            r.hset("proc-abc:work", long_running_tid, generate_sidekiq_hash(worker_class_name))
+            r.hset("proc-abc:work", dangling_process_thread_id, generate_sidekiq_hash('otherworker'))
+          end
+        end
+      end
+
+      it 'only cleans up dangling keys' do
+        expect { cleanup_stale_trackers }.to change { service.concurrent_worker_count }.from(4).to(2)
+      end
+    end
+
+    context 'when hash is invalid' do
+      let(:invalid_hash) { 'invalid' }
+
+      before do
+        Gitlab::Redis::QueuesMetadata.with do |r|
+          r.hset(tracking_hash, long_running_process_thread_id,
+            Time.now.utc.tv_sec - (2 * described_class::TRACKING_KEY_TTL.to_i))
+        end
+
+        Gitlab::SidekiqSharding::Validator.allow_unrouted_sidekiq_calls do
+          Sidekiq.redis do |r|
+            r.hset("proc-abc:work", long_running_tid, invalid_hash)
+          end
+        end
+      end
+
+      it 'tracks exception' do
+        expect(Gitlab::ErrorTracking).to receive(:track_exception).with(instance_of(JSON::ParserError),
+          worker_class: 'DummyWorker')
+
+        expect { cleanup_stale_trackers }.to change { service.concurrent_worker_count }.from(1).to(0)
+      end
+    end
+  end
+end
diff --git a/spec/lib/gitlab/sidekiq_middleware/concurrency_limit/workers_concurrency_spec.rb b/spec/lib/gitlab/sidekiq_middleware/concurrency_limit/workers_concurrency_spec.rb
index b446a8242d505924a7ad35a8c4bae65cc95074e9..8740e6d506af92c05c8ac972461110a2a456c166 100644
--- a/spec/lib/gitlab/sidekiq_middleware/concurrency_limit/workers_concurrency_spec.rb
+++ b/spec/lib/gitlab/sidekiq_middleware/concurrency_limit/workers_concurrency_spec.rb
@@ -30,9 +30,17 @@ def perform(*); end
     ]
   end
 
+  let(:concurrency_tracking_hash_content) do
+    (1..current_concurrency).flat_map { |i| [i, i] }
+  end
+
   before do
     stub_const('TestConcurrencyLimitWorker', worker_class)
     allow(described_class).to receive(:sidekiq_workers).and_return([sidekiq_worker] * current_concurrency)
+    Gitlab::Redis::QueuesMetadata.with do |c|
+      prefix = Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService::REDIS_KEY_PREFIX
+      c.hset("#{prefix}:{#{worker_class.name.underscore}}:executing", *concurrency_tracking_hash_content)
+    end
   end
 
   describe '.current_for' do
@@ -41,10 +49,21 @@ def perform(*); end
     context 'without cache' do
       let(:skip_cache) { true }
 
-      it 'returns the current concurrency', quarantine: 'https://gitlab.com/gitlab-org/gitlab/-/issues/451677' do
-        expect(described_class).to receive(:workers_uncached).and_call_original
+      it 'looks up current concurrency from hash' do
+        expect(described_class).not_to receive(:workers_uncached)
         expect(current_for).to eq(current_concurrency)
       end
+
+      context 'when sidekiq_concurrency_limit_optimized_count feature flag is disabled' do
+        before do
+          stub_feature_flags(sidekiq_concurrency_limit_optimized_count: false)
+        end
+
+        it 'returns the current concurrency', quarantine: 'https://gitlab.com/gitlab-org/gitlab/-/issues/451677' do
+          expect(described_class).to receive(:workers_uncached).and_call_original
+          expect(current_for).to eq(current_concurrency)
+        end
+      end
     end
 
     context 'with cache', :clean_gitlab_redis_cache do
@@ -55,10 +74,21 @@ def perform(*); end
         cache_setup!(tally: cached_value, lease: true)
       end
 
-      it 'returns cached current_for' do
-        expect(described_class).not_to receive(:workers_uncached)
+      it 'looks up current concurrency from hash' do
+        expect(described_class).not_to receive(:workers)
+        expect(current_for).to eq(current_concurrency)
+      end
 
-        expect(current_for).to eq(20)
+      context 'when sidekiq_concurrency_limit_optimized_count feature flag is disabled' do
+        before do
+          stub_feature_flags(sidekiq_concurrency_limit_optimized_count: false)
+        end
+
+        it 'returns cached current_for' do
+          expect(described_class).not_to receive(:workers_uncached)
+
+          expect(current_for).to eq(20)
+        end
       end
     end
   end
diff --git a/spec/lib/gitlab/sidekiq_middleware/concurrency_limit/workers_map_spec.rb b/spec/lib/gitlab/sidekiq_middleware/concurrency_limit/workers_map_spec.rb
index ea25953331d11adb22ba8569ef6cece536d20610..6b73d29bca0420c821deff6c95685a5231835352 100644
--- a/spec/lib/gitlab/sidekiq_middleware/concurrency_limit/workers_map_spec.rb
+++ b/spec/lib/gitlab/sidekiq_middleware/concurrency_limit/workers_map_spec.rb
@@ -27,21 +27,21 @@ def perform(*); end
     let(:expected_limit) { 60 }
 
     it 'accepts worker instance' do
-      expect(described_class.limit_for(worker: worker_class.new).call).to eq(expected_limit)
+      expect(described_class.limit_for(worker: worker_class.new)).to eq(expected_limit)
     end
 
     it 'accepts worker class' do
-      expect(described_class.limit_for(worker: worker_class).call).to eq(expected_limit)
+      expect(described_class.limit_for(worker: worker_class)).to eq(expected_limit)
     end
 
-    it 'returns nil for unknown worker' do
-      expect(described_class.limit_for(worker: described_class)).to be_nil
+    it 'returns 0 for unknown worker' do
+      expect(described_class.limit_for(worker: described_class)).to eq(0)
     end
 
-    it 'returns nil if the feature flag is disabled' do
+    it 'returns 0 if the feature flag is disabled' do
       stub_feature_flags(sidekiq_concurrency_limit_middleware: false)
 
-      expect(described_class.limit_for(worker: worker_class)).to be_nil
+      expect(described_class.limit_for(worker: worker_class)).to eq(0)
     end
   end
 
@@ -49,18 +49,14 @@ def perform(*); end
     subject(:over_the_limit?) { described_class.over_the_limit?(worker: worker_class) }
 
     where(:limit, :current, :result) do
-      nil        | 0   | false
-      nil        | 5   | false
-      -> { nil } | 0   | false
-      -> { nil } | 5   | false
-      -> { 0 }   | 0   | false
-      -> { 0 }   | 10  | false
-      -> { 5 }   | 10  | true
-      -> { 10 }  | 0   | false
-      -> { 10 }  | 5   | false
-      -> { -1 }  | 0   | true
-      -> { -1 }  | 1   | true
-      -> { -10 } | 10  | true
+      0   | 0   | false
+      0   | 10  | false
+      5   | 10  | true
+      10  | 0   | false
+      10  | 5   | false
+      -1  | 0   | true
+      -1  | 1   | true
+      -10 | 10  | true
     end
 
     with_them do
diff --git a/spec/lib/gitlab/sidekiq_process_spec.rb b/spec/lib/gitlab/sidekiq_process_spec.rb
new file mode 100644
index 0000000000000000000000000000000000000000..2264c480e0f7ee131970d12d0a24d5b11b01314b
--- /dev/null
+++ b/spec/lib/gitlab/sidekiq_process_spec.rb
@@ -0,0 +1,27 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe Gitlab::SidekiqProcess, feature_category: :scalability do
+  let(:cap) { Sidekiq::Capsule.new('test', Sidekiq.default_configuration) }
+
+  before do
+    Thread.current[:sidekiq_capsule] = cap
+  end
+
+  after do
+    Thread.current[:sidekiq_capsule] = nil
+  end
+
+  describe '#tid' do
+    it 'matches sidekiq internals' do
+      expect(described_class.tid).to eq(cap.tid)
+    end
+  end
+
+  describe '#pid' do
+    it 'matches sidekiq internals' do
+      expect(described_class.pid).to eq(cap.identity)
+    end
+  end
+end
diff --git a/spec/support/sidekiq.rb b/spec/support/sidekiq.rb
index ebba6bf5fddf3cb6f7586c8e6618b94facaae3c7..8aeef7218ac01126999608cf6f7f9ef76e06ec44 100644
--- a/spec/support/sidekiq.rb
+++ b/spec/support/sidekiq.rb
@@ -11,8 +11,13 @@ def gitlab_sidekiq_inline
     # see https://github.com/sidekiq/sidekiq/issues/6069
     Sidekiq::Testing.inline!
 
+    # Set a thread-local sidekiq capsule as it may be accessed in the
+    # Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkerExecutionTracker
+    Thread.current[:sidekiq_capsule] = Sidekiq::Capsule.new('test', Sidekiq.default_configuration)
+
     yield
   ensure
+    Thread.current[:sidekiq_capsule] = nil
     Sidekiq::Testing.fake! # fake is the default so we reset it to that
     redis_queues_cleanup!
     redis_queues_metadata_cleanup!
diff --git a/spec/workers/concerns/search/worker_spec.rb b/spec/workers/concerns/search/worker_spec.rb
index 2b972c15aceb901c14993eed541396f19c1274fd..0dd466b53a9a3e0257bf351086f914d9a4557ae4 100644
--- a/spec/workers/concerns/search/worker_spec.rb
+++ b/spec/workers/concerns/search/worker_spec.rb
@@ -24,6 +24,6 @@ def self.name
     limit = 55
     expect(Search).to receive(:default_concurrency_limit).and_return(limit)
 
-    expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.limit_for(worker: worker_class)&.call).to eq(limit)
+    expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.limit_for(worker: worker_class)).to eq(limit)
   end
 end