更新
更旧
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Pipeline options obtained from command line parsing."""
# pytype: skip-file
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Type
from typing import TypeVar
from apache_beam.options.value_provider import RuntimeValueProvider
from apache_beam.options.value_provider import StaticValueProvider
from apache_beam.options.value_provider import ValueProvider
from apache_beam.utils import proto_utils
__all__ = [
'PipelineOptions',
'StandardOptions',
'TypeOptions',
'DirectOptions',
'GoogleCloudOptions',

creste
已提交
'AzureOptions',
'WorkerOptions',
'DebugOptions',
'ProfilingOptions',
'SetupOptions',
'TestOptions',

dandy10
已提交
'S3Options'
PipelineOptionsT = TypeVar('PipelineOptionsT', bound='PipelineOptions')
_LOGGER = logging.getLogger(__name__)
# Map defined with option names to flag names for boolean options
# that have a destination(dest) in parser.add_argument() different
# from the flag name and whose default value is `None`.
_FLAG_THAT_SETS_FALSE_VALUE = {'use_public_ips': 'no_use_public_ips'}
def _static_value_provider_of(value_type):
"""Helper function to plug a ValueProvider into argparse.
Args:
value_type: the type of the value. Since the type param of argparse's
add_argument will always be ValueProvider, we need to
preserve the type of the actual value.
Returns:
A partially constructed StaticValueProvider in the form of a function.
"""
def _f(value):
return StaticValueProvider(value_type, value)
class _BeamArgumentParser(argparse.ArgumentParser):
"""An ArgumentParser that supports ValueProvider options.
Example Usage::
class TemplateUserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):

tvalentyn
已提交
parser.add_value_provider_argument('--vp_arg1', default='start')
parser.add_value_provider_argument('--vp_arg2')
parser.add_argument('--non_vp_arg')
"""
def add_value_provider_argument(self, *args, **kwargs):
"""ValueProvider arguments can be either of type keyword or positional.
At runtime, even positional arguments will need to be supplied in the
key/value form.
"""
# Extract the option name from positional argument ['pos_arg']
assert args and len(args[0]) >= 1
if args[0][0] != '-':
option_name = args[0]
if kwargs.get('nargs') is None: # make them optionally templated
kwargs['nargs'] = '?'
else:
# or keyword arguments like [--kw_arg, -k, -w] or [--kw-arg]
option_name = [i.replace('--', '') for i in args if i[:2] == '--'][0]
# reassign the type to make room for using
# StaticValueProvider as the type for add_argument
value_type = kwargs.get('type') or str
kwargs['type'] = _static_value_provider_of(value_type)
# reassign default to default_value to make room for using
# RuntimeValueProvider as the default for add_argument
default_value = kwargs.get('default')
kwargs['default'] = RuntimeValueProvider(
option_name=option_name,
value_type=value_type,
# have add_argument do most of the work
self.add_argument(*args, **kwargs)
# The argparse package by default tries to autocomplete option names. This
# results in an "ambiguous option" error from argparse when an unknown option
# matching multiple known ones are used. This suppresses that behavior.
def error(self, message):
if message.startswith('ambiguous option: '):
return
super().error(message)

Yi Hu
已提交
class _DictUnionAction(argparse.Action):
"""
argparse Action take union of json loads values. If a key is specified in more
than one of the values, the last value takes precedence.
"""
def __call__(self, parser, namespace, values, option_string=None):
if not hasattr(namespace,
self.dest) or getattr(namespace, self.dest) is None:
setattr(namespace, self.dest, {})
getattr(namespace, self.dest).update(values)
class PipelineOptions(HasDisplayData):

tvalentyn
已提交
"""This class and subclasses are used as containers for command line options.

