Skip to content
代码片段 群组 项目
提交 3ac92b44 编辑于 作者: Adam Hegyi's avatar Adam Hegyi
浏览文件

Add a new min_max strategy to ClickHouse iterator

Add a new min_max strategy to ClickHouse iterator
上级 63b37287
No related branches found
No related tags found
无相关合并请求
......@@ -249,6 +249,29 @@ iterator.each_batch(column: :id, of: 10) do |scope, min, max|
end
```
### Min-max strategies
As the first step, the iterator determines the data range which will be used as condition in the iteration database queries. The data range is
determined using `MIN(column)` and `MAX(column)` aggregations. For some database tables this strategy causes inefficient database queries (full table scan). One example would be partitioned database tables.
Example query:
```sql
SELECT MIN(id) AS min, MAX(id) AS max FROM events;
```
Alternatively a different min-max strategy can be used which uses `ORDER BY + LIMIT` for determining the data range.
```ruby
iterator = ClickHouse::Iterator.new(query_builder: builder, connection: connection, min_max_strategy: :order_limit)
```
Example query:
```sql
SELECT (SELECT id FROM events ORDER BY id ASC LIMIT 1) AS min, (SELECT id FROM events ORDER BY id DESC LIMIT 1) AS max;
```
## Implementing Sidekiq workers
Sidekiq workers leveraging ClickHouse databases should include the `ClickHouseWorker` module.
......
......@@ -22,24 +22,16 @@ module ClickHouse
# builder = ClickHouse::QueryBuilder.new('event_authors').where(type: 'some_type')
class Iterator
# rubocop: disable CodeReuse/ActiveRecord -- this is a ClickHouse query builder class usin Arel
def initialize(query_builder:, connection:, min_value: nil)
def initialize(query_builder:, connection:, min_value: nil, min_max_strategy: :min_max)
@query_builder = query_builder
@connection = connection
@min_value = min_value
@min_max_strategy = min_max_strategy
end
def each_batch(column: :id, of: 10_000)
min_max_query = query_builder.select(
table[column].minimum.as('min'),
table[column].maximum.as('max')
)
row = connection.select(min_max_query.to_sql).first
return if row.nil?
min = min_value || row['min']
max = row['max']
return if max == 0
min, max = min_max(column)
return if min.nil? || max == 0
loop do
break if min > max
......@@ -57,7 +49,35 @@ def each_batch(column: :id, of: 10_000)
delegate :table, to: :query_builder
attr_reader :query_builder, :connection, :min_value
attr_reader :query_builder, :connection, :min_value, :min_max_strategy
def min_max(column)
case min_max_strategy
when :min_max
min_max_query = query_builder.select(
table[column].minimum.as('min'),
table[column].maximum.as('max')
)
row = connection.select(min_max_query.to_sql).first
return if row.nil?
[min_value || row['min'], row['max']]
when :order_limit
min_query = query_builder.select(table[column]).order(column, :asc).limit(1)
max_query = query_builder.select(table[column]).order(column, :desc).limit(1)
query = "SELECT (#{min_query.to_sql}) AS min, (#{max_query.to_sql}) AS max"
row = connection.select(query).first
return if row.nil?
[min_value || row['min'], row['max']]
else
raise ArgumentError, "Unknown min_max strategy is given: #{min_max_strategy}"
end
end
# rubocop: enable CodeReuse/ActiveRecord
end
end
......@@ -5,7 +5,14 @@
RSpec.describe ClickHouse::Iterator, :click_house, feature_category: :database do
let(:query_builder) { ClickHouse::QueryBuilder.new('event_authors') }
let(:connection) { ClickHouse::Connection.new(:main) }
let(:iterator) { described_class.new(query_builder: query_builder, connection: connection) }
let(:min_max_strategy) { :min_max }
let(:iterator) do
described_class.new(
query_builder: query_builder,
connection: connection,
min_max_strategy: min_max_strategy
)
end
before do
connection.execute('INSERT INTO event_authors (author_id) SELECT number + 1 FROM numbers(10)')
......@@ -29,6 +36,27 @@ def collect_ids_with_batch_size(of)
expect(collect_ids_with_batch_size(15)).to match_array(expected_values)
end
context 'when invalid min_max_strategy is given' do
let(:min_max_strategy) { :unknown }
it 'raises ArgumentError' do
expect { collect_ids_with_batch_size(3) }.to raise_error(ArgumentError, /Unknown min_max/)
end
end
context 'when order_limit min_max_strategy is given' do
let(:min_max_strategy) { :order_limit }
it 'iterates correctly' do
expected_values = (1..10).to_a
expect(collect_ids_with_batch_size(3)).to match_array(expected_values)
expect(collect_ids_with_batch_size(5)).to match_array(expected_values)
expect(collect_ids_with_batch_size(10)).to match_array(expected_values)
expect(collect_ids_with_batch_size(15)).to match_array(expected_values)
end
end
it 'yields the boundary values' do
min_values = []
max_values = []
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册