From 57e18d128c7f12740c12a0702bb8c4b2f605c251 Mon Sep 17 00:00:00 2001
From: Sashi Kumar Kumaresan <skumar@gitlab.com>
Date: Mon, 11 Dec 2023 15:10:06 +0000
Subject: [PATCH] Add publish_group feature to Gitlab::EventStore

This change adds functionality to publish
group of events to EventStore to reduce the number of
sidekiq jobs triggered for an event type.

Changelog: added
---
 doc/development/event_store.md                |  20 +++
 ee/spec/lib/ee/gitlab/event_store_spec.rb     |  12 +-
 lib/gitlab/event_store.rb                     |   4 +
 lib/gitlab/event_store/store.rb               |  16 +-
 lib/gitlab/event_store/subscriber.rb          |  14 +-
 lib/gitlab/event_store/subscription.rb        |  40 ++++-
 spec/lib/gitlab/event_store/store_spec.rb     |  70 ++++++++-
 .../gitlab/event_store/subscription_spec.rb   | 142 ++++++++++++++++++
 8 files changed, 307 insertions(+), 11 deletions(-)
 create mode 100644 spec/lib/gitlab/event_store/subscription_spec.rb

diff --git a/doc/development/event_store.md b/doc/development/event_store.md
index 357962246e5ac..ea7ccb9f7268c 100644
--- a/doc/development/event_store.md
+++ b/doc/development/event_store.md
@@ -319,6 +319,26 @@ on the subscriber Sidekiq worker, instead of `perform_async`.
 
 This technique is useful when publishing many events and leverage the Sidekiq deduplication.
 
+### Publishing group of events
+
+In some scenarios we publish multiple events of same type in a single business transaction.
+This puts additional load to Sidekiq by invoking a job for each event. In such cases, we can
+publish a group of events by calling `Gitlab::EventStore.publish_group`. This method accepts an
+array of events of similar type. By default the subscriber worker receives a group of max 10 events,
+but this can be configured by defining `group_size` parameter while creating the subscription.
+The number of published events are dispatched to the subscriber in batches based on the
+configured `group_size`. If the number of groups exceeds 100, we schedule each group with a delay
+of 10 seconds, to reduce the load on Sidekiq.
+
+```ruby
+store.subscribe ::Security::RefreshProjectPoliciesWorker,
+  to: ::ProjectAuthorizations::AuthorizationsChangedEvent,
+  delay: 1.minute,
+  group_size: 25
+```
+
+The `handle_event` method in the subscriber worker is called for each of the events in the group.
+
 ## Testing
 
 ### Testing the publisher
diff --git a/ee/spec/lib/ee/gitlab/event_store_spec.rb b/ee/spec/lib/ee/gitlab/event_store_spec.rb
index 533371a5076c7..a19406b0810a1 100644
--- a/ee/spec/lib/ee/gitlab/event_store_spec.rb
+++ b/ee/spec/lib/ee/gitlab/event_store_spec.rb
@@ -2,7 +2,7 @@
 
 require 'spec_helper'
 
-RSpec.describe Gitlab::EventStore do
+RSpec.describe Gitlab::EventStore, feature_category: :shared do
   describe '.instance' do
     it 'returns a store with CE and EE subscriptions' do
       instance = described_class.instance
@@ -23,4 +23,14 @@
       )
     end
   end
+
+  describe '.publish_group' do
+    let(:events) { [] }
+
+    it 'calls publish_group of instance' do
+      expect(described_class.instance).to receive(:publish_group).with(events)
+
+      described_class.publish_group(events)
+    end
+  end
 end
diff --git a/lib/gitlab/event_store.rb b/lib/gitlab/event_store.rb
index 48af9d2671439..b422fd061ff93 100644
--- a/lib/gitlab/event_store.rb
+++ b/lib/gitlab/event_store.rb
@@ -17,6 +17,10 @@ def self.publish(event)
       instance.publish(event)
     end
 
+    def self.publish_group(events)
+      instance.publish_group(events)
+    end
+
     def self.instance
       @instance ||= Store.new { |store| configure!(store) }
     end
diff --git a/lib/gitlab/event_store/store.rb b/lib/gitlab/event_store/store.rb
index 318745cc19262..c558362122b12 100644
--- a/lib/gitlab/event_store/store.rb
+++ b/lib/gitlab/event_store/store.rb
@@ -15,12 +15,12 @@ def initialize
         lock!
       end
 