tvalentyn
已提交
These classes are wrappers over the standard argparse Python module
(see https://docs.python.org/3/library/argparse.html). To define one option

tvalentyn
已提交
or a group of options, create a subclass from PipelineOptions.
Example Usage::
class XyzOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--abc', default='start')
parser.add_argument('--xyz', default='end')
The arguments for the add_argument() method are exactly the ones
described in the argparse public documentation.
Pipeline objects require an options object during initialization.

tvalentyn
已提交
This is obtained simply by initializing an options class as defined above.
Example Usage::
p = Pipeline(options=XyzOptions())
if p.options.xyz == 'end':
raise ValueError('Option xyz has an invalid value.')

tvalentyn
已提交
Instances of PipelineOptions or any of its subclass have access to values
defined by other PipelineOption subclasses (see get_all_options()), and
can be converted to an instance of another PipelineOptions subclass
(see view_as()). All views share the underlying data structure that stores
option key-value pairs.
By default the options classes will use command line arguments to initialize
the options.
"""
def __init__(self, flags=None, **kwargs):
# type: (Optional[List[str]], **Any) -> None
"""Initialize an options class.
The initializer will traverse all subclasses, add all their argparse
arguments and then parse the command line specified by flags or by default
the one obtained from sys.argv.

tvalentyn
已提交
The subclasses of PipelineOptions do not need to redefine __init__.
Args:
flags: An iterable of command line arguments to be used. If not specified
then sys.argv will be used as input for parsing arguments.
**kwargs: Add overrides for arguments passed in flags. For overrides
of arguments, please pass the `option names` instead of
flag names.
Option names: These are defined as dest in the
parser.add_argument() for each flag. Passing flags
like {no_use_public_ips: True}, for which the dest is
defined to a different flag name in the parser,
would be discarded. Instead, pass the dest of
the flag (dest of no_use_public_ips is use_public_ips).
# Initializing logging configuration in case the user did not set it up.
logging.basicConfig()

tvalentyn
已提交
# self._flags stores a list of not yet parsed arguments, typically,
# command-line flags. This list is shared across different views.
# See: view_as().

tvalentyn
已提交
# Build parser that will parse options recognized by the [sub]class of
# PipelineOptions whose object is being instantiated.
parser = _BeamArgumentParser()
for cls in type(self).mro():
if cls == PipelineOptions:
break
elif '_add_argparse_args' in cls.__dict__:
cls._add_argparse_args(parser) # type: ignore

tvalentyn
已提交
# The _visible_options attribute will contain options that were recognized
# by the parser.
self._visible_options, _ = parser.parse_known_args(flags)

tvalentyn
已提交
# self._all_options is initialized with overrides to flag values,
# provided in kwargs, and will store key-value pairs for options recognized
# by current PipelineOptions [sub]class and its views that may be created.
# See: view_as().
# This dictionary is shared across different views, and is lazily updated
# as each new views are created.
# Users access this dictionary store via __getattr__ / __setattr__ methods.
self._all_options = kwargs
# Initialize values of keys defined by this class.
for option_name in self._visible_option_list():
# Note that options specified in kwargs will not be overwritten.
if option_name not in self._all_options:
self._all_options[option_name] = getattr(
self._visible_options, option_name)

tvalentyn
已提交
@classmethod
def _add_argparse_args(cls, parser):
# type: (_BeamArgumentParser) -> None
# Override this in subclasses to provide options.
pass
@classmethod
def from_dictionary(cls, options):
"""Returns a PipelineOptions from a dictionary of arguments.
Args:
options: Dictionary of argument value pairs.
Returns:
A PipelineOptions object representing the given arguments.
"""
flags = []
# Note: If a boolean flag is True in the dictionary,
# implicitly the method assumes the boolean flag is
# specified as a command line argument. If the
# boolean flag is False, this method simply discards them.
# Eg: {no_auth: True} is similar to python your_file.py --no_auth
# {no_auth: False} is similar to python your_file.py.
if isinstance(v, bool):
if v:
flags.append('--%s' % k)
elif k in _FLAG_THAT_SETS_FALSE_VALUE:
# Capture overriding flags, which have a different dest
# from the flag name defined in the parser.add_argument
# Eg: no_use_public_ips, which has the dest=use_public_ips
# different from flag name
flag_that_disables_the_option = (_FLAG_THAT_SETS_FALSE_VALUE[k])
flags.append('--%s' % flag_that_disables_the_option)
elif isinstance(v, list):
for i in v:
flags.append('--%s=%s' % (k, i))
elif isinstance(v, dict):
flags.append('--%s=%s' % (k, json.dumps(v)))
elif v is None:
# Don't process None type args here, they will be treated
# as strings when parsed by BeamArgumentParser..
logging.warning('Not setting flag with value None: %s', k)
else:
flags.append('--%s=%s' % (k, v))
return cls(flags)
def get_all_options(
self,
drop_default=False,
add_extra_args_fn=None, # type: Optional[Callable[[_BeamArgumentParser], None]]
retain_unknown_options=False
# type: (...) -> Dict[str, Any]
"""Returns a dictionary of all defined arguments.
Returns a dictionary of all defined arguments (arguments that are defined in
any subclass of PipelineOptions) into a dictionary.
Args:
drop_default: If set to true, options that are equal to their default
values, are not returned as part of the result dictionary.
add_extra_args_fn: Callback to populate additional arguments, can be used
by runner to supply otherwise unknown args.
retain_unknown_options: If set to true, options not recognized by any
known pipeline options class will still be included in the result. If
set to false, they will be discarded.
Returns:
Dictionary of all args and values.
"""
# TODO(https://github.com/apache/beam/issues/18197): PipelineOption
# sub-classes in the main session might be repeated. Pick last unique
# instance of each subclass to avoid conflicts.
parser = _BeamArgumentParser()
for cls in PipelineOptions.__subclasses__():
subset[str(cls)] = cls
for cls in subset.values():
cls._add_argparse_args(parser) # pylint: disable=protected-access
if add_extra_args_fn:
add_extra_args_fn(parser)
known_args, unknown_args = parser.parse_known_args(self._flags)
if retain_unknown_options:
if unknown_args:
_LOGGER.warning(
'Unknown pipeline options received: %s. Ignore if flags are '
'used for internal purposes.' % (','.join(unknown_args)))
i = 0
while i < len(unknown_args):
# Treat all unary flags as booleans, and all binary argument values as
# strings.
if not unknown_args[i].startswith('-'):
i += 1
continue
if i + 1 >= len(unknown_args) or unknown_args[i + 1].startswith('-'):
if len(split) == 1:
parser.add_argument(unknown_args[i], action='store_true')
else:
parser.add_argument(split[0], type=str)
elif unknown_args[i].startswith('--'):
parser.add_argument(unknown_args[i], type=str)
i += 2
else:
# skip all binary flags used with '-' and not '--'.
# ex: using -f instead of --f (or --flexrs_goal) will prevent
# argument validation before job submission and can be incorrectly
# submitted to job.
_LOGGER.warning(
"Discarding flag %s, single dash flags are not allowed.",
unknown_args[i])
i += 2
continue
parsed_args, _ = parser.parse_known_args(self._flags)
else:
if unknown_args:
_LOGGER.warning("Discarding unparseable args: %s", unknown_args)
parsed_args = known_args
result = vars(parsed_args)
overrides = self._all_options.copy()
# Apply the overrides if any
overrides.pop(k, None)
if k in self._all_options:
result[k] = self._all_options[k]
if (drop_default and parser.get_default(k) == result[k] and
not isinstance(parser.get_default(k), ValueProvider)):
del result[k]
if retain_unknown_options:
result.update(overrides)
else:
_LOGGER.warning("Discarding invalid overrides: %s", overrides)
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
def to_runner_api(self):
def to_struct_value(o):
if isinstance(o, (bool, int, str)):
return o
elif isinstance(o, (tuple, list)):
return [to_struct_value(e) for e in o]
elif isinstance(o, dict):
return {str(k): to_struct_value(v) for k, v in o.items()}
else:
return str(o) # Best effort.
return proto_utils.pack_Struct(
**{
f'beam:option:{k}:v1': to_struct_value(v)
for (k, v) in self.get_all_options(
drop_default=True, retain_unknown_options=True).items()
if v is not None
})
@classmethod
def from_runner_api(cls, proto_options):
def from_urn(key):
assert key.startswith('beam:option:')
assert key.endswith(':v1')
return key[12:-3]
return cls(
**{from_urn(key): value
for (key, value) in proto_options.items()})
return self.get_all_options(drop_default=True, retain_unknown_options=True)
def view_as(self, cls):
# type: (Type[PipelineOptionsT]) -> PipelineOptionsT

tvalentyn
已提交
"""Returns a view of current object as provided PipelineOption subclass.
Example Usage::
options = PipelineOptions(['--runner', 'Direct', '--streaming'])
standard_options = options.view_as(StandardOptions)
if standard_options.streaming:
# ... start a streaming job ...
Note that options objects may have multiple views, and modifications
of values in any view-object will apply to current object and other
view-objects.
Args:
cls: PipelineOptions class or any of its subclasses.
Returns:
An instance of cls that is initialized using options contained in current

tvalentyn
已提交
object.
"""

tvalentyn
已提交
for option_name in view._visible_option_list():
# Initialize values of keys defined by a cls.
#
# Note that we do initialization only once per key to make sure that
# values in _all_options dict are not-recreated with each new view.
# This is important to make sure that values of multi-options keys are
# backed by the same list across multiple views, and that any overrides of
# pipeline options already stored in _all_options are preserved.
if option_name not in self._all_options:
self._all_options[option_name] = getattr(
view._visible_options, option_name)

tvalentyn
已提交
# Note that views will still store _all_options of the source object.
view._all_options = self._all_options
return view
def _visible_option_list(self):
return sorted(
option for option in dir(self._visible_options) if option[0] != '_')
return sorted(
dir(type(self)) + list(self.__dict__) + self._visible_option_list())
def __getattr__(self, name):
# Special methods which may be accessed before the object is
# fully constructed (e.g. in unpickling).
if name[:2] == name[-2:] == '__':
return object.__getattribute__(self, name)
elif name in self._visible_option_list():

tvalentyn
已提交
return self._all_options[name]
raise AttributeError(
"'%s' object has no attribute '%s'" % (type(self).__name__, name))
def __setattr__(self, name, value):
if name in ('_flags', '_all_options', '_visible_options'):
super().__setattr__(name, value)
elif name in self._visible_option_list():
self._all_options[name] = value
else:
raise AttributeError(
"'%s' object has no attribute '%s'" % (type(self).__name__, name))
return '%s(%s)' % (
type(self).__name__,
', '.join(
'%s=%s' % (option, getattr(self, option))
for option in self._visible_option_list()))
class StandardOptions(PipelineOptions):
DEFAULT_RUNNER = 'DirectRunner'
ALL_KNOWN_RUNNERS = (
'apache_beam.runners.dataflow.dataflow_runner.DataflowRunner',
'apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner',
'apache_beam.runners.direct.direct_runner.DirectRunner',
'apache_beam.runners.direct.direct_runner.SwitchingDirectRunner',
'apache_beam.runners.interactive.interactive_runner.InteractiveRunner',
'apache_beam.runners.portability.flink_runner.FlinkRunner',
'apache_beam.runners.portability.portable_runner.PortableRunner',
'apache_beam.runners.portability.spark_runner.SparkRunner',
'apache_beam.runners.test.TestDirectRunner',
'apache_beam.runners.test.TestDataflowRunner',
)
KNOWN_RUNNER_NAMES = [path.split('.')[-1] for path in ALL_KNOWN_RUNNERS]
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--runner',
help=(
'Pipeline runner used to execute the workflow. Valid values are '
'one of %s, or the fully qualified name of a PipelineRunner '
'subclass. If unspecified, defaults to %s.' %
(', '.join(cls.KNOWN_RUNNER_NAMES), cls.DEFAULT_RUNNER)))
# Whether to enable streaming mode.
parser.add_argument(
'--streaming',
default=False,
action='store_true',
help='Whether to enable streaming mode.')

tvalentyn
已提交
parser.add_argument(
'--resource_hint',

tvalentyn
已提交
dest='resource_hints',
action='append',
default=[],
help=(
'Resource hint to set in the pipeline execution environment.'
'Hints specified via this option override hints specified '
'at transform level. Interpretation of hints is defined by '
'Beam runners.'))
parser.add_argument(
'--auto_unique_labels',
default=False,
action='store_true',
help='Whether to automatically generate unique transform labels '
'for every transform. The default behavior is to raise an '
'exception if a transform is created with a non-unique label.')
class StreamingOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--update_compatibility_version',
default=None,
help='Attempt to produce a pipeline compatible with the given prior '
'version of the Beam SDK. '
'See for example, https://cloud.google.com/dataflow/docs/guides/'
'updating-a-pipeline')
class CrossLanguageOptions(PipelineOptions):
@staticmethod
def _beam_services_from_enviroment():
return json.loads(os.environ.get('BEAM_SERVICE_OVERRIDES') or '{}')
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--beam_services',
type=lambda s: {
**cls._beam_services_from_enviroment(), **json.loads(s)
},
default=cls._beam_services_from_enviroment(),
'For convenience, Beam provides the ability to automatically '
'download and start various services (such as expansion services) '
'used at pipeline construction and execution. These services are '
'identified by gradle target. This option provides the ability to '
'use pre-started services or non-default pre-existing artifacts to '
'start the given service. '
'Should be a json mapping of gradle build targets to pre-built '
'artifacts (e.g. jar files) or expansion endpoints '
'(e.g. host:port). Defaults to the value of BEAM_SERVICE_OVERRIDES '
'from the environment.'))
parser.add_argument(
'--use_transform_service',
default=False,
action='store_true',
help='Use the Docker-composed-based transform service when expanding '
'cross-language transforms.')
def additional_option_ptransform_fn():
beam.transforms.ptransform.ptransform_fn_typehints_enabled = True
# Optional type checks that aren't enabled by default.
additional_type_checks = {
'ptransform_fn': additional_option_ptransform_fn,
} # type: Dict[str, Callable[[], None]]
def enable_all_additional_type_checks():
"""Same as passing --type_check_additional=all."""
for f in additional_type_checks.values():
f()
class TypeOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
# TODO(laolu): Add a type inferencing option here once implemented.
parser.add_argument(
'--type_check_strictness',
default='DEFAULT_TO_ANY',
choices=['ALL_REQUIRED', 'DEFAULT_TO_ANY'],
help='The level of exhaustive manual type-hint '
'annotation required')
parser.add_argument(
'--type_check_additional',
default='',
help='Comma separated list of additional type checking features to '
'enable. Options: all, ptransform_fn. For details see:'
'https://beam.apache.org/documentation/sdks/python-type-safety/')
parser.add_argument(
'--no_pipeline_type_check',
dest='pipeline_type_check',
action='store_false',
help='Disable type checking at pipeline construction '
'time')
parser.add_argument(
'--runtime_type_check',
default=False,
action='store_true',
help='Enable type checking at pipeline execution '
'time. NOTE: only supported with the '
'DirectRunner')

