From 825363b865b2c64ea745532436460e1702376968 Mon Sep 17 00:00:00 2001
From: Michal Walenia <32354134+mwalenia@users.noreply.github.com>
Date: Wed, 8 Apr 2020 16:10:36 +0200
Subject: [PATCH] [BEAM-9147] Add a VideoIntelligence transform to Java SDK
 (#11261)

* [BEAM-9147] Add Google Cloud AI VideoIntelligence integration transform
---
 build.gradle                                  |   3 +-
 sdks/java/extensions/ml/build.gradle          |  40 ++++
 .../beam/sdk/extensions/ml/AnnotateVideo.java | 103 ++++++++++
 .../sdk/extensions/ml/VideoIntelligence.java  | 184 ++++++++++++++++++
 .../beam/sdk/extensions/ml/package-info.java  |  19 ++
 .../sdk/extensions/ml/AnnotateVideoTest.java  |  73 +++++++
 .../extensions/ml/VideoIntelligenceIT.java    |  83 ++++++++
 .../org.mockito.plugins.MockMaker             |   1 +
 settings.gradle                               |   1 +
 9 files changed, 506 insertions(+), 1 deletion(-)
 create mode 100644 sdks/java/extensions/ml/build.gradle
 create mode 100644 sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java
 create mode 100644 sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java
 create mode 100644 sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/package-info.java
 create mode 100644 sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java
 create mode 100644 sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java
 create mode 100644 sdks/java/extensions/ml/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker

diff --git a/build.gradle b/build.gradle
index 14d7104d25d..043f8d9dc92 100644
--- a/build.gradle
+++ b/build.gradle
@@ -107,7 +107,8 @@ rat {
     "learning/katas/*/IO/**/*.txt",
 
     // Mockito extensions
-    "sdks/java/io/amazon-web-services2/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker"
+    "sdks/java/io/amazon-web-services2/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker",
+    "sdks/java/extensions/ml/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker"
   ]
 
   // Add .gitignore excludes to the Apache Rat exclusion list. We re-create the behavior
diff --git a/sdks/java/extensions/ml/build.gradle b/sdks/java/extensions/ml/build.gradle
new file mode 100644
index 00000000000..274c0747e00
--- /dev/null
+++ b/sdks/java/extensions/ml/build.gradle
@@ -0,0 +1,40 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.extensions.protobuf')
+
+description = 'Apache Beam :: SDKs :: Java :: Extensions :: ML'
+
+dependencies {
+    compile project(path: ":sdks:java:core", configuration: "shadow")
+    compile project(":sdks:java:expansion-service")
+    testCompile project(path: ':sdks:java:core', configuration: 'shadowTest')
+    compile 'com.google.cloud:google-cloud-video-intelligence:1.2.0'
+    testCompile library.java.mockito_core
+    testCompile 'com.google.cloud:google-cloud-video-intelligence:1.2.0'
+    testCompile library.java.junit
+    testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
+    testRuntimeOnly project(":runners:google-cloud-dataflow-java")
+}
+
+project.test {
+    include "**/**IT.class"
+}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java
new file mode 100644
index 00000000000..56e863843ee
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.api.gax.longrunning.OperationFuture;
+import com.google.cloud.videointelligence.v1.AnnotateVideoProgress;
+import com.google.cloud.videointelligence.v1.AnnotateVideoRequest;
+import com.google.cloud.videointelligence.v1.AnnotateVideoResponse;
+import com.google.cloud.videointelligence.v1.Feature;
+import com.google.cloud.videointelligence.v1.VideoAnnotationResults;
+import com.google.cloud.videointelligence.v1.VideoContext;
+import com.google.cloud.videointelligence.v1.VideoIntelligenceServiceClient;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Base class for Video Intelligence transform.
+ *
+ * @param <T> Class of input data being passed in - either ByteString - video data encoded into.
+ *     String or String - a GCS URI of the video to be annotated.
+ */
+public abstract class AnnotateVideo<T> extends DoFn<T, List<VideoAnnotationResults>> {
+
+  protected final PCollectionView<Map<T, VideoContext>> contextSideInput;
+  protected final List<Feature> featureList;
+  VideoIntelligenceServiceClient videoIntelligenceServiceClient;
+
+  public AnnotateVideo(
+      PCollectionView<Map<T, VideoContext>> contextSideInput, List<Feature> featureList) {
+    this.contextSideInput = contextSideInput;
+    this.featureList = featureList;
+  }
+
+  public AnnotateVideo(List<Feature> featureList) {
+    contextSideInput = null;
+    this.featureList = featureList;
+  }
+
+  @StartBundle
+  public void startBundle() throws IOException {
+    videoIntelligenceServiceClient = VideoIntelligenceServiceClient.create();
+  }
+
+  @Teardown
+  public void teardown() {
+    videoIntelligenceServiceClient.close();
+  }
+
+  /**
+   * Call the Video Intelligence Cloud AI service and return annotation results.
+   *
+   * @param elementURI This or elementContents is required. GCS address of video to be annotated
+   * @param elementContents this or elementURI is required. Hex-encoded contents of video to be
+   *     annotated
+   * @param videoContext Optional context for video annotation.
+   * @return
+   */
+  List<VideoAnnotationResults> getVideoAnnotationResults(
+      String elementURI, ByteString elementContents, VideoContext videoContext)
+      throws InterruptedException, ExecutionException {
+    AnnotateVideoRequest.Builder requestBuilder =
+        AnnotateVideoRequest.newBuilder().addAllFeatures(featureList);
+    if (elementURI != null) {
+      requestBuilder.setInputUri(elementURI);
+    } else if (elementContents != null) {
+      requestBuilder.setInputContent(elementContents);
+    } else {
+      throw new IllegalArgumentException("Either elementURI or elementContents should be non-null");
+    }
+    if (videoContext != null) {
+      requestBuilder.setVideoContext(videoContext);
+    }
+    AnnotateVideoRequest annotateVideoRequest = requestBuilder.build();
+    OperationFuture<AnnotateVideoResponse, AnnotateVideoProgress> annotateVideoAsync =
+        videoIntelligenceServiceClient.annotateVideoAsync(annotateVideoRequest);
+    return annotateVideoAsync.get().getAnnotationResultsList();
+  }
+
+  /** Process element implementation required. */
+  @ProcessElement
+  public abstract void processElement(ProcessContext context)
+      throws ExecutionException, InterruptedException;
+}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java
new file mode 100644
index 00000000000..0f447da6498
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.cloud.videointelligence.v1.Feature;
+import com.google.cloud.videointelligence.v1.VideoAnnotationResults;
+import com.google.cloud.videointelligence.v1.VideoContext;
+import com.google.protobuf.ByteString;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Factory class for AnnotateVideo subclasses. allows integration with Google Cloud AI -
+ * VideoIntelligence service. Converts GCS URIs of videos or ByteStrings with video contents into
+ * Lists of VideoAnnotationResults.
+ *
+ * <p>Adding a side input of Maps of elements to VideoContext objects is allowed, so is using KVs of
+ * element and VideoContext as input.
+ *
+ * <p>Service account with proper permissions is required to use these transforms.
+ */
+public class VideoIntelligence {
+
+  /**
+   * Annotates videos from GCS URIs.
+   *
+   * @param featureList List of features to be annotated
+   * @param contextSideInput Optional side input with map of contexts to URIs
+   * @return DoFn performing the necessary operations
+   */
+  public static AnnotateVideoFromURI annotateFromURI(
+      List<Feature> featureList, PCollectionView<Map<String, VideoContext>> contextSideInput) {
+    return new AnnotateVideoFromURI(contextSideInput, featureList);
+  }
+
+  /**
+   * Annotates videos from ByteStrings of their contents.
+   *
+   * @param featureList List of features to be annotated
+   * @param contextSideInput Optional side input with map of contexts to ByteStrings
+   * @return DoFn performing the necessary operations
+   */
+  public static AnnotateVideoFromBytes annotateFromBytes(
+      PCollectionView<Map<ByteString, VideoContext>> contextSideInput, List<Feature> featureList) {
+    return new AnnotateVideoFromBytes(contextSideInput, featureList);
+  }
+
+  /**
+   * Annotates videos from key-value pairs of GCS URI and VideoContext.
+   *
+   * @param featureList List of features to be annotated
+   * @return DoFn performing the necessary operations
+   */
+  public static AnnotateVideoURIWithContext annotateFromUriWithContext(List<Feature> featureList) {
+    return new AnnotateVideoURIWithContext(featureList);
+  }
+
+  /**
+   * Annotates videos from key-value pairs of ByteStrings and VideoContext.
+   *
+   * @param featureList List of features to be annotated
+   * @return DoFn performing the necessary operations
+   */
+  public static AnnotateVideoBytesWithContext annotateFromBytesWithContext(
+      List<Feature> featureList) {
+    return new AnnotateVideoBytesWithContext(featureList);
+  }
+
+  /**
+   * Implementation of AnnotateVideo accepting Strings as contents of input PCollection. Annotates
+   * videos found on GCS based on URIs from input PCollection.
+   */
+  public static class AnnotateVideoFromURI extends AnnotateVideo<String> {
+
+    public AnnotateVideoFromURI(
+        PCollectionView<Map<String, VideoContext>> contextSideInput, List<Feature> featureList) {
+      super(contextSideInput, featureList);
+    }
+
+    /** ProcessElement implementation. */
+    @Override
+    public void processElement(ProcessContext context)
+        throws ExecutionException, InterruptedException {
+      String elementURI = context.element();
+      VideoContext videoContext = null;
+      if (contextSideInput != null) {
+        videoContext = context.sideInput(contextSideInput).get(elementURI);
+      }
+      List<VideoAnnotationResults> annotationResultsList =
+          getVideoAnnotationResults(elementURI, null, videoContext);
+      context.output(annotationResultsList);
+    }
+  }
+
+  /**
+   * Implementation of AnnotateVideo accepting ByteStrings as contents of input PCollection. Videos
+   * decoded from the ByteStrings are annotated.
+   */
+  public static class AnnotateVideoFromBytes extends AnnotateVideo<ByteString> {
+
+    public AnnotateVideoFromBytes(
+        PCollectionView<Map<ByteString, VideoContext>> contextSideInput,
+        List<Feature> featureList) {
+      super(contextSideInput, featureList);
+    }
+
+    /** Implementation of ProcessElement. */
+    @Override
+    public void processElement(ProcessContext context)
+        throws ExecutionException, InterruptedException {
+      ByteString element = context.element();
+      VideoContext videoContext = null;
+      if (contextSideInput != null) {
+        videoContext = context.sideInput(contextSideInput).get(element);
+      }
+      List<VideoAnnotationResults> videoAnnotationResults =
+          getVideoAnnotationResults(null, element, videoContext);
+      context.output(videoAnnotationResults);
+    }
+  }
+
+  /**
+   * Implementation of AnnotateVideo accepting KVs as contents of input PCollection. Keys are the
+   * GCS URIs, values - VideoContext objects.
+   */
+  public static class AnnotateVideoURIWithContext extends AnnotateVideo<KV<String, VideoContext>> {
+
+    public AnnotateVideoURIWithContext(List<Feature> featureList) {
+      super(featureList);
+    }
+
+    /** ProcessElement implementation. */
+    @Override
+    public void processElement(ProcessContext context)
+        throws ExecutionException, InterruptedException {
+      String elementURI = context.element().getKey();
+      VideoContext videoContext = context.element().getValue();
+      List<VideoAnnotationResults> videoAnnotationResults =
+          getVideoAnnotationResults(elementURI, null, videoContext);
+      context.output(videoAnnotationResults);
+    }
+  }
+
+  /**
+   * Implementation of AnnotateVideo accepting KVs as contents of input PCollection. Keys are the
+   * ByteString encoded video contents, values - VideoContext objects.
+   */
+  public static class AnnotateVideoBytesWithContext
+      extends AnnotateVideo<KV<ByteString, VideoContext>> {
+
+    public AnnotateVideoBytesWithContext(List<Feature> featureList) {
+      super(featureList);
+    }
+
+    /** ProcessElement implementation. */
+    @Override
+    public void processElement(ProcessContext context)
+        throws ExecutionException, InterruptedException {
+      ByteString element = context.element().getKey();
+      VideoContext videoContext = context.element().getValue();
+      List<VideoAnnotationResults> videoAnnotationResults =
+          getVideoAnnotationResults(null, element, videoContext);
+      context.output(videoAnnotationResults);
+    }
+  }
+}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/package-info.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/package-info.java
new file mode 100644
index 00000000000..ad5216dd8cc
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/** Provides DoFns for integration with Google Cloud AI Video Intelligence service. */
+package org.apache.beam.sdk.extensions.ml;
diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java
new file mode 100644
index 00000000000..57400e48d06
--- /dev/null
+++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+import com.google.api.gax.longrunning.OperationFuture;
+import com.google.cloud.videointelligence.v1.AnnotateVideoProgress;
+import com.google.cloud.videointelligence.v1.AnnotateVideoResponse;
+import com.google.cloud.videointelligence.v1.Feature;
+import com.google.cloud.videointelligence.v1.VideoAnnotationResults;
+import com.google.cloud.videointelligence.v1.VideoIntelligenceServiceClient;
+import com.google.protobuf.ByteString;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class AnnotateVideoTest {
+
+  private static final String TEST_URI = "fake_uri";
+  private static final ByteString TEST_BYTES = ByteString.copyFromUtf8("12345");
+
+  @Mock private VideoIntelligenceServiceClient serviceClient;
+  @Mock private OperationFuture<AnnotateVideoResponse, AnnotateVideoProgress> future;
+  @Mock private AnnotateVideoResponse response;
+
+  @Test
+  public void shouldReturnAListOfAnnotations() throws ExecutionException, InterruptedException {
+    when(response.getAnnotationResultsList())
+        .thenReturn(Collections.singletonList(VideoAnnotationResults.newBuilder().build()));
+    when(future.get()).thenReturn(response);
+    when(serviceClient.annotateVideoAsync(any())).thenReturn(future);
+    VideoIntelligence.AnnotateVideoFromBytes annotateVideoFromBytes =
+        VideoIntelligence.annotateFromBytes(
+            null, Collections.singletonList(Feature.LABEL_DETECTION));
+
+    annotateVideoFromBytes.videoIntelligenceServiceClient = serviceClient;
+    List<VideoAnnotationResults> videoAnnotationResults =
+        annotateVideoFromBytes.getVideoAnnotationResults(TEST_URI, null, null);
+    assertEquals(1, videoAnnotationResults.size());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void shouldThrowErrorWhenBothInputTypesNull()
+      throws ExecutionException, InterruptedException {
+    VideoIntelligence.AnnotateVideoFromBytes annotateVideoFromBytes =
+        VideoIntelligence.annotateFromBytes(
+            null, Collections.singletonList(Feature.LABEL_DETECTION));
+    annotateVideoFromBytes.getVideoAnnotationResults(null, null, null);
+  }
+}
diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java
new file mode 100644
index 00000000000..b0b74eeccc2
--- /dev/null
+++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import static org.apache.beam.sdk.extensions.ml.VideoIntelligence.annotateFromURI;
+import static org.junit.Assert.assertEquals;
+
+import com.google.cloud.videointelligence.v1.Feature;
+import com.google.cloud.videointelligence.v1.VideoAnnotationResults;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class VideoIntelligenceIT {
+  @Rule public TestPipeline testPipeline = TestPipeline.create();
+  private static final String VIDEO_URI =
+      "gs://apache-beam-samples/advanced_analytics/video/gbikes_dinosaur.mp4";
+  private List<Feature> featureList = Collections.singletonList(Feature.LABEL_DETECTION);
+
+  @Test
+  public void annotateVideoFromURINoContext() {
+    PCollection<List<VideoAnnotationResults>> annotationResults =
+        testPipeline
+            .apply(Create.of(VIDEO_URI))
+            .apply("Annotate video", ParDo.of(annotateFromURI(featureList, null)));
+    PAssert.that(annotationResults).satisfies(new VerifyVideoAnnotationResult());
+    testPipeline.run().waitUntilFinish();
+  }
+
+  private static class VerifyVideoAnnotationResult
+      implements SerializableFunction<Iterable<List<VideoAnnotationResults>>, Void> {
+
+    @Override
+    public Void apply(Iterable<List<VideoAnnotationResults>> input) {
+      List<Boolean> labelEvaluations = new ArrayList<>();
+      input.forEach(findStringMatchesInVideoAnnotationResultList(labelEvaluations, "dinosaur"));
+      assertEquals(Boolean.TRUE, labelEvaluations.contains(Boolean.TRUE));
+      return null;
+    }
+
+    private Consumer<List<VideoAnnotationResults>> findStringMatchesInVideoAnnotationResultList(
+        List<Boolean> labelEvaluations, String toMatch) {
+      return videoAnnotationResults ->
+          labelEvaluations.add(
+              videoAnnotationResults.stream()
+                  .anyMatch(result -> entityWithDescriptionFoundInSegmentLabels(toMatch, result)));
+    }
+
+    private boolean entityWithDescriptionFoundInSegmentLabels(
+        String toMatch, VideoAnnotationResults result) {
+      return result.getSegmentLabelAnnotationsList().stream()
+          .anyMatch(
+              labelAnnotation -> labelAnnotation.getEntity().getDescription().equals(toMatch));
+    }
+  }
+}
diff --git a/sdks/java/extensions/ml/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/sdks/java/extensions/ml/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
new file mode 100644
index 00000000000..1f0955d450f
--- /dev/null
+++ b/sdks/java/extensions/ml/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
@@ -0,0 +1 @@
+mock-maker-inline
diff --git a/settings.gradle b/settings.gradle
index a25f3570df0..ea18ec490a0 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -71,6 +71,7 @@ include ":sdks:java:extensions:kryo"
 include ":sdks:java:extensions:google-cloud-platform-core"
 include ":sdks:java:extensions:jackson"
 include ":sdks:java:extensions:join-library"
+include ":sdks:java:extensions:ml"
 include ":sdks:java:extensions:protobuf"
 include ":sdks:java:extensions:sketching"
 include ":sdks:java:extensions:sorter"
-- 
GitLab