diff --git a/db/migrate/20230516080816_replace_vsd_index_with_nulls_first_order.rb b/db/migrate/20230516080816_replace_vsd_index_with_nulls_first_order.rb new file mode 100644 index 0000000000000000000000000000000000000000..dcc8bd5972eccfdbd24065f6583a6963d2c3aad9 --- /dev/null +++ b/db/migrate/20230516080816_replace_vsd_index_with_nulls_first_order.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +class ReplaceVsdIndexWithNullsFirstOrder < Gitlab::Database::Migration[2.1] + disable_ddl_transaction! + + OLD_INDEX = 'index_on_value_stream_dashboard_aggregations_last_run_at_id' + NEW_INDEX = 'index_on_value_stream_dashboard_aggregations_last_run_at_and_id' + + def up + add_concurrent_index :value_stream_dashboard_aggregations, + [:last_run_at, :namespace_id], + where: 'enabled IS TRUE', + name: NEW_INDEX, + order: { last_run_at: 'ASC NULLS FIRST' } + remove_concurrent_index_by_name :value_stream_dashboard_aggregations, OLD_INDEX + end + + def down + add_concurrent_index :value_stream_dashboard_aggregations, + [:last_run_at, :namespace_id], + where: 'enabled IS TRUE', + name: OLD_INDEX + remove_concurrent_index_by_name :value_stream_dashboard_aggregations, NEW_INDEX + end +end diff --git a/db/schema_migrations/20230516080816 b/db/schema_migrations/20230516080816 new file mode 100644 index 0000000000000000000000000000000000000000..720e9275ea20d519eacff3c474cfab800477cda3 --- /dev/null +++ b/db/schema_migrations/20230516080816 @@ -0,0 +1 @@ +d45ccbc7191760bf61396cf3b50110352149958dfe3696d5e4a172f9e96e204a \ No newline at end of file diff --git a/db/structure.sql b/db/structure.sql index 5767bebb6c6367e5c5c9d6f3252fd4e7f8ab94f2..c117d095de4b675d988c574df32b387f58302089 100644 --- a/db/structure.sql +++ b/db/structure.sql @@ -31679,7 +31679,7 @@ CREATE INDEX index_on_users_lower_username ON users USING btree (lower((username CREATE INDEX index_on_users_name_lower ON users USING btree (lower((name)::text)); -CREATE INDEX index_on_value_stream_dashboard_aggregations_last_run_at_id ON value_stream_dashboard_aggregations USING btree (last_run_at, namespace_id) WHERE (enabled IS TRUE); +CREATE INDEX index_on_value_stream_dashboard_aggregations_last_run_at_and_id ON value_stream_dashboard_aggregations USING btree (last_run_at NULLS FIRST, namespace_id) WHERE (enabled IS TRUE); CREATE INDEX index_onboarding_progresses_for_create_track ON onboarding_progresses USING btree (created_at) WHERE (git_write_at IS NULL); diff --git a/ee/app/models/analytics/value_stream_dashboard/aggregation.rb b/ee/app/models/analytics/value_stream_dashboard/aggregation.rb index 49ca690dce5af3e452448e588ba31abbf3d249b0..8eab20f2350b943ec1acb6f7090623ea0db9e7c4 100644 --- a/ee/app/models/analytics/value_stream_dashboard/aggregation.rb +++ b/ee/app/models/analytics/value_stream_dashboard/aggregation.rb @@ -3,12 +3,39 @@ module Analytics module ValueStreamDashboard class Aggregation < ApplicationRecord + include FromUnion + self.table_name = :value_stream_dashboard_aggregations belongs_to :namespace, optional: false validates_inclusion_of :enabled, in: [true, false] validates_presence_of :namespace_id + + scope :latest_first_order, -> { order(arel_table[:last_run_at].asc.nulls_first, arel_table[:namespace_id].asc) } + scope :outdated, -> { where('last_run_at < ?', 10.days.ago) } + scope :enabled, -> { where('enabled IS TRUE') } + scope :not_aggregated, -> { where(last_run_at: nil) } + # ensures that a given namespace_id will show up as the first result + scope :specific_namespace_id_first_order, ->(namespace_id) { + order(arel_table[:namespace_id].not_eq(namespace_id)) + } + + def self.load_batch(cursor = {}, batch_size = 100) + unions = [ + enabled.not_aggregated.latest_first_order.limit(batch_size), + enabled.outdated.latest_first_order.limit(batch_size) + ].compact + + unions.unshift(primary_key_in(cursor[:top_level_namespace_id])) if cursor[:top_level_namespace_id] + + query = from_union(unions, remove_order: false) + if cursor[:top_level_namespace_id] + query = query.specific_namespace_id_first_order(cursor[:top_level_namespace_id]) + end + + query.latest_first_order.limit(batch_size).to_a + end end end end diff --git a/ee/app/services/analytics/value_stream_dashboard/count_service.rb b/ee/app/services/analytics/value_stream_dashboard/count_service.rb new file mode 100644 index 0000000000000000000000000000000000000000..50b7fa6a47b61a679b63f808eeff1dd9c42f7ed9 --- /dev/null +++ b/ee/app/services/analytics/value_stream_dashboard/count_service.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true + +module Analytics + module ValueStreamDashboard + class CountService + def initialize(aggregation:, cursor:) + @aggregation = aggregation + @cursor = cursor + end + + def execute + # count logic comes here + ServiceResponse.success(payload: { cursor: cursor }) + end + + private + + attr_reader :aggregation, :cursor + end + end +end diff --git a/ee/app/workers/all_queues.yml b/ee/app/workers/all_queues.yml index 9dc2d5380d4e951f8a2f8e74183d1a2c342ace63..e65deeca4920d758e0ee36300da739aeb06a6fb4 100644 --- a/ee/app/workers/all_queues.yml +++ b/ee/app/workers/all_queues.yml @@ -75,6 +75,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: cronjob:analytics_value_stream_dashboard_count + :worker_name: Analytics::ValueStreamDashboard::CountWorker + :feature_category: :value_stream_management + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: cronjob:app_sec_dast_profile_schedule :worker_name: AppSec::Dast::ProfileScheduleWorker :feature_category: :dynamic_application_security_testing diff --git a/ee/app/workers/analytics/value_stream_dashboard/count_worker.rb b/ee/app/workers/analytics/value_stream_dashboard/count_worker.rb new file mode 100644 index 0000000000000000000000000000000000000000..23a1ed869cf846f35bce7a6468a049d5a8e73bbc --- /dev/null +++ b/ee/app/workers/analytics/value_stream_dashboard/count_worker.rb @@ -0,0 +1,49 @@ +# frozen_string_literal: true + +module Analytics + module ValueStreamDashboard + class CountWorker + include ApplicationWorker + + # rubocop:disable Scalability/CronWorkerContext + # This worker does not perform work scoped to a context + include CronjobQueue + # rubocop:enable Scalability/CronWorkerContext + + idempotent! + + data_consistency :sticky + feature_category :value_stream_management + + CACHE_KEY = 'value_stream_dasboard_count_cursor' + CUTOFF_DAYS = 5 + + def perform + return unless should_perform? + + runtime_limiter = Analytics::CycleAnalytics::RuntimeLimiter.new + + batch = Analytics::ValueStreamDashboard::Aggregation.load_batch + return if batch.empty? + + batch.each do |aggregation| + Analytics::ValueStreamDashboard::CountService.new( + aggregation: aggregation, + cursor: {} + ).execute + + aggregation.update!(last_run_at: Time.current) + + break if runtime_limiter.over_time? + end + end + + private + + def should_perform? + Time.current.day >= (Time.current.end_of_month.day - CUTOFF_DAYS) && + License.feature_available?(:group_level_analytics_dashboard) + end + end + end +end diff --git a/ee/spec/models/analytics/value_stream_dashboard/aggregation_spec.rb b/ee/spec/models/analytics/value_stream_dashboard/aggregation_spec.rb index b91fece348d27d2113b35290ae71f6d633fb3154..3f64c3fb46e9e4ce18ff92c06d0b53a36d795d0d 100644 --- a/ee/spec/models/analytics/value_stream_dashboard/aggregation_spec.rb +++ b/ee/spec/models/analytics/value_stream_dashboard/aggregation_spec.rb @@ -13,4 +13,41 @@ it { is_expected.to validate_presence_of(:namespace_id) } it { is_expected.to validate_inclusion_of(:enabled).in_array([true, false]) } end + + describe '.load_batch' do + let_it_be(:aggregation1) { create(:value_stream_dashboard_aggregation, last_run_at: nil) } + let_it_be(:aggregation2) { create(:value_stream_dashboard_aggregation, last_run_at: 2.months.ago) } + let_it_be(:aggregation3) { create(:value_stream_dashboard_aggregation, last_run_at: nil) } + let_it_be(:aggregation4) { create(:value_stream_dashboard_aggregation, last_run_at: 3.months.ago) } + + delegate :load_batch, to: described_class + + context 'when the cursor is empty' do + it 'returns the records with the oldest or empty last_run_at values' do + expect(load_batch).to eq([aggregation1, aggregation3, aggregation4, aggregation2]) + end + + context 'when bath size is given' do + it { expect(load_batch({}, 2)).to eq([aggregation1, aggregation3]) } + end + end + + context 'when top_level_namespace_id is present in the cursor' do + it 'returns the aggregation record associated with the top_level_namespace_id as the first record' do + expect(load_batch({ top_level_namespace_id: aggregation3.id })).to eq([aggregation3, aggregation1, + aggregation4, aggregation2]) + end + + context 'when top_level_namespace_id no longer exists' do + it 'ignores the given top_level_namespace_id' do + expect(load_batch({ top_level_namespace_id: non_existing_record_id })).to eq([ + aggregation1, + aggregation3, + aggregation4, + aggregation2 + ]) + end + end + end + end end diff --git a/ee/spec/services/analytics/value_stream_dashboard/count_service_spec.rb b/ee/spec/services/analytics/value_stream_dashboard/count_service_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..95a8fb1eb087b15e2b67810d2a1ce13e04d42ce6 --- /dev/null +++ b/ee/spec/services/analytics/value_stream_dashboard/count_service_spec.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Analytics::ValueStreamDashboard::CountService, feature_category: :value_stream_management do + it 'returns successful response' do + aggregation = create(:value_stream_dashboard_aggregation) + count_service = described_class.new(aggregation: aggregation, cursor: {}) + + service_response = count_service.execute + + expect(service_response).to be_success + expect(service_response.payload[:cursor]).to eq({}) + end +end diff --git a/ee/spec/workers/analytics/value_stream_dashboard/count_worker_spec.rb b/ee/spec/workers/analytics/value_stream_dashboard/count_worker_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..a84a355c9c0efcaae6a8915d6b9f7075d2163052 --- /dev/null +++ b/ee/spec/workers/analytics/value_stream_dashboard/count_worker_spec.rb @@ -0,0 +1,88 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Analytics::ValueStreamDashboard::CountWorker, feature_category: :value_stream_management do + def run_job + described_class.new.perform + end + + context 'when the group_level_analytics_dashboard feature is not available' do + it 'does nothing' do + expect(Analytics::ValueStreamDashboard::Aggregation).not_to receive(:load_batch) + + run_job + end + end + + context 'when the group_level_analytics_dashboard feature is available' do + before do + stub_licensed_features(group_level_analytics_dashboard: true) + end + + context 'when the current time is not close to the end of month' do + it 'does nothing' do + travel_to(Date.new(2022, 5, 15)) do + expect(Analytics::ValueStreamDashboard::Aggregation).not_to receive(:load_batch) + + run_job + end + end + end + + context 'when the current time is close to the end of month' do + around do |example| + travel_to(Date.new(2022, 2, 26)) do + example.run + end + end + + context 'when no records present' do + it 'does nothing' do + expect(Analytics::ValueStreamDashboard::CountService).not_to receive(:new) + + run_job + end + end + + context 'when records are returned' do + it 'invokes the count service' do + create_list(:value_stream_dashboard_aggregation, 3, last_run_at: nil) + + expect(Analytics::ValueStreamDashboard::CountService).to receive(:new).thrice.and_call_original + run_job + + last_run_at_values = Analytics::ValueStreamDashboard::Aggregation.pluck(:last_run_at) + expect(last_run_at_values).to all(eq(Time.current)) + end + end + + context 'when some records were processed recently' do + it 'skips the recently processed record' do + create(:value_stream_dashboard_aggregation, last_run_at: 3.days.ago) # should not be processed + aggregation = create(:value_stream_dashboard_aggregation, last_run_at: 15.days.ago).reload + + expect(Analytics::ValueStreamDashboard::CountService).to receive(:new).with(aggregation: aggregation, + cursor: {}).and_call_original + + run_job + end + end + + context 'when the execution is over time' do + it 'stops the processing' do + create_list(:value_stream_dashboard_aggregation, 3, last_run_at: nil) + + expect_next_instance_of(Analytics::CycleAnalytics::RuntimeLimiter) do |runtime_limiter| + expect(runtime_limiter).to receive(:over_time?).and_return(false) + expect(runtime_limiter).to receive(:over_time?).and_return(true) + end + + expect(Analytics::ValueStreamDashboard::CountService).to receive(:new).twice.and_call_original + + run_job + end + end + end + end +end