From 189f707aaefeb37a74daee102c0c2c4dea5f3073 Mon Sep 17 00:00:00 2001
From: Sean McGivern <sean@gitlab.com>
Date: Thu, 6 Feb 2020 15:05:35 +0000
Subject: [PATCH] Fix and document --queue-query-syntax

This ensures that --queue-query-syntax works correctly when invoked from
the command line, without the rest of the Rails environment, as well as
adding documentation and unit-level tests.
---
 changelogs/unreleased/an-sidekiq-query.yml    |   5 +
 .../operations/extra_sidekiq_processes.md     |  75 +++++++++++++
 ee/lib/gitlab/sidekiq_cluster/cli.rb          |  30 ++---
 .../lib/gitlab/sidekiq_cluster/cli_spec.rb    |  22 +++-
 lib/gitlab/sidekiq_config.rb                  |  54 ---------
 lib/gitlab/sidekiq_config/cli_methods.rb      |  85 +++++++++++++-
 .../gitlab/sidekiq_config/cli_methods_spec.rb | 105 +++++++++++++++++-
 7 files changed, 296 insertions(+), 80 deletions(-)
 create mode 100644 changelogs/unreleased/an-sidekiq-query.yml

diff --git a/changelogs/unreleased/an-sidekiq-query.yml b/changelogs/unreleased/an-sidekiq-query.yml
new file mode 100644
index 0000000000000..1e759cc395470
--- /dev/null
+++ b/changelogs/unreleased/an-sidekiq-query.yml
@@ -0,0 +1,5 @@
+---
+title: Add experimental --queue-query-syntax option to sidekiq-cluster
+merge_request: 18877
+author:
+type: changed
diff --git a/doc/administration/operations/extra_sidekiq_processes.md b/doc/administration/operations/extra_sidekiq_processes.md
index 5cdd33ba507be..7829f0af1783c 100644
--- a/doc/administration/operations/extra_sidekiq_processes.md
+++ b/doc/administration/operations/extra_sidekiq_processes.md
@@ -82,6 +82,81 @@ you list:
    sudo gitlab-ctl reconfigure
    ```
 
+## Queue query syntax (experimental)
+
+> [Introduced](https://gitlab.com/gitlab-com/gl-infra/scalability/issues/45) in [GitLab Starter](https://about.gitlab.com/pricing/) 12.8.
+
+In addition to selecting queues by name, as above, the `queue_query_syntax`
+option allows queue groups to be selected in a more general way using
+the following components:
+
+- Attributes that can be selected.
+- Operators used to construct a query.
+
+### Available attributes
+
+From the [list of all available
+attributes](https://gitlab.com/gitlab-org/gitlab/-/blob/master/app/workers/all_queues.yml),
+`queue_query_syntax` allows selecting of queues by the following attributes:
+
+- `feature_category` - the [GitLab feature
+  category](https://about.gitlab.com/direction/maturity/#category-maturity) the
+  queue belongs to. For example, the `merge` queue belongs to the
+  `source_code_management` category.
+- `has_external_dependencies` - whether or not the queue connects to external
+  services. For example, all importers have this set to `true`.
+- `latency_sensitive` - whether or not the queue is particularly sensitive to
+  latency, which also means that its jobs should run quickly. For example, the
+  `authorized_projects` queue is used to refresh user permissions, and is
+  latency sensitive.
+- `resource_boundary` - if the worker is bound by `cpu`, `memory`, or
+  `unknown`. For example, the `project_export` queue is memory bound as it has
+  to load data in memory before saving it for export.
+
+Both `has_external_dependencies` and `latency_sensitive` are boolean attributes:
+only the exact string `true` is considered true, and everything else is
+considered false.
+
+### Available operators
+
+`queue_query_syntax` supports the following operators, listed from highest to
+lowest precedence:
+
+- <code> </code>&nbsp;(space) - the logical OR operator. For example, `query_a
+  query_b` (where `query_a` and `query_b` are queries made up of the other
+  operators here) will include queues that match either query.
+- `,` - the logical AND operator. For example, `query_a,query_b` (where
+  `query_a` and `query_b` are queries made up of the other operators here) will
+  only include queues that match both queries.
+- `!=` - the not equal to operator. For example,
+  `feature_category!=issue_tracking` excludes all queues from the
+  `issue_tracking` feature category.
+- `=` - the equal to operator. For example, `resource_boundary=cpu` includes all
+  queues that are CPU bound.
+- `|` - the concatenate set operator. For example,
+  `feature_category=continuous_integration|pages` includes all queues from
+  either the `continuous_integration` category or the `pages` category. This
+  example is also possible using the OR operator, but allows greater brevity, as
+  well as being lower precedence.
+
+The operator precedence for this syntax is fixed: it's not possible to make AND
+have higher precedence than OR.
+
+### Example queries
+
+In `/etc/gitlab/gitlab.rb`:
+
+```ruby
+sidekiq_cluster['enable'] = true
+sidekiq_cluster['queue_query_syntax'] = true
+sidekiq_cluster['queue_groups'] = [
+  # Run all non-CPU-bound, queues that are latency sensitive
+  'resource_boundary!=cpu,latency_sensitive=true',
+  # Run all continuous integration and pages queues that are not latency sensitive
+  'feature_category=continuous_integration|pages,latency_sensitive=false'
+]
+```
+
 ## Ignore all GitHub import queues
 
 When [importing from GitHub](../../user/project/import/github.md), Sidekiq might
diff --git a/ee/lib/gitlab/sidekiq_cluster/cli.rb b/ee/lib/gitlab/sidekiq_cluster/cli.rb
index ac22af45c8e00..1ee88ac44b7d1 100644
--- a/ee/lib/gitlab/sidekiq_cluster/cli.rb
+++ b/ee/lib/gitlab/sidekiq_cluster/cli.rb
@@ -41,21 +41,25 @@ def run(argv = ARGV)
 
         option_parser.parse!(argv)
 
-        queue_groups = SidekiqCluster.parse_queues(argv)
-
-        all_queues = SidekiqConfig::CliMethods.worker_queues(@rails_path)
-
-        # When using the experimental queue query syntax, we treat each queue
-        # group as a worker attribute query, and resolve the queues for the
-        # queue group using this query.
-        if @queue_query_syntax
-          queue_groups = argv.map { |queues| SidekiqConfig.query_workers(queues).map(&:queue) }
-        else
-          queue_groups.map! { |queues| SidekiqConfig::CliMethods.expand_queues(queues, all_queues) }
-        end
+        all_queues = SidekiqConfig::CliMethods.all_queues(@rails_path)
+        queue_names = SidekiqConfig::CliMethods.worker_queues(@rails_path)
+
+        queue_groups =
+          if @queue_query_syntax
+            # When using the experimental queue query syntax, we treat
+            # each queue group as a worker attribute query, and resolve
+            # the queues for the queue group using this query.
+            argv.map do |queues|
+              SidekiqConfig::CliMethods.query_workers(queues, all_queues)
+            end
+          else
+            SidekiqCluster.parse_queues(argv).map do |queues|
+              SidekiqConfig::CliMethods.expand_queues(queues, queue_names)
+            end
+          end
 
         if @negate_queues
-          queue_groups.map! { |queues| all_queues - queues }
+          queue_groups.map! { |queues| queue_names - queues }
         end
 
         @logger.info("Starting cluster with #{queue_groups.length} processes")
diff --git a/ee/spec/lib/gitlab/sidekiq_cluster/cli_spec.rb b/ee/spec/lib/gitlab/sidekiq_cluster/cli_spec.rb
index 8aa3129e327e5..1ab589af51f59 100644
--- a/ee/spec/lib/gitlab/sidekiq_cluster/cli_spec.rb
+++ b/ee/spec/lib/gitlab/sidekiq_cluster/cli_spec.rb
@@ -22,9 +22,9 @@
 
     context 'with arguments' do
       before do
-        expect(cli).to receive(:write_pid)
-        expect(cli).to receive(:trap_signals)
-        expect(cli).to receive(:start_loop)
+        allow(cli).to receive(:write_pid)
+        allow(cli).to receive(:trap_signals)
+        allow(cli).to receive(:start_loop)
       end
 
       it 'starts the Sidekiq workers' do
@@ -140,6 +140,22 @@
             cli.run(%W(--negate --queue-query-syntax #{query}))
           end
         end
+
+        it 'expands multiple queue groups correctly' do
+          expect(Gitlab::SidekiqCluster)
+            .to receive(:start)
+                  .with([['chat_notification'], ['project_export']], default_options)
+                  .and_return([])
+
+          cli.run(%w(--queue-query-syntax feature_category=chatops,latency_sensitive=true resource_boundary=memory,feature_category=source_code_management))
+        end
+
+        it 'errors on an invalid query multiple queue groups correctly' do
+          expect(Gitlab::SidekiqCluster).not_to receive(:start)
+
+          expect { cli.run(%w(--queue-query-syntax unknown_field=chatops)) }
+            .to raise_error(Gitlab::SidekiqConfig::CliMethods::QueryError)
+        end
       end
     end
   end
diff --git a/lib/gitlab/sidekiq_config.rb b/lib/gitlab/sidekiq_config.rb
index 7e40145752d18..4e0d3da186822 100644
--- a/lib/gitlab/sidekiq_config.rb
+++ b/lib/gitlab/sidekiq_config.rb
@@ -104,59 +104,5 @@ def worker_from_path(path, root)
         ns.camelize.constantize
       end
     end
-
-    def self.query_workers(query_string)
-      predicate = query_string_to_lambda(query_string)
-
-      workers.filter(&predicate)
-    end
-
-    def self.query_string_to_lambda(query_string)
-      or_clauses = query_string.split(%r{\s+}).map do |and_clauses_string|
-        and_clauses_predicates = and_clauses_string.split(',').map do |term|
-          match = term.match(%r{^(\w+)(!?=)([\w|]+)})
-          raise "invalid term #{term}" unless match
-
-          lhs = match[1]
-          op = match[2]
-          rhs = match[3]
-
-          predicate_for_op(op, predicate_factory(lhs, rhs.split('|')))
-        end
-
-        lambda { |worker| and_clauses_predicates.all? { |predicate| predicate.call(worker) } }
-      end
-
-      lambda { |worker| or_clauses.any? { |predicate| predicate.call(worker) } }
-    end
-
-    def self.predicate_for_op(op, predicate)
-      case op
-      when "="
-        predicate
-      when "!="
-        lambda { |worker| !predicate.call(worker) }
-      else
-        raise "unknown op #{op}"
-      end
-    end
-
-    def self.predicate_factory(lhs, values)
-      case lhs
-      when "resource_boundary"
-        values_sym = values.map(&:to_sym)
-        lambda { |worker| values_sym.include? worker.get_worker_resource_boundary }
-
-      when "latency_sensitive"
-        values_bool = values.map { |v| v.casecmp("true").zero? }
-        lambda { |worker| values_bool.include? worker.latency_sensitive_worker? }
-
-      when "feature_category"
-        values_sym = values.map(&:to_sym)
-        lambda { |worker| values_sym.include? worker.get_feature_category }
-      else
-        raise "unknown predicate #{lhs}"
-      end
-    end
   end
 end
diff --git a/lib/gitlab/sidekiq_config/cli_methods.rb b/lib/gitlab/sidekiq_config/cli_methods.rb
index 0676e9df9c5c8..9fb31253a2fc7 100644
--- a/lib/gitlab/sidekiq_config/cli_methods.rb
+++ b/lib/gitlab/sidekiq_config/cli_methods.rb
@@ -18,17 +18,31 @@ module CliMethods
         result
       end.freeze
 
-      def worker_queues(rails_path = Rails.root.to_s)
+      QUERY_OR_OPERATOR = %r{\s+}.freeze
+      QUERY_AND_OPERATOR = ','
+      QUERY_CONCATENATE_OPERATOR = '|'
+      QUERY_TERM_REGEX = %r{^(\w+)(!?=)([\w|]+)}.freeze
+
+      QueryError = Class.new(StandardError)
+      InvalidTerm = Class.new(QueryError)
+      UnknownOperator = Class.new(QueryError)
+      UnknownPredicate = Class.new(QueryError)
+
+      def all_queues(rails_path = Rails.root.to_s)
         @worker_queues ||= {}
 
         @worker_queues[rails_path] ||= QUEUE_CONFIG_PATHS.flat_map do |path|
           full_path = File.join(rails_path, path)
-          queues = File.exist?(full_path) ? YAML.load_file(full_path) : []
 
-          # https://gitlab.com/gitlab-org/gitlab/issues/199230
-          queues.map { |queue| queue.is_a?(Hash) ? queue[:name] : queue }
+          File.exist?(full_path) ? YAML.load_file(full_path) : []
         end
       end
+      # rubocop:enable Gitlab/ModuleWithInstanceVariables
+
+      def worker_queues(rails_path = Rails.root.to_s)
+        # https://gitlab.com/gitlab-org/gitlab/issues/199230
+        worker_names(all_queues(rails_path))
+      end
 
       def expand_queues(queues, all_queues = self.worker_queues)
         return [] if queues.empty?
@@ -40,12 +54,73 @@ def expand_queues(queues, all_queues = self.worker_queues)
         end
       end
 
+      def query_workers(query_string, queues)
+        worker_names(queues.select(&query_string_to_lambda(query_string)))
+      end
+
       def clear_memoization!
         if instance_variable_defined?('@worker_queues')
           remove_instance_variable('@worker_queues')
         end
       end
-      # rubocop:enable Gitlab/ModuleWithInstanceVariables
+
+      private
+
+      def worker_names(workers)
+        workers.map { |queue| queue.is_a?(Hash) ? queue[:name] : queue }
+      end
+
+      def query_string_to_lambda(query_string)
+        or_clauses = query_string.split(QUERY_OR_OPERATOR).map do |and_clauses_string|
+          and_clauses_predicates = and_clauses_string.split(QUERY_AND_OPERATOR).map do |term|
+            match = term.match(QUERY_TERM_REGEX)
+
+            raise InvalidTerm.new("Invalid term: #{term}") unless match
+
+            _, lhs, op, rhs = *match
+
+            predicate_for_op(op, predicate_factory(lhs, rhs.split(QUERY_CONCATENATE_OPERATOR)))
+          end
+
+          lambda { |worker| and_clauses_predicates.all? { |predicate| predicate.call(worker) } }
+        end
+
+        lambda { |worker| or_clauses.any? { |predicate| predicate.call(worker) } }
+      end
+
+      def predicate_for_op(op, predicate)
+        case op
+        when '='
+          predicate
+        when '!='
+          lambda { |worker| !predicate.call(worker) }
+        else
+          # This is unreachable because InvalidTerm will be raised instead, but
+          # keeping it allows to guard against that changing in future.
+          raise UnknownOperator.new("Unknown operator: #{op}")
+        end
+      end
+
+      def predicate_factory(lhs, values)
+        to_bool = lambda { |value| value == 'true' }
+
+        case lhs
+        when 'feature_category'
+          lambda { |worker| values.map(&:to_sym).include?(worker[:feature_category]) }
+
+        when 'has_external_dependencies'
+          lambda { |worker| values.map(&to_bool).include?(worker[:has_external_dependencies]) }
+
+        when 'latency_sensitive'
+          lambda { |worker| values.map(&to_bool).include?(worker[:latency_sensitive]) }
+
+        when 'resource_boundary'
+          lambda { |worker| values.map(&:to_sym).include?(worker[:resource_boundary]) }
+
+        else
+          raise UnknownPredicate.new("Unknown predicate: #{lhs}")
+        end
+      end
     end
   end
 end
diff --git a/spec/lib/gitlab/sidekiq_config/cli_methods_spec.rb b/spec/lib/gitlab/sidekiq_config/cli_methods_spec.rb
index 60d946db744b3..feab6943e2644 100644
--- a/spec/lib/gitlab/sidekiq_config/cli_methods_spec.rb
+++ b/spec/lib/gitlab/sidekiq_config/cli_methods_spec.rb
@@ -1,6 +1,7 @@
 # frozen_string_literal: true
 
 require 'fast_spec_helper'
+require 'rspec-parameterized'
 
 describe Gitlab::SidekiqConfig::CliMethods do
   let(:dummy_root) { '/tmp/' }
@@ -82,7 +83,7 @@ def stub_contents(foss_queues, ee_queues)
   end
 
   describe '.expand_queues' do
-    let(:all_queues) do
+    let(:worker_queues) do
       ['cronjob:stuck_import_jobs', 'cronjob:stuck_merge_jobs', 'post_receive']
     end
 
@@ -92,25 +93,119 @@ def stub_contents(foss_queues, ee_queues)
       expect(described_class.expand_queues(['cronjob']))
         .to contain_exactly('cronjob')
 
-      allow(described_class).to receive(:worker_queues).and_return(all_queues)
+      allow(described_class).to receive(:worker_queues).and_return(worker_queues)
 
       expect(described_class.expand_queues(['cronjob']))
         .to contain_exactly('cronjob', 'cronjob:stuck_import_jobs', 'cronjob:stuck_merge_jobs')
     end
 
     it 'expands queue namespaces to concrete queue names' do
-      expect(described_class.expand_queues(['cronjob'], all_queues))
+      expect(described_class.expand_queues(['cronjob'], worker_queues))
         .to contain_exactly('cronjob', 'cronjob:stuck_import_jobs', 'cronjob:stuck_merge_jobs')
     end
 
     it 'lets concrete queue names pass through' do
-      expect(described_class.expand_queues(['post_receive'], all_queues))
+      expect(described_class.expand_queues(['post_receive'], worker_queues))
         .to contain_exactly('post_receive')
     end
 
     it 'lets unknown queues pass through' do
-      expect(described_class.expand_queues(['unknown'], all_queues))
+      expect(described_class.expand_queues(['unknown'], worker_queues))
         .to contain_exactly('unknown')
     end
   end
+
+  describe '.query_workers' do
+    using RSpec::Parameterized::TableSyntax
+
+    let(:queues) do
+      [
+        {
+          name: 'a',
+          feature_category: :category_a,
+          has_external_dependencies: false,
+          latency_sensitive: false,
+          resource_boundary: :cpu
+        },
+        {
+          name: 'a_2',
+          feature_category: :category_a,
+          has_external_dependencies: false,
+          latency_sensitive: true,
+          resource_boundary: :none
+        },
+        {
+          name: 'b',
+          feature_category: :category_b,
+          has_external_dependencies: true,
+          latency_sensitive: true,
+          resource_boundary: :memory
+        },
+        {
+          name: 'c',
+          feature_category: :category_c,
+          has_external_dependencies: false,
+          latency_sensitive: false,
+          resource_boundary: :memory
+        }
+      ]
+    end
+
+    context 'with valid input' do
+      where(:query, :selected_queues) do
+        # feature_category
+        'feature_category=category_a' | %w(a a_2)
+        'feature_category=category_a|category_c' | %w(a a_2 c)
+        'feature_category=category_a feature_category=category_c' | %w(a a_2 c)
+        'feature_category!=category_a' | %w(b c)
+
+        # has_external_dependencies
+        'has_external_dependencies=true' | %w(b)
+        'has_external_dependencies=false' | %w(a a_2 c)
+        'has_external_dependencies=true|false' | %w(a a_2 b c)
+        'has_external_dependencies=true has_external_dependencies=false' | %w(a a_2 b c)
+        'has_external_dependencies!=true' | %w(a a_2 c)
+
+        # latency_sensitive
+        'latency_sensitive=true' | %w(a_2 b)
+        'latency_sensitive=false' | %w(a c)
+        'latency_sensitive=true|false' | %w(a a_2 b c)
+        'latency_sensitive=true latency_sensitive=false' | %w(a a_2 b c)
+        'latency_sensitive!=true' | %w(a c)
+
+        # resource_boundary
+        'resource_boundary=memory' | %w(b c)
+        'resource_boundary=memory|cpu' | %w(a b c)
+        'resource_boundary=memory resource_boundary=cpu' | %w(a b c)
+        'resource_boundary!=memory|cpu' | %w(a_2)
+
+        # combinations
+        'feature_category=category_a,latency_sensitive=true' | %w(a_2)
+        'feature_category=category_a,latency_sensitive=true feature_category=category_c' | %w(a_2 c)
+      end
+
+      with_them do
+        it do
+          expect(described_class.query_workers(query, queues))
+            .to match_array(selected_queues)
+        end
+      end
+    end
+
+    context 'with invalid input' do
+      where(:query, :error) do
+        'feature_category="category_a"' | described_class::InvalidTerm
+        'feature_category=' | described_class::InvalidTerm
+        'feature_category~category_a' | described_class::InvalidTerm
+        'name=a' | described_class::UnknownPredicate
+      end
+
+      with_them do
+        it do
+          expect { described_class.query_workers(query, queues) }
+            .to raise_error(error)
+        end
+      end
+    end
+  end
 end
-- 
GitLab