From 1beb0c24a95ab489cadb5659bc44ca73b79f7e5d Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen <qmnguyen@gitlab.com> Date: Mon, 19 Apr 2021 10:27:11 +0000 Subject: [PATCH] Extract Sidekiq query selector matching logic to a new class --- bin/sidekiq-cluster | 1 + lib/gitlab/sidekiq_cluster/cli.rb | 14 +- lib/gitlab/sidekiq_config/cli_methods.rb | 90 +++--------- lib/gitlab/sidekiq_config/worker_matcher.rb | 86 ++++++++++++ spec/lib/gitlab/sidekiq_cluster/cli_spec.rb | 2 +- .../gitlab/sidekiq_config/cli_methods_spec.rb | 84 ++---------- .../sidekiq_config/worker_matcher_spec.rb | 129 ++++++++++++++++++ 7 files changed, 249 insertions(+), 157 deletions(-) create mode 100644 lib/gitlab/sidekiq_config/worker_matcher.rb create mode 100644 spec/lib/gitlab/sidekiq_config/worker_matcher_spec.rb diff --git a/bin/sidekiq-cluster b/bin/sidekiq-cluster index 2204a222b885a..47f8e82d2283b 100755 --- a/bin/sidekiq-cluster +++ b/bin/sidekiq-cluster @@ -5,6 +5,7 @@ require 'optparse' require_relative '../lib/gitlab' require_relative '../lib/gitlab/utils' require_relative '../lib/gitlab/sidekiq_config/cli_methods' +require_relative '../lib/gitlab/sidekiq_config/worker_matcher' require_relative '../lib/gitlab/sidekiq_cluster' require_relative '../lib/gitlab/sidekiq_cluster/cli' diff --git a/lib/gitlab/sidekiq_cluster/cli.rb b/lib/gitlab/sidekiq_cluster/cli.rb index e471517c50ad9..9490d543dd1b7 100644 --- a/lib/gitlab/sidekiq_cluster/cli.rb +++ b/lib/gitlab/sidekiq_cluster/cli.rb @@ -53,11 +53,11 @@ def run(argv = ARGV) 'You cannot specify --queue-selector and --experimental-queue-selector together' end - all_queues = SidekiqConfig::CliMethods.all_queues(@rails_path) - queue_names = SidekiqConfig::CliMethods.worker_queues(@rails_path) + worker_metadatas = SidekiqConfig::CliMethods.worker_metadatas(@rails_path) + worker_queues = SidekiqConfig::CliMethods.worker_queues(@rails_path) - queue_groups = argv.map do |queues| - next queue_names if queues == '*' + queue_groups = argv.map do |queues_or_query_string| + next worker_queues if queues_or_query_string == SidekiqConfig::WorkerMatcher::WILDCARD_MATCH # When using the queue query syntax, we treat each queue group # as a worker attribute query, and resolve the queues for the @@ -65,14 +65,14 @@ def run(argv = ARGV) # Simplify with https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/646 if @queue_selector || @experimental_queue_selector - SidekiqConfig::CliMethods.query_workers(queues, all_queues) + SidekiqConfig::CliMethods.query_queues(queues_or_query_string, worker_metadatas) else - SidekiqConfig::CliMethods.expand_queues(queues.split(','), queue_names) + SidekiqConfig::CliMethods.expand_queues(queues_or_query_string.split(','), worker_queues) end end if @negate_queues - queue_groups.map! { |queues| queue_names - queues } + queue_groups.map! { |queues| worker_queues - queues } end if queue_groups.all?(&:empty?) diff --git a/lib/gitlab/sidekiq_config/cli_methods.rb b/lib/gitlab/sidekiq_config/cli_methods.rb index a256632bc1232..8eef15f9ccb25 100644 --- a/lib/gitlab/sidekiq_config/cli_methods.rb +++ b/lib/gitlab/sidekiq_config/cli_methods.rb @@ -12,35 +12,19 @@ module CliMethods # rubocop:disable Gitlab/ModuleWithInstanceVariables extend self + # The file names are misleading. Those files contain the metadata of the + # workers. They should be renamed to all_workers instead. + # https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1018 QUEUE_CONFIG_PATHS = begin result = %w[app/workers/all_queues.yml] result << 'ee/app/workers/all_queues.yml' if Gitlab.ee? result end.freeze - QUERY_OR_OPERATOR = '|' - QUERY_AND_OPERATOR = '&' - QUERY_CONCATENATE_OPERATOR = ',' - QUERY_TERM_REGEX = %r{^(\w+)(!?=)([\w:#{QUERY_CONCATENATE_OPERATOR}]+)}.freeze + def worker_metadatas(rails_path = Rails.root.to_s) + @worker_metadatas ||= {} - QUERY_PREDICATES = { - feature_category: :to_sym, - has_external_dependencies: lambda { |value| value == 'true' }, - name: :to_s, - resource_boundary: :to_sym, - tags: :to_sym, - urgency: :to_sym - }.freeze - - QueryError = Class.new(StandardError) - InvalidTerm = Class.new(QueryError) - UnknownOperator = Class.new(QueryError) - UnknownPredicate = Class.new(QueryError) - - def all_queues(rails_path = Rails.root.to_s) - @worker_queues ||= {} - - @worker_queues[rails_path] ||= QUEUE_CONFIG_PATHS.flat_map do |path| + @worker_metadatas[rails_path] ||= QUEUE_CONFIG_PATHS.flat_map do |path| full_path = File.join(rails_path, path) File.exist?(full_path) ? YAML.load_file(full_path) : [] @@ -49,7 +33,7 @@ def all_queues(rails_path = Rails.root.to_s) # rubocop:enable Gitlab/ModuleWithInstanceVariables def worker_queues(rails_path = Rails.root.to_s) - worker_names(all_queues(rails_path)) + worker_names(worker_metadatas(rails_path)) end def expand_queues(queues, all_queues = self.worker_queues) @@ -62,13 +46,18 @@ def expand_queues(queues, all_queues = self.worker_queues) end end - def query_workers(query_string, queues) - worker_names(queues.select(&query_string_to_lambda(query_string))) + def query_queues(query_string, worker_metadatas) + matcher = SidekiqConfig::WorkerMatcher.new(query_string) + selected_metadatas = worker_metadatas.select do |worker_metadata| + matcher.match?(worker_metadata) + end + + worker_names(selected_metadatas) end def clear_memoization! - if instance_variable_defined?('@worker_queues') - remove_instance_variable('@worker_queues') + if instance_variable_defined?('@worker_metadatas') + remove_instance_variable('@worker_metadatas') end end @@ -77,53 +66,6 @@ def clear_memoization! def worker_names(workers) workers.map { |queue| queue[:name] } end - - def query_string_to_lambda(query_string) - or_clauses = query_string.split(QUERY_OR_OPERATOR).map do |and_clauses_string| - and_clauses_predicates = and_clauses_string.split(QUERY_AND_OPERATOR).map do |term| - predicate_for_term(term) - end - - lambda { |worker| and_clauses_predicates.all? { |predicate| predicate.call(worker) } } - end - - lambda { |worker| or_clauses.any? { |predicate| predicate.call(worker) } } - end - - def predicate_for_term(term) - match = term.match(QUERY_TERM_REGEX) - - raise InvalidTerm.new("Invalid term: #{term}") unless match - - _, lhs, op, rhs = *match - - predicate_for_op(op, predicate_factory(lhs, rhs.split(QUERY_CONCATENATE_OPERATOR))) - end - - def predicate_for_op(op, predicate) - case op - when '=' - predicate - when '!=' - lambda { |worker| !predicate.call(worker) } - else - # This is unreachable because InvalidTerm will be raised instead, but - # keeping it allows to guard against that changing in future. - raise UnknownOperator.new("Unknown operator: #{op}") - end - end - - def predicate_factory(lhs, values) - values_block = QUERY_PREDICATES[lhs.to_sym] - - raise UnknownPredicate.new("Unknown predicate: #{lhs}") unless values_block - - lambda do |queue| - comparator = Array(queue[lhs.to_sym]).to_set - - values.map(&values_block).to_set.intersect?(comparator) - end - end end end end diff --git a/lib/gitlab/sidekiq_config/worker_matcher.rb b/lib/gitlab/sidekiq_config/worker_matcher.rb new file mode 100644 index 0000000000000..fe5ac10c65ac0 --- /dev/null +++ b/lib/gitlab/sidekiq_config/worker_matcher.rb @@ -0,0 +1,86 @@ +# frozen_string_literal: true + +module Gitlab + module SidekiqConfig + class WorkerMatcher + WILDCARD_MATCH = '*' + QUERY_OR_OPERATOR = '|' + QUERY_AND_OPERATOR = '&' + QUERY_CONCATENATE_OPERATOR = ',' + QUERY_TERM_REGEX = %r{^(\w+)(!?=)([\w:#{QUERY_CONCATENATE_OPERATOR}]+)}.freeze + + QUERY_PREDICATES = { + feature_category: :to_sym, + has_external_dependencies: lambda { |value| value == 'true' }, + name: :to_s, + resource_boundary: :to_sym, + tags: :to_sym, + urgency: :to_sym + }.freeze + + QueryError = Class.new(StandardError) + InvalidTerm = Class.new(QueryError) + UnknownOperator = Class.new(QueryError) + UnknownPredicate = Class.new(QueryError) + + def initialize(query_string) + @match_lambda = query_string_to_lambda(query_string) + end + + def match?(worker_metadata) + @match_lambda.call(worker_metadata) + end + + private + + def query_string_to_lambda(query_string) + return lambda { |_worker| true } if query_string.strip == WILDCARD_MATCH + + or_clauses = query_string.split(QUERY_OR_OPERATOR).map do |and_clauses_string| + and_clauses_predicates = and_clauses_string.split(QUERY_AND_OPERATOR).map do |term| + predicate_for_term(term) + end + + lambda { |worker| and_clauses_predicates.all? { |predicate| predicate.call(worker) } } + end + + lambda { |worker| or_clauses.any? { |predicate| predicate.call(worker) } } + end + + def predicate_for_term(term) + match = term.match(QUERY_TERM_REGEX) + + raise InvalidTerm.new("Invalid term: #{term}") unless match + + _, lhs, op, rhs = *match + + predicate_for_op(op, predicate_factory(lhs, rhs.split(QUERY_CONCATENATE_OPERATOR))) + end + + def predicate_for_op(op, predicate) + case op + when '=' + predicate + when '!=' + lambda { |worker| !predicate.call(worker) } + else + # This is unreachable because InvalidTerm will be raised instead, but + # keeping it allows to guard against that changing in future. + raise UnknownOperator.new("Unknown operator: #{op}") + end + end + + def predicate_factory(lhs, values) + values_block = QUERY_PREDICATES[lhs.to_sym] + + raise UnknownPredicate.new("Unknown predicate: #{lhs}") unless values_block + + lambda do |queue| + comparator = Array(queue[lhs.to_sym]).to_set + + values.map(&values_block).to_set.intersect?(comparator) + end + end + end + end +end diff --git a/spec/lib/gitlab/sidekiq_cluster/cli_spec.rb b/spec/lib/gitlab/sidekiq_cluster/cli_spec.rb index 74834fb9014f6..43cbe71dd6b2a 100644 --- a/spec/lib/gitlab/sidekiq_cluster/cli_spec.rb +++ b/spec/lib/gitlab/sidekiq_cluster/cli_spec.rb @@ -214,7 +214,7 @@ expect(Gitlab::SidekiqCluster).not_to receive(:start) expect { cli.run(%W(#{flag} unknown_field=chatops)) } - .to raise_error(Gitlab::SidekiqConfig::CliMethods::QueryError) + .to raise_error(Gitlab::SidekiqConfig::WorkerMatcher::QueryError) end end end diff --git a/spec/lib/gitlab/sidekiq_config/cli_methods_spec.rb b/spec/lib/gitlab/sidekiq_config/cli_methods_spec.rb index 01e7c06249acd..bc63289a344a4 100644 --- a/spec/lib/gitlab/sidekiq_config/cli_methods_spec.rb +++ b/spec/lib/gitlab/sidekiq_config/cli_methods_spec.rb @@ -1,7 +1,6 @@ # frozen_string_literal: true require 'fast_spec_helper' -require 'rspec-parameterized' RSpec.describe Gitlab::SidekiqConfig::CliMethods do let(:dummy_root) { '/tmp/' } @@ -122,10 +121,8 @@ def stub_contents(foss_queues, ee_queues) end end - describe '.query_workers' do - using RSpec::Parameterized::TableSyntax - - let(:queues) do + describe '.query_queues' do + let(:worker_metadatas) do [ { name: 'a', @@ -162,79 +159,16 @@ def stub_contents(foss_queues, ee_queues) ] end - context 'with valid input' do - where(:query, :selected_queues) do - # feature_category - 'feature_category=category_a' | %w(a a:2) - 'feature_category=category_a,category_c' | %w(a a:2 c) - 'feature_category=category_a|feature_category=category_c' | %w(a a:2 c) - 'feature_category!=category_a' | %w(b c) - - # has_external_dependencies - 'has_external_dependencies=true' | %w(b) - 'has_external_dependencies=false' | %w(a a:2 c) - 'has_external_dependencies=true,false' | %w(a a:2 b c) - 'has_external_dependencies=true|has_external_dependencies=false' | %w(a a:2 b c) - 'has_external_dependencies!=true' | %w(a a:2 c) - - # urgency - 'urgency=high' | %w(a:2 b) - 'urgency=low' | %w(a) - 'urgency=high,low,throttled' | %w(a a:2 b c) - 'urgency=low|urgency=throttled' | %w(a c) - 'urgency!=high' | %w(a c) - - # name - 'name=a' | %w(a) - 'name=a,b' | %w(a b) - 'name=a,a:2|name=b' | %w(a a:2 b) - 'name!=a,a:2' | %w(b c) - - # resource_boundary - 'resource_boundary=memory' | %w(b c) - 'resource_boundary=memory,cpu' | %w(a b c) - 'resource_boundary=memory|resource_boundary=cpu' | %w(a b c) - 'resource_boundary!=memory,cpu' | %w(a:2) - - # tags - 'tags=no_disk_io' | %w(a b) - 'tags=no_disk_io,git_access' | %w(a a:2 b) - 'tags=no_disk_io|tags=git_access' | %w(a a:2 b) - 'tags=no_disk_io&tags=git_access' | %w(a) - 'tags!=no_disk_io' | %w(a:2 c) - 'tags!=no_disk_io,git_access' | %w(c) - 'tags=unknown_tag' | [] - 'tags!=no_disk_io' | %w(a:2 c) - 'tags!=no_disk_io,git_access' | %w(c) - 'tags!=unknown_tag' | %w(a a:2 b c) - - # combinations - 'feature_category=category_a&urgency=high' | %w(a:2) - 'feature_category=category_a&urgency=high|feature_category=category_c' | %w(a:2 c) - end + let(:worker_matcher) { double(:WorkerMatcher) } + let(:query) { 'feature_category=category_a,category_c' } - with_them do - it do - expect(described_class.query_workers(query, queues)) - .to match_array(selected_queues) - end - end + before do + allow(::Gitlab::SidekiqConfig::WorkerMatcher).to receive(:new).with(query).and_return(worker_matcher) + allow(worker_matcher).to receive(:match?).and_return(true, true, false, true) end - context 'with invalid input' do - where(:query, :error) do - 'feature_category="category_a"' | described_class::InvalidTerm - 'feature_category=' | described_class::InvalidTerm - 'feature_category~category_a' | described_class::InvalidTerm - 'worker_name=a' | described_class::UnknownPredicate - end - - with_them do - it do - expect { described_class.query_workers(query, queues) } - .to raise_error(error) - end - end + it 'returns the queue names of matched workers' do + expect(described_class.query_queues(query, worker_metadatas)).to match(%w(a a:2 c)) end end end diff --git a/spec/lib/gitlab/sidekiq_config/worker_matcher_spec.rb b/spec/lib/gitlab/sidekiq_config/worker_matcher_spec.rb new file mode 100644 index 0000000000000..75e9c8c100b48 --- /dev/null +++ b/spec/lib/gitlab/sidekiq_config/worker_matcher_spec.rb @@ -0,0 +1,129 @@ +# frozen_string_literal: true + +require 'fast_spec_helper' +require 'rspec-parameterized' + +RSpec.describe Gitlab::SidekiqConfig::WorkerMatcher do + describe '#match?' do + using RSpec::Parameterized::TableSyntax + + let(:worker_metadatas) do + [ + { + name: 'a', + feature_category: :category_a, + has_external_dependencies: false, + urgency: :low, + resource_boundary: :cpu, + tags: [:no_disk_io, :git_access] + }, + { + name: 'a:2', + feature_category: :category_a, + has_external_dependencies: false, + urgency: :high, + resource_boundary: :none, + tags: [:git_access] + }, + { + name: 'b', + feature_category: :category_b, + has_external_dependencies: true, + urgency: :high, + resource_boundary: :memory, + tags: [:no_disk_io] + }, + { + name: 'c', + feature_category: :category_c, + has_external_dependencies: false, + urgency: :throttled, + resource_boundary: :memory, + tags: [] + } + ] + end + + context 'with valid input' do + where(:query, :expected_metadatas) do + # feature_category + 'feature_category=category_a' | %w(a a:2) + 'feature_category=category_a,category_c' | %w(a a:2 c) + 'feature_category=category_a|feature_category=category_c' | %w(a a:2 c) + 'feature_category!=category_a' | %w(b c) + + # has_external_dependencies + 'has_external_dependencies=true' | %w(b) + 'has_external_dependencies=false' | %w(a a:2 c) + 'has_external_dependencies=true,false' | %w(a a:2 b c) + 'has_external_dependencies=true|has_external_dependencies=false' | %w(a a:2 b c) + 'has_external_dependencies!=true' | %w(a a:2 c) + + # urgency + 'urgency=high' | %w(a:2 b) + 'urgency=low' | %w(a) + 'urgency=high,low,throttled' | %w(a a:2 b c) + 'urgency=low|urgency=throttled' | %w(a c) + 'urgency!=high' | %w(a c) + + # name + 'name=a' | %w(a) + 'name=a,b' | %w(a b) + 'name=a,a:2|name=b' | %w(a a:2 b) + 'name!=a,a:2' | %w(b c) + + # resource_boundary + 'resource_boundary=memory' | %w(b c) + 'resource_boundary=memory,cpu' | %w(a b c) + 'resource_boundary=memory|resource_boundary=cpu' | %w(a b c) + 'resource_boundary!=memory,cpu' | %w(a:2) + + # tags + 'tags=no_disk_io' | %w(a b) + 'tags=no_disk_io,git_access' | %w(a a:2 b) + 'tags=no_disk_io|tags=git_access' | %w(a a:2 b) + 'tags=no_disk_io&tags=git_access' | %w(a) + 'tags!=no_disk_io' | %w(a:2 c) + 'tags!=no_disk_io,git_access' | %w(c) + 'tags=unknown_tag' | [] + 'tags!=no_disk_io' | %w(a:2 c) + 'tags!=no_disk_io,git_access' | %w(c) + 'tags!=unknown_tag' | %w(a a:2 b c) + + # combinations + 'feature_category=category_a&urgency=high' | %w(a:2) + 'feature_category=category_a&urgency=high|feature_category=category_c' | %w(a:2 c) + + # Match all + '*' | %w(a a:2 b c) + end + + with_them do + it do + matched_metadatas = worker_metadatas.select do |metadata| + described_class.new(query).match?(metadata) + end + expect(matched_metadatas.map { |m| m[:name] }).to match_array(expected_metadatas) + end + end + end + + context 'with invalid input' do + where(:query, :error) do + 'feature_category="category_a"' | described_class::InvalidTerm + 'feature_category=' | described_class::InvalidTerm + 'feature_category~category_a' | described_class::InvalidTerm + 'worker_name=a' | described_class::UnknownPredicate + end + + with_them do + it do + worker_metadatas.each do |metadata| + expect { described_class.new(query).match?(metadata) } + .to raise_error(error) + end + end + end + end + end +end -- GitLab