Skip to content
代码片段 群组 项目
未验证 提交 d1c82a9b 编辑于 作者: Igor Soarez's avatar Igor Soarez 提交者: GitHub
浏览文件

KAFKA-10205: Documentation and handling of non deterministic Topologies (#9064)

Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>
上级 6e0a10b4
No related branches found
No related tags found
无相关合并请求
......@@ -51,6 +51,14 @@ import java.util.regex.Pattern;
/**
* {@code StreamsBuilder} provide the high-level Kafka Streams DSL to specify a Kafka Streams topology.
*
* <p>
* It is a requirement that the processing logic ({@link Topology}) be defined in a deterministic way,
* as in, the order in which all operators are added must be predictable and the same across all application
* instances.
* Topologies are only identical if all operators are added in the same order.
* If different {@link KafkaStreams} instances of the same application build different topologies the result may be
* incompatible runtime code and unexpected results or errors
*
* @see Topology
* @see KStream
* @see KTable
......
......@@ -31,6 +31,7 @@ import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
......@@ -1109,6 +1110,14 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
public RecordQueue createQueue(final TopicPartition partition) {
final SourceNode<?, ?, ?, ?> source = topology.source(partition.topic());
if (source == null) {
throw new TopologyException(
"Topic is unkown to the topology. " +
"This may happen if different KafkaStreams instances of the same application execute different Topologies. " +
"Note that Topologies are only identical if all operators are added in the same order."
);
}
final TimestampExtractor sourceTimestampExtractor = source.getTimestampExtractor();
final TimestampExtractor timestampExtractor = sourceTimestampExtractor != null ? sourceTimestampExtractor : defaultTimestampExtractor;
return new RecordQueue(
......
......@@ -46,6 +46,7 @@ import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
......@@ -2032,6 +2033,46 @@ public class StreamTaskTest {
assertThat(task.state(), equalTo(SUSPENDED));
}
@Test
public void shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic() {
final InternalProcessorContext context = new ProcessorContextImpl(
taskId,
createConfig(false, "100"),
stateManager,
streamsMetrics,
null
);
final StreamsMetricsImpl metrics = new StreamsMetricsImpl(this.metrics, "test", StreamsConfig.METRICS_LATEST, time);
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
EasyMock.replay(stateManager);
// The processor topology is missing the topics
final ProcessorTopology topology = withSources(asList(), mkMap());
final TopologyException exception = assertThrows(
TopologyException.class,
() -> new StreamTask(
taskId,
partitions,
topology,
consumer,
createConfig(false, "100"),
metrics,
stateDirectory,
cache,
time,
stateManager,
recordCollector,
context
)
);
assertThat(exception.getMessage(), equalTo("Invalid topology: " +
"Topic is unkown to the topology. This may happen if different KafkaStreams instances of the same " +
"application execute different Topologies. Note that Topologies are only identical if all operators " +
"are added in the same order."));
}
private List<MetricName> getTaskMetrics() {
return metrics.metrics().keySet().stream().filter(m -> m.tags().containsKey("task-id")).collect(Collectors.toList());
}
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册