-      def subscribe(worker, to:, if: nil, delay: nil)
+      def subscribe(worker, to:, if: nil, delay: nil, group_size: nil)
         condition = binding.local_variable_get('if')
 
         Array(to).each do |event|
           validate_subscription!(worker, event)
-          subscriptions[event] << Gitlab::EventStore::Subscription.new(worker, condition, delay)
+          subscriptions[event] << Gitlab::EventStore::Subscription.new(worker, condition, delay, group_size)
         end
       end
 
@@ -34,6 +34,18 @@ def publish(event)
         end
       end
 
+      def publish_group(events)
+        event_class = events.first.class
+
+        unless events.all? { |e| e.class < Event && e.instance_of?(event_class) }
+          raise InvalidEvent, "Not all events being published are valid"
+        end
+
+        subscriptions.fetch(event_class, []).each do |subscription|
+          subscription.consume_events(events)
+        end
+      end
+
       private
 
       def lock!
diff --git a/lib/gitlab/event_store/subscriber.rb b/lib/gitlab/event_store/subscriber.rb
index da95d3cfcfa8b..81770624cd9ca 100644
--- a/lib/gitlab/event_store/subscriber.rb
+++ b/lib/gitlab/event_store/subscriber.rb
@@ -29,16 +29,22 @@ module Subscriber
       def perform(event_type, data)
         raise InvalidEvent, event_type unless self.class.const_defined?(event_type)
 
-        event = event_type.constantize.new(
-          data: data.with_indifferent_access
-        )
+        event_type_class = event_type.constantize
 
-        handle_event(event)
+        Array.wrap(data).each do |single_event_data|
+          handle_event(construct_event(event_type_class, single_event_data))
+        end
       end
 
       def handle_event(event)
         raise NotImplementedError, 'you must implement this methods in order to handle events'
       end
+
+      private
+
+      def construct_event(event_type, event_data)
+        event_type.new(data: event_data.with_indifferent_access)
+      end
     end
   end
 end
diff --git a/lib/gitlab/event_store/subscription.rb b/lib/gitlab/event_store/subscription.rb
index 81a65f9a8ff05..f39bbc2aaf0dc 100644
--- a/lib/gitlab/event_store/subscription.rb
+++ b/lib/gitlab/event_store/subscription.rb
@@ -3,12 +3,17 @@
 module Gitlab
   module EventStore
     class Subscription
-      attr_reader :worker, :condition, :delay
+      DEFAULT_GROUP_SIZE = 10
+      SCHEDULING_BATCH_SIZE = 100
+      SCHEDULING_BATCH_DELAY = 10.seconds
 
-      def initialize(worker, condition, delay)
+      attr_reader :worker, :condition, :delay, :group_size
+
+      def initialize(worker, condition, delay, group_size)
         @worker = worker
         @condition = condition
         @delay = delay
+        @group_size = group_size || DEFAULT_GROUP_SIZE
       end
 
       def consume_event(event)
@@ -29,6 +34,30 @@ def consume_event(event)
         Gitlab::ErrorTracking.track_and_raise_for_dev_exception(e, event_class: event.class.name, event_data: event.data)
       end
 
+      def consume_events(events)
+        event_class = events.first.class
+        unless events.all? { |e| e.class < Event && e.instance_of?(event_class) }
+          raise InvalidEvent, "Events being published are not an instance of Gitlab::EventStore::Event"
+        end
+
+        matched_events = events.select { |event| condition_met?(event) }
+        worker_args = events_worker_args(event_class, matched_events)
+
+        # rubocop:disable Scalability/BulkPerformWithContext -- Context info is already available in `ApplicationContext` here.
+        if worker_args.size > SCHEDULING_BATCH_SIZE
+          # To reduce the number of concurrent jobs, we batch the group of events and add delay between each batch.
+          # We add a delay of 1s as bulk_perform_in does not support 0s delay.
+          worker.bulk_perform_in(delay || 1.second, worker_args, batch_size: SCHEDULING_BATCH_SIZE, batch_delay: SCHEDULING_BATCH_DELAY)
+        elsif delay
+          worker.bulk_perform_in(delay, worker_args)
+        else
+          worker.bulk_perform_async(worker_args)
+        end
+        # rubocop:enable Scalability/BulkPerformWithContext
+      rescue StandardError => e
+        Gitlab::ErrorTracking.track_and_raise_for_dev_exception(e, event_class: event_class, events: events.map(&:data))
+      end
+
       private
 
       def condition_met?(event)
