Skip to content
代码片段 群组 项目
未验证 提交 f3571748 编辑于 作者: Jan Lukavský's avatar Jan Lukavský 提交者: GitHub
浏览文件

[flink] FlinkRunner initializes the same split twice (#31313) (#33606)

* [flink] FlinkRunner initializes the same split twice (#31313)
上级 2af60583
No related branches found
No related tags found
无相关合并请求
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"runFor": "#33146"
"runFor": "#33606"
}
......@@ -29,8 +29,6 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.im
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded.FlinkUnboundedSource;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.construction.UnboundedReadFromBoundedSource;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
......@@ -73,18 +71,6 @@ public abstract class FlinkSource<T, OutputT>
return new FlinkUnboundedSource<>(stepName, source, serializablePipelineOptions, numSplits);
}
public static FlinkUnboundedSource<byte[]> unboundedImpulse(long shutdownSourceAfterIdleMs) {
FlinkPipelineOptions flinkPipelineOptions = FlinkPipelineOptions.defaults();
flinkPipelineOptions.setShutdownSourcesAfterIdleMs(shutdownSourceAfterIdleMs);
return new FlinkUnboundedSource<>(
"Impulse",
new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter<>(
new BeamImpulseSource()),
new SerializablePipelineOptions(flinkPipelineOptions),
1,
record -> BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
}
public static FlinkBoundedSource<byte[]> boundedImpulse() {
return new FlinkBoundedSource<>(
"Impulse",
......@@ -117,7 +103,8 @@ public abstract class FlinkSource<T, OutputT>
@Override
public SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>>
createEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext) throws Exception {
createEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext) {
return new FlinkSourceSplitEnumerator<>(
enumContext, beamSource, serializablePipelineOptions.get(), numSplits);
}
......@@ -126,11 +113,11 @@ public abstract class FlinkSource<T, OutputT>
public SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>>
restoreEnumerator(
SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext,
Map<Integer, List<FlinkSourceSplit<T>>> checkpoint)
throws Exception {
Map<Integer, List<FlinkSourceSplit<T>>> checkpoint) {
FlinkSourceSplitEnumerator<T> enumerator =
new FlinkSourceSplitEnumerator<>(
enumContext, beamSource, serializablePipelineOptions.get(), numSplits);
enumContext, beamSource, serializablePipelineOptions.get(), numSplits, true);
checkpoint.forEach(
(subtaskId, splitsForSubtask) -> enumerator.addSplitsBack(splitsForSubtask, subtaskId));
return enumerator;
......
......@@ -63,16 +63,40 @@ public class FlinkSourceSplitEnumerator<T>
Source<T> beamSource,
PipelineOptions pipelineOptions,
int numSplits) {
this(context, beamSource, pipelineOptions, numSplits, false);
}
public FlinkSourceSplitEnumerator(
SplitEnumeratorContext<FlinkSourceSplit<T>> context,
Source<T> beamSource,
PipelineOptions pipelineOptions,
int numSplits,
boolean splitsInitialized) {
this.context = context;
this.beamSource = beamSource;
this.pipelineOptions = pipelineOptions;
this.numSplits = numSplits;
this.pendingSplits = new HashMap<>(numSplits);
this.splitsInitialized = false;
this.splitsInitialized = splitsInitialized;
LOG.info(
"Created new enumerator with parallelism {}, source {}, numSplits {}, initialized {}",
context.currentParallelism(),
beamSource,
numSplits,
splitsInitialized);
}
@Override
public void start() {
if (!splitsInitialized) {
initializeSplits();
}
}
private void initializeSplits() {
context.callAsync(
() -> {
try {
......
......@@ -23,6 +23,8 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestBoundedCountingSource;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestCountingSource;
......@@ -130,6 +132,49 @@ public class FlinkSourceSplitEnumeratorTest {
}
}
@Test
public void testAddSplitsBackAfterRescale() throws Exception {
final int numSubtasks = 2;
final int numSplits = 10;
final int totalNumRecords = 10;
TestingSplitEnumeratorContext<FlinkSourceSplit<KV<Integer, Integer>>> testContext =
new TestingSplitEnumeratorContext<>(numSubtasks);
TestBoundedCountingSource testSource =
new TestBoundedCountingSource(numSplits, totalNumRecords);
final Map<Integer, List<FlinkSourceSplit<KV<Integer, Integer>>>> assignment;
try (FlinkSourceSplitEnumerator<KV<Integer, Integer>> splitEnumerator =
new FlinkSourceSplitEnumerator<>(
testContext, testSource, FlinkPipelineOptions.defaults(), numSplits)) {
splitEnumerator.start();
for (int i = 0; i < numSubtasks; i++) {
testContext.registerReader(i, String.valueOf(i));
splitEnumerator.addReader(i);
}
testContext.getExecutorService().triggerAll();
assignment =
testContext.getSplitAssignments().entrySet().stream()
.map(e -> KV.of(e.getKey(), e.getValue().getAssignedSplits()))
.collect(Collectors.toMap(KV::getKey, KV::getValue));
}
// add tasks back
testContext = new TestingSplitEnumeratorContext<>(numSubtasks);
try (FlinkSourceSplitEnumerator<KV<Integer, Integer>> splitEnumerator =
new FlinkSourceSplitEnumerator<>(
testContext, testSource, FlinkPipelineOptions.defaults(), numSplits, true)) {
splitEnumerator.start();
assignment.forEach(
(splitId, assignedSplits) -> splitEnumerator.addSplitsBack(assignedSplits, splitId));
testContext.registerReader(0, "0");
splitEnumerator.addReader(0);
testContext.getExecutorService().triggerAll();
List<FlinkSourceSplit<KV<Integer, Integer>>> splitsForReader =
testContext.getSplitAssignments().get(0).getAssignedSplits();
assertEquals(numSplits / numSubtasks, splitsForReader.size());
}
}
private void assignSplits(
TestingSplitEnumeratorContext<FlinkSourceSplit<KV<Integer, Integer>>> context,
Source<KV<Integer, Integer>> source,
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册