diff --git a/app/services/click_house/rebuild_materialized_view_service.rb b/app/services/click_house/rebuild_materialized_view_service.rb new file mode 100644 index 0000000000000000000000000000000000000000..9072f4bfd5fb6b79620a1a4b828549a70a4d8147 --- /dev/null +++ b/app/services/click_house/rebuild_materialized_view_service.rb @@ -0,0 +1,109 @@ +# frozen_string_literal: true + +module ClickHouse + class RebuildMaterializedViewService + INSERT_BATCH_SIZE = 10_000_000 + + VIEW_DEFINITION_QUERY = <<~SQL + SELECT view_definition FROM information_schema.views + WHERE table_name = {view_name:String} AND + table_schema = {database_name:String} + SQL + + def initialize(connection:, state: {}) + @connection = connection + + @view_name = state.fetch(:view_name) + @tmp_view_name = state.fetch(:tmp_view_name) + @view_table_name = state.fetch(:view_table_name) + @tmp_view_table_name = state.fetch(:tmp_view_table_name) + @source_table_name = state.fetch(:source_table_name) + end + + def execute + create_tmp_materialized_view_table + create_tmp_materialized_view + + backfill_data + + rename_table + drop_tmp_tables + end + + private + + attr_reader :connection, :view_name, :tmp_view_name, :view_table_name, :tmp_view_table_name, :source_table_name + + def create_tmp_materialized_view_table + # Create a tmp table from the existing table, use IF NOT EXISTS to avoid failure when the table exists. + create_statement = show_create_table(view_table_name) + .gsub("#{connection.database_name}.#{view_table_name}", + "#{connection.database_name}.#{quote(tmp_view_table_name)}") + .gsub('CREATE TABLE', 'CREATE TABLE IF NOT EXISTS') + + connection.execute(create_statement) + end + + def create_tmp_materialized_view + # Create a tmp materialized view from the existing view, use IF NOT EXISTS to avoid failure when the view exists. + create_statement = show_create_table(view_name) + .gsub("#{connection.database_name}.#{view_name}", + "#{connection.database_name}.#{quote(tmp_view_name)}") + .gsub("#{connection.database_name}.#{view_table_name}", + "#{connection.database_name}.#{quote(tmp_view_table_name)}") + .gsub('CREATE MATERIALIZED VIEW', 'CREATE MATERIALIZED VIEW IF NOT EXISTS') + + connection.execute(create_statement) + end + + def backfill_data + # Take the query from the materialized view definition. + query = ClickHouse::Client::Query.new(raw_query: VIEW_DEFINITION_QUERY, placeholders: { + view_name: view_name, + database_name: connection.database_name + }) + view_query = connection.select(query).first['view_definition'] + + iterator.each_batch(column: :id, of: INSERT_BATCH_SIZE) do |scope| + # Use the materialized view query to backfill the new temporary table. + # The materialized view query selects from the source table, example: FROM events. + # Replace the FROM part and select data from a batched subquery. + # Old: FROM events + # New: FROM (SELECT .. FROM events WHERE id > x and id < y) events + inner_query = "(#{scope.to_sql}) #{quote(source_table_name)}" + + query = view_query.gsub("FROM #{connection.database_name}.#{source_table_name}", "FROM #{inner_query}") + + # Insert the batch + connection.execute("INSERT INTO #{quote(tmp_view_table_name)} #{query}") + end + end + + def rename_table + # Swap the tables + connection.execute("EXCHANGE TABLES #{quote(view_table_name)} AND #{quote(tmp_view_table_name)}") + end + + def drop_tmp_tables + connection.execute("DROP TABLE IF EXISTS #{quote(tmp_view_table_name)}") + connection.execute("DROP TABLE IF EXISTS #{quote(tmp_view_name)}") + end + + def show_create_table(table) + result = connection.select("SHOW CREATE TABLE #{quote(table)}") + + raise "Table or view not found: #{table}" if result.empty? + + result.first['statement'] + end + + def quote(table) + ApplicationRecord.connection.quote_table_name(table) + end + + def iterator + builder = ClickHouse::QueryBuilder.new(source_table_name) + ClickHouse::Iterator.new(query_builder: builder, connection: connection) + end + end +end diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index 1413638e903dbd2c6059b509dcf70fab1fcdaf8d..76680071360d9fd4ffd1f8230b2085fef679ebac 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -390,6 +390,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: cronjob:click_house_rebuild_materialized_view_cron + :worker_name: ClickHouse::RebuildMaterializedViewCronWorker + :feature_category: :value_stream_management + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: cronjob:concurrency_limit_resume :worker_name: ConcurrencyLimit::ResumeWorker :feature_category: :global_search diff --git a/app/workers/click_house/rebuild_materialized_view_cron_worker.rb b/app/workers/click_house/rebuild_materialized_view_cron_worker.rb new file mode 100644 index 0000000000000000000000000000000000000000..2b7513803e8f302dd233e1c1ac94c91daf0c29f0 --- /dev/null +++ b/app/workers/click_house/rebuild_materialized_view_cron_worker.rb @@ -0,0 +1,32 @@ +# frozen_string_literal: true + +module ClickHouse + class RebuildMaterializedViewCronWorker + include ApplicationWorker + include ClickHouseWorker + include Gitlab::ExclusiveLeaseHelpers + + idempotent! + queue_namespace :cronjob + data_consistency :delayed + worker_has_external_dependencies! # the worker interacts with a ClickHouse database + feature_category :value_stream_management + + MATERIALIZED_VIEWS = [ + { + view_name: 'contributions_mv', + view_table_name: 'contributions', + tmp_view_name: 'tmp_contributions_mv', + tmp_view_table_name: 'tmp_contributions', + source_table_name: 'events' + }.freeze + ].freeze + + def perform + connection = ClickHouse::Connection.new(:main) + ClickHouse::RebuildMaterializedViewService + .new(connection: connection, state: MATERIALIZED_VIEWS.first) + .execute + end + end +end diff --git a/lib/click_house/connection.rb b/lib/click_house/connection.rb index 79551326d2da312b36beeaf3b4ae9a5567f4e673..30c71870b1f095214826e98a6b6f806d65b1682b 100644 --- a/lib/click_house/connection.rb +++ b/lib/click_house/connection.rb @@ -15,13 +15,16 @@ def execute(query) ClickHouse::Client.execute(query, database, configuration) end + def database_name + configuration.databases[database]&.database + end + def table_exists?(table_name) raw_query = <<~SQL.squish SELECT 1 FROM system.tables WHERE name = {table_name: String} AND database = {database_name: String} SQL - database_name = configuration.databases[database]&.database placeholders = { table_name: table_name, database_name: database_name } query = ClickHouse::Client::Query.new(raw_query: raw_query, placeholders: placeholders) diff --git a/spec/lib/click_house/connection_spec.rb b/spec/lib/click_house/connection_spec.rb index dda736dfaa88f75ce01a3449b8478f6977b11fc5..a3866e46471bfe94a401d44857fbaf2273b80ed8 100644 --- a/spec/lib/click_house/connection_spec.rb +++ b/spec/lib/click_house/connection_spec.rb @@ -5,6 +5,13 @@ RSpec.describe ClickHouse::Connection, click_house: :without_migrations, feature_category: :database do let(:connection) { described_class.new(:main) } + describe '#database_name' do + it 'returns the configured database name' do + name = ClickHouse::Client.configuration.databases[:main].database + expect(connection.database_name).to eq(name) + end + end + describe '#select' do it 'proxies select to client' do expect( diff --git a/spec/services/click_house/rebuild_materialized_view_service_spec.rb b/spec/services/click_house/rebuild_materialized_view_service_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..895cd965041f69b73827f32abe7eec90e2388917 --- /dev/null +++ b/spec/services/click_house/rebuild_materialized_view_service_spec.rb @@ -0,0 +1,62 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe ClickHouse::RebuildMaterializedViewService, :click_house, feature_category: :database do + include ClickHouseHelpers + + let_it_be(:event1) { create(:event, :pushed) } + let_it_be(:event2) { create(:event, :pushed) } + let_it_be(:event3) { create(:closed_issue_event) } + + let(:connection) { ClickHouse::Connection.new(:main) } + + before do + insert_events_into_click_house + end + + def invoke_service + described_class.new(connection: connection, state: { + view_name: 'contributions_mv', + view_table_name: 'contributions', + tmp_view_name: 'tmp_contributions_mv', + tmp_view_table_name: 'tmp_contributions', + source_table_name: 'events' + }).execute + end + + it 're-creates the materialized view with correct data from the source table' do + stub_const("#{described_class}::INSERT_BATCH_SIZE", 1) + # Delete two records from the contributions MV to create so we have inconsistency + connection.execute("DELETE FROM contributions WHERE id IN (#{event2.id}, #{event3.id})") + + # The current MV should have one record left + ids = connection.select('SELECT id FROM contributions FINAL').pluck('id') + expect(ids).to eq([event1.id]) + + # Rebuild the MV so we get the inconsistency corrected + invoke_service + + ids = connection.select('SELECT id FROM contributions FINAL').pluck('id') + expect(ids).to match_array([event1.id, event2.id, event3.id]) + end + + it 'does not leave temporary tables around' do + invoke_service + + view_query = <<~SQL + SELECT view_definition FROM information_schema.views + WHERE table_name = 'tmp_contributions_mv' AND + table_schema = '#{connection.database_name}' + SQL + + table_query = <<~SQL + SELECT view_definition FROM information_schema.tables + WHERE table_name = 'tmp_contributions' AND + table_schema = '#{connection.database_name}' + SQL + + expect(connection.select(view_query)).to be_empty + expect(connection.select(table_query)).to be_empty + end +end diff --git a/ee/spec/support/helpers/click_house_helpers.rb b/spec/support/helpers/click_house_helpers.rb similarity index 100% rename from ee/spec/support/helpers/click_house_helpers.rb rename to spec/support/helpers/click_house_helpers.rb diff --git a/spec/workers/click_house/rebuild_materialized_view_cron_worker_spec.rb b/spec/workers/click_house/rebuild_materialized_view_cron_worker_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..93631e9d4d53eb620a29b6cf0d1dac9721c3c316 --- /dev/null +++ b/spec/workers/click_house/rebuild_materialized_view_cron_worker_spec.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe ClickHouse::RebuildMaterializedViewCronWorker, feature_category: :database do + it 'invokes the RebuildMaterializedViewService' do + allow_next_instance_of(ClickHouse::RebuildMaterializedViewService) do |instance| + allow(instance).to receive(:execute) + end + + described_class.new.perform + end +end