Skip to content
代码片段 群组 项目
提交 256a8ae9 编辑于 作者: Boyuan Zhang's avatar Boyuan Zhang 提交者: Luke Cwik
浏览文件

[BEAM-5801] Support postcommit ITs using fn-api worker (#6762)

* Support postcommit ITs using fn-api worker
上级 7a487e72
No related branches found
No related tags found
无相关合并请求
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import CommonJobProperties as commonJobProperties
import PostcommitJobBuilder
// This job runs the Java postcommit tests, including the suite of integration
// tests.
PostcommitJobBuilder.postCommitJob('beam_PostCommit_Java_PortabilityApi_GradleBuild', 'Run Java PortabilityApi PostCommit',
'Java SDK PortabilityApi Post Commit Tests', this) {
description('Runs PostCommit tests on the Java SDK using Portability APIs.')
// Set common parameters.
commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 240)
// Publish all test results to Jenkins
publishers {
archiveJunit('**/build/test-results/**/*.xml')
}
// Gradle goals for this job.
steps {
gradle {
rootBuildScriptDir(commonJobProperties.checkoutDir)
tasks(':javaPostCommitPortabilityApi')
commonJobProperties.setGradleSwitches(delegate)
// Specify maven home on Jenkins, needed by Maven archetype integration tests.
switches('-Pmaven_home=/home/jenkins/tools/maven/apache-maven-3.5.2')
// BEAM-5035: Parallel builds are very flaky
switches('--no-parallel')
// To catch as many as failures
switches('--continue')
}
}
}
......@@ -208,6 +208,10 @@ task javaPostCommit() {
dependsOn ":beam-sdks-java-extensions-sql-jdbc:postCommit"
}
task javaPostCommitPortabilityApi () {
dependsOn ":beam-runners-google-cloud-dataflow-java:postCommitPortabilityApi"
}
task goPreCommit() {
dependsOn ":beam-sdks-go:test"
......
......@@ -26,12 +26,14 @@ description = "Apache Beam :: Runners :: Google Cloud Dataflow"
/*
* We need to rely on manually specifying these evaluationDependsOn to ensure that
* the following projects are evaluated before we evaluate this project. This is because
* we are attempting to reference the "sourceSets.test.output" directly.
* we are attempting to reference parameters such as "sourceSets.test.output" directly.
*/
evaluationDependsOn(":beam-sdks-java-io-google-cloud-platform")
evaluationDependsOn(":beam-sdks-java-core")
evaluationDependsOn(":beam-examples-java")
evaluationDependsOn(":beam-runners-google-cloud-dataflow-java-legacy-worker")
evaluationDependsOn(":beam-runners-google-cloud-dataflow-java-fn-api-worker")
evaluationDependsOn(":beam-sdks-java-container")
processResources {
filter org.apache.tools.ant.filters.ReplaceTokens, tokens: [
......@@ -104,20 +106,42 @@ test {
systemProperties = [ "beamUseDummyRunner" : "true" ]
}
// For the following test tasks, set workerHarnessContainerImage to empty to make Dataflow pick up
// the non-versioned container image, which handles a staged worker jar.
task validatesRunnerTest(type: Test) {
def dataflowProject = project.findProperty('dataflowProject') ?: 'apache-beam-testing'
def dataflowValidatesTempRoot = project.findProperty('dataflowTempRoot') ?: 'gs://temp-storage-for-validates-runner-tests/'
def dataflowPostCommitTempRoot = project.findProperty('dataflowTempRoot') ?: 'gs://temp-storage-for-end-to-end-tests'
def dataflowLegacyWorkerJar = project.findProperty('dataflowWorkerJar') ?: project(":beam-runners-google-cloud-dataflow-java-legacy-worker").shadowJar.archivePath
def dataflowFnApiWorkerJar = project.findProperty('dataflowWorkerJar') ?: project(":beam-runners-google-cloud-dataflow-java-fn-api-worker").shadowJar.archivePath
def dockerImageRoot = project.findProperty('dockerImageRoot') ?: "us.gcr.io/${dataflowProject}/java-postcommit-it"
def dockerImageContainer = "${dockerImageRoot}/java"
def dockerTag = new Date().format('yyyyMMddHHmmss')
def dockerImageName = "${dockerImageContainer}:${dockerTag}"
def commonExcludeCategories = [
'org.apache.beam.sdk.testing.LargeKeys$Above10MB',
'org.apache.beam.sdk.testing.UsesAttemptedMetrics',
'org.apache.beam.sdk.testing.UsesDistributionMetrics',
'org.apache.beam.sdk.testing.UsesGaugeMetrics',
'org.apache.beam.sdk.testing.UsesSetState',
'org.apache.beam.sdk.testing.UsesMapState',
'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs',
'org.apache.beam.sdk.testing.UsesUnboundedPCollections',
'org.apache.beam.sdk.testing.UsesTestStream',
'org.apache.beam.sdk.testing.UsesParDoLifecycle',
'org.apache.beam.sdk.testing.UsesMetricsPusher',
]
// For the following test tasks using legacy worker, set workerHarnessContainerImage to empty to
// make Dataflow pick up the non-versioned container image, which handles a staged worker jar.
task validatesRunnerLegacyWorkerTest(type: Test) {
group = "Verification"
dependsOn ":beam-runners-google-cloud-dataflow-java-legacy-worker:shadowJar"
def dataflowProject = project.findProperty('dataflowProject') ?: 'apache-beam-testing'
def dataflowTempRoot = project.findProperty('dataflowTempRoot') ?: 'gs://temp-storage-for-validates-runner-tests/'
def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: project(":beam-runners-google-cloud-dataflow-java-legacy-worker").shadowJar.archivePath
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestDataflowRunner",
"--project=${dataflowProject}",
"--tempRoot=${dataflowTempRoot}",
"--dataflowWorkerJar=${dataflowWorkerJar}",
"--tempRoot=${dataflowValidatesTempRoot}",
"--dataflowWorkerJar=${dataflowLegacyWorkerJar}",
"--workerHarnessContainerImage=",
])
......@@ -129,37 +153,39 @@ task validatesRunnerTest(type: Test) {
testClassesDirs = files(project(":beam-sdks-java-core").sourceSets.test.output.classesDirs)
useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above10MB'
excludeCategories 'org.apache.beam.sdk.testing.UsesAttemptedMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesDistributionMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections'
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
excludeCategories 'org.apache.beam.sdk.testing.UsesMetricsPusher'
commonExcludeCategories.each {
excludeCategories it
}
}
}
task buildAndPushDockerContainer() {
dependsOn ":beam-sdks-java-container:docker"
def defaultDockerImageName = containerImageName(name: "java")
doLast {
exec {
commandLine "docker", "tag", "${defaultDockerImageName}", "${dockerImageName}"
}
exec {
commandLine "gcloud", "docker", "--", "push", "${dockerImageContainer}"
}
}
}
task validatesRunner {
group = "Verification"
description "Validates Dataflow runner"
dependsOn validatesRunnerTest
dependsOn validatesRunnerLegacyWorkerTest
}
task googleCloudPlatformIntegrationTest(type: Test) {
task googleCloudPlatformLegacyWorkerIntegrationTest(type: Test) {
group = "Verification"
dependsOn ":beam-runners-google-cloud-dataflow-java-legacy-worker:shadowJar"
def dataflowProject = project.findProperty('dataflowProject') ?: 'apache-beam-testing'
def dataflowTempRoot = project.findProperty('dataflowTempRoot') ?: 'gs://temp-storage-for-end-to-end-tests'
def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: project(":beam-runners-google-cloud-dataflow-java-legacy-worker").shadowJar.archivePath
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestDataflowRunner",
"--project=${dataflowProject}",
"--tempRoot=${dataflowTempRoot}",
"--dataflowWorkerJar=${dataflowWorkerJar}",
"--tempRoot=${dataflowPostCommitTempRoot}",
"--dataflowWorkerJar=${dataflowLegacyWorkerJar}",
"--workerHarnessContainerImage=",
])
......@@ -172,18 +198,47 @@ task googleCloudPlatformIntegrationTest(type: Test) {
useJUnit { }
}
task examplesJavaIntegrationTest(type: Test) {
task googleCloudPlatformFnApiWorkerIntegrationTest(type: Test) {
group = "Verification"
dependsOn ":beam-runners-google-cloud-dataflow-java-fn-api-worker:shadowJar"
dependsOn buildAndPushDockerContainer
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestDataflowRunner",
"--project=${dataflowProject}",
"--tempRoot=${dataflowPostCommitTempRoot}",
"--dataflowWorkerJar=${dataflowFnApiWorkerJar}",
"--workerHarnessContainerImage=${dockerImageContainer}:${dockerTag}",
"--experiments=beam_fn_api",
])
include '**/*IT.class'
exclude '**/BigQueryIOReadIT.class'
exclude '**/PubsubReadIT.class'
exclude '**/SpannerReadIT.class'
exclude '**/BigtableReadIT.class'
exclude '**/V1ReadIT.class'
exclude '**/SpannerWriteIT.class'
exclude '**/BigQueryNestedRecordsIT.class'
exclude '**/SplitQueryFnIT.class'
maxParallelForks 4
classpath = configurations.googleCloudPlatformIntegrationTest
testClassesDirs = files(project(":beam-sdks-java-io-google-cloud-platform").sourceSets.test.output.classesDirs)
useJUnit {
excludeCategories 'org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported'
}
}
task examplesJavaLegacyWorkerIntegrationTest(type: Test) {
group = "Verification"
dependsOn ":beam-runners-google-cloud-dataflow-java-legacy-worker:shadowJar"
def dataflowProject = project.findProperty('dataflowProject') ?: 'apache-beam-testing'
def dataflowTempRoot = project.findProperty('dataflowTempRoot') ?: 'gs://temp-storage-for-end-to-end-tests'
def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: project(":beam-runners-google-cloud-dataflow-java-legacy-worker").shadowJar.archivePath
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestDataflowRunner",
"--project=${dataflowProject}",
"--tempRoot=${dataflowTempRoot}",
"--dataflowWorkerJar=${dataflowWorkerJar}",
"--tempRoot=${dataflowPostCommitTempRoot}",
"--dataflowWorkerJar=${dataflowLegacyWorkerJar}",
"--workerHarnessContainerImage=",
])
......@@ -198,18 +253,48 @@ task examplesJavaIntegrationTest(type: Test) {
useJUnit { }
}
task coreSDKJavaIntegrationTest(type: Test) {
// For fn-api runner, only run the IT can be passed for now.
// Should support more ITs in the future.
task examplesJavaFnApiWorkerIntegrationTest(type: Test) {
group = "Verification"
dependsOn ":beam-runners-google-cloud-dataflow-java-fn-api-worker:shadowJar"
dependsOn buildAndPushDockerContainer
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestDataflowRunner",
"--project=${dataflowProject}",
"--tempRoot=${dataflowPostCommitTempRoot}",
"--dataflowWorkerJar=${dataflowFnApiWorkerJar}",
"--workerHarnessContainerImage=${dockerImageContainer}:${dockerTag}",
"--experiments=beam_fn_api",
])
// The examples/java preCommit task already covers running WordCountIT/WindowedWordCountIT so
// this postCommit integration test excludes them.
include '**/*IT.class'
exclude '**/WordCountIT.class'
exclude '**/WindowedWordCountIT.class'
exclude '**/TopWikipediaSessionsIT.class'
exclude '**/TfIdfIT.class'
exclude '**/AutoCompleteIT.class'
exclude '**/TrafficMaxLaneFlowIT.class'
exclude '**/TrafficRoutesIT.class'
maxParallelForks 4
classpath = configurations.examplesJavaIntegrationTest
testClassesDirs = files(project(":beam-examples-java").sourceSets.test.output.classesDirs)
useJUnit { }
}
task coreSDKJavaLegacyWorkerIntegrationTest(type: Test) {
group = "Verification"
dependsOn ":beam-runners-google-cloud-dataflow-java-legacy-worker:shadowJar"
def dataflowProject = project.findProperty('dataflowProject') ?: 'apache-beam-testing'
def dataflowTempRoot = project.findProperty('dataflowTempRoot') ?: 'gs://temp-storage-for-end-to-end-tests'
def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: project(":beam-runners-google-cloud-dataflow-java-legacy-worker").shadowJar.archivePath
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestDataflowRunner",
"--project=${dataflowProject}",
"--tempRoot=${dataflowTempRoot}",
"--dataflowWorkerJar=${dataflowWorkerJar}",
"--tempRoot=${dataflowPostCommitTempRoot}",
"--dataflowWorkerJar=${dataflowLegacyWorkerJar}",
"--workerHarnessContainerImage=",
])
......@@ -220,12 +305,50 @@ task coreSDKJavaIntegrationTest(type: Test) {
useJUnit { }
}
task coreSDKJavaFnApiWorkerIntegrationTest(type: Test) {
group = "Verification"
dependsOn ":beam-runners-google-cloud-dataflow-java-fn-api-worker:shadowJar"
dependsOn buildAndPushDockerContainer
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestDataflowRunner",
"--project=${dataflowProject}",
"--tempRoot=${dataflowPostCommitTempRoot}",
"--dataflowWorkerJar=${dataflowFnApiWorkerJar}",
"--workerHarnessContainerImage=${dockerImageContainer}:${dockerTag}",
"--experiments=beam_fn_api",
])
include '**/*IT.class'
maxParallelForks 4
classpath = configurations.coreSDKJavaIntegrationTest
testClassesDirs = files(project(":beam-sdks-java-core").sourceSets.test.output.classesDirs)
useJUnit { }
}
task postCommit {
group = "Verification"
description = "Various integration tests using the Dataflow runner."
dependsOn googleCloudPlatformIntegrationTest
dependsOn examplesJavaIntegrationTest
dependsOn coreSDKJavaIntegrationTest
dependsOn googleCloudPlatformLegacyWorkerIntegrationTest
dependsOn examplesJavaLegacyWorkerIntegrationTest
dependsOn coreSDKJavaLegacyWorkerIntegrationTest
}
task postCommitPortabilityApi {
group = "Verification"
description = "Various integration tests using the Dataflow FnApi runner."
dependsOn googleCloudPlatformFnApiWorkerIntegrationTest
dependsOn examplesJavaFnApiWorkerIntegrationTest
dependsOn coreSDKJavaFnApiWorkerIntegrationTest
// Clean up docker image
doLast {
exec {
commandLine "docker", "rmi", "${dockerImageName}"
}
exec {
commandLine "gcloud", "--quiet", "container", "images", "delete", "${dockerImageName}"
}
}
}
def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.testing;
/** Category tag for the dataflow fn-api worker unsupported tests. */
public interface DataflowPortabilityApiUnsupported {}
......@@ -42,6 +42,7 @@ import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.Reshuffle;
......@@ -51,6 +52,7 @@ import org.apache.beam.sdk.values.PCollection;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
......@@ -289,6 +291,7 @@ public class BigQueryToTableIT {
}
@Test
@Category(DataflowPortabilityApiUnsupported.class)
public void testNewTypesQueryWithoutReshuffleWithCustom() throws Exception {
this.setupNewTypesQueryTest();
this.options.setExperiments(
......@@ -300,6 +303,7 @@ public class BigQueryToTableIT {
}
@Test
@Category(DataflowPortabilityApiUnsupported.class)
public void testLegacyQueryWithoutReshuffleWithCustom() throws Exception {
this.setupLegacyQueryTest();
this.options.setExperiments(
......@@ -311,6 +315,7 @@ public class BigQueryToTableIT {
}
@Test
@Category(DataflowPortabilityApiUnsupported.class)
public void testStandardQueryWithoutReshuffleWithCustom() throws Exception {
this.setupStandardQueryTest();
this.options.setExperiments(
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册