Saavan Nanavati
已提交
parser.add_argument(
'--performance_runtime_type_check',
default=False,
action='store_true',
help='Enable faster type checking via sampling at pipeline execution '
'time. NOTE: only supported with portable runners '
'(including the DirectRunner)')
parser.add_argument(
'--allow_non_deterministic_key_coders',
default=False,
action='store_true',
help='Use non-deterministic coders (such as pickling) for key-grouping '
'operations such as GroupByKey. This is unsafe, as runners may group '
'keys based on their encoded bytes, but is available for backwards '
'compatibility. See BEAM-11719.')
parser.add_argument(
'--allow_unsafe_triggers',
action='store_true',
help='Allow the use of unsafe triggers. Unsafe triggers have the '
'potential to cause data loss due to finishing and/or never having '
'their condition met. Some operations, such as GroupByKey, disallow '
'this. This exists for cases where such loss is acceptable and for '
'backwards compatibility. See BEAM-9487.')
def validate(self, unused_validator):
errors = []
if beam.version.__version__ >= '3':
errors.append(
'Update --type_check_additional default to include all '
'available additional checks at Beam 3.0 release time.')
keys = self.type_check_additional.split(',')
for key in keys:
if not key:
continue
elif key == 'all':
enable_all_additional_type_checks()
elif key in additional_type_checks:
additional_type_checks[key]()
else:
errors.append('Unrecognized --type_check_additional feature: %s' % key)
return errors
class DirectOptions(PipelineOptions):
"""DirectRunner-specific execution options."""
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--no_direct_runner_use_stacked_bundle',
action='store_false',
dest='direct_runner_use_stacked_bundle',
help='DirectRunner uses stacked WindowedValues within a Bundle for '
'memory optimization. Set --no_direct_runner_use_stacked_bundle to '
'avoid it.')
parser.add_argument(
'--direct_runner_bundle_repeat',
type=int,
default=0,
help='replay every bundle this many extra times, for profiling'
'and debugging')
parser.add_argument(
'--direct_num_workers',
type=int,
default=1,
help='number of parallel running workers.')
parser.add_argument(
'--direct_running_mode',
default='in_memory',
choices=['in_memory', 'multi_threading', 'multi_processing'],
parser.add_argument(
'--direct_embed_docker_python',
default=False,
action='store_true',
dest='direct_embed_docker_python',
help='DirectRunner uses the embedded Python environment when '
'the default Python docker environment is specified.')
parser.add_argument(
default={},
type=json.loads,
help='Split test configuration of the json form '
'{"step_name": {"timings": [...], "fractions": [...]}, ...} '
'where step_name is the name of a step controlling the stage to which '
'splits will be sent, timings is a list of floating-point times '
'(in seconds) at which the split requests will be sent, and '
'fractions is a corresponding list of floating points to use in the '
'split requests themselves.')
class GoogleCloudOptions(PipelineOptions):
"""Google Cloud Dataflow service execution options."""
BIGQUERY_API_SERVICE = 'bigquery.googleapis.com'
COMPUTE_API_SERVICE = 'compute.googleapis.com'
STORAGE_API_SERVICE = 'storage.googleapis.com'
DATAFLOW_ENDPOINT = 'https://dataflow.googleapis.com'
OAUTH_SCOPES = [
'https://www.googleapis.com/auth/bigquery',
'https://www.googleapis.com/auth/cloud-platform',
'https://www.googleapis.com/auth/devstorage.full_control',
'https://www.googleapis.com/auth/userinfo.email',
'https://www.googleapis.com/auth/datastore',
'https://www.googleapis.com/auth/spanner.admin',
'https://www.googleapis.com/auth/spanner.data'
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--dataflow_endpoint',
default=cls.DATAFLOW_ENDPOINT,
help=(
'The URL for the Dataflow API. If not set, the default public URL '
'will be used.'))
# Remote execution must check that this option is not None.
parser.add_argument(
'--project',
default=None,
help='Name of the Cloud project owning the Dataflow '
'job.')
# Remote execution must check that this option is not None.
parser.add_argument(
'--job_name', default=None, help='Name of the Cloud Dataflow job.')
# Remote execution must check that this option is not None.
parser.add_argument(
'--staging_location',
default=None,
help='GCS path for staging code packages needed by '
'workers.')
# Remote execution must check that this option is not None.
# If staging_location is not set, it defaults to temp_location.
parser.add_argument(
'--temp_location',
default=None,
help='GCS path for saving temporary workflow jobs.')
# The Google Compute Engine region for creating Dataflow jobs. See
# https://cloud.google.com/compute/docs/regions-zones/regions-zones for a
parser.add_argument(
'--region',
default=None,
help='The Google Compute Engine region for creating '
'Dataflow job.')
parser.add_argument(
'--service_account_email',
default=None,
help='Identity to run virtual machines as.')
parser.add_argument(
'--no_auth',
dest='no_auth',
action='store_true',
default=False,
help='Skips authorizing credentials with Google Cloud.')
# Option to run templated pipelines
parser.add_argument(
'--template_location',
default=None,
help='Save job to specified local or GCS location.')
parser.add_argument(
'--label',
'--labels',
dest='labels',
action='append',
default=None,
help='Labels to be applied to this Dataflow job. '
'Labels are key value pairs separated by = '
'(e.g. --label key=value) or '
'(--labels=\'{ "key": "value", "mass": "1_3kg", "count": "3" }\').')
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
parser.add_argument(
'--update',
default=False,
action='store_true',
help='Update an existing streaming Cloud Dataflow job. '
'See https://cloud.google.com/dataflow/docs/guides/'
'updating-a-pipeline')
parser.add_argument(
'--transform_name_mapping',
default=None,
type=json.loads,
help='The transform mapping that maps the named '
'transforms in your prior pipeline code to names '
'in your replacement pipeline code.'
'See https://cloud.google.com/dataflow/docs/guides/'
'updating-a-pipeline')
parser.add_argument(
'--enable_streaming_engine',
default=False,
action='store_true',
help='Enable Windmill Service for this Dataflow job. ')
parser.add_argument(
'--dataflow_kms_key',
default=None,
help='Set a Google Cloud KMS key name to be used in '
'Dataflow state operations (GBK, Streaming).')
parser.add_argument(
'--create_from_snapshot',
default=None,
help='The snapshot from which the job should be created.')
parser.add_argument(
'--flexrs_goal',
default=None,
choices=['COST_OPTIMIZED', 'SPEED_OPTIMIZED'],
help='Set the Flexible Resource Scheduling mode')
'--dataflow_service_option',
'--dataflow_service_options',
dest='dataflow_service_options',
action='append',
default=None,
help=(
'Options to configure the Dataflow service. These '
'options decouple service side feature availability '
'from the Apache Beam release cycle.'
'Note: If set programmatically, must be set as a '
'list of strings'))
parser.add_argument(
'--enable_hot_key_logging',
default=False,
action='store_true',
help='When true, will enable the direct logging of any detected hot '
'keys into Cloud Logging. Warning: this will log the literal key as an '
'unobfuscated string.')
parser.add_argument(
'--enable_artifact_caching',
default=False,
action='store_true',
help='When true, artifacts will be cached across job submissions in '
'the GCS staging bucket')
parser.add_argument(
'--impersonate_service_account',
default=None,
help='All API requests will be made as the given service account or '
'target service account in an impersonation delegation chain '
'instead of the currently selected account. You can specify '
'either a single service account as the impersonator, or a '
'comma-separated list of service accounts to create an '
'impersonation delegation chain.')
parser.add_argument(
'--gcp_oauth_scope',
'--gcp_oauth_scopes',
dest='gcp_oauth_scopes',
action='append',
default=cls.OAUTH_SCOPES,
help=(
'Controls the OAuth scopes that will be requested when creating '
'GCP credentials. Note: If set programmatically, must be set as a '
'list of strings'))

