From 6981a61e8ae05050906b69c052f69ddd4ce6ea0a Mon Sep 17 00:00:00 2001
From: Gregorius Marco <gmarco@gitlab.com>
Date: Thu, 26 Oct 2023 17:47:16 +0000
Subject: [PATCH] Revert "Disable keywatcher in setuphelper if workhorse uses
 Redis cluster"

This reverts commit e504f5943faff02c71464d71db3f951a05a52f99.
---
 app/models/active_session.rb                  | 36 +++++++++++++++----
 .../ci/build_trace_chunks/redis_base.rb       |  6 +++-
 config/initializers/7_redis.rb                |  4 +++
 config/initializers/action_cable.rb           |  8 +++++
 config/redis.yml.example                      | 12 +++++++
 .../elastic/process_bookkeeping_service.rb    |  6 +++-
 lib/gitlab/issues/rebalancing/state.rb        |  2 +-
 lib/gitlab/redis/cluster_util.rb              |  9 +++++
 lib/gitlab/sidekiq_status.rb                  | 13 +++++--
 scripts/prepare_build.sh                      |  1 +
 .../lib/gitlab/instrumentation_helper_spec.rb | 12 +++++--
 .../gitlab/issues/rebalancing/state_spec.rb   | 12 +++++--
 spec/models/ci/runner_spec.rb                 |  2 +-
 13 files changed, 107 insertions(+), 16 deletions(-)

diff --git a/app/models/active_session.rb b/app/models/active_session.rb
index e42f9eeef238d..9756e1b7dd3ec 100644
--- a/app/models/active_session.rb
+++ b/app/models/active_session.rb
@@ -84,7 +84,7 @@ def self.set(user, request)
       )
 
       Gitlab::Instrumentation::RedisClusterValidator.allow_cross_slot_commands do