@@ -36,6 +65,13 @@ def condition_met?(event)
 
         condition.call(event)
       end
+
+      def events_worker_args(event_class, events)
+        events
+          .map { |event| event.data.deep_stringify_keys }
+          .each_slice(group_size)
+          .map { |events_data_group| [event_class.name, events_data_group] }
+      end
     end
   end
 end
diff --git a/spec/lib/gitlab/event_store/store_spec.rb b/spec/lib/gitlab/event_store/store_spec.rb
index 04d0706c1306a..e747027db980c 100644
--- a/spec/lib/gitlab/event_store/store_spec.rb
+++ b/spec/lib/gitlab/event_store/store_spec.rb
@@ -263,12 +263,59 @@ def schema
     end
   end
 
+  describe '#publish_group' do
+    let(:event1) { event_klass.new(data: { name: 'Bob', id: 123 }) }
+    let(:event2) { event_klass.new(data: { name: 'Alice', id: 456 }) }
+    let(:event3) { event_klass.new(data: { name: 'Eva', id: 789 }) }
+
+    let(:group_size) { 3 }
+    let(:events) { [event1, event2, event3] }
+    let(:serialized_data) { events.map(&:data).map(&:deep_stringify_keys) }
+
+    let(:store) do
+      described_class.new do |s|
+        s.subscribe worker, to: event_klass, group_size: group_size
+      end
+    end
+
+    subject { store.publish_group(events) }
+
+    context 'with valid events' do
+      it 'calls consume_events of subscription' do
+        expect(store.subscriptions[event_klass].first).to receive(:consume_events).with(events)
+
+        subject
+      end
+    end
+
+    context 'when there is invalid event' do
+      let(:events) { [event1, invalid_event] }
+
+      context 'when event is invalid' do
+        let(:invalid_event) { stub_const('TestEvent', {}) }
+
+        it 'raises InvalidEvent error' do
+          expect { subject }.to raise_error(Gitlab::EventStore::InvalidEvent)
+        end
+      end
+
+      context 'when one of the events is a different event' do
+        let(:invalid_event) { stub_const('DifferentEvent', Class.new(Gitlab::EventStore::Event)) }
+
+        it 'raises InvalidEvent error' do
+          expect { subject }.to raise_error(Gitlab::EventStore::InvalidEvent)
+        end
+      end
+    end
+  end
+
   describe 'subscriber' do
     let(:data) { { name: 'Bob', id: 123 } }
+    let(:event_data) { data }
     let(:event_name) { event.class.name }
     let(:worker_instance) { worker.new }
 
-    subject { worker_instance.perform(event_name, data) }
+    subject { worker_instance.perform(event_name, event_data) }
 
     it 'is a Sidekiq worker' do
       expect(worker_instance).to be_a(ApplicationWorker)
@@ -278,7 +325,7 @@ def schema
       expect(worker_instance).to receive(:handle_event).with(instance_of(event.class))
 
       expect_any_instance_of(event.class) do |event|
-        expect(event).to receive(:data).and_return(data)
+        expect(event).to receive(:data).and_return(event_data)
       end
 
       subject
@@ -299,5 +346,24 @@ def schema
         expect { subject }.to raise_error(NotImplementedError)
       end
     end
+
+    context 'when there are multiple events' do
+      let(:event_data) { [{ name: 'Bob', id: 123 }, { name: 'Alice', id: 456 }] }
+
+      let(:first_event) { event_klass.new(data: event_data.first) }
+      let(:second_event) { event_klass.new(data: event_data.last) }
+
+      before do
+        allow(worker_instance).to receive(:construct_event).with(event_klass, event_data.first).and_return(first_event)
+        allow(worker_instance).to receive(:construct_event).with(event_klass, event_data.last).and_return(second_event)
+      end
+
+      it 'calls handle_event multiple times' do
+        expect(worker_instance).to receive(:handle_event).once.with(first_event)
+        expect(worker_instance).to receive(:handle_event).once.with(second_event)
+
+        subject
+      end
+    end
   end
 end
