Skip to content
代码片段 群组 项目
未验证 提交 758d821a 编辑于 作者: omarismail94's avatar omarismail94 提交者: GitHub
浏览文件

[BEAM-10037] BeamSqlExample.java fails to build (#11754)


* [BEAM-10037] BeamSqlExample.java fails to build when running ./gradlew command

* [BEAM-10037] Added BeamSQLExample and PojoExample to sqlPreCommit(), and fixed coder in BeamSqlExample

* [BEAM-10037] Reverted first query to c1>1 as opposed to c3. I added c3 for testing and forgot to change it back :)

Co-authored-by: default avatarOmar Ismail <omarismail@omarismail-macbookpro.roam.corp.google.com>
上级 9818fa82
No related branches found
No related tags found
无相关合并请求
......@@ -152,6 +152,8 @@ task javaPreCommit() {
}
task sqlPreCommit() {
dependsOn ":sdks:java:extensions:sql:runBasicExample"
dependsOn ":sdks:java:extensions:sql:runPojoExample"
dependsOn ":sdks:java:extensions:sql:build"
dependsOn ":sdks:java:extensions:sql:buildDependents"
}
......
......@@ -45,6 +45,7 @@ import org.apache.beam.sdk.values.TupleTag;
* Beam documentation on how to run pipelines.
*/
class BeamSqlExample {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline p = Pipeline.create(options);
......@@ -66,19 +67,21 @@ class BeamSqlExample {
inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1"));
// print the output record of case 1;
outputStream.apply(
"log_result",
MapElements.via(
new SimpleFunction<Row, Row>() {
@Override
public Row apply(Row input) {
// expect output:
// PCOLLECTION: [3, row, 3.0]
// PCOLLECTION: [2, row, 2.0]
System.out.println("PCOLLECTION: " + input.getValues());
return input;
}
}));
outputStream
.apply(
"log_result",
MapElements.via(
new SimpleFunction<Row, Row>() {
@Override
public Row apply(Row input) {
// expect output:
// PCOLLECTION: [3, row, 3.0]
// PCOLLECTION: [2, row, 2.0]
System.out.println("PCOLLECTION: " + input.getValues());
return input;
}
}))
.setRowSchema(type);
// Case 2. run the query with SqlTransform.query over result PCollection of case 1.
PCollection<Row> outputStream2 =
......@@ -86,18 +89,21 @@ class BeamSqlExample {
.apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT group by c2"));
// print the output record of case 2;
outputStream2.apply(
"log_result",
MapElements.via(
new SimpleFunction<Row, Row>() {
@Override
public Row apply(Row input) {
// expect output:
// CASE1_RESULT: [row, 5.0]
System.out.println("CASE1_RESULT: " + input.getValues());
return input;
}
}));
outputStream2
.apply(
"log_result",
MapElements.via(
new SimpleFunction<Row, Row>() {
@Override
public Row apply(Row input) {
// expect output:
// CASE1_RESULT: [row, 5.0]
System.out.println("CASE1_RESULT: " + input.getValues());
return input;
}
}))
.setRowSchema(
Schema.builder().addStringField("stringField").addDoubleField("doubleField").build());
p.run().waitUntilFinish();
}
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册