-        redis.pipelined do |pipeline|
+        Gitlab::Redis::CrossSlot::Pipeline.new(redis).pipelined do |pipeline|
           pipeline.setex(
             key_name(user.id, session_private_id),
             expiry,
@@ -135,9 +135,15 @@ def self.destroy_sessions(redis, user, session_ids)
 
     redis.srem(lookup_key_name(user.id), session_ids)
 
+    session_keys = rack_session_keys(session_ids)
     Gitlab::Instrumentation::RedisClusterValidator.allow_cross_slot_commands do
-      redis.del(key_names)
-      redis.del(rack_session_keys(session_ids))
+      if Gitlab::Redis::ClusterUtil.cluster?(redis)
+        Gitlab::Redis::ClusterUtil.batch_unlink(key_names, redis)
+        Gitlab::Redis::ClusterUtil.batch_unlink(session_keys, redis)
+      else
+        redis.del(key_names)
+        redis.del(session_keys)
+      end
     end
   end
 
@@ -206,7 +212,13 @@ def self.sessions_from_ids(session_ids)
 
       session_keys.each_slice(SESSION_BATCH_SIZE).flat_map do |session_keys_batch|
         Gitlab::Instrumentation::RedisClusterValidator.allow_cross_slot_commands do
-          redis.mget(session_keys_batch).compact.map do |raw_session|
+          raw_sessions = if Gitlab::Redis::ClusterUtil.cluster?(redis)
+                           Gitlab::Redis::ClusterUtil.batch_get(session_keys_batch, redis)
+                         else
+                           redis.mget(session_keys_batch)
+                         end
+
+          raw_sessions.compact.map do |raw_session|
             load_raw_session(raw_session)
           end
         end
@@ -249,7 +261,13 @@ def dump
 
     found = Gitlab::Instrumentation::RedisClusterValidator.allow_cross_slot_commands do
       entry_keys = session_ids.map { |session_id| key_name(user_id, session_id) }
-      session_ids.zip(redis.mget(entry_keys)).to_h
+      entries = if Gitlab::Redis::ClusterUtil.cluster?(redis)
+                  Gitlab::Redis::ClusterUtil.batch_get(entry_keys, redis)
+                else
+                  redis.mget(entry_keys)
+                end
+
+      session_ids.zip(entries).to_h
     end
 
     found.compact!
@@ -258,7 +276,13 @@ def dump
 
     fallbacks = Gitlab::Instrumentation::RedisClusterValidator.allow_cross_slot_commands do
       entry_keys = missing.map { |session_id| key_name_v1(user_id, session_id) }
-      missing.zip(redis.mget(entry_keys)).to_h
+      entries = if Gitlab::Redis::ClusterUtil.cluster?(redis)
+                  Gitlab::Redis::ClusterUtil.batch_get(entry_keys, redis)
+                else
+                  redis.mget(entry_keys)
+                end
+
+      missing.zip(entries).to_h
     end
 
     fallbacks.merge(found.compact)
diff --git a/app/models/ci/build_trace_chunks/redis_base.rb b/app/models/ci/build_trace_chunks/redis_base.rb
index 3b7a844d12250..5f6b5c30a6ae4 100644
--- a/app/models/ci/build_trace_chunks/redis_base.rb
+++ b/app/models/ci/build_trace_chunks/redis_base.rb
@@ -71,7 +71,11 @@ def delete_keys(keys)
         with_redis do |redis|
           # https://gitlab.com/gitlab-org/gitlab/-/issues/224171
           Gitlab::Instrumentation::RedisClusterValidator.allow_cross_slot_commands do
-            redis.del(keys)
+            if Gitlab::Redis::ClusterUtil.cluster?(redis)
+              Gitlab::Redis::ClusterUtil.batch_unlink(keys, redis)
+            else
+              redis.del(keys)
+            end
           end
         end
       end
diff --git a/config/initializers/7_redis.rb b/config/initializers/7_redis.rb
index 060d0a8a67bd3..25c2c6aa11f0a 100644
--- a/config/initializers/7_redis.rb
+++ b/config/initializers/7_redis.rb
@@ -27,6 +27,10 @@
 Redis::Cluster::CommandLoader.prepend(Gitlab::Patch::CommandLoader)
 Redis::Cluster.prepend(Gitlab::Patch::RedisCluster)
 
+if Gitlab::Redis::Workhorse.params[:cluster].present?
+  raise "Do not configure workhorse with a Redis Cluster as pub/sub commands are not cluster-compatible."
+end
+
 # Make sure we initialize a Redis connection pool before multi-threaded
 # execution starts by
 # 1. Sidekiq
diff --git a/config/initializers/action_cable.rb b/config/initializers/action_cable.rb
index 0d2073586be4d..fb52ac6eb8adc 100644
--- a/config/initializers/action_cable.rb
+++ b/config/initializers/action_cable.rb
@@ -11,6 +11,14 @@
 
 ActionCable::SubscriptionAdapter::Base.prepend(Gitlab::Patch::ActionCableSubscriptionAdapterIdentifier)
 
+using_redis_cluster = begin
+  Rails.application.config_for(:cable)[:cluster].present?
+rescue RuntimeError
+  # config/cable.yml does not exist, but that is not the purpose of this check
+end
+
+raise "Do not configure cable.yml with a Redis Cluster as ActionCable only works with Redis." if using_redis_cluster
+
 # https://github.com/rails/rails/blob/bb5ac1623e8de08c1b7b62b1368758f0d3bb6379/actioncable/lib/action_cable/subscription_adapter/redis.rb#L18
 ActionCable::SubscriptionAdapter::Redis.redis_connector = lambda do |config|
   args = config.except(:adapter, :channel_prefix)
diff --git a/config/redis.yml.example b/config/redis.yml.example
index 9d884038af78e..a391ae36a65fa 100644
--- a/config/redis.yml.example
+++ b/config/redis.yml.example
@@ -18,6 +18,11 @@ development:
   queues_metadata:
     cluster:
       - redis://localhost:7001
+  shared_state:
+    cluster:
+      - redis://localhost:7001
+  workhorse:
+    url: redis://localhost:6379
 
 test:
   chat:
@@ -38,3 +43,10 @@ test:
   queues_metadata:
     cluster:
       - redis://localhost:7001
+  shared_state:
+    cluster:
+      - redis://localhost:7001
+  # pubsub and workhorse are not redis-cluster compatible
+  # even though they fall-back to shared_state
+  workhorse:
+    url: redis://localhost:6379
diff --git a/ee/app/services/elastic/process_bookkeeping_service.rb b/ee/app/services/elastic/process_bookkeeping_service.rb
index dbec655f51890..a74043278052f 100644
--- a/ee/app/services/elastic/process_bookkeeping_service.rb
+++ b/ee/app/services/elastic/process_bookkeeping_service.rb
@@ -79,7 +79,11 @@ def clear_tracking!
           Gitlab::Instrumentation::RedisClusterValidator.allow_cross_slot_commands do
             keys = SHARDS.map { |m| [redis_set_key(m), redis_score_key(m)] }.flatten
 
-            redis.unlink(*keys)
+            if Gitlab::Redis::ClusterUtil.cluster?(redis)
+              Gitlab::Redis::ClusterUtil.batch_unlink(keys, redis)
+            else
+              redis.unlink(*keys)
+            end
           end
         end
       end
diff --git a/lib/gitlab/issues/rebalancing/state.rb b/lib/gitlab/issues/rebalancing/state.rb
index 12cc5f6e5ddf4..c60dac6f571e5 100644
--- a/lib/gitlab/issues/rebalancing/state.rb
+++ b/lib/gitlab/issues/rebalancing/state.rb
@@ -100,7 +100,7 @@ def remove_current_project_id_cache
         def refresh_keys_expiration
           with_redis do |redis|
             Gitlab::Instrumentation::RedisClusterValidator.allow_cross_slot_commands do
-              redis.pipelined do |pipeline|
+              Gitlab::Redis::CrossSlot::Pipeline.new(redis).pipelined do |pipeline|
                 pipeline.expire(issue_ids_key, REDIS_EXPIRY_TIME)
                 pipeline.expire(current_index_key, REDIS_EXPIRY_TIME)
                 pipeline.expire(current_project_key, REDIS_EXPIRY_TIME)
diff --git a/lib/gitlab/redis/cluster_util.rb b/lib/gitlab/redis/cluster_util.rb
index 5f1f39b523761..9e307940de3d1 100644
--- a/lib/gitlab/redis/cluster_util.rb
+++ b/lib/gitlab/redis/cluster_util.rb
@@ -26,6 +26,15 @@ def batch_unlink(keys, redis)
           end
           expired_count
         end
+
+        # Redis cluster alternative to mget
+        def batch_get(keys, redis)
+          keys.each_slice(1000).flat_map do |subset|
+            Gitlab::Redis::CrossSlot::Pipeline.new(redis).pipelined do |pipeline|
+              subset.map { |key| pipeline.get(key) }
+            end
+          end
+        end
       end
     end
   end
diff --git a/lib/gitlab/sidekiq_status.rb b/lib/gitlab/sidekiq_status.rb
index 778d278146d2f..ae4aca7ff921b 100644
--- a/lib/gitlab/sidekiq_status.rb
+++ b/lib/gitlab/sidekiq_status.rb
@@ -94,8 +94,17 @@ def self.job_status(job_ids)
 
       keys = job_ids.map { |jid| key_for(jid) }
 
-      with_redis { |redis| redis.mget(*keys) }
-        .map { |result| !result.nil? }
+      status = with_redis do |redis|
+        Gitlab::Instrumentation::RedisClusterValidator.allow_cross_slot_commands do
+          if Gitlab::Redis::ClusterUtil.cluster?(redis)
+            Gitlab::Redis::ClusterUtil.batch_get(keys, redis)
+          else
+            redis.mget(*keys)
+          end
+        end
+      end
+
+      status.map { |result| !result.nil? }
     end
 
     # Returns the JIDs that are completed
diff --git a/scripts/prepare_build.sh b/scripts/prepare_build.sh
index 36fe4a010a0d3..4f644812aa706 100644
--- a/scripts/prepare_build.sh
+++ b/scripts/prepare_build.sh
@@ -21,6 +21,7 @@ sed -i 's|url:.*$|url: redis://redis:6379|g' config/resque.yml
 if [[ "$USE_REDIS_CLUSTER" != "false" ]] && [[ "$SETUP_DB" != "false" ]]; then
   cp config/redis.yml.example config/redis.yml
   sed -i 's|- .*$|- redis://rediscluster:7001|g' config/redis.yml
+  sed -i 's|url:.*$|url: redis://redis:6379|g' config/redis.yml
 fi
 
 setup_database_yml
diff --git a/spec/lib/gitlab/instrumentation_helper_spec.rb b/spec/lib/gitlab/instrumentation_helper_spec.rb
index 698c8a37d48b8..072a9c5f00894 100644
--- a/spec/lib/gitlab/instrumentation_helper_spec.rb
+++ b/spec/lib/gitlab/instrumentation_helper_spec.rb
@@ -7,6 +7,7 @@
 RSpec.describe Gitlab::InstrumentationHelper, :clean_gitlab_redis_repository_cache, :clean_gitlab_redis_cache,
                :use_null_store_as_repository_cache, feature_category: :scalability do
   using RSpec::Parameterized::TableSyntax
+  include RedisHelpers
 
   describe '.add_instrumentation_data', :request_store do
     let(:payload) { {} }
@@ -39,11 +40,18 @@
     end
 
     context 'when Redis calls are made' do
+      let_it_be(:redis_store_class) { define_helper_redis_store_class }
+
+      before do # init redis connection with `test` env details
+        redis_store_class.with(&:ping)
+        RequestStore.clear!
+      end
+
       it 'adds Redis data and omits Gitaly data' do
         stub_rails_env('staging') # to avoid raising CrossSlotError
-        Gitlab::Redis::Sessions.with { |redis| redis.mset('test-cache', 123, 'test-cache2', 123) }
+        redis_store_class.with { |redis| redis.mset('test-cache', 123, 'test-cache2', 123) }
         Gitlab::Instrumentation::RedisClusterValidator.allow_cross_slot_commands do
-          Gitlab::Redis::Sessions.with { |redis| redis.mget('cache-test', 'cache-test-2') }
+          redis_store_class.with { |redis| redis.mget('cache-test', 'cache-test-2') }
         end
         Gitlab::Redis::Queues.with { |redis| redis.set('test-queues', 321) }
 
diff --git a/spec/lib/gitlab/issues/rebalancing/state_spec.rb b/spec/lib/gitlab/issues/rebalancing/state_spec.rb
index b457e0bf1dc99..a0ea5fec8ec86 100644
--- a/spec/lib/gitlab/issues/rebalancing/state_spec.rb
+++ b/spec/lib/gitlab/issues/rebalancing/state_spec.rb
@@ -231,8 +231,16 @@ def generate_and_cache_issues_ids(count:, position_offset: 0, position_direction
 
   def check_existing_keys
     index = 0
-    # spec only, we do not actually scan keys in the code
-    recently_finished_keys_count = Gitlab::Redis::SharedState.with { |redis| redis.scan(0, match: "#{described_class::RECENTLY_FINISHED_REBALANCE_PREFIX}:*") }.last.count
+    cursor = '0'
+    recently_finished_keys_count = 0
+
+    # loop to scan since it may run against a Redis Cluster
+    loop do
+      # spec only, we do not actually scan keys in the code
+      cursor, items = Gitlab::Redis::SharedState.with { |redis| redis.scan(cursor, match: "#{described_class::RECENTLY_FINISHED_REBALANCE_PREFIX}:*") }
+      recently_finished_keys_count += items.count
+      break if cursor == '0'
+    end
 
     index += 1 if rebalance_caching.get_current_index > 0
     index += 1 if rebalance_caching.get_current_project_id.present?
diff --git a/spec/models/ci/runner_spec.rb b/spec/models/ci/runner_spec.rb
index 7a88b133a98d4..538eb51387ef7 100644
--- a/spec/models/ci/runner_spec.rb
+++ b/spec/models/ci/runner_spec.rb
@@ -1059,7 +1059,7 @@ def stub_redis_runner_contacted_at(value)
     end
 
     def value_in_queues
-      Gitlab::Redis::SharedState.with do |redis|
+      Gitlab::Redis::Workhorse.with do |redis|
         runner_queue_key = runner.send(:runner_queue_key)
         redis.get(runner_queue_key)
       end
-- 
GitLab