Skip to content
代码片段 群组 项目
未验证 提交 a05aa45d 编辑于 作者: Etienne Chauchot's avatar Etienne Chauchot 提交者: GitHub
浏览文件

Merge pull request #15381 from egalpin/BEAM-10990-elasticsearch-response-filtering

Merge pull request #15381: [BEAM-10990] Elasticsearch response filtering and [BEAM-5172] Tries to reduce ES UTest flakiness
No related branches found
No related tags found
无相关合并请求
显示
647 个添加111 个删除
......@@ -39,6 +39,10 @@ configurations.all {
}
}
test {
maxParallelForks = 1
}
dependencies {
testCompile project(path: ":sdks:java:io:elasticsearch-tests:elasticsearch-tests-common", configuration: "testRuntime")
testCompile "org.elasticsearch.test:framework:$elastic_search_version"
......
......@@ -33,7 +33,6 @@ import org.elasticsearch.client.RestClient;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
......@@ -84,7 +83,6 @@ public class ElasticsearchIOTest implements Serializable {
@Rule public TestPipeline pipeline = TestPipeline.create();
@Ignore("https://issues.apache.org/jira/browse/BEAM-5172")
@Test
public void testSizes() throws Exception {
// need to create the index using the helper method (not create it at first insertion)
......@@ -134,6 +132,18 @@ public class ElasticsearchIOTest implements Serializable {
elasticsearchIOTestCommon.testWriteWithErrors();
}
@Test
public void testWriteWithErrorsReturned() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithErrorsReturned();
}
@Test
public void testWriteWithErrorsReturnedAllowedErrors() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithErrorsReturnedAllowedErrors();
}
@Test
public void testWriteWithMaxBatchSize() throws Exception {
elasticsearchIOTestCommon.testWriteWithMaxBatchSize();
......@@ -144,7 +154,6 @@ public class ElasticsearchIOTest implements Serializable {
elasticsearchIOTestCommon.testWriteWithMaxBatchSizeBytes();
}
@Ignore("https://issues.apache.org/jira/browse/BEAM-5172")
@Test
public void testSplit() throws Exception {
// need to create the index using the helper method (not create it at first insertion)
......@@ -254,4 +263,15 @@ public class ElasticsearchIOTest implements Serializable {
elasticsearchIOTestCommon.testWriteWithIsDeletedFnWithPartialUpdates();
elasticsearchIOTestCommon.testWriteWithIsDeletedFnWithoutPartialUpdate();
}
@Test
public void testDocumentCoder() throws Exception {
elasticsearchIOTestCommon.testDocumentCoder();
}
@Test
public void testPDone() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testPipelineDone();
}
}
......@@ -39,6 +39,10 @@ configurations.all {
}
}
test {
maxParallelForks = 1
}
dependencies {
testCompile project(path: ":sdks:java:io:elasticsearch-tests:elasticsearch-tests-common", configuration: "testRuntime")
testCompile "org.elasticsearch.test:framework:$elastic_search_version"
......
......@@ -33,7 +33,6 @@ import org.elasticsearch.client.RestClient;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
......@@ -83,7 +82,6 @@ public class ElasticsearchIOTest implements Serializable {
@Rule public TestPipeline pipeline = TestPipeline.create();
@Ignore("https://issues.apache.org/jira/browse/BEAM-5172")
@Test
public void testSizes() throws Exception {
// need to create the index using the helper method (not create it at first insertion)
......@@ -133,6 +131,18 @@ public class ElasticsearchIOTest implements Serializable {
elasticsearchIOTestCommon.testWriteWithErrors();
}
@Test
public void testWriteWithErrorsReturned() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithErrorsReturned();
}
@Test
public void testWriteWithErrorsReturnedAllowedErrors() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithErrorsReturnedAllowedErrors();
}
@Test
public void testWriteWithMaxBatchSize() throws Exception {
elasticsearchIOTestCommon.testWriteWithMaxBatchSize();
......@@ -143,7 +153,6 @@ public class ElasticsearchIOTest implements Serializable {
elasticsearchIOTestCommon.testWriteWithMaxBatchSizeBytes();
}
@Ignore("https://issues.apache.org/jira/browse/BEAM-5172")
@Test
public void testSplit() throws Exception {
// need to create the index using the helper method (not create it at first insertion)
......@@ -247,4 +256,15 @@ public class ElasticsearchIOTest implements Serializable {
elasticsearchIOTestCommon.testWriteWithIsDeletedFnWithPartialUpdates();
elasticsearchIOTestCommon.testWriteWithIsDeletedFnWithoutPartialUpdate();
}
@Test
public void testDocumentCoder() throws Exception {
elasticsearchIOTestCommon.testDocumentCoder();
}
@Test
public void testPDone() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testPipelineDone();
}
}
......@@ -28,7 +28,7 @@ description = "Apache Beam :: SDKs :: Java :: IO :: Elasticsearch-Tests :: 7.x"
ext.summary = "Tests of ElasticsearchIO on Elasticsearch 7.x"
def log4j_version = "2.14.1"
def elastic_search_version = "7.9.2"
def elastic_search_version = "7.13.4"
configurations.all {
resolutionStrategy {
......@@ -39,6 +39,10 @@ configurations.all {
}
}
test {
maxParallelForks = 1
}
dependencies {
testCompile project(path: ":sdks:java:io:elasticsearch-tests:elasticsearch-tests-common", configuration: "testRuntime")
testCompile "org.elasticsearch.test:framework:$elastic_search_version"
......
......@@ -33,7 +33,6 @@ import org.elasticsearch.client.RestClient;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
......@@ -84,7 +83,6 @@ public class ElasticsearchIOTest implements Serializable {
@Rule public TestPipeline pipeline = TestPipeline.create();
@Ignore("https://issues.apache.org/jira/browse/BEAM-5172")
@Test
public void testSizes() throws Exception {
// need to create the index using the helper method (not create it at first insertion)
......@@ -134,6 +132,18 @@ public class ElasticsearchIOTest implements Serializable {
elasticsearchIOTestCommon.testWriteWithErrors();
}
@Test
public void testWriteWithErrorsReturned() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithErrorsReturned();
}
@Test
public void testWriteWithErrorsReturnedAllowedErrors() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithErrorsReturnedAllowedErrors();
}
@Test
public void testWriteWithMaxBatchSize() throws Exception {
elasticsearchIOTestCommon.testWriteWithMaxBatchSize();
......@@ -144,7 +154,6 @@ public class ElasticsearchIOTest implements Serializable {
elasticsearchIOTestCommon.testWriteWithMaxBatchSizeBytes();
}
@Ignore("https://issues.apache.org/jira/browse/BEAM-5172")
@Test
public void testSplit() throws Exception {
// need to create the index using the helper method (not create it at first insertion)
......@@ -248,4 +257,15 @@ public class ElasticsearchIOTest implements Serializable {
elasticsearchIOTestCommon.testWriteWithIsDeletedFnWithPartialUpdates();
elasticsearchIOTestCommon.testWriteWithIsDeletedFnWithoutPartialUpdate();
}
@Test
public void testDocumentCoder() throws Exception {
elasticsearchIOTestCommon.testDocumentCoder();
}
@Test
public void testPDone() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testPipelineDone();
}
}
......@@ -26,11 +26,13 @@ import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfigur
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Write;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.FAMOUS_SCIENTISTS;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.INVALID_DOCS_IDS;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.NUM_SCIENTISTS;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.SCRIPT_SOURCE;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.countByMatch;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.countByScientistName;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.insertTestDocuments;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.mapToInputId;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.refreshAllIndices;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.refreshIndexAndGetCurrentNumDocs;
import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
......@@ -48,6 +50,8 @@ import static org.junit.Assert.fail;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
......@@ -55,11 +59,19 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BulkIO.StatefulBatching;
import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Document;
import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.DocumentCoder;
import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration.DefaultRetryPredicate;
import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration.RetryPredicate;
import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.InjectionMode;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
......@@ -73,6 +85,7 @@ import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
......@@ -85,6 +98,7 @@ import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.rules.ExpectedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -111,7 +125,7 @@ class ElasticsearchIOTestCommon implements Serializable {
}
static final String ES_TYPE = "test";
static final long NUM_DOCS_UTESTS = 100L;
static final long NUM_DOCS_UTESTS = 40L;
static final long NUM_DOCS_ITESTS = 50000L;
static final float ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE = 0.5f;
private static final long AVERAGE_DOC_SIZE = 25L;
......@@ -281,13 +295,19 @@ class ElasticsearchIOTestCommon implements Serializable {
executeWriteTest(write);
}
List<String> serializeDocs(ElasticsearchIO.Write write, List<String> jsonDocs)
List<Document> serializeDocs(ElasticsearchIO.Write write, List<String> jsonDocs)
throws IOException {
List<String> serializedInput = new ArrayList<>();
List<Document> serializedInput = new ArrayList<>();
for (String doc : jsonDocs) {
serializedInput.add(
String bulkDoc =
DocToBulk.createBulkApiEntity(
write.getDocToBulk(), doc, getBackendVersion(connectionConfiguration)));
write.getDocToBulk(), doc, getBackendVersion(connectionConfiguration));
Document r =
Document.create()
.withInputDoc(doc)
.withBulkDirective(bulkDoc)
.withTimestamp(Instant.now());
serializedInput.add(r);
}
return serializedInput;
}
......@@ -323,13 +343,78 @@ class ElasticsearchIOTestCommon implements Serializable {
// write bundles size is the runner decision, we cannot force a bundle size,
// so we test the Writer as a DoFn outside of a runner.
try (DoFnTester<String, Void> fnTester =
try (DoFnTester<Document, Document> fnTester =
DoFnTester.of(new BulkIO.BulkIOBundleFn(write.getBulkIO()))) {
// inserts into Elasticsearch
fnTester.processBundle(serializeDocs(write, input));
}
}
void testWriteWithErrorsReturned() throws Exception {
Write write =
ElasticsearchIO.write()
.withConnectionConfiguration(connectionConfiguration)
.withMaxBatchSize(BATCH_SIZE)
.withThrowWriteErrors(false);
List<String> data =
ElasticsearchIOTestUtils.createDocuments(
numDocs, ElasticsearchIOTestUtils.InjectionMode.INJECT_SOME_INVALID_DOCS);
PCollectionTuple outputs = pipeline.apply(Create.of(data)).apply(write);
PCollection<Integer> success =
outputs
.get(Write.SUCCESSFUL_WRITES)
.apply("Convert success to input ID", MapElements.via(mapToInputId));
PCollection<Integer> fail =
outputs
.get(Write.FAILED_WRITES)
.apply("Convert fails to input ID", MapElements.via(mapToInputId));
Set<Integer> successfulIds =
IntStream.range(0, data.size()).boxed().collect(Collectors.toSet());
successfulIds.removeAll(INVALID_DOCS_IDS);
PAssert.that(success).containsInAnyOrder(successfulIds);
PAssert.that(fail).containsInAnyOrder(INVALID_DOCS_IDS);
pipeline.run();
}
void testWriteWithErrorsReturnedAllowedErrors() throws Exception {
Write write =
ElasticsearchIO.write()
.withConnectionConfiguration(connectionConfiguration)
.withMaxBatchSize(BATCH_SIZE)
.withThrowWriteErrors(false)
.withAllowableResponseErrors(Collections.singleton("json_parse_exception"));
List<String> data =
ElasticsearchIOTestUtils.createDocuments(
numDocs, ElasticsearchIOTestUtils.InjectionMode.INJECT_SOME_INVALID_DOCS);
PCollectionTuple outputs = pipeline.apply(Create.of(data)).apply(write);
PCollection<Integer> success =
outputs
.get(Write.SUCCESSFUL_WRITES)
.apply("Convert success to input ID", MapElements.via(mapToInputId));
PCollection<Integer> fail =
outputs
.get(Write.FAILED_WRITES)
.apply("Convert fails to input ID", MapElements.via(mapToInputId));
// Successful IDs should be all IDs, as we're explicitly telling the ES transform that we
// want to ignore failures of a certain kind, therefore treat those failures as having been
// successfully processed
Set<Integer> successfulIds =
IntStream.range(0, data.size()).boxed().collect(Collectors.toSet());
PAssert.that(success).containsInAnyOrder(successfulIds);
PAssert.that(fail).empty();
pipeline.run();
}
void testWriteWithAllowedErrors() throws Exception {
Write write =
ElasticsearchIO.write()
......@@ -342,7 +427,7 @@ class ElasticsearchIOTestCommon implements Serializable {
// write bundles size is the runner decision, we cannot force a bundle size,
// so we test the Writer as a DoFn outside of a runner.
try (DoFnTester<String, Void> fnTester =
try (DoFnTester<Document, Document> fnTester =
DoFnTester.of(new BulkIO.BulkIOBundleFn(write.getBulkIO()))) {
// inserts into Elasticsearch
fnTester.processBundle(serializeDocs(write, input));
......@@ -357,21 +442,27 @@ class ElasticsearchIOTestCommon implements Serializable {
// write bundles size is the runner decision, we cannot force a bundle size,
// so we test the Writer as a DoFn outside of a runner.
try (DoFnTester<String, Void> fnTester =
try (DoFnTester<Document, Document> fnTester =
DoFnTester.of(new BulkIO.BulkIOBundleFn(write.getBulkIO()))) {
List<String> input =
ElasticsearchIOTestUtils.createDocuments(
numDocs, ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
List<String> serializedInput = new ArrayList<>();
List<Document> serializedInput = new ArrayList<>();
for (String doc : input) {
serializedInput.add(
String bulkDoc =
DocToBulk.createBulkApiEntity(
write.getDocToBulk(), doc, getBackendVersion(connectionConfiguration)));
write.getDocToBulk(), doc, getBackendVersion(connectionConfiguration));
Document r =
Document.create()
.withInputDoc(doc)
.withBulkDirective(bulkDoc)
.withTimestamp(Instant.now());
serializedInput.add(r);
}
long numDocsProcessed = 0;
long numDocsInserted = 0;
for (String document : serializedInput) {
for (Document document : serializedInput) {
fnTester.processElement(document);
numDocsProcessed++;
// test every 100 docs to avoid overloading ES
......@@ -406,25 +497,31 @@ class ElasticsearchIOTestCommon implements Serializable {
.withMaxBatchSizeBytes(BATCH_SIZE_BYTES);
// write bundles size is the runner decision, we cannot force a bundle size,
// so we test the Writer as a DoFn outside of a runner.
try (DoFnTester<String, Void> fnTester =
try (DoFnTester<Document, Document> fnTester =
DoFnTester.of(new BulkIO.BulkIOBundleFn(write.getBulkIO()))) {
List<String> input =
ElasticsearchIOTestUtils.createDocuments(
numDocs, ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
List<String> serializedInput = new ArrayList<>();
List<Document> serializedInput = new ArrayList<>();
for (String doc : input) {
serializedInput.add(
String bulkDoc =
DocToBulk.createBulkApiEntity(
write.getDocToBulk(), doc, getBackendVersion(connectionConfiguration)));
write.getDocToBulk(), doc, getBackendVersion(connectionConfiguration));
Document r =
Document.create()
.withInputDoc(doc)
.withBulkDirective(bulkDoc)
.withTimestamp(Instant.now());
serializedInput.add(r);
}
long numDocsProcessed = 0;
long sizeProcessed = 0;
long numDocsInserted = 0;
long batchInserted = 0;
for (String document : serializedInput) {
for (Document document : serializedInput) {
fnTester.processElement(document);
numDocsProcessed++;
sizeProcessed += document.getBytes(StandardCharsets.UTF_8).length;
sizeProcessed += document.getBulkDirective().getBytes(StandardCharsets.UTF_8).length;
// test every 40 docs to avoid overloading ES
if ((numDocsProcessed % 40) == 0) {
// force the index to upgrade after inserting for the inserted docs
......@@ -802,22 +899,25 @@ class ElasticsearchIOTestCommon implements Serializable {
}
void testMaxParallelRequestsPerWindow() throws Exception {
List<String> data =
List<Document> data =
ElasticsearchIOTestUtils.createDocuments(
numDocs, ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
numDocs, ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS)
.stream()
.map(doc -> Document.create().withInputDoc(doc).withTimestamp(Instant.now()))
.collect(Collectors.toList());
Write write =
ElasticsearchIO.write()
.withConnectionConfiguration(connectionConfiguration)
.withMaxParallelRequestsPerWindow(1);
PCollection<KV<Integer, Iterable<String>>> batches =
PCollection<KV<Integer, Iterable<Document>>> batches =
pipeline.apply(Create.of(data)).apply(StatefulBatching.fromSpec(write.getBulkIO()));
PCollection<Integer> keyValues =
batches.apply(
MapElements.into(integers())
.via((SerializableFunction<KV<Integer, Iterable<String>>, Integer>) KV::getKey));
.via((SerializableFunction<KV<Integer, Iterable<Document>>, Integer>) KV::getKey));
// Number of unique keys produced should be number of maxParallelRequestsPerWindow * numWindows
// There is only 1 request (key) per window, and 1 (global) window ie. one key total where
......@@ -829,19 +929,51 @@ class ElasticsearchIOTestCommon implements Serializable {
pipeline.run();
}
void testDocumentCoder() throws Exception {
List<String> data =
ElasticsearchIOTestUtils.createDocuments(numDocs, InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
int randomNum = ThreadLocalRandom.current().nextInt(0, data.size());
Instant now = Instant.now();
Write write = ElasticsearchIO.write().withConnectionConfiguration(connectionConfiguration);
Document expected =
serializeDocs(write, data)
.get(randomNum)
.withTimestamp(now)
.withHasError(randomNum % 2 == 0);
PipedInputStream in = new PipedInputStream();
PipedOutputStream out = new PipedOutputStream(in);
DocumentCoder coder = DocumentCoder.of();
coder.encode(expected, out);
Document actual = coder.decode(in);
assertEquals(expected, actual);
}
void testPipelineDone() throws Exception {
Write write = ElasticsearchIO.write().withConnectionConfiguration(connectionConfiguration);
List<String> data =
ElasticsearchIOTestUtils.createDocuments(
numDocs, ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
pipeline.apply(Create.of(data)).apply(write);
assertEquals(State.DONE, pipeline.run().waitUntilFinish());
}
private static class AssertThatHasExpectedContents
implements SerializableFunction<Iterable<KV<Integer, Iterable<String>>>, Void> {
implements SerializableFunction<Iterable<KV<Integer, Iterable<Document>>>, Void> {
private final int key;
private final List<String> expectedContents;
private final List<Document> expectedContents;
AssertThatHasExpectedContents(int key, List<String> expected) {
AssertThatHasExpectedContents(int key, List<Document> expected) {
this.key = key;
this.expectedContents = expected;
}
@Override
public Void apply(Iterable<KV<Integer, Iterable<String>>> actual) {
public Void apply(Iterable<KV<Integer, Iterable<Document>>> actual) {
assertThat(
actual,
IsIterableContainingInAnyOrder.containsInAnyOrder(
......
......@@ -30,9 +30,14 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Document;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
......@@ -50,6 +55,7 @@ class ElasticsearchIOTestUtils {
static final int ELASTICSEARCH_DEFAULT_PORT = 9200;
static final String ELASTICSEARCH_PASSWORD = "superSecure";
static final String ELASTIC_UNAME = "elastic";
static final Set<Integer> INVALID_DOCS_IDS = new HashSet<>(Arrays.asList(6, 7));
static final String[] FAMOUS_SCIENTISTS = {
"einstein",
......@@ -163,7 +169,8 @@ class ElasticsearchIOTestUtils {
request.addParameters(Collections.singletonMap("refresh", "wait_for"));
request.setEntity(requestBody);
Response response = restClient.performRequest(request);
ElasticsearchIO.checkForErrors(response.getEntity(), Collections.emptySet());
ElasticsearchIO.createWriteReport(response.getEntity(), Collections.emptySet(), true);
refreshAllIndices(restClient);
}
/** Inserts the given number of test documents into Elasticsearch. */
......@@ -285,7 +292,8 @@ class ElasticsearchIOTestUtils {
for (int i = 0; i < numDocs; i++) {
int index = i % FAMOUS_SCIENTISTS.length;
// insert 2 malformed documents
if (InjectionMode.INJECT_SOME_INVALID_DOCS.equals(injectionMode) && (i == 6 || i == 7)) {
if (InjectionMode.INJECT_SOME_INVALID_DOCS.equals(injectionMode)
&& INVALID_DOCS_IDS.contains(i)) {
data.add(String.format("{\"scientist\";\"%s\", \"id\":%s}", FAMOUS_SCIENTISTS[index], i));
} else {
data.add(String.format("{\"scientist\":\"%s\", \"id\":%s}", FAMOUS_SCIENTISTS[index], i));
......@@ -452,4 +460,18 @@ class ElasticsearchIOTestUtils {
return container;
}
static SimpleFunction<Document, Integer> mapToInputId =
new SimpleFunction<Document, Integer>() {
@Override
public Integer apply(Document document) {
try {
// Account for intentionally invalid input json docs
String fixedJson = document.getInputDoc().replaceAll(";", ":");
return MAPPER.readTree(fixedJson).path("id").asInt();
} catch (JsonProcessingException e) {
return -1;
}
}
};
}
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册