Skip to content
代码片段 群组 项目
未验证 提交 cb66fe01 编辑于 作者: Pedro Pombeiro's avatar Pedro Pombeiro 提交者: GitLab
浏览文件

Implement service to compute pipeline analytics

# Conflicts:
#	lib/click_house/models/ci/finished_pipelines_hourly.rb
#	spec/lib/click_house/models/ci/finished_pipelines_hourly_spec.rb
上级 3b9ac142
No related branches found
No related tags found
无相关合并请求
# frozen_string_literal: true
module Ci
class CollectPipelineAnalyticsService
TIME_BUCKETS_LIMIT = 1.week.in_hours + 1 # +1 to add some error margin
STATUS_GROUP_TO_STATUSES = { success: %w[success], failed: %w[failed], other: %w[canceled skipped] }.freeze
STATUS_GROUPS = STATUS_GROUP_TO_STATUSES.keys.freeze
STATUS_TO_STATUS_GROUP = STATUS_GROUP_TO_STATUSES.flat_map { |k, v| v.product([k]) }.to_h
def initialize(current_user:, project:, from_time:, to_time:, status_groups: [:all])
@current_user = current_user
@project = project
@status_groups = status_groups
@from_time = from_time || 1.week.ago.utc
@to_time = to_time || Time.now.utc
end
def execute
return ServiceResponse.error(message: 'Project must be specified') unless @project
unless ::Gitlab::ClickHouse.configured?
return ServiceResponse.error(message: 'ClickHouse database is not configured')
end
return ServiceResponse.error(message: 'Not allowed') unless allowed?
if (@to_time - @from_time) / 1.hour > TIME_BUCKETS_LIMIT
return ServiceResponse.error(message: "Maximum of #{TIME_BUCKETS_LIMIT} 1-hour intervals can be requested")
end
ServiceResponse.success(payload: { aggregate: calculate_aggregate })
end
private
def allowed?
@current_user&.can?(:read_ci_cd_analytics, @project)
end
def clickhouse_model
::ClickHouse::Models::Ci::FinishedPipelinesHourly
end
def calculate_aggregate
result = @status_groups.index_with(0)
query = clickhouse_model.for_project(@project).within_dates(@from_time, @to_time)
if @status_groups.include?(:all)
all_query = query.select(query.count_pipelines_function.as('all'))
result[:all] = ::ClickHouse::Client.select(all_query.to_sql, :main).first['all']
end
if @status_groups.intersect?(STATUS_GROUPS)
query = query
.select(:status, query.count_pipelines_function.as('count'))
.by_status(@status_groups.flat_map(&STATUS_GROUP_TO_STATUSES).compact)
.group_by_status
result_by_status = ::ClickHouse::Client.select(query.to_sql, :main).map(&:values).to_h
result_by_status.each_pair { |status, count| result[STATUS_TO_STATUS_GROUP[status]] += count }
end
result
end
end
end
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
let(:percentiles) { [50, 75, 90, 95, 99] } let(:percentiles) { [50, 75, 90, 95, 99] }
let(:runner_type) { nil } let(:runner_type) { nil }
let(:from_time) { starting_time } let(:from_time) { starting_time }
let(:to_time) { starting_time + 3.hours } let(:to_time) { 3.hours.after(starting_time) }
let(:owner_namespace) { nil } let(:owner_namespace) { nil }
let(:service) do let(:service) do
...@@ -200,13 +200,13 @@ ...@@ -200,13 +200,13 @@
builds = [from_time - 1.second, builds = [from_time - 1.second,
from_time, from_time,
to_time, to_time,
to_time + 5.minutes + 1.second].map do |started_at| (5.minutes + 1.second).after(to_time)].map do |started_at|
build_stubbed(:ci_build, build_stubbed(:ci_build,
:success, :success,
created_at: started_at - 1.minute, created_at: 1.minute.before(started_at),
queued_at: started_at - 1.minute, queued_at: 1.minute.before(started_at),
started_at: started_at, started_at: started_at,
finished_at: started_at + 10.minutes, finished_at: 10.minutes.after(started_at),
runner: runner, runner: runner,
runner_manager: runner.runner_managers.first) runner_manager: runner.runner_managers.first)
end end
...@@ -264,7 +264,7 @@ ...@@ -264,7 +264,7 @@
end end
context 'when requesting more that TIME_BUCKETS_LIMIT' do context 'when requesting more that TIME_BUCKETS_LIMIT' do
let(:to_time) { starting_time + 190.minutes } let(:to_time) { 190.minutes.after(starting_time) }
it 'returns error' do it 'returns error' do
expect(result.error?).to eq(true) expect(result.error?).to eq(true)
......
...@@ -20,6 +20,18 @@ def for_project(project) ...@@ -20,6 +20,18 @@ def for_project(project)
where(path: project.project_namespace.traversal_path) where(path: project.project_namespace.traversal_path)
end end
def within_dates(from_time, to_time)
query = self
started_at_bucket = @query_builder.table[:started_at_bucket]
# rubocop: disable CodeReuse/ActiveRecord -- this is a ClickHouse model
query = query.where(started_at_bucket.gteq(format_time(from_time))) if from_time
query = query.where(started_at_bucket.lt(format_time(to_time))) if to_time
# rubocop: enable CodeReuse/ActiveRecord
query
end
def by_status(statuses) def by_status(statuses)
where(status: statuses) where(status: statuses)
end end
...@@ -31,6 +43,16 @@ def group_by_status ...@@ -31,6 +43,16 @@ def group_by_status
def count_pipelines_function def count_pipelines_function
Arel::Nodes::NamedFunction.new('countMerge', [@query_builder.table[:count_pipelines]]) Arel::Nodes::NamedFunction.new('countMerge', [@query_builder.table[:count_pipelines]])
end end
private
def format_time(date)
Arel::Nodes::NamedFunction.new('toDateTime64', [
Arel::Nodes::SqlLiteral.new(date.utc.strftime("'%Y-%m-%d %H:%M:%S'")),
6,
Arel::Nodes.build_quoted('UTC')
])
end
end end
end end
end end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe ::Ci::CollectPipelineAnalyticsService, :click_house, :enable_admin_mode,
feature_category: :fleet_visibility do
include ClickHouseHelpers
let_it_be(:project1) { create(:project).tap(&:reload) } # reload required to calculate traversal path
let_it_be(:project2) { create(:project).tap(&:reload) }
let_it_be(:current_user) { create(:user, reporter_of: [project1, project2]) }
let_it_be(:starting_time) { Time.utc(2023) }
let_it_be(:ending_time) { 1.week.after(Time.utc(2023)) }
let(:project) { project1 }
let(:status_groups) { [:all] }
let(:from_time) { starting_time }
let(:to_time) { ending_time }
let(:service) do
described_class.new(
current_user: current_user,
project: project,
from_time: from_time,
to_time: to_time,
status_groups: status_groups)
end
let(:pipelines) do
[
create_pipeline(project1, :running, 35.minutes.before(ending_time), 30.minutes),
create_pipeline(project1, :success, 1.day.before(ending_time), 30.minutes),
create_pipeline(project1, :canceled, 1.hour.before(ending_time), 1.minute),
create_pipeline(project1, :failed, 5.days.before(ending_time), 2.hours),
create_pipeline(project1, :failed, 1.week.before(ending_time), 45.minutes),
create_pipeline(project1, :skipped, 5.days.before(ending_time), 1.second),
create_pipeline(project1, :skipped, 1.second.before(starting_time), 45.minutes),
create_pipeline(project1, :success, ending_time, 30.minutes)
]
end
subject(:result) { service.execute }
before do
insert_ci_pipelines_to_click_house(pipelines)
end
context 'when ClickHouse database is not configured' do
before do
allow(::Gitlab::ClickHouse).to receive(:configured?).and_return(false)
end
it 'returns error' do
expect(result.error?).to eq(true)
expect(result.errors).to contain_exactly('ClickHouse database is not configured')
end
end
shared_examples 'returns Not allowed error' do
it 'returns error' do
expect(result.error?).to eq(true)
expect(result.errors).to contain_exactly('Not allowed')
end
end
shared_examples 'a service returning aggregate analytics' do
using RSpec::Parameterized::TableSyntax
where(:status_groups, :expected_aggregate) do
%i[all] | { all: 6 }
%i[all success] | { all: 6, success: 1 }
%i[success other] | { success: 1, other: 2 }
%i[failed] | { failed: 2 }
end
with_them do
it 'returns aggregate analytics' do
expect(result.success?).to eq(true)
expect(result.errors).to eq([])
expect(result.payload[:aggregate]).to eq(expected_aggregate)
end
end
context 'when dates are not specified' do
let(:from_time) { nil }
let(:to_time) { nil }
context 'and there are pipelines in the last week', time_travel_to: '2023-01-08' do
it 'returns aggregate analytics from last week' do
expect(result.errors).to eq([])
expect(result.success?).to eq(true)
expect(result.payload[:aggregate]).to eq({ all: 6 })
end
end
context 'and there are no pipelines in the last week', time_travel_to: '2023-01-15 00:00:01' do
it 'returns empty aggregate analytics' do
expect(result.errors).to eq([])
expect(result.success?).to eq(true)
expect(result.payload[:aggregate]).to eq({ all: 0 })
end
end
end
context 'when requesting statistics starting one second before beginning of week' do
let(:from_time) { 1.second.before(starting_time) }
it 'does not include job starting 1 second before start of week' do
expect(result.errors).to eq([])
expect(result.success?).to eq(true)
expect(result.payload[:aggregate]).to eq({ all: 6 })
end
end
context 'when requesting statistics starting one hour before beginning of week' do
let(:from_time) { 1.hour.before(starting_time) }
it 'includes job starting 1 second before start of week' do
expect(result.errors).to eq([])
expect(result.success?).to eq(true)
expect(result.payload[:aggregate]).to eq({ all: 7 })
end
end
context 'when requesting hourly statistics that span more than one week' do
let(:from_time) { (1.hour + 1.second).before(starting_time) }
it 'returns an error' do
expect(result.errors).to contain_exactly(
"Maximum of #{described_class::TIME_BUCKETS_LIMIT} 1-hour intervals can be requested")
expect(result.error?).to eq(true)
end
end
context 'when a different project is specified' do
let(:project) { project2 }
let(:status_groups) { %i[all success failed] }
before do
insert_ci_pipelines_to_click_house([
create_pipeline(project2, :failed, 1.week.before(ending_time), 45.minutes)
])
end
it 'returns aggregate analytics for specified project only' do
expect(result.success?).to eq(true)
expect(result.errors).to eq([])
expect(result.payload[:aggregate]).to eq({ all: 1, success: 0, failed: 1 })
end
end
end
it_behaves_like 'a service returning aggregate analytics'
context 'when user is nil' do
let(:current_user) { nil }
include_examples 'returns Not allowed error'
end
context 'when project has analytics disabled' do
let_it_be(:project) { create(:project, :analytics_disabled) }
include_examples 'returns Not allowed error'
end
context 'when project is not specified' do
let(:project) { nil }
it 'returns error' do
expect(result.error?).to eq(true)
expect(result.errors).to contain_exactly('Project must be specified')
end
end
context 'when user is an admin' do
let(:current_user) { create(:admin) }
it_behaves_like 'a service returning aggregate analytics'
end
context 'when user is a guest' do
let_it_be(:current_user) { create(:user, guest_of: project1) }
include_examples 'returns Not allowed error'
end
def create_pipeline(project, status, started_at, duration)
build_stubbed(:ci_pipeline, status,
project: project,
created_at: 1.second.before(started_at), started_at: started_at, finished_at: duration.after(started_at),
duration: duration)
end
end
...@@ -41,6 +41,18 @@ def insert_ci_builds_to_click_house(builds) ...@@ -41,6 +41,18 @@ def insert_ci_builds_to_click_house(builds)
# rubocop:enable Metrics/CyclomaticComplexity # rubocop:enable Metrics/CyclomaticComplexity
# rubocop:enable Metrics/PerceivedComplexity # rubocop:enable Metrics/PerceivedComplexity
def insert_ci_pipelines_to_click_house(pipelines)
result = clickhouse_fixture(:ci_finished_pipelines, pipelines.map do |pipeline|
pipeline.slice(
%i[id duration status source ref committed_at created_at started_at finished_at]).symbolize_keys
.merge(
path: pipeline.project&.project_namespace&.traversal_path || '0/'
)
end)
expect(result).to eq(true)
end
def self.default_timezone def self.default_timezone
ActiveRecord.default_timezone ActiveRecord.default_timezone
end end
......
...@@ -22,6 +22,51 @@ ...@@ -22,6 +22,51 @@
end end
end end
describe '#within_dates' do
let(:from_time) { 1.hour.ago }
let(:to_time) { Time.current }
subject(:result_sql) { instance.within_dates(from_time, to_time).to_sql }
it 'builds the correct SQL' do
expected_sql = <<~SQL.lines(chomp: true).join(' ')
SELECT * FROM "#{table_name}"
WHERE "#{table_name}"."started_at_bucket" >= toDateTime64('#{from_time.utc.strftime('%Y-%m-%d %H:%M:%S')}', 6, 'UTC')
AND "#{table_name}"."started_at_bucket" < toDateTime64('#{to_time.utc.strftime('%Y-%m-%d %H:%M:%S')}', 6, 'UTC')
SQL
expect(result_sql.strip).to eq(expected_sql.strip)
end
context 'when only from_date is passed' do
let(:from_time) { 1.hour.ago }
let(:to_time) { nil }
it 'builds the correct SQL' do
expected_sql = <<~SQL.lines(chomp: true).join(' ')
SELECT * FROM "#{table_name}"
WHERE "#{table_name}"."started_at_bucket" >= toDateTime64('#{from_time.utc.strftime('%Y-%m-%d %H:%M:%S')}', 6, 'UTC')
SQL
expect(result_sql.strip).to eq(expected_sql.strip)
end
end
context 'when only to_date is passed' do
let(:from_time) { nil }
let(:to_time) { Time.current }
it 'builds the correct SQL' do
expected_sql = <<~SQL.lines(chomp: true).join(' ')
SELECT * FROM "#{table_name}"
WHERE "#{table_name}"."started_at_bucket" < toDateTime64('#{to_time.utc.strftime('%Y-%m-%d %H:%M:%S')}', 6, 'UTC')
SQL
expect(result_sql.strip).to eq(expected_sql.strip)
end
end
end
describe '#by_status' do describe '#by_status' do
subject(:result_sql) { instance.by_status(%i[failed success]).to_sql } subject(:result_sql) { instance.by_status(%i[failed success]).to_sql }
...@@ -98,13 +143,25 @@ ...@@ -98,13 +143,25 @@
end end
it 'builds the correct SQL with chained methods' do it 'builds the correct SQL with chained methods' do
from_time = 1.hour.ago
to_time = Time.current
expected_sql = <<~SQL.lines(chomp: true).join(' ') expected_sql = <<~SQL.lines(chomp: true).join(' ')
SELECT "#{table_name}"."status" FROM "#{table_name}" SELECT "#{table_name}"."status" FROM "#{table_name}"
WHERE "#{table_name}"."path" = '#{path}' WHERE "#{table_name}"."path" = '#{path}'
AND "#{table_name}"."started_at_bucket" >= toDateTime64('#{from_time.utc.strftime('%Y-%m-%d %H:%M:%S')}', 6, 'UTC')
AND "#{table_name}"."started_at_bucket" < toDateTime64('#{to_time.utc.strftime('%Y-%m-%d %H:%M:%S')}', 6, 'UTC')
AND "#{table_name}"."status" IN ('failed', 'success') AND "#{table_name}"."status" IN ('failed', 'success')
GROUP BY "#{table_name}"."status" GROUP BY "#{table_name}"."status"
SQL SQL
result_sql = instance
.for_project(project)
.select(:status)
.within_dates(from_time, to_time)
.by_status(%i[failed success])
.group_by_status.to_sql
expect(result_sql.strip).to eq(expected_sql.strip) expect(result_sql.strip).to eq(expected_sql.strip)
end end
end end
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册