Skip to content
代码片段 群组 项目
未验证 提交 858f0b9b 编辑于 作者: annaqin418's avatar annaqin418 提交者: GitHub
浏览文件

[BEAM-9872] Moved Spark validates tests to shared file (#12002)

* [BEAM-9872] Moved Spark validates tests to shared file

* combine python versions in wrapper task

* spark test only python 3

* changed env type

* changed env config

* changed environment_config

* wrapper task in beam/build

* update groovy file to run wrapper
上级 f22279bf
No related branches found
No related tags found
无相关合并请求
......@@ -31,7 +31,7 @@ PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_VR_Spark',
steps {
gradle {
rootBuildScriptDir(commonJobProperties.checkoutDir)
tasks(':sdks:python:test-suites:portable:py2:sparkValidatesRunner')
tasks(':pythonSparkPostCommit')
commonJobProperties.setGradleSwitches(delegate)
}
}
......
......@@ -280,6 +280,13 @@ task portablePythonPreCommit() {
dependsOn ":sdks:python:test-suites:portable:py37:preCommitPy37"
}
task pythonSparkPostCommit() {
dependsOn ":sdks:python:test-suites:portable:py2:sparkValidatesRunner"
dependsOn ":sdks:python:test-suites:portable:py35:sparkValidatesRunner"
dependsOn ":sdks:python:test-suites:portable:py36:sparkValidatesRunner"
dependsOn ":sdks:python:test-suites:portable:py37:sparkValidatesRunner"
}
task websitePreCommit() {
dependsOn ":website:preCommit"
}
......
import org.apache.tools.ant.taskdefs.condition.Os
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
......@@ -110,11 +112,11 @@ task crossLanguagePythonJavaKafkaIOFlink {
doLast {
def kafkaJar = project(":sdks:java:testing:kafka-service:").buildTestKafkaServiceJar.archivePath
def options = [
"--runner=FlinkRunner",
"--parallelism=2",
"--environment_type=DOCKER",
"--environment_cache_millis=10000",
"--experiment=beam_fn_api",
"--runner=FlinkRunner",
"--parallelism=2",
"--environment_type=DOCKER",
"--environment_cache_millis=10000",
"--experiment=beam_fn_api",
]
exec {
environment "LOCAL_KAFKA_JAR", kafkaJar
......@@ -131,34 +133,103 @@ task crossLanguagePythonJavaKafkaIOFlink {
}
}
task createProcessWorker {
dependsOn ':sdks:python:container:build'
dependsOn 'setupVirtualenv'
def sdkWorkerFile = file("${buildDir}/sdk_worker.sh")
def osType = 'linux'
if (Os.isFamily(Os.FAMILY_MAC))
osType = 'darwin'
def workerScript = "${project(":sdks:python:container:").buildDir.absolutePath}/target/launcher/${osType}_amd64/boot"
def sdkWorkerFileCode = "sh -c \"pip=`which pip` . ${envdir}/bin/activate && ${workerScript} \$* \""
outputs.file sdkWorkerFile
doLast {
sdkWorkerFile.write sdkWorkerFileCode
exec {
commandLine('sh', '-c', ". ${envdir}/bin/activate && cd ${pythonRootDir} && pip install -e .[test]")
}
exec {
commandLine('chmod', '+x', sdkWorkerFile)
}
}
}
def sparkCompatibilityMatrix = {
def config = it ? it as CompatibilityMatrixConfig : new CompatibilityMatrixConfig()
def workerType = config.workerType.name()
def streaming = config.streaming
def environment_config = config.workerType == CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS ? "--environment_config='{\"command\": \"${buildDir.absolutePath}/sdk_worker.sh\"}'" : ""
def name = "sparkCompatibilityMatrix${streaming ? 'Streaming' : 'Batch'}${config.preOptimize ? 'PreOptimize' : ''}${workerType}"
tasks.create(name: name) {
dependsOn 'createProcessWorker'
dependsOn 'setupVirtualenv'
dependsOn ':runners:spark:job-server:shadowJar'
doLast {
def argMap = [
"environment_type" : workerType,
"spark_job_server_jar": project(":runners:spark:job-server:").shadowJar.archivePath,
"environment_cache_millis": 10000,
]
def argString = mapToArgString(argMap)
// Optionally specify test function names separated by space e.g.:
// ./gradlew :sdks:python:test-suites:portable:py2:sparkValidatesRunner -Ptests="test_external_transforms test_read"
// Otherwise run all test functions under SparkRunnerTest
def tests = project.hasProperty('tests') ?
project.property('tests').split().collect{ "SparkRunnerTest.$it" }.join(' ') : ''
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && cd ${pythonRootDir} && pip install -e .[test] && python -m apache_beam.runners.portability.spark_runner_test $tests $argString ${environment_config}"
}
}
}
}
task sparkCompatibilityMatrixDocker() {
dependsOn sparkCompatibilityMatrix(streaming: false)
}
task sparkCompatibilityMatrixProcess() {
dependsOn sparkCompatibilityMatrix(streaming: false, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS)
}
task sparkCompatibilityMatrixLoopback() {
dependsOn sparkCompatibilityMatrix(streaming: false, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.LOOPBACK)
}
task sparkValidatesRunner() {
dependsOn 'sparkCompatibilityMatrixLoopback'
}
project.task("preCommitPy${pythonVersionSuffix}") {
dependsOn = [":sdks:python:container:py${pythonVersionSuffix}:docker",
':runners:flink:1.10:job-server:shadowJar',
'portableWordCountFlinkRunnerBatch',
'portableWordCountFlinkRunnerStreaming']
dependsOn = [":sdks:python:container:py${pythonVersionSuffix}:docker",
':runners:flink:1.10:job-server:shadowJar',
'portableWordCountFlinkRunnerBatch',
'portableWordCountFlinkRunnerStreaming']
}
project.task("postCommitPy${pythonVersionSuffix}") {
dependsOn = ['setupVirtualenv',
"postCommitPy${pythonVersionSuffix}IT",
':runners:spark:job-server:shadowJar',
'portableWordCountSparkRunnerBatch']
dependsOn = ['setupVirtualenv',
"postCommitPy${pythonVersionSuffix}IT",
':runners:spark:job-server:shadowJar',
'portableWordCountSparkRunnerBatch']
}
project.task("postCommitPy${pythonVersionSuffix}IT") {
dependsOn = ['setupVirtualenv',
'installGcpTest',
':runners:flink:1.10:job-server:shadowJar']
'installGcpTest',
':runners:flink:1.10:job-server:shadowJar']
doLast {
def tests = [
"apache_beam.io.gcp.bigquery_read_it_test",
"apache_beam.io.gcp.bigquery_read_it_test",
]
def testOpts = ["--tests=${tests.join(',')}"]
def cmdArgs = mapToArgString([
"test_opts": testOpts,
"suite": "postCommitIT-flink-py${pythonVersionSuffix}",
"pipeline_opts": "--runner=FlinkRunner --project=apache-beam-testing --environment_type=LOOPBACK --temp_location=gs://temp-storage-for-end-to-end-tests/temp-it",
"test_opts": testOpts,
"suite": "postCommitIT-flink-py${pythonVersionSuffix}",
"pipeline_opts": "--runner=FlinkRunner --project=apache-beam-testing --environment_type=LOOPBACK --temp_location=gs://temp-storage-for-end-to-end-tests/temp-it",
])
exec {
executable 'sh'
......
......@@ -132,52 +132,4 @@ task chicagoTaxiExample {
/*************************************************************************************************/
task createProcessWorker {
dependsOn ':sdks:python:container:build'
dependsOn 'setupVirtualenv'
def sdkWorkerFile = file("${buildDir}/sdk_worker.sh")
def osType = 'linux'
if (Os.isFamily(Os.FAMILY_MAC))
osType = 'darwin'
def workerScript = "${project(":sdks:python:container:").buildDir.absolutePath}/target/launcher/${osType}_amd64/boot"
def sdkWorkerFileCode = "sh -c \"pip=`which pip` . ${envdir}/bin/activate && ${workerScript} \$* \""
outputs.file sdkWorkerFile
doLast {
sdkWorkerFile.write sdkWorkerFileCode
exec {
commandLine('sh', '-c', ". ${envdir}/bin/activate && cd ${pythonRootDir} && pip install -e .[test]")
}
exec {
commandLine('chmod', '+x', sdkWorkerFile)
}
}
}
task sparkValidatesRunner() {
dependsOn 'createProcessWorker'
dependsOn 'setupVirtualenv'
dependsOn ':runners:spark:job-server:shadowJar'
doLast {
def environment_config = "'{\"command\": \"${buildDir.absolutePath}/sdk_worker.sh\"}'"
def argMap = [
"environment_type" : "PROCESS",
"spark_job_server_jar": project(":runners:spark:job-server:").shadowJar.archivePath,
"environment_config": environment_config,
"environment_cache_millis": 10000,
]
def argString = mapToArgString(argMap)
// Optionally specify test function names separated by space e.g.:
// ./gradlew :sdks:python:test-suites:portable:py2:sparkValidatesRunner -Ptests="test_external_transforms test_read"
// Otherwise run all test functions under SparkRunnerTest
def tests = project.hasProperty('tests') ?
project.property('tests').split().collect{ "SparkRunnerTest.$it" }.join(' ') : ''
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && cd ${pythonRootDir} && pip install -e .[test] && python -m apache_beam.runners.portability.spark_runner_test $tests $argString"
}
}
}
apply from: "../common.gradle"
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册