Skip to content
代码片段 群组 项目
未验证 提交 b4ffbfe7 编辑于 作者: Robert Bradshaw's avatar Robert Bradshaw 提交者: GitHub
浏览文件

Merge pull request #29828 Also pull default beam services from an environment variable.

No related branches found
No related tags found
无相关合并请求
......@@ -572,12 +572,18 @@ class StreamingOptions(PipelineOptions):
class CrossLanguageOptions(PipelineOptions):
@staticmethod
def _beam_services_from_enviroment():
return json.loads(os.environ.get('BEAM_SERVICE_OVERRIDES') or '{}')
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--beam_services',
type=json.loads,
default={},
type=lambda s: {
**cls._beam_services_from_enviroment(), **json.loads(s)
},
default=cls._beam_services_from_enviroment(),
help=(
'For convenience, Beam provides the ability to automatically '
'download and start various services (such as expansion services) '
......@@ -586,7 +592,9 @@ class CrossLanguageOptions(PipelineOptions):
'use pre-started services or non-default pre-existing artifacts to '
'start the given service. '
'Should be a json mapping of gradle build targets to pre-built '
'artifacts (e.g. jar files) expansion endpoints (e.g. host:port).'))
'artifacts (e.g. jar files) or expansion endpoints '
'(e.g. host:port). Defaults to the value of BEAM_SERVICE_OVERRIDES '
'from the environment.'))
parser.add_argument(
'--use_transform_service',
......
......@@ -21,11 +21,14 @@
import json
import logging
import os
import unittest
import hamcrest as hc
import mock
from parameterized import parameterized
from apache_beam.options.pipeline_options import CrossLanguageOptions
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
......@@ -395,6 +398,36 @@ class PipelineOptionsTest(unittest.TestCase):
self.assertEqual(worker_options.machine_type, 'abc')
self.assertEqual(worker_options.disk_type, 'def')
def test_beam_services_empty(self):
with mock.patch.dict(os.environ, {}, clear=True):
options = PipelineOptions().view_as(CrossLanguageOptions)
self.assertEqual(options.beam_services, {})
def test_beam_services_from_env(self):
with mock.patch.dict(os.environ,
{'BEAM_SERVICE_OVERRIDES': '{"foo": "bar"}'},
clear=True):
options = PipelineOptions().view_as(CrossLanguageOptions)
self.assertEqual(options.beam_services, {'foo': 'bar'})
def test_beam_services_from_flag(self):
with mock.patch.dict(os.environ, {}, clear=True):
options = PipelineOptions(['--beam_services={"foo": "bar"}'
]).view_as(CrossLanguageOptions)
self.assertEqual(options.beam_services, {'foo': 'bar'})
def test_beam_services_from_env_and_flag(self):
with mock.patch.dict(
os.environ,
{'BEAM_SERVICE_OVERRIDES': '{"foo": "bar", "other": "zzz"}'},
clear=True):
options = PipelineOptions(['--beam_services={"foo": "override"}'
]).view_as(CrossLanguageOptions)
self.assertEqual(
options.beam_services, {
'foo': 'override', 'other': 'zzz'
})
def test_option_modifications_are_shared_between_views(self):
pipeline_options = PipelineOptions([
'--mock_option',
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册