Pablo Estrada
已提交
def _create_default_gcs_bucket(self):
try:
from apache_beam.io.gcp import gcsio
except ImportError:
_LOGGER.warning('Unable to create default GCS bucket.')
return None
bucket = gcsio.get_or_create_default_gcs_bucket(self)
if bucket:
return 'gs://%s' % bucket.id
else:
return None
# If either temp or staging location has an issue, we use the valid one for
# both locations. If both are bad we return an error.
def _handle_temp_and_staging_locations(self, validator):
temp_errors = validator.validate_gcs_path(self, 'temp_location')
staging_errors = validator.validate_gcs_path(self, 'staging_location')
if temp_errors and not staging_errors:
setattr(self, 'temp_location', getattr(self, 'staging_location'))
return []
elif staging_errors and not temp_errors:
setattr(self, 'staging_location', getattr(self, 'temp_location'))
return []
elif not staging_errors and not temp_errors:
return []
# Both staging and temp locations are bad, try to use default bucket.
else:
default_bucket = self._create_default_gcs_bucket()
if default_bucket is None:
temp_errors.extend(staging_errors)
return temp_errors
else:
setattr(self, 'temp_location', default_bucket)
setattr(self, 'staging_location', default_bucket)
return []
def validate(self, validator):
errors = []
if validator.is_service_runner():
errors.extend(self._handle_temp_and_staging_locations(validator))
errors.extend(validator.validate_cloud_options(self))

