Skip to content
代码片段 群组 项目
提交 b65c8ff9 编辑于 作者: Adam Hegyi's avatar Adam Hegyi
浏览文件

Schedule materialized view refresh

This change adds a cron job to periodically rebuild the contributions
materialized view.
上级 011a7173
No related branches found
No related tags found
无相关合并请求
......@@ -10,29 +10,34 @@ class RebuildMaterializedViewService
table_schema = {database_name:String}
SQL
def initialize(connection:, state: {})
def initialize(connection:, runtime_limiter: Gitlab::Metrics::RuntimeLimiter.new, state: {})
@connection = connection
@runtime_limiter = runtime_limiter
@view_name = state.fetch(:view_name)
@tmp_view_name = state.fetch(:tmp_view_name)
@view_table_name = state.fetch(:view_table_name)
@tmp_view_table_name = state.fetch(:tmp_view_table_name)
@source_table_name = state.fetch(:source_table_name)
@next_value = state[:next_value]
end
def execute
create_tmp_materialized_view_table
create_tmp_materialized_view
backfill_data
rename_table
drop_tmp_tables
backfill_data.tap do |service_response|
if service_response.payload[:status] == :finished
rename_table
drop_tmp_tables if Feature.enabled?(:rebuild_mv_drop_old_tables, type: :gitlab_com_derisk)
end
end
end
private
attr_reader :connection, :view_name, :tmp_view_name, :view_table_name, :tmp_view_table_name, :source_table_name
attr_reader :connection, :view_name, :tmp_view_name, :view_table_name, :tmp_view_table_name, :source_table_name,
:next_value, :runtime_limiter
def create_tmp_materialized_view_table
# Create a tmp table from the existing table, use IF NOT EXISTS to avoid failure when the table exists.
......@@ -64,7 +69,8 @@ def backfill_data
})
view_query = connection.select(query).first['view_definition']
iterator.each_batch(column: :id, of: INSERT_BATCH_SIZE) do |scope|
payload = { status: :finished }
iterator.each_batch(column: :id, of: INSERT_BATCH_SIZE) do |scope, _min, max|
# Use the materialized view query to backfill the new temporary table.
# The materialized view query selects from the source table, example: FROM events.
# Replace the FROM part and select data from a batched subquery.
......@@ -76,7 +82,14 @@ def backfill_data
# Insert the batch
connection.execute("INSERT INTO #{quote(tmp_view_table_name)} #{query}")
if runtime_limiter.over_time?
payload.merge!(status: :over_time, next_value: max + 1)
break
end
end
ServiceResponse.success(payload: payload)
end
def rename_table
......@@ -103,7 +116,8 @@ def quote(table)
def iterator
builder = ClickHouse::QueryBuilder.new(source_table_name)
ClickHouse::Iterator.new(query_builder: builder, connection: connection)
ClickHouse::Iterator.new(query_builder: builder, connection: connection, min_value: next_value,
min_max_strategy: :order_limit)
end
end
end
......@@ -12,21 +12,75 @@ class RebuildMaterializedViewCronWorker
worker_has_external_dependencies! # the worker interacts with a ClickHouse database
feature_category :value_stream_management
MATERIALIZED_VIEWS = [
{
view_name: 'contributions_mv',
view_table_name: 'contributions',
tmp_view_name: 'tmp_contributions_mv',
tmp_view_table_name: 'tmp_contributions',
source_table_name: 'events'
}.freeze
].freeze
MAX_TTL = 5.minutes
MAX_RUNTIME = 4.minutes
REBUILDING_SCHEDULE = 1.week
MATERIALIZED_VIEW = {
view_name: 'contributions_mv',
view_table_name: 'contributions',
tmp_view_name: 'tmp_contributions_mv',
tmp_view_table_name: 'tmp_contributions',
source_table_name: 'events'
}.freeze
def self.redis_key
"rebuild_click_house_materialized_view:#{MATERIALIZED_VIEW[:view_name]}"
end
def perform
connection = ClickHouse::Connection.new(:main)
ClickHouse::RebuildMaterializedViewService
.new(connection: connection, state: MATERIALIZED_VIEWS.first)
.execute
return if Feature.disabled?(:rebuild_contributions_mv, type: :gitlab_com_derisk)
in_lock("#{self.class}:#{MATERIALIZED_VIEW[:view_name]}", ttl: MAX_TTL, retries: 0) do
state = build_state
if state[:finished_at] && DateTime.parse(Gitlab::Json.parse(state[:finished_at])) > REBUILDING_SCHEDULE.ago
break
end
service_response = ClickHouse::RebuildMaterializedViewService
.new(
connection: ClickHouse::Connection.new(:main),
runtime_limiter: Gitlab::Metrics::RuntimeLimiter.new(MAX_RUNTIME),
state: state)
.execute
payload = service_response.payload
current_time = Time.current.to_json
if payload[:status] == :over_time
state.merge!(
next_value: payload[:next_value],
last_update_at: current_time,
finished_at: nil
)
else
state.merge!(
next_value: nil,
last_update_at: current_time,
finished_at: current_time,
started_at: nil
)
end
Gitlab::Redis::SharedState.with do |redis|
redis.set(self.class.redis_key, Gitlab::Json.dump(state))
end
log_extra_metadata_on_done(:state, state)
end
end
private
def build_state
Gitlab::Redis::SharedState.with do |redis|
raw = redis.get(self.class.redis_key)
state = raw.present? ? Gitlab::Json.parse(raw) : {}
state.merge(initial_state).symbolize_keys
end
end
def initial_state
MATERIALIZED_VIEW.merge(started_at: Time.current)
end
end
end
---
name: rebuild_contributions_mv
feature_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/431453
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/144478
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/441620
milestone: '16.10'
group: group::optimize
type: gitlab_com_derisk
default_enabled: false
---
name: rebuild_mv_drop_old_tables
feature_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/431453
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/144478
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/441584
milestone: '16.10'
group: group::optimize
type: gitlab_com_derisk
default_enabled: false
......@@ -918,6 +918,9 @@
Settings.cron_jobs['click_house_audit_events_sync_worker'] ||= {}
Settings.cron_jobs['click_house_audit_events_sync_worker']['cron'] ||= "*/3 * * * *"
Settings.cron_jobs['click_house_audit_events_sync_worker']['job_class'] = 'ClickHouse::AuditEventsSyncWorker'
Settings.cron_jobs['click_house_rebuild_materialized_view_cron_worker'] ||= {}
Settings.cron_jobs['click_house_rebuild_materialized_view_cron_worker']['cron'] ||= "*/10 * * * *"
Settings.cron_jobs['click_house_rebuild_materialized_view_cron_worker']['job_class'] = 'ClickHouse::RebuildMaterializedViewCronWorker'
Settings.cron_jobs['vertex_ai_refresh_access_token_worker'] ||= {}
Settings.cron_jobs['vertex_ai_refresh_access_token_worker']['cron'] ||= '*/50 * * * *'
Settings.cron_jobs['vertex_ai_refresh_access_token_worker']['job_class'] = 'Llm::VertexAiAccessTokenRefreshWorker'
......
......@@ -31,7 +31,7 @@ def initialize(query_builder:, connection:, min_value: nil, min_max_strategy: :m
def each_batch(column: :id, of: 10_000)
min, max = min_max(column)
return if min.nil? || max == 0
return if min.nil? || max.nil? || max == 0
loop do
break if min > max
......
......@@ -10,19 +10,30 @@
let_it_be(:event3) { create(:closed_issue_event) }
let(:connection) { ClickHouse::Connection.new(:main) }
let(:runtime_limiter) { Gitlab::Metrics::RuntimeLimiter.new }
before do
insert_events_into_click_house
end
def invoke_service
described_class.new(connection: connection, state: {
let(:state) do
{
view_name: 'contributions_mv',
view_table_name: 'contributions',
tmp_view_name: 'tmp_contributions_mv',
tmp_view_table_name: 'tmp_contributions',
source_table_name: 'events'
}).execute
}
end
subject(:service_response) { run_service }
def run_service(new_state = state)
described_class.new(
connection: connection,
runtime_limiter: runtime_limiter,
state: new_state
).execute
end
before do
insert_events_into_click_house
end
it 're-creates the materialized view with correct data from the source table' do
......@@ -35,14 +46,16 @@ def invoke_service
expect(ids).to eq([event1.id])
# Rebuild the MV so we get the inconsistency corrected
invoke_service
expect(service_response).to be_success
payload = service_response.payload
expect(payload[:status]).to eq(:finished)
ids = connection.select('SELECT id FROM contributions FINAL').pluck('id')
expect(ids).to match_array([event1.id, event2.id, event3.id])
end
it 'does not leave temporary tables around' do
invoke_service
expect(service_response).to be_success
view_query = <<~SQL
SELECT view_definition FROM information_schema.views
......@@ -59,4 +72,59 @@ def invoke_service
expect(connection.select(view_query)).to be_empty
expect(connection.select(table_query)).to be_empty
end
context 'when the rebuild_mv_drop_old_tables FF is off' do
it 'preserves the old tables' do
stub_feature_flags(rebuild_mv_drop_old_tables: false)
expect(service_response).to be_success
view_query = <<~SQL
SELECT view_definition FROM information_schema.views
WHERE table_name = 'tmp_contributions_mv' AND
table_schema = '#{connection.database_name}'
SQL
table_query = <<~SQL
SELECT table_name FROM information_schema.tables
WHERE table_name = 'tmp_contributions' AND
table_schema = '#{connection.database_name}'
SQL
expect(connection.select(view_query)).not_to be_empty
expect(connection.select(table_query)).not_to be_empty
end
end
context 'when the processing is stopped due to over time' do
before do
stub_const("#{described_class}::INSERT_BATCH_SIZE", 1)
end
it 'returns time_limit status and the cursor' do
allow(runtime_limiter).to receive(:over_time?).and_return(true)
expect(service_response).to be_success
payload = service_response.payload
expect(payload[:status]).to eq(:over_time)
expect(payload[:next_value]).to eq(event1.id + 1)
end
context 'when the service is invoked three times' do
it 'finishes the processing' do
allow(runtime_limiter).to receive(:over_time?).and_return(true)
service_response = run_service
expect(service_response.payload[:status]).to eq(:over_time)
service_response = run_service(state.merge(next_value: service_response.payload[:next_value]))
expect(service_response.payload[:status]).to eq(:over_time)
service_response = run_service(state.merge(next_value: service_response.payload[:next_value]))
expect(service_response.payload[:status]).to eq(:over_time)
service_response = run_service(state.merge(next_value: service_response.payload[:next_value]))
expect(service_response.payload[:status]).to eq(:finished)
end
end
end
end
......@@ -2,12 +2,81 @@
require 'spec_helper'
RSpec.describe ClickHouse::RebuildMaterializedViewCronWorker, feature_category: :database do
it 'invokes the RebuildMaterializedViewService' do
allow_next_instance_of(ClickHouse::RebuildMaterializedViewService) do |instance|
allow(instance).to receive(:execute)
RSpec.describe ClickHouse::RebuildMaterializedViewCronWorker, :clean_gitlab_redis_shared_state, :freeze_time, feature_category: :database do
def run_job
described_class.new.perform
end
context 'when the previous run was just recently' do
before do
Gitlab::Redis::SharedState.with do |redis|
state = { finished_at: 1.day.ago.to_json }
redis.set(described_class.redis_key, Gitlab::Json.dump(state))
end
end
described_class.new.perform
it 'does not invoke the service' do
expect(ClickHouse::RebuildMaterializedViewService).not_to receive(:new)
run_job
end
end
context 'when the rebuild_contributions_mv feature flag is disabled' do
it 'does not invoke the service' do
stub_feature_flags(rebuild_contributions_mv: false)
expect(ClickHouse::RebuildMaterializedViewService).not_to receive(:new)
run_job
end
end
context 'when the service is finished', :click_house do
it 'persists the finished_at timestamp' do
run_job
Gitlab::Redis::SharedState.with do |redis|
data = Gitlab::Json.parse(redis.get(described_class.redis_key))
expect(DateTime.parse(data['finished_at'])).to eq(Time.current)
end
end
end
context 'when the service is interrupted' do
it 'persists the next value to continue the processing from' do
allow_next_instance_of(ClickHouse::RebuildMaterializedViewService) do |instance|
allow(instance).to receive(:execute).and_return(ServiceResponse.success(payload: { status: :over_time,
next_value: 100 }))
end
run_job
Gitlab::Redis::SharedState.with do |redis|
data = Gitlab::Json.parse(redis.get(described_class.redis_key))
expect(data['finished_at']).to eq(nil)
expect(data['next_value']).to eq(100)
end
end
end
context 'when the previous run was interrupted' do
before do
Gitlab::Redis::SharedState.with do |redis|
state = { started_at: 1.day.ago.to_json, next_value: 200 }
redis.set(described_class.redis_key, Gitlab::Json.dump(state))
end
end
it 'continues from the the previously persisted next_value' do
service = instance_double('ClickHouse::RebuildMaterializedViewService',
execute: ServiceResponse.success(payload: { status: :finished }))
expect(ClickHouse::RebuildMaterializedViewService).to receive(:new) do |args|
expect(args[:state][:next_value]).to eq(200)
end.and_return(service)
run_job
end
end
end
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册