diff --git a/ee/lib/audit_events/streaming/base_streamer.rb b/ee/lib/audit_events/streaming/base_streamer.rb new file mode 100644 index 0000000000000000000000000000000000000000..10fc86ab948f19147dda645a0800aaa9cdf0746a --- /dev/null +++ b/ee/lib/audit_events/streaming/base_streamer.rb @@ -0,0 +1,73 @@ +# frozen_string_literal: true + +module AuditEvents + module Streaming + class BaseStreamer + include Gitlab::InternalEventsTracking + + INTERNAL_EVENTS = %w[delete_epic delete_issue delete_merge_request delete_work_item].freeze + STREAMABLE_NOT_IMPLEMENTED_MESSAGE = 'Subclasses must implement the `streamable?` method' + DESTINATIONS_NOT_IMPLEMENTED_MESSAGE = 'Subclasses must implement the `destinations` method' + STREAMER_CATEGORY_NOT_FOUND_MESSAGE = 'Streamer class for category not found' + + attr_reader :event_type, :audit_event + + STREAMER_DESTINATIONS = { + 'aws' => AuditEvents::Streaming::Destinations::AmazonS3StreamDestination, + 'gcp' => AuditEvents::Streaming::Destinations::GoogleCloudLoggingStreamDestination, + 'http' => AuditEvents::Streaming::Destinations::HttpStreamDestination + }.freeze + + def initialize(event_type, audit_event) + @event_type = event_type + @audit_event = audit_event + end + + def streamable? + raise NotImplementedError, _(STREAMABLE_NOT_IMPLEMENTED_MESSAGE) + end + + def execute + return unless streamable? + + destinations.each do |destination| + track_and_stream(destination) + end + end + + private + + def destinations + raise NotImplementedError, _(DESTINATIONS_NOT_IMPLEMENTED_MESSAGE) + end + + def track_and_stream(destination) + track_audit_event + stream_to_destination(destination) + rescue StandardError => e + Gitlab::ErrorTracking.track_exception(e) + end + + def stream_to_destination(destination) + streamer_cls = STREAMER_DESTINATIONS[destination.category] + + raise ArgumentError, _(STREAMER_CATEGORY_NOT_FOUND_MESSAGE) unless streamer_cls + + streamer = streamer_cls.new(event_type, audit_event, destination) + streamer.stream + end + + def track_audit_event + return unless Gitlab::UsageDataCounters::StreamingAuditEventTypeCounter::KNOWN_EVENTS.include? event_type + + if event_type.in?(INTERNAL_EVENTS) + track_internal_event("trigger_audit_event", additional_properties: { label: event_type }) + else + Gitlab::UsageDataCounters::StreamingAuditEventTypeCounter.count(event_type) + end + rescue Redis::CannotConnectError => e + Gitlab::ErrorTracking.log_exception(e) + end + end + end +end diff --git a/ee/lib/audit_events/streaming/group/streamer.rb b/ee/lib/audit_events/streaming/group/streamer.rb new file mode 100644 index 0000000000000000000000000000000000000000..ed2402eac41a789ead7272376d88615b638f0648 --- /dev/null +++ b/ee/lib/audit_events/streaming/group/streamer.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true + +module AuditEvents + module Streaming + module Group + class Streamer < BaseStreamer + def streamable? + group = audit_event.root_group_entity + group.present? && + group.licensed_feature_available?(:external_audit_events) && + group.external_audit_event_streaming_destinations.exists? + end + + def destinations + group = audit_event.root_group_entity + group.present? ? group.external_audit_event_streaming_destinations.to_a : [] + end + end + end + end +end diff --git a/ee/lib/audit_events/streaming/instance/streamer.rb b/ee/lib/audit_events/streaming/instance/streamer.rb new file mode 100644 index 0000000000000000000000000000000000000000..1d495b1a106f3c3792d3f7afaaecd64079c70567 --- /dev/null +++ b/ee/lib/audit_events/streaming/instance/streamer.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +module AuditEvents + module Streaming + module Instance + class Streamer < BaseStreamer + def streamable? + ::License.feature_available?(:external_audit_events) && + AuditEvents::Instance::ExternalStreamingDestination.exists? + end + + def destinations + AuditEvents::Instance::ExternalStreamingDestination.all + end + end + end + end +end diff --git a/ee/spec/lib/audit_events/streaming/base_streamer_spec.rb b/ee/spec/lib/audit_events/streaming/base_streamer_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..4fce6533937192669e38f414059980c06a6dbba1 --- /dev/null +++ b/ee/spec/lib/audit_events/streaming/base_streamer_spec.rb @@ -0,0 +1,177 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe AuditEvents::Streaming::BaseStreamer, feature_category: :audit_events do + let_it_be(:audit_event) { create(:audit_event, :group_event) } + let(:event_type) { 'event_type ' } + let(:streamer) { described_class.new(event_type, audit_event) } + + describe '#initialize' do + it 'sets audit operation and event' do + expect(streamer.event_type).to eq(event_type) + expect(streamer.audit_event).to eq(audit_event) + end + end + + describe '#streamable?' do + it 'raises NotImplementedError' do + expect { streamer.streamable? }.to raise_error(NotImplementedError) + end + end + + describe '#destinations' do + it 'raises NotImplementedError' do + expect { streamer.send(:destinations) }.to raise_error(NotImplementedError) + end + end + + describe '#execute' do + let(:destination) { build(:audit_events_group_external_streaming_destination, :http) } + let(:http_streamer) { instance_double(AuditEvents::Streaming::Destinations::HttpStreamDestination) } + let(:test_streamer) do + dest = destination + Class.new(described_class) do + def streamable? + true + end + + define_method(:destinations) { [dest] } + end + end + + subject(:streamer_execute) { test_streamer.new(event_type, audit_event).execute } + + before do + allow(AuditEvents::Streaming::Destinations::HttpStreamDestination) + .to receive(:new) + .and_return(http_streamer) + allow(http_streamer).to receive(:stream) + end + + context 'when not streamable' do + before do + instance = instance_double(described_class, streamable?: false, destinations: [destination], execute: nil) + allow(test_streamer).to receive(:new).and_return(instance) + end + + it 'does not stream to destinations' do + expect(http_streamer).not_to receive(:stream) + + streamer_execute + end + end + + context 'when streamable' do + specify do + expect(http_streamer).to receive(:stream) + + streamer_execute + end + end + end + + describe '#track_and_stream' do + let(:destination) { build(:audit_events_group_external_streaming_destination, :http) } + + it 'tracks exception when error occurs' do + allow(streamer).to receive(:track_audit_event).and_raise(StandardError) + + expect(Gitlab::ErrorTracking).to receive(:track_exception).with(instance_of(StandardError)) + + streamer.send(:track_and_stream, destination) + end + end + + describe '#stream_to_destination' do + let(:destination) { create(:audit_events_group_external_streaming_destination, :http) } + let(:http_streamer) { instance_double(AuditEvents::Streaming::Destinations::HttpStreamDestination) } + + subject(:stream_to_destination) { streamer.send(:stream_to_destination, destination) } + + before do + allow(AuditEvents::Streaming::Destinations::HttpStreamDestination) + .to receive(:new) + .and_return(http_streamer) + allow(http_streamer).to receive(:stream) + end + + context 'when destination category is valid' do + it 'streams to destination', :aggregate_failures do + expect(AuditEvents::Streaming::Destinations::HttpStreamDestination) + .to receive(:new) + .with(event_type, audit_event, destination) + expect(http_streamer).to receive(:stream) + + stream_to_destination + end + end + + context 'when destination category is invalid' do + before do + allow(destination).to receive(:category).and_return('invalid') + end + + it 'does not stream to destination' do + expect { stream_to_destination }.to raise_error(ArgumentError, 'Streamer class for category not found') + end + end + end + + describe '#track_audit_event', :aggregate_failures do + subject(:track_audit_event) { streamer.send(:track_audit_event) } + + using RSpec::Parameterized::TableSyntax + + context 'with different audit operations' do + where(:operation, :internal, :counter_called) do + 'delete_epic' | true | false + 'delete_issue' | true | false + 'project_created' | false | true + 'unknown_operation' | false | false + end + with_them do + let(:event_type) { operation } + + it 'tracks the event appropriately' do + if counter_called + expect(Gitlab::UsageDataCounters::StreamingAuditEventTypeCounter) + .to receive(:count) + .with(operation) + else + expect(Gitlab::UsageDataCounters::StreamingAuditEventTypeCounter) + .not_to receive(:count) + .with(operation) + end + + expectation = expect { track_audit_event } + if internal + expectation.to trigger_internal_events('trigger_audit_event') + .with({ additional_properties: { label: operation } }) + .and increment_usage_metrics("counts.#{operation}") + else + expectation.not_to trigger_internal_events('trigger_audit_event') + end + end + end + end + + context 'when Redis connection fails' do + let(:event_type) { 'project_created' } + + before do + allow(Gitlab::UsageDataCounters::StreamingAuditEventTypeCounter) + .to receive(:count) + .with(event_type) + .and_raise(Redis::CannotConnectError) + end + + it 'logs the exception' do + expect(Gitlab::ErrorTracking).to receive(:log_exception) + .with(instance_of(Redis::CannotConnectError)) + + track_audit_event + end + end + end +end diff --git a/ee/spec/lib/audit_events/streaming/group/streamer_spec.rb b/ee/spec/lib/audit_events/streaming/group/streamer_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..fce6a2da614817f707fbfd37c34d29727034ab89 --- /dev/null +++ b/ee/spec/lib/audit_events/streaming/group/streamer_spec.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe AuditEvents::Streaming::Group::Streamer, feature_category: :audit_events do + context 'when group is not present' do + let_it_be(:audit_event) { create(:audit_event) } + let(:event_type) { 'event_type' } + let(:streamer) { described_class.new(event_type, audit_event) } + + describe '#streamable?' do + subject(:check_streamable) { streamer.streamable? } + + it { is_expected.to be_falsey } + end + + describe '#destinations' do + subject(:get_streamer_destinations) { streamer.destinations } + + it { is_expected.to be_empty } + end + end + + it_behaves_like 'streamer streaming audit events', :group +end diff --git a/ee/spec/lib/audit_events/streaming/instance/streamer_spec.rb b/ee/spec/lib/audit_events/streaming/instance/streamer_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..86aa3439a3d2fa170c939369293127683a4bd35a --- /dev/null +++ b/ee/spec/lib/audit_events/streaming/instance/streamer_spec.rb @@ -0,0 +1,7 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe AuditEvents::Streaming::Instance::Streamer, feature_category: :audit_events do + it_behaves_like 'streamer streaming audit events', :instance +end diff --git a/ee/spec/support/shared_examples/audit_events/streaming/streamer_shared_examples.rb b/ee/spec/support/shared_examples/audit_events/streaming/streamer_shared_examples.rb new file mode 100644 index 0000000000000000000000000000000000000000..072e13407d60e0fce494de3567fe76e92dc1b5f5 --- /dev/null +++ b/ee/spec/support/shared_examples/audit_events/streaming/streamer_shared_examples.rb @@ -0,0 +1,78 @@ +# frozen_string_literal: true + +RSpec.shared_examples 'streamer streaming audit events' do |scope| + let_it_be(:group) { create(:group) if scope == :group } + let_it_be(:audit_event) do + scope == :group ? create(:audit_event, :group_event, target_group: group) : create(:audit_event, :instance_event) + end + + let(:event_type) { 'event_type ' } + let(:streamer) { described_class.new(event_type, audit_event) } + + describe '#streamable?' do + subject(:check_streamable) { streamer.streamable? } + + context 'when audit events licensed feature is false' do + before do + if scope == :group + allow(audit_event.root_group_entity).to receive(:licensed_feature_available?) + .with(:external_audit_events).and_return(false) + else + stub_licensed_features(external_audit_events: false) + end + end + + it { is_expected.to be_falsey } + end + + context 'when audit events licensed feature is true' do + before do + if scope == :group + allow(audit_event.root_group_entity).to receive(:licensed_feature_available?) + .with(:external_audit_events).and_return(true) + else + stub_licensed_features(external_audit_events: true) + end + end + + context 'when audit event type is not valid for streaming' do + before do + if scope == :group + create(:audit_events_group_external_streaming_destination, :http, group: audit_event.root_group_entity) + else + create(:audit_events_instance_external_streaming_destination, :http) + end + end + + it { is_expected.to be_truthy } + end + end + end + + describe '#destinations' do + subject(:get_streamer_destinations) { streamer.destinations } + + context 'when no valid destinations exist' do + it { is_expected.to be_empty } + end + + context 'when valid destinations exist' do + before do + audit_event.root_group_entity_id = group.id if scope == :group + end + + let!(:destination) do + if scope == :group + group = audit_event.root_group_entity.reload + create(:audit_events_group_external_streaming_destination, :http, group: group) + else + create(:audit_events_instance_external_streaming_destination, :http) + end + end + + it 'returns the correct destination' do + expect(get_streamer_destinations).to contain_exactly(destination) + end + end + end +end