From b4c3a4ff927c052d503378aeb1584c82fe457c47 Mon Sep 17 00:00:00 2001
From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com>
Date: Wed, 8 Jan 2025 16:43:20 +0000
Subject: [PATCH] [BigQueryIO] fetch updated schema for newly created Storage
 API stream writers (#33231)

* add dynamic dest test

* fix and add some tests

* add to changes.md

* fix whitespace

* trigger postcommits

* address comments
---
 .../beam_PostCommit_Java_DataflowV2.json      |   2 +-
 CHANGES.md                                    |   1 +
 .../sdk/io/gcp/bigquery/BigQueryServices.java |   2 +-
 .../io/gcp/bigquery/BigQueryServicesImpl.java |  13 +-
 .../StorageApiWriteUnshardedRecords.java      |  20 +-
 .../StorageApiWritesShardedRecords.java       |  22 ++
 .../io/gcp/testing/FakeDatasetService.java    |   4 +-
 .../StorageApiSinkSchemaUpdateIT.java         | 206 ++++++++++++++++--
 8 files changed, 236 insertions(+), 34 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json
index 3f63c0c9975..bbdc3a3910e 100644
--- a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json
+++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json
@@ -1,4 +1,4 @@
 {
     "comment": "Modify this file in a trivial way to cause this test suite to run",
-    "modification": 2
+    "modification": 3
 }
diff --git a/CHANGES.md b/CHANGES.md
index d5cbb76fb3d..ac3992a872d 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -79,6 +79,7 @@
 ## Bugfixes
 
 * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
+* [BigQueryIO] Fixed an issue where Storage Write API sometimes doesn't pick up auto-schema updates ([#33231](https://github.com/apache/beam/pull/33231))
 
 ## Security Fixes
 * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 374288de656..8dbc47359b7 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -213,7 +213,7 @@ public interface BigQueryServices extends Serializable {
         throws IOException, InterruptedException;
 
     @Nullable
-    WriteStream getWriteStream(String writeStream);
+    TableSchema getWriteStreamSchema(String writeStream);
 
     /**
      * Create an append client for a given Storage API write stream. The stream must be created
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 432f31e81c9..0688694fb9b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -76,6 +76,7 @@ import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamRequest;
 import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse;
 import com.google.cloud.bigquery.storage.v1.FlushRowsRequest;
 import com.google.cloud.bigquery.storage.v1.FlushRowsResponse;
+import com.google.cloud.bigquery.storage.v1.GetWriteStreamRequest;
 import com.google.cloud.bigquery.storage.v1.ProtoRows;
 import com.google.cloud.bigquery.storage.v1.ProtoSchema;
 import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
@@ -86,6 +87,7 @@ import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse;
 import com.google.cloud.bigquery.storage.v1.StreamWriter;
 import com.google.cloud.bigquery.storage.v1.TableSchema;
 import com.google.cloud.bigquery.storage.v1.WriteStream;
+import com.google.cloud.bigquery.storage.v1.WriteStreamView;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
 import com.google.protobuf.DescriptorProtos;
@@ -1418,8 +1420,15 @@ public class BigQueryServicesImpl implements BigQueryServices {
     }
 
     @Override
-    public @Nullable WriteStream getWriteStream(String writeStream) {
-      return newWriteClient.getWriteStream(writeStream);
+    public @Nullable TableSchema getWriteStreamSchema(String writeStream) {
+      @Nullable
+      WriteStream stream =
+          newWriteClient.getWriteStream(
+              GetWriteStreamRequest.newBuilder()
+                  .setView(WriteStreamView.FULL)
+                  .setName(writeStream)
+                  .build());
+      return (stream != null && stream.hasTableSchema()) ? stream.getTableSchema() : null;
     }
 
     @Override
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 30a9ecd274d..1b5f32f7e43 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -29,7 +29,6 @@ import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
 import com.google.cloud.bigquery.storage.v1.Exceptions;
 import com.google.cloud.bigquery.storage.v1.ProtoRows;
 import com.google.cloud.bigquery.storage.v1.TableSchema;
-import com.google.cloud.bigquery.storage.v1.WriteStream;
 import com.google.cloud.bigquery.storage.v1.WriteStream.Type;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.DescriptorProtos;
@@ -475,15 +474,18 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
             () -> {
               if (autoUpdateSchema) {
                 @Nullable
-                WriteStream writeStream =
+                TableSchema streamSchema =
                     Preconditions.checkStateNotNull(maybeWriteStreamService)
-                        .getWriteStream(streamName);
-                if (writeStream != null && writeStream.hasTableSchema()) {
-                  TableSchema updatedFromStream = writeStream.getTableSchema();
-                  currentSchema.set(updatedFromStream);
-                  updated.set(true);
-                  LOG.debug(
-                      "Fetched updated schema for table {}:\n\t{}", tableUrn, updatedFromStream);
+                        .getWriteStreamSchema(streamName);
+                if (streamSchema != null) {
+                  Optional<TableSchema> newSchema =
+                      TableSchemaUpdateUtils.getUpdatedSchema(initialTableSchema, streamSchema);
+                  if (newSchema.isPresent()) {
+                    currentSchema.set(newSchema.get());
+                    updated.set(true);
+                    LOG.debug(
+                        "Fetched updated schema for table {}:\n\t{}", tableUrn, newSchema.get());
+                  }
                 }
               }
               return null;
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index 738a52b69cb..d905c4bf93c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -531,6 +531,28 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
                       element.getKey().getKey(), dynamicDestinations, datasetService);
               tableSchema = converter.getTableSchema();
               descriptor = converter.getDescriptor(false);
+
+              if (autoUpdateSchema) {
+                // A StreamWriter ignores table schema updates that happen prior to its creation.
+                // So before creating a StreamWriter below, we fetch the table schema to check if we
+                // missed an update.
+                // If so, use the new schema instead of the base schema
+                @Nullable
+                TableSchema streamSchema =
+                    MoreObjects.firstNonNull(
+                        writeStreamService.getWriteStreamSchema(getOrCreateStream.get()),
+                        TableSchema.getDefaultInstance());
+                Optional<TableSchema> newSchema =
+                    TableSchemaUpdateUtils.getUpdatedSchema(tableSchema, streamSchema);
+
+                if (newSchema.isPresent()) {
+                  tableSchema = newSchema.get();
+                  descriptor =
+                      TableRowToStorageApiProto.descriptorSchemaFromTableSchema(
+                          tableSchema, true, false);
+                  updatedSchema.write(tableSchema);
+                }
+              }
             }
             AppendClientInfo info =
                 AppendClientInfo.of(
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
index a99e4bea37a..24229643eca 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
@@ -590,11 +590,11 @@ public class FakeDatasetService implements DatasetService, WriteStreamService, S
 
   @Override
   @Nullable
-  public WriteStream getWriteStream(String streamName) {
+  public com.google.cloud.bigquery.storage.v1.TableSchema getWriteStreamSchema(String streamName) {
     synchronized (FakeDatasetService.class) {
       @Nullable Stream stream = writeStreams.get(streamName);
       if (stream != null) {
-        return stream.toWriteStream();
+        return stream.toWriteStream().getTableSchema();
       }
     }
     // TODO(relax): Return the exact error that BigQuery returns.
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java
index ae9e9cefb15..3118e97b2b9 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java
@@ -19,6 +19,8 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeTrue;
 
@@ -33,7 +35,9 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.function.Function;
@@ -60,6 +64,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import org.joda.time.Duration;
@@ -124,6 +129,9 @@ public class StorageApiSinkSchemaUpdateIT {
   private static final int TOTAL_N = 70;
   // Number of rows with the original schema
   private static final int ORIGINAL_N = 60;
+  // for dynamic destination test
+  private static final int NUM_DESTINATIONS = 3;
+  private static final int TOTAL_NUM_STREAMS = 9;
 
   private final Random randomGenerator = new Random();
 
@@ -145,6 +153,11 @@ public class StorageApiSinkSchemaUpdateIT {
   }
 
   private String createTable(TableSchema tableSchema) throws IOException, InterruptedException {
+    return createTable(tableSchema, "");
+  }
+
+  private String createTable(TableSchema tableSchema, String suffix)
+      throws IOException, InterruptedException {
     String tableId = Iterables.get(Splitter.on('[').split(testName.getMethodName()), 0);
     if (useInputSchema) {
       tableId += "WithInputSchema";
@@ -152,6 +165,8 @@ public class StorageApiSinkSchemaUpdateIT {
     if (changeTableSchema) {
       tableId += "OnSchemaChange";
     }
+    tableId += suffix;
+
     BQ_CLIENT.deleteTable(PROJECT, BIG_QUERY_DATASET_ID, tableId);
     BQ_CLIENT.createNewTable(
         PROJECT,
@@ -170,9 +185,8 @@ public class StorageApiSinkSchemaUpdateIT {
 
     private final String projectId;
     private final String datasetId;
-    private final String tableId;
     // represent as String because TableSchema is not serializable
-    private final String newSchema;
+    private final Map<String, String> newSchemas;
 
     private transient BigqueryClient bqClient;
 
@@ -183,11 +197,14 @@ public class StorageApiSinkSchemaUpdateIT {
     private final StateSpec<ValueState<Integer>> counter;
 
     public UpdateSchemaDoFn(
-        String projectId, String datasetId, String tableId, TableSchema newSchema) {
+        String projectId, String datasetId, Map<String, TableSchema> newSchemas) {
       this.projectId = projectId;
       this.datasetId = datasetId;
-      this.tableId = tableId;
-      this.newSchema = BigQueryHelpers.toJsonString(newSchema);
+      Map<String, String> serializableSchemas = new HashMap<>();
+      for (Map.Entry<String, TableSchema> entry : newSchemas.entrySet()) {
+        serializableSchemas.put(entry.getKey(), BigQueryHelpers.toJsonString(entry.getValue()));
+      }
+      this.newSchemas = serializableSchemas;
       this.bqClient = null;
       this.counter = StateSpecs.value();
     }
@@ -201,14 +218,17 @@ public class StorageApiSinkSchemaUpdateIT {
     public void processElement(ProcessContext c, @StateId(ROW_COUNTER) ValueState<Integer> counter)
         throws Exception {
       int current = firstNonNull(counter.read(), 0);
-      // We update schema early on to leave a healthy amount of time for
-      // StreamWriter to recognize it.
-      if (current == 10) {
-        bqClient.updateTableSchema(
-            projectId,
-            datasetId,
-            tableId,
-            BigQueryHelpers.fromJsonString(newSchema, TableSchema.class));
+      // We update schema early on to leave a healthy amount of time for StreamWriter to recognize
+      // it.
+      // We also update halfway through so that some writers are created *after* the schema update
+      if (current == TOTAL_NUM_STREAMS / 2) {
+        for (Map.Entry<String, String> entry : newSchemas.entrySet()) {
+          bqClient.updateTableSchema(
+              projectId,
+              datasetId,
+              entry.getKey(),
+              BigQueryHelpers.fromJsonString(entry.getValue(), TableSchema.class));
+        }
       }
 
       counter.write(++current);
@@ -304,7 +324,7 @@ public class StorageApiSinkSchemaUpdateIT {
     p.getOptions().as(BigQueryOptions.class).setStorageApiAppendThresholdBytes(0);
     // Limit parallelism so that all streams recognize the new schema in an expected short amount
     // of time (before we start writing rows with updated schema)
-    p.getOptions().as(BigQueryOptions.class).setNumStorageWriteApiStreams(3);
+    p.getOptions().as(BigQueryOptions.class).setNumStorageWriteApiStreams(TOTAL_NUM_STREAMS);
     // Need to manually enable streaming engine for legacy dataflow runner
     ExperimentalOptions.addExperiment(
         p.getOptions().as(ExperimentalOptions.class), GcpOptions.STREAMING_ENGINE_EXPERIMENT);
@@ -394,7 +414,8 @@ public class StorageApiSinkSchemaUpdateIT {
               .apply(
                   "Update Schema",
                   ParDo.of(
-                      new UpdateSchemaDoFn(PROJECT, BIG_QUERY_DATASET_ID, tableId, updatedSchema)));
+                      new UpdateSchemaDoFn(
+                          PROJECT, BIG_QUERY_DATASET_ID, ImmutableMap.of(tableId, updatedSchema))));
     }
     WriteResult result = rows.apply("Stream to BigQuery", write);
     if (useIgnoreUnknownValues) {
@@ -494,13 +515,13 @@ public class StorageApiSinkSchemaUpdateIT {
       if (Integer.parseInt((String) row.get("id")) < ORIGINAL_N
           || !useAutoSchemaUpdate
           || !changeTableSchema) {
-        assertTrue(
+        assertNull(
             String.format("Expected row to NOT have field %s:\n%s", extraField, row),
-            row.get(extraField) == null);
+            row.get(extraField));
       } else {
-        assertTrue(
+        assertNotNull(
             String.format("Expected row to have field %s:\n%s", extraField, row),
-            row.get(extraField) != null);
+            row.get(extraField));
       }
     }
   }
@@ -539,4 +560,151 @@ public class StorageApiSinkSchemaUpdateIT {
   public void testAtLeastOnceWithAutoSchemaUpdate() throws Exception {
     runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_API_AT_LEAST_ONCE, true, true);
   }
+
+  public void runDynamicDestinationsWithAutoSchemaUpdate(boolean useAtLeastOnce) throws Exception {
+    Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions());
+    // 0 threshold so that the stream tries fetching an updated schema after each append
+    p.getOptions().as(BigQueryOptions.class).setStorageApiAppendThresholdBytes(0);
+    // Total streams per destination
+    p.getOptions()
+        .as(BigQueryOptions.class)
+        .setNumStorageWriteApiStreams(TOTAL_NUM_STREAMS / NUM_DESTINATIONS);
+    // Need to manually enable streaming engine for legacy dataflow runner
+    ExperimentalOptions.addExperiment(
+        p.getOptions().as(ExperimentalOptions.class), GcpOptions.STREAMING_ENGINE_EXPERIMENT);
+    // Only run the most relevant test case on Dataflow
+    if (p.getOptions().getRunner().getName().contains("DataflowRunner")) {
+      assumeTrue(
+          "Skipping in favor of more relevant test case", changeTableSchema && useInputSchema);
+    }
+
+    List<String> fieldNamesOrigin = new ArrayList<String>(Arrays.asList(FIELDS));
+
+    // Shuffle the fields in the write schema to do fuzz testing on field order
+    List<String> fieldNamesShuffled = new ArrayList<String>(fieldNamesOrigin);
+    Collections.shuffle(fieldNamesShuffled, randomGenerator);
+    TableSchema bqTableSchema = makeTableSchemaFromTypes(fieldNamesOrigin, null);
+    TableSchema inputSchema = makeTableSchemaFromTypes(fieldNamesShuffled, null);
+
+    Map<Long, String> destinations = new HashMap<>(NUM_DESTINATIONS);
+    Map<String, TableSchema> updatedSchemas = new HashMap<>(NUM_DESTINATIONS);
+    Map<String, String> extraFields = new HashMap<>(NUM_DESTINATIONS);
+    Map<Long, GenerateRowFunc> rowFuncs = new HashMap<>(NUM_DESTINATIONS);
+    for (int i = 0; i < NUM_DESTINATIONS; i++) {
+      // The updated schema includes all fields in the original schema plus a random new field
+      List<String> fieldNamesWithExtra = new ArrayList<String>(fieldNamesOrigin);
+      String extraField =
+          fieldNamesOrigin.get(randomGenerator.nextInt(fieldNamesOrigin.size())) + "_EXTRA";
+      fieldNamesWithExtra.add(extraField);
+      TableSchema updatedSchema =
+          makeTableSchemaFromTypes(fieldNamesWithExtra, ImmutableSet.of(extraField));
+      GenerateRowFunc generateRowFunc = new GenerateRowFunc(fieldNamesOrigin, fieldNamesWithExtra);
+
+      String tableId = createTable(bqTableSchema, "_dynamic_" + i);
+      String tableSpec = PROJECT + ":" + BIG_QUERY_DATASET_ID + "." + tableId;
+
+      rowFuncs.put((long) i, generateRowFunc);
+      destinations.put((long) i, tableSpec);
+      updatedSchemas.put(tableId, updatedSchema);
+      extraFields.put(tableSpec, extraField);
+    }
+
+    // build write transform
+    Write<TableRow> write =
+        BigQueryIO.writeTableRows()
+            .to(
+                row -> {
+                  long l = (int) row.getValue().get("id") % NUM_DESTINATIONS;
+                  String destination = destinations.get(l);
+                  return new TableDestination(destination, null);
+                })
+            .withAutoSchemaUpdate(true)
+            .ignoreUnknownValues()
+            .withMethod(Write.Method.STORAGE_API_AT_LEAST_ONCE)
+            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
+            .withWriteDisposition(WriteDisposition.WRITE_APPEND);
+    if (useInputSchema) {
+      write = write.withSchema(inputSchema);
+    }
+    if (!useAtLeastOnce) {
+      write =
+          write
+              .withMethod(Write.Method.STORAGE_WRITE_API)
+              .withTriggeringFrequency(Duration.standardSeconds(1));
+    }
+
+    int numRows = TOTAL_N;
+    // set up and build pipeline
+    Instant start = new Instant(0);
+    // We give a healthy waiting period between each element to give Storage API streams a chance to
+    // recognize the new schema. Apply on relevant tests.
+    Duration interval = changeTableSchema ? Duration.standardSeconds(1) : Duration.millis(1);
+    Duration stop =
+        changeTableSchema ? Duration.standardSeconds(numRows - 1) : Duration.millis(numRows - 1);
+    Function<Instant, Long> getIdFromInstant =
+        changeTableSchema
+            ? (Function<Instant, Long> & Serializable)
+                (Instant instant) -> instant.getMillis() / 1000
+            : (Function<Instant, Long> & Serializable) Instant::getMillis;
+
+    // Generates rows with original schema up for row IDs under ORIGINAL_N
+    // Then generates rows with updated schema for the rest
+    // Rows with updated schema should only reach the table if ignoreUnknownValues is set,
+    // and the extra field should be present only when autoSchemaUpdate is set
+    PCollection<Instant> instants =
+        p.apply(
+            "Generate Instants",
+            PeriodicImpulse.create()
+                .startAt(start)
+                .stopAt(start.plus(stop))
+                .withInterval(interval)
+                .catchUpToNow(false));
+    PCollection<TableRow> rows =
+        instants.apply(
+            "Create TableRows",
+            MapElements.into(TypeDescriptor.of(TableRow.class))
+                .via(
+                    instant -> {
+                      long rowId = getIdFromInstant.apply(instant);
+                      long dest = rowId % NUM_DESTINATIONS;
+                      return rowFuncs.get(dest).apply(rowId);
+                    }));
+    if (changeTableSchema) {
+      rows =
+          rows
+              // UpdateSchemaDoFn uses state, so need to have a KV input
+              .apply("Add a dummy key", WithKeys.of(1))
+              .apply(
+                  "Update Schema",
+                  ParDo.of(new UpdateSchemaDoFn(PROJECT, BIG_QUERY_DATASET_ID, updatedSchemas)));
+    }
+
+    WriteResult result = rows.apply("Stream to BigQuery", write);
+    // We ignore the extra fields, so no rows should have been sent to DLQ
+    PAssert.that("Check DLQ is empty", result.getFailedStorageApiInserts()).empty();
+    p.run().waitUntilFinish();
+
+    Map<String, Integer> expectedCounts = new HashMap<>(NUM_DESTINATIONS);
+    for (int i = 0; i < numRows; i++) {
+      long mod = i % NUM_DESTINATIONS;
+      String destination = destinations.get(mod);
+      expectedCounts.merge(destination, 1, Integer::sum);
+    }
+
+    for (Map.Entry<String, Integer> expectedCount : expectedCounts.entrySet()) {
+      String dest = expectedCount.getKey();
+      checkRowCompleteness(dest, expectedCount.getValue(), true);
+      checkRowsWithUpdatedSchema(dest, extraFields.get(dest), true);
+    }
+  }
+
+  @Test
+  public void testExactlyOnceDynamicDestinationsWithAutoSchemaUpdate() throws Exception {
+    runDynamicDestinationsWithAutoSchemaUpdate(false);
+  }
+
+  @Test
+  public void testAtLeastOnceDynamicDestinationsWithAutoSchemaUpdate() throws Exception {
+    runDynamicDestinationsWithAutoSchemaUpdate(true);
+  }
 }
-- 
GitLab