Pablo Estrada
已提交
if self.view_as(DebugOptions).dataflow_job_file:
if self.view_as(GoogleCloudOptions).template_location:
errors.append(
'--dataflow_job_file and --template_location '
'are mutually exclusive.')
# Validate that dataflow_service_options is a list
if self.dataflow_service_options:
errors.extend(
validator.validate_repeatable_argument_passed_as_list(
self, 'dataflow_service_options'))
def get_cloud_profiler_service_name(self):
_ENABLE_GOOGLE_CLOUD_PROFILER = 'enable_google_cloud_profiler'
if self.dataflow_service_options:
if _ENABLE_GOOGLE_CLOUD_PROFILER in self.dataflow_service_options:
return os.environ["JOB_NAME"]
for option_name in self.dataflow_service_options:
if option_name.startswith(_ENABLE_GOOGLE_CLOUD_PROFILER + '='):
return option_name.split('=', 1)[1]
experiments = self.view_as(DebugOptions).experiments or []
if _ENABLE_GOOGLE_CLOUD_PROFILER in experiments:
return os.environ["JOB_NAME"]
return None

creste
已提交
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
class AzureOptions(PipelineOptions):
"""Azure Blob Storage options."""
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--azure_connection_string',
default=None,
help='Connection string of the Azure Blob Storage Account.')
parser.add_argument(
'--blob_service_endpoint',
default=None,
help='URL of the Azure Blob Storage Account.')
parser.add_argument(
'--azure_managed_identity_client_id',
default=None,
help='Client ID of a user-assigned managed identity.')
def validate(self, validator):
errors = []
if self.azure_connection_string:
if self.blob_service_endpoint:
errors.append(
'--azure_connection_string and '
'--blob_service_endpoint are mutually exclusive.')