Skip to content
代码片段 群组 项目
未验证 提交 825363b8 编辑于 作者: Michal Walenia's avatar Michal Walenia 提交者: GitHub
浏览文件

[BEAM-9147] Add a VideoIntelligence transform to Java SDK (#11261)

* [BEAM-9147] Add Google Cloud AI VideoIntelligence integration transform
上级 a2766fb8
No related branches found
No related tags found
无相关合并请求
......@@ -107,7 +107,8 @@ rat {
"learning/katas/*/IO/**/*.txt",
// Mockito extensions
"sdks/java/io/amazon-web-services2/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker"
"sdks/java/io/amazon-web-services2/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker",
"sdks/java/extensions/ml/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker"
]
// Add .gitignore excludes to the Apache Rat exclusion list. We re-create the behavior
......
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
plugins { id 'org.apache.beam.module' }
applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.extensions.protobuf')
description = 'Apache Beam :: SDKs :: Java :: Extensions :: ML'
dependencies {
compile project(path: ":sdks:java:core", configuration: "shadow")
compile project(":sdks:java:expansion-service")
testCompile project(path: ':sdks:java:core', configuration: 'shadowTest')
compile 'com.google.cloud:google-cloud-video-intelligence:1.2.0'
testCompile library.java.mockito_core
testCompile 'com.google.cloud:google-cloud-video-intelligence:1.2.0'
testCompile library.java.junit
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
testRuntimeOnly project(":runners:google-cloud-dataflow-java")
}
project.test {
include "**/**IT.class"
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.ml;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.videointelligence.v1.AnnotateVideoProgress;
import com.google.cloud.videointelligence.v1.AnnotateVideoRequest;
import com.google.cloud.videointelligence.v1.AnnotateVideoResponse;
import com.google.cloud.videointelligence.v1.Feature;
import com.google.cloud.videointelligence.v1.VideoAnnotationResults;
import com.google.cloud.videointelligence.v1.VideoContext;
import com.google.cloud.videointelligence.v1.VideoIntelligenceServiceClient;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.PCollectionView;
/**
* Base class for Video Intelligence transform.
*
* @param <T> Class of input data being passed in - either ByteString - video data encoded into.
* String or String - a GCS URI of the video to be annotated.
*/
public abstract class AnnotateVideo<T> extends DoFn<T, List<VideoAnnotationResults>> {
protected final PCollectionView<Map<T, VideoContext>> contextSideInput;
protected final List<Feature> featureList;
VideoIntelligenceServiceClient videoIntelligenceServiceClient;
public AnnotateVideo(
PCollectionView<Map<T, VideoContext>> contextSideInput, List<Feature> featureList) {
this.contextSideInput = contextSideInput;
this.featureList = featureList;
}
public AnnotateVideo(List<Feature> featureList) {
contextSideInput = null;
this.featureList = featureList;
}
@StartBundle
public void startBundle() throws IOException {
videoIntelligenceServiceClient = VideoIntelligenceServiceClient.create();
}
@Teardown
public void teardown() {
videoIntelligenceServiceClient.close();
}
/**
* Call the Video Intelligence Cloud AI service and return annotation results.
*
* @param elementURI This or elementContents is required. GCS address of video to be annotated
* @param elementContents this or elementURI is required. Hex-encoded contents of video to be
* annotated
* @param videoContext Optional context for video annotation.
* @return
*/
List<VideoAnnotationResults> getVideoAnnotationResults(
String elementURI, ByteString elementContents, VideoContext videoContext)
throws InterruptedException, ExecutionException {
AnnotateVideoRequest.Builder requestBuilder =
AnnotateVideoRequest.newBuilder().addAllFeatures(featureList);
if (elementURI != null) {
requestBuilder.setInputUri(elementURI);
} else if (elementContents != null) {
requestBuilder.setInputContent(elementContents);
} else {
throw new IllegalArgumentException("Either elementURI or elementContents should be non-null");
}
if (videoContext != null) {
requestBuilder.setVideoContext(videoContext);
}
AnnotateVideoRequest annotateVideoRequest = requestBuilder.build();
OperationFuture<AnnotateVideoResponse, AnnotateVideoProgress> annotateVideoAsync =
videoIntelligenceServiceClient.annotateVideoAsync(annotateVideoRequest);
return annotateVideoAsync.get().getAnnotationResultsList();
}
/** Process element implementation required. */
@ProcessElement
public abstract void processElement(ProcessContext context)
throws ExecutionException, InterruptedException;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.ml;
import com.google.cloud.videointelligence.v1.Feature;
import com.google.cloud.videointelligence.v1.VideoAnnotationResults;
import com.google.cloud.videointelligence.v1.VideoContext;
import com.google.protobuf.ByteString;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
/**
* Factory class for AnnotateVideo subclasses. allows integration with Google Cloud AI -
* VideoIntelligence service. Converts GCS URIs of videos or ByteStrings with video contents into
* Lists of VideoAnnotationResults.
*
* <p>Adding a side input of Maps of elements to VideoContext objects is allowed, so is using KVs of
* element and VideoContext as input.
*
* <p>Service account with proper permissions is required to use these transforms.
*/
public class VideoIntelligence {
/**
* Annotates videos from GCS URIs.
*
* @param featureList List of features to be annotated
* @param contextSideInput Optional side input with map of contexts to URIs
* @return DoFn performing the necessary operations
*/
public static AnnotateVideoFromURI annotateFromURI(
List<Feature> featureList, PCollectionView<Map<String, VideoContext>> contextSideInput) {
return new AnnotateVideoFromURI(contextSideInput, featureList);
}
/**
* Annotates videos from ByteStrings of their contents.
*
* @param featureList List of features to be annotated
* @param contextSideInput Optional side input with map of contexts to ByteStrings
* @return DoFn performing the necessary operations
*/
public static AnnotateVideoFromBytes annotateFromBytes(
PCollectionView<Map<ByteString, VideoContext>> contextSideInput, List<Feature> featureList) {
return new AnnotateVideoFromBytes(contextSideInput, featureList);
}
/**
* Annotates videos from key-value pairs of GCS URI and VideoContext.
*
* @param featureList List of features to be annotated
* @return DoFn performing the necessary operations
*/
public static AnnotateVideoURIWithContext annotateFromUriWithContext(List<Feature> featureList) {
return new AnnotateVideoURIWithContext(featureList);
}
/**
* Annotates videos from key-value pairs of ByteStrings and VideoContext.
*
* @param featureList List of features to be annotated
* @return DoFn performing the necessary operations
*/
public static AnnotateVideoBytesWithContext annotateFromBytesWithContext(
List<Feature> featureList) {
return new AnnotateVideoBytesWithContext(featureList);
}
/**
* Implementation of AnnotateVideo accepting Strings as contents of input PCollection. Annotates
* videos found on GCS based on URIs from input PCollection.
*/
public static class AnnotateVideoFromURI extends AnnotateVideo<String> {
public AnnotateVideoFromURI(
PCollectionView<Map<String, VideoContext>> contextSideInput, List<Feature> featureList) {
super(contextSideInput, featureList);
}
/** ProcessElement implementation. */
@Override
public void processElement(ProcessContext context)
throws ExecutionException, InterruptedException {
String elementURI = context.element();
VideoContext videoContext = null;
if (contextSideInput != null) {
videoContext = context.sideInput(contextSideInput).get(elementURI);
}
List<VideoAnnotationResults> annotationResultsList =
getVideoAnnotationResults(elementURI, null, videoContext);
context.output(annotationResultsList);
}
}
/**
* Implementation of AnnotateVideo accepting ByteStrings as contents of input PCollection. Videos
* decoded from the ByteStrings are annotated.
*/
public static class AnnotateVideoFromBytes extends AnnotateVideo<ByteString> {
public AnnotateVideoFromBytes(
PCollectionView<Map<ByteString, VideoContext>> contextSideInput,
List<Feature> featureList) {
super(contextSideInput, featureList);
}
/** Implementation of ProcessElement. */
@Override
public void processElement(ProcessContext context)
throws ExecutionException, InterruptedException {
ByteString element = context.element();
VideoContext videoContext = null;
if (contextSideInput != null) {
videoContext = context.sideInput(contextSideInput).get(element);
}
List<VideoAnnotationResults> videoAnnotationResults =
getVideoAnnotationResults(null, element, videoContext);
context.output(videoAnnotationResults);
}
}
/**
* Implementation of AnnotateVideo accepting KVs as contents of input PCollection. Keys are the
* GCS URIs, values - VideoContext objects.
*/
public static class AnnotateVideoURIWithContext extends AnnotateVideo<KV<String, VideoContext>> {
public AnnotateVideoURIWithContext(List<Feature> featureList) {
super(featureList);
}
/** ProcessElement implementation. */
@Override
public void processElement(ProcessContext context)
throws ExecutionException, InterruptedException {
String elementURI = context.element().getKey();
VideoContext videoContext = context.element().getValue();
List<VideoAnnotationResults> videoAnnotationResults =
getVideoAnnotationResults(elementURI, null, videoContext);
context.output(videoAnnotationResults);
}
}
/**
* Implementation of AnnotateVideo accepting KVs as contents of input PCollection. Keys are the
* ByteString encoded video contents, values - VideoContext objects.
*/
public static class AnnotateVideoBytesWithContext
extends AnnotateVideo<KV<ByteString, VideoContext>> {
public AnnotateVideoBytesWithContext(List<Feature> featureList) {
super(featureList);
}
/** ProcessElement implementation. */
@Override
public void processElement(ProcessContext context)
throws ExecutionException, InterruptedException {
ByteString element = context.element().getKey();
VideoContext videoContext = context.element().getValue();
List<VideoAnnotationResults> videoAnnotationResults =
getVideoAnnotationResults(null, element, videoContext);
context.output(videoAnnotationResults);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/** Provides DoFns for integration with Google Cloud AI Video Intelligence service. */
package org.apache.beam.sdk.extensions.ml;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.ml;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.videointelligence.v1.AnnotateVideoProgress;
import com.google.cloud.videointelligence.v1.AnnotateVideoResponse;
import com.google.cloud.videointelligence.v1.Feature;
import com.google.cloud.videointelligence.v1.VideoAnnotationResults;
import com.google.cloud.videointelligence.v1.VideoIntelligenceServiceClient;
import com.google.protobuf.ByteString;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class AnnotateVideoTest {
private static final String TEST_URI = "fake_uri";
private static final ByteString TEST_BYTES = ByteString.copyFromUtf8("12345");
@Mock private VideoIntelligenceServiceClient serviceClient;
@Mock private OperationFuture<AnnotateVideoResponse, AnnotateVideoProgress> future;
@Mock private AnnotateVideoResponse response;
@Test
public void shouldReturnAListOfAnnotations() throws ExecutionException, InterruptedException {
when(response.getAnnotationResultsList())
.thenReturn(Collections.singletonList(VideoAnnotationResults.newBuilder().build()));
when(future.get()).thenReturn(response);
when(serviceClient.annotateVideoAsync(any())).thenReturn(future);
VideoIntelligence.AnnotateVideoFromBytes annotateVideoFromBytes =
VideoIntelligence.annotateFromBytes(
null, Collections.singletonList(Feature.LABEL_DETECTION));
annotateVideoFromBytes.videoIntelligenceServiceClient = serviceClient;
List<VideoAnnotationResults> videoAnnotationResults =
annotateVideoFromBytes.getVideoAnnotationResults(TEST_URI, null, null);
assertEquals(1, videoAnnotationResults.size());
}
@Test(expected = IllegalArgumentException.class)
public void shouldThrowErrorWhenBothInputTypesNull()
throws ExecutionException, InterruptedException {
VideoIntelligence.AnnotateVideoFromBytes annotateVideoFromBytes =
VideoIntelligence.annotateFromBytes(
null, Collections.singletonList(Feature.LABEL_DETECTION));
annotateVideoFromBytes.getVideoAnnotationResults(null, null, null);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.ml;
import static org.apache.beam.sdk.extensions.ml.VideoIntelligence.annotateFromURI;
import static org.junit.Assert.assertEquals;
import com.google.cloud.videointelligence.v1.Feature;
import com.google.cloud.videointelligence.v1.VideoAnnotationResults;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class VideoIntelligenceIT {
@Rule public TestPipeline testPipeline = TestPipeline.create();
private static final String VIDEO_URI =
"gs://apache-beam-samples/advanced_analytics/video/gbikes_dinosaur.mp4";
private List<Feature> featureList = Collections.singletonList(Feature.LABEL_DETECTION);
@Test
public void annotateVideoFromURINoContext() {
PCollection<List<VideoAnnotationResults>> annotationResults =
testPipeline
.apply(Create.of(VIDEO_URI))
.apply("Annotate video", ParDo.of(annotateFromURI(featureList, null)));
PAssert.that(annotationResults).satisfies(new VerifyVideoAnnotationResult());
testPipeline.run().waitUntilFinish();
}
private static class VerifyVideoAnnotationResult
implements SerializableFunction<Iterable<List<VideoAnnotationResults>>, Void> {
@Override
public Void apply(Iterable<List<VideoAnnotationResults>> input) {
List<Boolean> labelEvaluations = new ArrayList<>();
input.forEach(findStringMatchesInVideoAnnotationResultList(labelEvaluations, "dinosaur"));
assertEquals(Boolean.TRUE, labelEvaluations.contains(Boolean.TRUE));
return null;
}
private Consumer<List<VideoAnnotationResults>> findStringMatchesInVideoAnnotationResultList(
List<Boolean> labelEvaluations, String toMatch) {
return videoAnnotationResults ->
labelEvaluations.add(
videoAnnotationResults.stream()
.anyMatch(result -> entityWithDescriptionFoundInSegmentLabels(toMatch, result)));
}
private boolean entityWithDescriptionFoundInSegmentLabels(
String toMatch, VideoAnnotationResults result) {
return result.getSegmentLabelAnnotationsList().stream()
.anyMatch(
labelAnnotation -> labelAnnotation.getEntity().getDescription().equals(toMatch));
}
}
}
mock-maker-inline
......@@ -71,6 +71,7 @@ include ":sdks:java:extensions:kryo"
include ":sdks:java:extensions:google-cloud-platform-core"
include ":sdks:java:extensions:jackson"
include ":sdks:java:extensions:join-library"
include ":sdks:java:extensions:ml"
include ":sdks:java:extensions:protobuf"
include ":sdks:java:extensions:sketching"
include ":sdks:java:extensions:sorter"
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册