Skip to content
代码片段 群组 项目
提交 5aabc409 编辑于 作者: Adam Hegyi's avatar Adam Hegyi
浏览文件

Persist cursor for VSD count worker

This change persists the cursor and makes the VSD count worker
useable on production.
上级 55cc2efe
No related branches found
No related tags found
无相关合并请求
...@@ -21,18 +21,18 @@ class Aggregation < ApplicationRecord ...@@ -21,18 +21,18 @@ class Aggregation < ApplicationRecord
order(arel_table[:namespace_id].not_eq(namespace_id)) order(arel_table[:namespace_id].not_eq(namespace_id))
} }
def self.load_batch(cursor = {}, batch_size = 100) def self.load_batch(cursor = nil, batch_size = 100)
top_level_namespace_id = cursor && cursor[:top_level_namespace_id]
unions = [ unions = [
enabled.not_aggregated.latest_first_order.limit(batch_size), enabled.not_aggregated.latest_first_order.limit(batch_size),
enabled.outdated.latest_first_order.limit(batch_size) enabled.outdated.latest_first_order.limit(batch_size)
].compact ].compact
unions.unshift(primary_key_in(cursor[:top_level_namespace_id])) if cursor[:top_level_namespace_id] unions.unshift(primary_key_in(top_level_namespace_id)) if top_level_namespace_id
query = from_union(unions, remove_order: false) query = from_union(unions, remove_order: false)
if cursor[:top_level_namespace_id] query = query.specific_namespace_id_first_order(top_level_namespace_id) if top_level_namespace_id
query = query.specific_namespace_id_first_order(cursor[:top_level_namespace_id])
end
query.latest_first_order.limit(batch_size).to_a query.latest_first_order.limit(batch_size).to_a
end end
......
...@@ -105,6 +105,8 @@ def ensure_cursor(countable_config:) ...@@ -105,6 +105,8 @@ def ensure_cursor(countable_config:)
end end
def metrics_to_count def metrics_to_count
return COUNTS_TO_COLLECT.values unless cursor
# Skip count configs before the provided metric # Skip count configs before the provided metric
COUNTS_TO_COLLECT.values.drop_while { |v| v[:metric] != cursor[:metric] } COUNTS_TO_COLLECT.values.drop_while { |v| v[:metric] != cursor[:metric] }
end end
......
...@@ -15,7 +15,7 @@ class CountWorker ...@@ -15,7 +15,7 @@ class CountWorker
data_consistency :sticky data_consistency :sticky
feature_category :value_stream_management feature_category :value_stream_management
CACHE_KEY = 'value_stream_dasboard_count_cursor' CURSOR_KEY = 'value_stream_dashboard_count_cursor'
CUTOFF_DAYS = 5 CUTOFF_DAYS = 5
def perform def perform
...@@ -23,19 +23,32 @@ def perform ...@@ -23,19 +23,32 @@ def perform
runtime_limiter = Analytics::CycleAnalytics::RuntimeLimiter.new runtime_limiter = Analytics::CycleAnalytics::RuntimeLimiter.new
batch = Analytics::ValueStreamDashboard::Aggregation.load_batch cursor = load_cursor
return if batch.empty? batch = Analytics::ValueStreamDashboard::Aggregation.load_batch(cursor)
if batch.empty?
persist_cursor(nil)
return
end
batch.each do |aggregation| batch.each do |aggregation|
cursor = Analytics::ValueStreamDashboard::TopLevelGroupCounterService next unless feature_flag_enabled_for_aggregation?(aggregation)
.load_cursor(raw_cursor: { top_level_namespace_id: aggregation.id })
Analytics::ValueStreamDashboard::TopLevelGroupCounterService unless licensed?(aggregation)
aggregation.update!(enabled: false)
next
end
service_response = Analytics::ValueStreamDashboard::TopLevelGroupCounterService
.new(aggregation: aggregation, cursor: cursor, runtime_limiter: runtime_limiter) .new(aggregation: aggregation, cursor: cursor, runtime_limiter: runtime_limiter)
.execute .execute
break if runtime_limiter.over_time? cursor = service_response[:cursor]
break if service_response[:result] == :interrupted || runtime_limiter.over_time?
end end
persist_cursor(cursor)
end end
private private
...@@ -44,6 +57,30 @@ def should_perform? ...@@ -44,6 +57,30 @@ def should_perform?
Time.current.day >= (Time.current.end_of_month.day - CUTOFF_DAYS) && Time.current.day >= (Time.current.end_of_month.day - CUTOFF_DAYS) &&
License.feature_available?(:group_level_analytics_dashboard) License.feature_available?(:group_level_analytics_dashboard)
end end
def load_cursor
value = Gitlab::Redis::SharedState.with { |redis| redis.get(CURSOR_KEY) }
return if value.nil?
raw_cursor = Gitlab::Json.parse(value).symbolize_keys
Analytics::ValueStreamDashboard::TopLevelGroupCounterService.load_cursor(raw_cursor: raw_cursor)
end
def persist_cursor(cursor)
if cursor.nil?
Gitlab::Redis::SharedState.with { |redis| redis.del(CURSOR_KEY) }
else
Gitlab::Redis::SharedState.with { |redis| redis.set(CURSOR_KEY, Gitlab::Json.dump(cursor.dump)) }
end
end
def feature_flag_enabled_for_aggregation?(aggregation)
Feature.enabled?(:value_stream_dashboard_on_off_setting, aggregation.namespace)
end
def licensed?(aggregation)
aggregation.namespace.licensed_feature_available?(:group_level_analytics_dashboard)
end
end end
end end
end end
...@@ -7,10 +7,10 @@ class NamespaceCursor ...@@ -7,10 +7,10 @@ class NamespaceCursor
NAMESPACE_BATCH_SIZE = 300 NAMESPACE_BATCH_SIZE = 300
def initialize(namespace_scope:, inner_namespace_query:, cursor_data:) def initialize(namespace_scope:, inner_namespace_query:, cursor_data:)
@top_level_namespace_id = cursor_data.fetch(:top_level_namespace_id)
@namespace_scope = namespace_scope @namespace_scope = namespace_scope
@inner_namespace_query = inner_namespace_query @inner_namespace_query = inner_namespace_query
@cursor_data = cursor_data @cursor_data = cursor_data
@top_level_namespace_id = cursor_data.fetch(:top_level_namespace_id)
end end
def next def next
...@@ -35,7 +35,7 @@ def dump ...@@ -35,7 +35,7 @@ def dump
private private
attr_reader :top_level_namespace_id, :namespace_scope, :inner_namespace_query, :cursor_data attr_reader :namespace_scope, :inner_namespace_query, :cursor_data, :top_level_namespace_id
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def enumerator def enumerator
......
...@@ -39,4 +39,12 @@ ...@@ -39,4 +39,12 @@
expect(namespace_ids).to eq([subgroup2.id, subsubgroup.id]) expect(namespace_ids).to eq([subgroup2.id, subsubgroup.id])
end end
end end
context 'when top_level_namespace_id is missing' do
let(:cursor_data) { {} }
it 'raises error on initialize' do
expect { cursor }.to raise_error(KeyError)
end
end
end end
...@@ -48,6 +48,16 @@ ...@@ -48,6 +48,16 @@
]) ])
end end
end end
context 'when a cursor is a Gitlab::Analytics::ValueStreamDashboard::NamespaceCursor' do
it 'returns correct data' do
cursor = Analytics::ValueStreamDashboard::TopLevelGroupCounterService
.load_cursor(raw_cursor: { top_level_namespace_id: aggregation3.id })
expect(load_batch(cursor)).to eq([aggregation3, aggregation1,
aggregation4, aggregation2])
end
end
end end
end end
end end
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe Analytics::ValueStreamDashboard::CountWorker, feature_category: :value_stream_management do RSpec.describe Analytics::ValueStreamDashboard::CountWorker, :clean_gitlab_redis_shared_state, feature_category: :value_stream_management do
def run_job def run_job
described_class.new.perform described_class.new.perform
end end
...@@ -17,7 +17,7 @@ def run_job ...@@ -17,7 +17,7 @@ def run_job
context 'when the group_level_analytics_dashboard feature is available' do context 'when the group_level_analytics_dashboard feature is available' do
before do before do
stub_licensed_features(group_level_analytics_dashboard: true) allow(License).to receive(:feature_available?).and_return(true)
end end
context 'when the current time is not close to the end of month' do context 'when the current time is not close to the end of month' do
...@@ -69,6 +69,98 @@ def run_job ...@@ -69,6 +69,98 @@ def run_job
end end
end end
context 'when a group downgraded and has no license', :saas do
it 'disables the aggregation' do
aggregation1 = create(:value_stream_dashboard_aggregation, last_run_at: nil)
aggregation2 = create(:value_stream_dashboard_aggregation, last_run_at: nil)
allow(aggregation1.namespace).to receive(:licensed_feature_available?)
.with(:group_level_analytics_dashboard)
.and_return(false)
allow(aggregation2.namespace).to receive(:licensed_feature_available?)
.with(:group_level_analytics_dashboard)
.and_return(true)
allow(Analytics::ValueStreamDashboard::Aggregation).to receive(:load_batch).and_return(
[aggregation1, aggregation2], [])
run_job
expect(aggregation1.reload).not_to be_enabled
expect(aggregation2.reload.last_run_at).to be_present
end
end
context 'when loading a persisted cursor' do
let_it_be(:first) { create(:value_stream_dashboard_aggregation, last_run_at: 15.days.ago) }
let_it_be(:second) { create(:value_stream_dashboard_aggregation, last_run_at: nil) }
let(:cursor) { { top_level_namespace_id: first.namespace_id } }
before do
Gitlab::Redis::SharedState.with do |redis|
redis.set(described_class::CURSOR_KEY, Gitlab::Json.dump(cursor))
end
end
def expect_service_invocation_for(aggregation, returned_payload)
response = ServiceResponse.success(payload: returned_payload)
service = instance_double('Analytics::ValueStreamDashboard::TopLevelGroupCounterService',
execute: response)
expect(Analytics::ValueStreamDashboard::TopLevelGroupCounterService).to receive(:new).with(
aggregation: aggregation,
cursor: an_instance_of(Gitlab::Analytics::ValueStreamDashboard::NamespaceCursor),
runtime_limiter: an_instance_of(Analytics::CycleAnalytics::RuntimeLimiter)
).and_return(service)
end
it 'passes the cursor to the aggregation service' do
cursor1 = Analytics::ValueStreamDashboard::TopLevelGroupCounterService.load_cursor(raw_cursor: {
top_level_namespace_id: first.id
})
cursor2 = Analytics::ValueStreamDashboard::TopLevelGroupCounterService.load_cursor(raw_cursor: {
top_level_namespace_id: second.id
})
expect_service_invocation_for(first, { cursor: cursor1, result: :finished })
expect_service_invocation_for(second, { cursor: cursor2, result: :finished })
run_job
end
it 'persists the new cursor' do
cursor1 = Analytics::ValueStreamDashboard::TopLevelGroupCounterService.load_cursor(raw_cursor: {
top_level_namespace_id: first.id
})
expect_service_invocation_for(first, { cursor: cursor1, result: :finished })
interrupted_cursor = {
top_level_namespace_id: second.namespace_id,
metric: Analytics::ValueStreamDashboard::Count.metrics[:issues],
last_value: 1,
last_count: 2
}
cursor2 = Analytics::ValueStreamDashboard::TopLevelGroupCounterService
.load_cursor(raw_cursor: interrupted_cursor)
expect_service_invocation_for(second, { cursor: cursor2, result: :interrupted })
expect_next_instance_of(Analytics::CycleAnalytics::RuntimeLimiter) do |runtime_limiter|
# first aggregation, trigger no overtime
expect(runtime_limiter).to receive(:over_time?).and_return(false)
end
run_job
persisted_cursor = Gitlab::Redis::SharedState.with { |redis| redis.get(described_class::CURSOR_KEY) }
parsed_cursor = Gitlab::Json.parse(persisted_cursor).symbolize_keys
expect(parsed_cursor).to eq(interrupted_cursor)
end
end
context 'when the execution is over time' do context 'when the execution is over time' do
it 'stops the processing' do it 'stops the processing' do
create_list(:value_stream_dashboard_aggregation, 3, last_run_at: nil) create_list(:value_stream_dashboard_aggregation, 3, last_run_at: nil)
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册