diff --git a/spec/lib/gitlab/event_store/subscription_spec.rb b/spec/lib/gitlab/event_store/subscription_spec.rb
new file mode 100644
index 0000000000000..2a87f48be10e3
--- /dev/null
+++ b/spec/lib/gitlab/event_store/subscription_spec.rb
@@ -0,0 +1,142 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe Gitlab::EventStore::Subscription, feature_category: :shared do
+  let(:worker) do
+    stub_const('EventSubscriber', Class.new).tap do |klass|
+      klass.class_eval do
+        include Gitlab::EventStore::Subscriber
+
+        def handle_event(event)
+          event.data
+        end
+      end
+    end
+  end
+
+  let(:event_klass) { stub_const('TestEvent', Class.new(Gitlab::EventStore::Event)) }
+  let(:event) { event_klass.new(data: data) }
+
+  let(:delay) { nil }
+  let(:condition) { nil }
+  let(:group_size) { nil }
+
+  subject(:subscription) { described_class.new(worker, condition, delay, group_size) }
+
+  before do
+    event_klass.class_eval do
+      def schema
+        {
+          'required' => %w[name id],
+          'type' => 'object',
+          'properties' => {
+            'name' => { 'type' => 'string' },
+            'id' => { 'type' => 'integer' }
+          }
+        }
+      end
+    end
+  end
+
+  describe '#consume_events' do
+    let(:event1) { event_klass.new(data: { name: 'Bob', id: 123 }) }
+    let(:event2) { event_klass.new(data: { name: 'Alice', id: 456 }) }
+    let(:event3) { event_klass.new(data: { name: 'Eva', id: 789 }) }
+
+    let(:group_size) { 3 }
+    let(:events) { [event1, event2, event3] }
+    let(:serialized_data) { events.map(&:data).map(&:deep_stringify_keys) }
+
+    subject(:consume_events) { subscription.consume_events(events) }
+
+    context 'with invalid events' do
+      let(:events) { [event1, invalid_event] }
+
+      context 'when event is invalid' do
+        let(:invalid_event) { stub_const('TestEvent', Class.new { attr_reader :data }).new }
+
+        it 'raises InvalidEvent error' do
+          expect { consume_events }.to raise_error(Gitlab::EventStore::InvalidEvent)
+        end
+      end
+
+      context 'when one of the events is a different event' do
+        let(:invalid_event_klass) { stub_const('DifferentEvent', Class.new(Gitlab::EventStore::Event)) }
+        let(:invalid_event) { invalid_event_klass.new(data: {}) }
+
+        before do
+          invalid_event_klass.class_eval do
+            def schema
+              {
+                'type' => 'object',
+                'properties' => {}
+              }
+            end
+          end
+        end
+
+        it 'raises InvalidEvent error' do
+          expect { consume_events }.to raise_error(Gitlab::EventStore::InvalidEvent)
+        end
+      end
+    end
+
+    context 'when grouped events size is more than batch scheduling size' do
+      let(:group_size) { 2 }
+
+      before do
+        stub_const("#{described_class}::SCHEDULING_BATCH_SIZE", 1)
+      end
+
+      it 'dispatches the events to the worker with batch parameters' do
+        expect(worker).to receive(:bulk_perform_in).with(
+          1.second,
+          [['TestEvent', serialized_data.take(2)], ['TestEvent', serialized_data.drop(2)]],
+          batch_size: 1,
+          batch_delay: 10.seconds
+        )
+
+        consume_events
+      end
+
+      context 'with delayed dispatching of event' do
+        let(:delay) { 1.minute }
+
+        it 'dispatches the events to the worker with batch parameters and delay' do
+          expect(worker).to receive(:bulk_perform_in).with(
+            1.minute,
+            [['TestEvent', serialized_data.take(2)], ['TestEvent', serialized_data.drop(2)]],
+            batch_size: 1,
+            batch_delay: 10.seconds
+          )
+
+          consume_events
+        end
+      end
+    end
+
+    context 'when subscription has grouped dispatching of events' do
+      let(:group_size) { 2 }
+
+      it 'dispatches the events to the worker in group' do
+        expect(worker).to receive(:bulk_perform_async).once.with([
+          ['TestEvent', serialized_data.take(2)],
+          ['TestEvent', serialized_data.drop(2)]
+        ])
+
+        consume_events
+      end
+    end
+
+    context 'when subscription has delayed dispatching of event' do
+      let(:delay) { 1.minute }
+
+      it 'dispatches the events to the worker after some time' do
+        expect(worker).to receive(:bulk_perform_in).with(1.minute, [['TestEvent', serialized_data]])
+
+        consume_events
+      end
+    end
+  end
+end
-- 
GitLab