Skip to content
代码片段 群组 项目
pipeline_options.py 67.0 KB
更新 更旧
  • 了解如何忽略特定修订
  • #
    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    #
    
    """Pipeline options obtained from command line parsing."""
    
    
    from typing import Any
    from typing import Callable
    from typing import Dict
    from typing import List
    from typing import Optional
    from typing import Type
    from typing import TypeVar
    
    import apache_beam as beam
    
    from apache_beam.options.value_provider import RuntimeValueProvider
    
    Holden Karau's avatar
    Holden Karau 已提交
    from apache_beam.options.value_provider import StaticValueProvider
    
    from apache_beam.options.value_provider import ValueProvider
    
    Holden Karau's avatar
    Holden Karau 已提交
    from apache_beam.transforms.display import HasDisplayData
    
    from apache_beam.utils import proto_utils
    
    __all__ = [
        'PipelineOptions',
        'StandardOptions',
        'TypeOptions',
        'DirectOptions',
        'GoogleCloudOptions',
    
        'HadoopFileSystemOptions',
    
        'WorkerOptions',
        'DebugOptions',
        'ProfilingOptions',
        'SetupOptions',
        'TestOptions',
    
    PipelineOptionsT = TypeVar('PipelineOptionsT', bound='PipelineOptions')
    
    
    _LOGGER = logging.getLogger(__name__)
    
    
    # Map defined with option names to flag names for boolean options
    # that have a destination(dest) in parser.add_argument() different
    # from the flag name and whose default value is `None`.
    _FLAG_THAT_SETS_FALSE_VALUE = {'use_public_ips': 'no_use_public_ips'}
    
    
    def _static_value_provider_of(value_type):
    
      """Helper function to plug a ValueProvider into argparse.
    
    
      Args:
        value_type: the type of the value. Since the type param of argparse's
                    add_argument will always be ValueProvider, we need to
                    preserve the type of the actual value.
      Returns:
        A partially constructed StaticValueProvider in the form of a function.
    
      """
      def _f(value):
    
        _f.__name__ = value_type.__name__
    
        return StaticValueProvider(value_type, value)
    
    class _BeamArgumentParser(argparse.ArgumentParser):
    
      """An ArgumentParser that supports ValueProvider options.
    
      Example Usage::
    
        class TemplateUserOptions(PipelineOptions):
          @classmethod
    
          def _add_argparse_args(cls, parser):
    
            parser.add_value_provider_argument('--vp_arg1', default='start')
            parser.add_value_provider_argument('--vp_arg2')
            parser.add_argument('--non_vp_arg')
    
    
      """
      def add_value_provider_argument(self, *args, **kwargs):
        """ValueProvider arguments can be either of type keyword or positional.
        At runtime, even positional arguments will need to be supplied in the
        key/value form.
        """
        # Extract the option name from positional argument ['pos_arg']
    
        assert args and len(args[0]) >= 1
    
        if args[0][0] != '-':
          option_name = args[0]
          if kwargs.get('nargs') is None:  # make them optionally templated
            kwargs['nargs'] = '?'
        else:
          # or keyword arguments like [--kw_arg, -k, -w] or [--kw-arg]
          option_name = [i.replace('--', '') for i in args if i[:2] == '--'][0]
    
        # reassign the type to make room for using
        # StaticValueProvider as the type for add_argument
        value_type = kwargs.get('type') or str
        kwargs['type'] = _static_value_provider_of(value_type)
    
        # reassign default to default_value to make room for using
        # RuntimeValueProvider as the default for add_argument
        default_value = kwargs.get('default')
        kwargs['default'] = RuntimeValueProvider(
            option_name=option_name,
            value_type=value_type,
    
            default_value=default_value)
    
    
        # have add_argument do most of the work
        self.add_argument(*args, **kwargs)
    
    
      # The argparse package by default tries to autocomplete option names. This
      # results in an "ambiguous option" error from argparse when an unknown option
      # matching multiple known ones are used. This suppresses that behavior.
      def error(self, message):
        if message.startswith('ambiguous option: '):
          return
    
    class _DictUnionAction(argparse.Action):
      """
      argparse Action take union of json loads values. If a key is specified in more
      than one of the values, the last value takes precedence.
      """
      def __call__(self, parser, namespace, values, option_string=None):
        if not hasattr(namespace,
                       self.dest) or getattr(namespace, self.dest) is None:
          setattr(namespace, self.dest, {})
        getattr(namespace, self.dest).update(values)
    
    
    
    class PipelineOptions(HasDisplayData):
    
      """This class and subclasses are used as containers for command line options.
    
      These classes are wrappers over the standard argparse Python module
    
      (see https://docs.python.org/3/library/argparse.html).  To define one option
    
      or a group of options, create a subclass from PipelineOptions.
    
      Example Usage::
    
    
        class XyzOptions(PipelineOptions):
    
          @classmethod
          def _add_argparse_args(cls, parser):
            parser.add_argument('--abc', default='start')
            parser.add_argument('--xyz', default='end')
    
      The arguments for the add_argument() method are exactly the ones
      described in the argparse public documentation.
    
      Pipeline objects require an options object during initialization.
    
      This is obtained simply by initializing an options class as defined above.
    
      Example Usage::
    
    
        p = Pipeline(options=XyzOptions())
        if p.options.xyz == 'end':
          raise ValueError('Option xyz has an invalid value.')
    
    
      Instances of PipelineOptions or any of its subclass have access to values
      defined by other PipelineOption subclasses (see get_all_options()), and
      can be converted to an instance of another PipelineOptions subclass
      (see view_as()). All views share the underlying data structure that stores
      option key-value pairs.
    
    
      By default the options classes will use command line arguments to initialize
      the options.
      """
    
      def __init__(self, flags=None, **kwargs):
        # type: (Optional[List[str]], **Any) -> None
    
        """Initialize an options class.
    
        The initializer will traverse all subclasses, add all their argparse
        arguments and then parse the command line specified by flags or by default
        the one obtained from sys.argv.
    
    
        The subclasses of PipelineOptions do not need to redefine __init__.
    
    
        Args:
          flags: An iterable of command line arguments to be used. If not specified
            then sys.argv will be used as input for parsing arguments.
    
    
          **kwargs: Add overrides for arguments passed in flags. For overrides
                    of arguments, please pass the `option names` instead of
                    flag names.
                    Option names: These are defined as dest in the
                    parser.add_argument() for each flag. Passing flags
                    like {no_use_public_ips: True}, for which the dest is
                    defined to a different flag name in the parser,
                    would be discarded. Instead, pass the dest of
                    the flag (dest of no_use_public_ips is use_public_ips).
    
        # Initializing logging configuration in case the user did not set it up.
        logging.basicConfig()
    
    
        # self._flags stores a list of not yet parsed arguments, typically,
        # command-line flags. This list is shared across different views.
        # See: view_as().
    
        # Build parser that will parse options recognized by the [sub]class of
        # PipelineOptions whose object is being instantiated.
        parser = _BeamArgumentParser()
    
        for cls in type(self).mro():
          if cls == PipelineOptions:
            break
          elif '_add_argparse_args' in cls.__dict__:
    
            cls._add_argparse_args(parser)  # type: ignore
    
    
        # The _visible_options attribute will contain options that were recognized
        # by the parser.
    
        self._visible_options, _ = parser.parse_known_args(flags)
    
    
        # self._all_options is initialized with overrides to flag values,
        # provided in kwargs, and will store key-value pairs for options recognized
        # by current PipelineOptions [sub]class and its views that may be created.
        # See: view_as().
        # This dictionary is shared across different views, and is lazily updated
        # as each new views are created.
        # Users access this dictionary store via __getattr__ / __setattr__ methods.
        self._all_options = kwargs
    
        # Initialize values of keys defined by this class.
        for option_name in self._visible_option_list():
          # Note that options specified in kwargs will not be overwritten.
          if option_name not in self._all_options:
    
            self._all_options[option_name] = getattr(
                self._visible_options, option_name)
    
      @classmethod
      def _add_argparse_args(cls, parser):
    
        # type: (_BeamArgumentParser) -> None
    
        # Override this in subclasses to provide options.
        pass
    
      @classmethod
      def from_dictionary(cls, options):
        """Returns a PipelineOptions from a dictionary of arguments.
    
        Args:
    
          options: Dictionary of argument value pairs.
    
    
        Returns:
          A PipelineOptions object representing the given arguments.
        """
        flags = []
    
        for k, v in options.items():
    
          # Note: If a boolean flag is True in the dictionary,
          # implicitly the method assumes the boolean flag is
          # specified as a command line argument. If the
          # boolean flag is False, this method simply discards them.
          # Eg: {no_auth: True} is similar to python your_file.py --no_auth
          # {no_auth: False} is similar to python your_file.py.
    
          if isinstance(v, bool):
            if v:
              flags.append('--%s' % k)
    
            elif k in _FLAG_THAT_SETS_FALSE_VALUE:
              # Capture overriding flags, which have a different dest
              # from the flag name defined in the parser.add_argument
              # Eg: no_use_public_ips, which has the dest=use_public_ips
              # different from flag name
              flag_that_disables_the_option = (_FLAG_THAT_SETS_FALSE_VALUE[k])
              flags.append('--%s' % flag_that_disables_the_option)
    
          elif isinstance(v, list):
            for i in v:
              flags.append('--%s=%s' % (k, i))
    
          elif isinstance(v, dict):
            flags.append('--%s=%s' % (k, json.dumps(v)))
    
          elif v is None:
            # Don't process None type args here, they will be treated
            # as strings when parsed by BeamArgumentParser..
            logging.warning('Not setting flag with value None: %s', k)
    
          else:
            flags.append('--%s=%s' % (k, v))
    
        return cls(flags)
    
    
      def get_all_options(
          self,
          drop_default=False,
    
          add_extra_args_fn=None,  # type: Optional[Callable[[_BeamArgumentParser], None]]
          retain_unknown_options=False
    
        # type: (...) -> Dict[str, Any]
    
        """Returns a dictionary of all defined arguments.
    
        Returns a dictionary of all defined arguments (arguments that are defined in
        any subclass of PipelineOptions) into a dictionary.
    
        Args:
          drop_default: If set to true, options that are equal to their default
            values, are not returned as part of the result dictionary.
    
          add_extra_args_fn: Callback to populate additional arguments, can be used
            by runner to supply otherwise unknown args.
    
          retain_unknown_options: If set to true, options not recognized by any
            known pipeline options class will still be included in the result. If
            set to false, they will be discarded.
    
    
        Returns:
          Dictionary of all args and values.
        """
    
    
        # TODO(https://github.com/apache/beam/issues/18197): PipelineOption
        # sub-classes in the main session might be repeated. Pick last unique
        # instance of each subclass to avoid conflicts.
    
        parser = _BeamArgumentParser()
    
        for cls in PipelineOptions.__subclasses__():
          subset[str(cls)] = cls
        for cls in subset.values():
          cls._add_argparse_args(parser)  # pylint: disable=protected-access
    
        if add_extra_args_fn:
          add_extra_args_fn(parser)
    
        known_args, unknown_args = parser.parse_known_args(self._flags)
    
          if unknown_args:
            _LOGGER.warning(
                'Unknown pipeline options received: %s. Ignore if flags are '
                'used for internal purposes.' % (','.join(unknown_args)))
    
          i = 0
          while i < len(unknown_args):
            # Treat all unary flags as booleans, and all binary argument values as
            # strings.
    
            if not unknown_args[i].startswith('-'):
              i += 1
              continue
    
            if i + 1 >= len(unknown_args) or unknown_args[i + 1].startswith('-'):
    
              split = unknown_args[i].split('=', 1)
    
              if len(split) == 1:
                parser.add_argument(unknown_args[i], action='store_true')
              else:
                parser.add_argument(split[0], type=str)
    
            elif unknown_args[i].startswith('--'):
    
              parser.add_argument(unknown_args[i], type=str)
              i += 2
    
            else:
              # skip all binary flags used with '-' and not '--'.
              # ex: using -f instead of --f (or --flexrs_goal) will prevent
              # argument validation before job submission and can be incorrectly
              # submitted to job.
              _LOGGER.warning(
                  "Discarding flag %s, single dash flags are not allowed.",
                  unknown_args[i])
              i += 2
              continue
    
          parsed_args, _ = parser.parse_known_args(self._flags)
    
        else:
          if unknown_args:
            _LOGGER.warning("Discarding unparseable args: %s", unknown_args)
          parsed_args = known_args
        result = vars(parsed_args)
    
        overrides = self._all_options.copy()
    
        # Apply the overrides if any
    
        for k in list(result):
    
          if k in self._all_options:
            result[k] = self._all_options[k]
    
          if (drop_default and parser.get_default(k) == result[k] and
    
              not isinstance(parser.get_default(k), ValueProvider)):
            del result[k]
    
    
          if retain_unknown_options:
            result.update(overrides)
          else:
            _LOGGER.warning("Discarding invalid overrides: %s", overrides)
    
      def to_runner_api(self):
        def to_struct_value(o):
          if isinstance(o, (bool, int, str)):
            return o
          elif isinstance(o, (tuple, list)):
            return [to_struct_value(e) for e in o]
          elif isinstance(o, dict):
            return {str(k): to_struct_value(v) for k, v in o.items()}
          else:
            return str(o)  # Best effort.
    
        return proto_utils.pack_Struct(
            **{
                f'beam:option:{k}:v1': to_struct_value(v)
                for (k, v) in self.get_all_options(
                    drop_default=True, retain_unknown_options=True).items()
                if v is not None
            })
    
      @classmethod
      def from_runner_api(cls, proto_options):
        def from_urn(key):
          assert key.startswith('beam:option:')
          assert key.endswith(':v1')
          return key[12:-3]
    
        return cls(
            **{from_urn(key): value
               for (key, value) in proto_options.items()})
    
    
      def display_data(self):
    
        return self.get_all_options(drop_default=True, retain_unknown_options=True)
    
        # type: (Type[PipelineOptionsT]) -> PipelineOptionsT
    
        """Returns a view of current object as provided PipelineOption subclass.
    
        Example Usage::
    
          options = PipelineOptions(['--runner', 'Direct', '--streaming'])
          standard_options = options.view_as(StandardOptions)
          if standard_options.streaming:
            # ... start a streaming job ...
    
        Note that options objects may have multiple views, and modifications
        of values in any view-object will apply to current object and other
        view-objects.
    
        Args:
          cls: PipelineOptions class or any of its subclasses.
    
        Returns:
    
          An instance of cls that is initialized using options contained in current
    
        view = cls(self._flags)
    
        for option_name in view._visible_option_list():
          # Initialize values of keys defined by a cls.
          #
          # Note that we do initialization only once per key to make sure that
          # values in _all_options dict are not-recreated with each new view.
          # This is important to make sure that values of multi-options keys are
          # backed by the same list across multiple views, and that any overrides of
          # pipeline options already stored in _all_options are preserved.
          if option_name not in self._all_options:
    
            self._all_options[option_name] = getattr(
                view._visible_options, option_name)
    
        # Note that views will still store _all_options of the source object.
    
        view._all_options = self._all_options
        return view
    
      def _visible_option_list(self):
    
        # type: () -> List[str]
    
        return sorted(
            option for option in dir(self._visible_options) if option[0] != '_')
    
        # type: () -> List[str]
    
        return sorted(
            dir(type(self)) + list(self.__dict__) + self._visible_option_list())
    
    
      def __getattr__(self, name):
        # Special methods which may be accessed before the object is
        # fully constructed (e.g. in unpickling).
        if name[:2] == name[-2:] == '__':
          return object.__getattribute__(self, name)
        elif name in self._visible_option_list():
    
          raise AttributeError(
              "'%s' object has no attribute '%s'" % (type(self).__name__, name))
    
    
      def __setattr__(self, name, value):
        if name in ('_flags', '_all_options', '_visible_options'):
    
          super().__setattr__(name, value)
    
        elif name in self._visible_option_list():
          self._all_options[name] = value
        else:
    
          raise AttributeError(
              "'%s' object has no attribute '%s'" % (type(self).__name__, name))
    
        return '%s(%s)' % (
            type(self).__name__,
            ', '.join(
                '%s=%s' % (option, getattr(self, option))
                for option in self._visible_option_list()))
    
    
    
    class StandardOptions(PipelineOptions):
    
      DEFAULT_RUNNER = 'DirectRunner'
    
    
      ALL_KNOWN_RUNNERS = (
          'apache_beam.runners.dataflow.dataflow_runner.DataflowRunner',
          'apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner',
          'apache_beam.runners.direct.direct_runner.DirectRunner',
          'apache_beam.runners.direct.direct_runner.SwitchingDirectRunner',
          'apache_beam.runners.interactive.interactive_runner.InteractiveRunner',
          'apache_beam.runners.portability.flink_runner.FlinkRunner',
          'apache_beam.runners.portability.portable_runner.PortableRunner',
          'apache_beam.runners.portability.spark_runner.SparkRunner',
          'apache_beam.runners.test.TestDirectRunner',
          'apache_beam.runners.test.TestDataflowRunner',
      )
    
      KNOWN_RUNNER_NAMES = [path.split('.')[-1] for path in ALL_KNOWN_RUNNERS]
    
    
      @classmethod
      def _add_argparse_args(cls, parser):
        parser.add_argument(
            '--runner',
    
            help=(
                'Pipeline runner used to execute the workflow. Valid values are '
    
                'one of %s, or the fully qualified name of a PipelineRunner '
                'subclass. If unspecified, defaults to %s.' %
                (', '.join(cls.KNOWN_RUNNER_NAMES), cls.DEFAULT_RUNNER)))
    
        # Whether to enable streaming mode.
    
        parser.add_argument(
            '--streaming',
            default=False,
            action='store_true',
            help='Whether to enable streaming mode.')
    
            dest='resource_hints',
            action='append',
            default=[],
            help=(
                'Resource hint to set in the pipeline execution environment.'
                'Hints specified via this option override hints specified '
                'at transform level. Interpretation of hints is defined by '
                'Beam runners.'))
    
    
        parser.add_argument(
            '--auto_unique_labels',
            default=False,
            action='store_true',
            help='Whether to automatically generate unique transform labels '
            'for every transform. The default behavior is to raise an '
            'exception if a transform is created with a non-unique label.')
    
    
    class StreamingOptions(PipelineOptions):
      @classmethod
      def _add_argparse_args(cls, parser):
        parser.add_argument(
            '--update_compatibility_version',
            default=None,
            help='Attempt to produce a pipeline compatible with the given prior '
            'version of the Beam SDK. '
            'See for example, https://cloud.google.com/dataflow/docs/guides/'
            'updating-a-pipeline')
    
    
    
    class CrossLanguageOptions(PipelineOptions):
    
      @staticmethod
      def _beam_services_from_enviroment():
        return json.loads(os.environ.get('BEAM_SERVICE_OVERRIDES') or '{}')
    
    
      @classmethod
      def _add_argparse_args(cls, parser):
        parser.add_argument(
            '--beam_services',
    
            type=lambda s: {
                **cls._beam_services_from_enviroment(), **json.loads(s)
            },
            default=cls._beam_services_from_enviroment(),
    
                'For convenience, Beam provides the ability to automatically '
    
                'download and start various services (such as expansion services) '
                'used at pipeline construction and execution. These services are '
                'identified by gradle target. This option provides the ability to '
                'use pre-started services or non-default pre-existing artifacts to '
                'start the given service. '
                'Should be a json mapping of gradle build targets to pre-built '
    
    Robert Bradshaw's avatar
    Robert Bradshaw 已提交
                'artifacts (e.g. jar files) or expansion endpoints '
                '(e.g. host:port). Defaults to the value of BEAM_SERVICE_OVERRIDES '
                'from the environment.'))
    
        parser.add_argument(
            '--use_transform_service',
            default=False,
            action='store_true',
            help='Use the Docker-composed-based transform service when expanding '
            'cross-language transforms.')
    
    
    def additional_option_ptransform_fn():
      beam.transforms.ptransform.ptransform_fn_typehints_enabled = True
    
    
    # Optional type checks that aren't enabled by default.
    additional_type_checks = {
        'ptransform_fn': additional_option_ptransform_fn,
    }  # type: Dict[str, Callable[[], None]]
    
    
    def enable_all_additional_type_checks():
      """Same as passing --type_check_additional=all."""
      for f in additional_type_checks.values():
        f()
    
    
    
    class TypeOptions(PipelineOptions):
      @classmethod
      def _add_argparse_args(cls, parser):
        # TODO(laolu): Add a type inferencing option here once implemented.
    
        parser.add_argument(
            '--type_check_strictness',
            default='DEFAULT_TO_ANY',
            choices=['ALL_REQUIRED', 'DEFAULT_TO_ANY'],
            help='The level of exhaustive manual type-hint '
            'annotation required')
    
        parser.add_argument(
            '--type_check_additional',
            default='',
            help='Comma separated list of additional type checking features to '
            'enable. Options: all, ptransform_fn. For details see:'
            'https://beam.apache.org/documentation/sdks/python-type-safety/')
    
        parser.add_argument(
            '--no_pipeline_type_check',
            dest='pipeline_type_check',
            action='store_false',
            help='Disable type checking at pipeline construction '
            'time')
        parser.add_argument(
            '--runtime_type_check',
            default=False,
            action='store_true',
            help='Enable type checking at pipeline execution '
            'time. NOTE: only supported with the '
            'DirectRunner')
    
        parser.add_argument(
            '--performance_runtime_type_check',
            default=False,
            action='store_true',
            help='Enable faster type checking via sampling at pipeline execution '
            'time. NOTE: only supported with portable runners '
            '(including the DirectRunner)')
    
        parser.add_argument(
            '--allow_non_deterministic_key_coders',
            default=False,
            action='store_true',
            help='Use non-deterministic coders (such as pickling) for key-grouping '
    
            'operations such as GroupByKey.  This is unsafe, as runners may group '
    
            'keys based on their encoded bytes, but is available for backwards '
            'compatibility. See BEAM-11719.')
    
        parser.add_argument(
            '--allow_unsafe_triggers',
    
            action='store_true',
            help='Allow the use of unsafe triggers. Unsafe triggers have the '
            'potential to cause data loss due to finishing and/or never having '
            'their condition met. Some operations, such as GroupByKey, disallow '
            'this. This exists for cases where such loss is acceptable and for '
            'backwards compatibility. See BEAM-9487.')
    
      def validate(self, unused_validator):
        errors = []
        if beam.version.__version__ >= '3':
          errors.append(
              'Update --type_check_additional default to include all '
              'available additional checks at Beam 3.0 release time.')
        keys = self.type_check_additional.split(',')
    
        for key in keys:
          if not key:
            continue
          elif key == 'all':
            enable_all_additional_type_checks()
          elif key in additional_type_checks:
            additional_type_checks[key]()
          else:
            errors.append('Unrecognized --type_check_additional feature: %s' % key)
    
        return errors
    
    
    
    class DirectOptions(PipelineOptions):
      """DirectRunner-specific execution options."""
      @classmethod
      def _add_argparse_args(cls, parser):
        parser.add_argument(
            '--no_direct_runner_use_stacked_bundle',
            action='store_false',
            dest='direct_runner_use_stacked_bundle',
            help='DirectRunner uses stacked WindowedValues within a Bundle for '
            'memory optimization. Set --no_direct_runner_use_stacked_bundle to '
            'avoid it.')
    
        parser.add_argument(
            '--direct_runner_bundle_repeat',
            type=int,
            default=0,
            help='replay every bundle this many extra times, for profiling'
            'and debugging')
    
        parser.add_argument(
            '--direct_num_workers',
            type=int,
            default=1,
            help='number of parallel running workers.')
    
        parser.add_argument(
            '--direct_running_mode',
            default='in_memory',
            choices=['in_memory', 'multi_threading', 'multi_processing'],
    
            help='Workers running environment.')
    
        parser.add_argument(
            '--direct_embed_docker_python',
            default=False,
            action='store_true',
            dest='direct_embed_docker_python',
            help='DirectRunner uses the embedded Python environment when '
            'the default Python docker environment is specified.')
    
            '--direct_test_splits',
    
            default={},
            type=json.loads,
            help='Split test configuration of the json form '
            '{"step_name": {"timings": [...], "fractions": [...]}, ...} '
            'where step_name is the name of a step controlling the stage to which '
            'splits will be sent, timings is a list of floating-point times '
            '(in seconds) at which the split requests will be sent, and '
            'fractions is a corresponding list of floating points to use in the '
            'split requests themselves.')
    
    
    
    class GoogleCloudOptions(PipelineOptions):
      """Google Cloud Dataflow service execution options."""
    
      BIGQUERY_API_SERVICE = 'bigquery.googleapis.com'
      COMPUTE_API_SERVICE = 'compute.googleapis.com'
      STORAGE_API_SERVICE = 'storage.googleapis.com'
      DATAFLOW_ENDPOINT = 'https://dataflow.googleapis.com'
    
      OAUTH_SCOPES = [
          'https://www.googleapis.com/auth/bigquery',
          'https://www.googleapis.com/auth/cloud-platform',
          'https://www.googleapis.com/auth/devstorage.full_control',
          'https://www.googleapis.com/auth/userinfo.email',
          'https://www.googleapis.com/auth/datastore',
    
          'https://www.googleapis.com/auth/spanner.admin',
          'https://www.googleapis.com/auth/spanner.data'
    
    
      @classmethod
      def _add_argparse_args(cls, parser):
        parser.add_argument(
            '--dataflow_endpoint',
            default=cls.DATAFLOW_ENDPOINT,
    
            help=(
                'The URL for the Dataflow API. If not set, the default public URL '
                'will be used.'))
    
        # Remote execution must check that this option is not None.
    
        parser.add_argument(
            '--project',
            default=None,
            help='Name of the Cloud project owning the Dataflow '
            'job.')
    
        # Remote execution must check that this option is not None.
    
        parser.add_argument(
            '--job_name', default=None, help='Name of the Cloud Dataflow job.')
    
        # Remote execution must check that this option is not None.
    
        parser.add_argument(
            '--staging_location',
            default=None,
            help='GCS path for staging code packages needed by '
            'workers.')
    
        # Remote execution must check that this option is not None.
        # If staging_location is not set, it defaults to temp_location.
    
        parser.add_argument(
            '--temp_location',
            default=None,
            help='GCS path for saving temporary workflow jobs.')
    
        # The Google Compute Engine region for creating Dataflow jobs. See
    
        # https://cloud.google.com/compute/docs/regions-zones/regions-zones for a
    
        # list of valid options.
    
        parser.add_argument(
            '--region',
            default=None,
            help='The Google Compute Engine region for creating '
            'Dataflow job.')
        parser.add_argument(
            '--service_account_email',
            default=None,
            help='Identity to run virtual machines as.')
        parser.add_argument(
            '--no_auth',
            dest='no_auth',
            action='store_true',
            default=False,
            help='Skips authorizing credentials with Google Cloud.')
    
        # Option to run templated pipelines
    
        parser.add_argument(
            '--template_location',
            default=None,
            help='Save job to specified local or GCS location.')
        parser.add_argument(
            '--label',
            '--labels',
            dest='labels',
            action='append',
            default=None,
            help='Labels to be applied to this Dataflow job. '
            'Labels are key value pairs separated by = '
    
            '(e.g. --label key=value) or '
            '(--labels=\'{ "key": "value", "mass": "1_3kg", "count": "3" }\').')
    
        parser.add_argument(
            '--update',
            default=False,
            action='store_true',
            help='Update an existing streaming Cloud Dataflow job. '
            'See https://cloud.google.com/dataflow/docs/guides/'
            'updating-a-pipeline')
        parser.add_argument(
            '--transform_name_mapping',
            default=None,
            type=json.loads,
            help='The transform mapping that maps the named '
            'transforms in your prior pipeline code to names '
            'in your replacement pipeline code.'
            'See https://cloud.google.com/dataflow/docs/guides/'
            'updating-a-pipeline')
        parser.add_argument(
            '--enable_streaming_engine',
            default=False,
            action='store_true',
            help='Enable Windmill Service for this Dataflow job. ')
        parser.add_argument(
            '--dataflow_kms_key',
            default=None,
            help='Set a Google Cloud KMS key name to be used in '
            'Dataflow state operations (GBK, Streaming).')
    
        parser.add_argument(
            '--create_from_snapshot',
            default=None,
            help='The snapshot from which the job should be created.')
    
        parser.add_argument(
            '--flexrs_goal',
            default=None,
            choices=['COST_OPTIMIZED', 'SPEED_OPTIMIZED'],
            help='Set the Flexible Resource Scheduling mode')
    
        parser.add_argument(
    
            '--dataflow_service_option',
            '--dataflow_service_options',
            dest='dataflow_service_options',
    
            action='append',
            default=None,
            help=(
                'Options to configure the Dataflow service. These '
    
                'options decouple service side feature availability '
    
                'from the Apache Beam release cycle.'
                'Note: If set programmatically, must be set as a '
                'list of strings'))
    
        parser.add_argument(
            '--enable_hot_key_logging',
            default=False,
            action='store_true',
            help='When true, will enable the direct logging of any detected hot '
            'keys into Cloud Logging. Warning: this will log the literal key as an '
            'unobfuscated string.')
    
        parser.add_argument(
            '--enable_artifact_caching',
            default=False,
            action='store_true',
    
            help='When true, artifacts will be cached across job submissions in '
            'the GCS staging bucket')
        parser.add_argument(
            '--impersonate_service_account',
            default=None,
            help='All API requests will be made as the given service account or '
            'target service account in an impersonation delegation chain '
            'instead of the currently selected account. You can specify '
            'either a single service account as the impersonator, or a '
            'comma-separated list of service accounts to create an '
            'impersonation delegation chain.')
    
        parser.add_argument(
            '--gcp_oauth_scope',
            '--gcp_oauth_scopes',
            dest='gcp_oauth_scopes',
            action='append',
            default=cls.OAUTH_SCOPES,
            help=(
                'Controls the OAuth scopes that will be requested when creating '
                'GCP credentials. Note: If set programmatically, must be set as a '
                'list of strings'))
    
      def _create_default_gcs_bucket(self):
        try:
          from apache_beam.io.gcp import gcsio
        except ImportError:
          _LOGGER.warning('Unable to create default GCS bucket.')
          return None
        bucket = gcsio.get_or_create_default_gcs_bucket(self)
        if bucket:
          return 'gs://%s' % bucket.id
        else:
          return None
    
    
      # If either temp or staging location has an issue, we use the valid one for
      # both locations. If both are bad we return an error.
      def _handle_temp_and_staging_locations(self, validator):
        temp_errors = validator.validate_gcs_path(self, 'temp_location')
        staging_errors = validator.validate_gcs_path(self, 'staging_location')
        if temp_errors and not staging_errors:
          setattr(self, 'temp_location', getattr(self, 'staging_location'))
          return []
        elif staging_errors and not temp_errors:
          setattr(self, 'staging_location', getattr(self, 'temp_location'))
          return []
        elif not staging_errors and not temp_errors:
          return []
        # Both staging and temp locations are bad, try to use default bucket.
        else:
          default_bucket = self._create_default_gcs_bucket()
          if default_bucket is None:
            temp_errors.extend(staging_errors)
            return temp_errors
          else:
            setattr(self, 'temp_location', default_bucket)
            setattr(self, 'staging_location', default_bucket)
            return []
    
    
      def validate(self, validator):
        errors = []
        if validator.is_service_runner():
    
          errors.extend(self._handle_temp_and_staging_locations(validator))
    
          errors.extend(validator.validate_cloud_options(self))
    
        if self.view_as(DebugOptions).dataflow_job_file:
          if self.view_as(GoogleCloudOptions).template_location:
    
            errors.append(
                '--dataflow_job_file and --template_location '
                'are mutually exclusive.')
    
        # Validate that dataflow_service_options is a list
        if self.dataflow_service_options:
          errors.extend(
              validator.validate_repeatable_argument_passed_as_list(
                  self, 'dataflow_service_options'))
    
    
      def get_cloud_profiler_service_name(self):
        _ENABLE_GOOGLE_CLOUD_PROFILER = 'enable_google_cloud_profiler'
        if self.dataflow_service_options:
          if _ENABLE_GOOGLE_CLOUD_PROFILER in self.dataflow_service_options:
            return os.environ["JOB_NAME"]
          for option_name in self.dataflow_service_options:
            if option_name.startswith(_ENABLE_GOOGLE_CLOUD_PROFILER + '='):
              return option_name.split('=', 1)[1]
    
        experiments = self.view_as(DebugOptions).experiments or []
        if _ENABLE_GOOGLE_CLOUD_PROFILER in experiments:
          return os.environ["JOB_NAME"]
    
        return None
    
    
    class AzureOptions(PipelineOptions):
      """Azure Blob Storage options."""
      @classmethod
      def _add_argparse_args(cls, parser):
        parser.add_argument(
            '--azure_connection_string',
            default=None,
            help='Connection string of the Azure Blob Storage Account.')
        parser.add_argument(
            '--blob_service_endpoint',
            default=None,
            help='URL of the Azure Blob Storage Account.')
        parser.add_argument(
            '--azure_managed_identity_client_id',
            default=None,
            help='Client ID of a user-assigned managed identity.')
    
      def validate(self, validator):
        errors = []
        if self.azure_connection_string:
          if self.blob_service_endpoint:
            errors.append(
                '--azure_connection_string and '
                '--blob_service_endpoint are mutually exclusive.')