Skip to content
代码片段 群组 项目
未验证 提交 65c29a9d 编辑于 作者: Matthias J. Sax's avatar Matthias J. Sax 提交者: GitHub
浏览文件

KAFKA-9274: fix incorrect default value for `task.timeout.ms` config (#9385)

 - part of KIP-572
 - also add handler method to trigger/reset the timeout on a task

Reviewer: John Roesler <john@confluent.io>
上级 a8b5f5a4
No related branches found
No related tags found
无相关合并请求
显示 152 个添加9 个删除
......@@ -694,7 +694,7 @@ public class StreamsConfig extends AbstractConfig {
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.define(TASK_TIMEOUT_MS_CONFIG,
Type.LONG,
Duration.ofSeconds(5L).toMillis(),
Duration.ofMinutes(5L).toMillis(),
atLeast(0L),
Importance.MEDIUM,
TASK_TIMEOUT_MS_DOC)
......
......@@ -16,23 +16,29 @@
*/
package org.apache.kafka.streams.processor.internals;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.kafka.streams.processor.internals.Task.State.CLOSED;
import static org.apache.kafka.streams.processor.internals.Task.State.CREATED;
public abstract class AbstractTask implements Task {
private final static long NO_DEADLINE = -1L;
private Task.State state = CREATED;
private long deadlineMs = NO_DEADLINE;
protected Set<TopicPartition> inputPartitions;
/**
......@@ -47,17 +53,20 @@ public abstract class AbstractTask implements Task {
protected final ProcessorTopology topology;
protected final StateDirectory stateDirectory;
protected final ProcessorStateManager stateMgr;
private final long taskTimeoutMs;
AbstractTask(final TaskId id,
final ProcessorTopology topology,
final StateDirectory stateDirectory,
final ProcessorStateManager stateMgr,
final Set<TopicPartition> inputPartitions) {
final Set<TopicPartition> inputPartitions,
final long taskTimeoutMs) {
this.id = id;
this.stateMgr = stateMgr;
this.topology = topology;
this.inputPartitions = inputPartitions;
this.stateDirectory = stateDirectory;
this.taskTimeoutMs = taskTimeoutMs;
}
/**
......@@ -137,4 +146,46 @@ public abstract class AbstractTask implements Task {
this.inputPartitions = topicPartitions;
topology.updateSourceTopics(nodeToSourceTopics);
}
void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
final TimeoutException timeoutException,
final Logger log) throws StreamsException {
if (deadlineMs == NO_DEADLINE) {
deadlineMs = currentWallClockMs + taskTimeoutMs;
} else if (currentWallClockMs > deadlineMs) {
final String errorMessage = String.format(
"Task %s did not make progress within %d ms. Adjust `%s` if needed.",
id,
currentWallClockMs - deadlineMs + taskTimeoutMs,
StreamsConfig.TASK_TIMEOUT_MS_CONFIG
);
if (timeoutException != null) {
throw new TimeoutException(errorMessage, timeoutException);
} else {
throw new TimeoutException(errorMessage);
}
}
if (timeoutException != null) {
log.debug(
"Timeout exception. Remaining time to deadline {}; retrying.",
deadlineMs - currentWallClockMs,
timeoutException
);
} else {
log.debug(
"Task did not make progress. Remaining time to deadline {}; retrying.",
deadlineMs - currentWallClockMs
);
}
}
void clearTaskTimeout(final Logger log) {
if (deadlineMs != NO_DEADLINE) {
log.debug("Clearing task timeout.");
deadlineMs = NO_DEADLINE;
}
}
}
......@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsConfig;
......@@ -64,7 +65,7 @@ public class StandbyTask extends AbstractTask implements Task {
final StateDirectory stateDirectory,
final ThreadCache cache,
final InternalProcessorContext processorContext) {
super(id, topology, stateDirectory, stateMgr, partitions);
super(id, topology, stateDirectory, stateMgr, partitions, config.getLong(StreamsConfig.TASK_TIMEOUT_MS_CONFIG));
this.processorContext = processorContext;
this.streamsMetrics = streamsMetrics;
processorContext.transitionToStandby(cache);
......@@ -286,6 +287,17 @@ public class StandbyTask extends AbstractTask implements Task {
throw new IllegalStateException("Attempted to add records to task " + id() + " for invalid input partition " + partition);
}
@Override
public void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
final TimeoutException timeoutException) throws StreamsException {
maybeInitTaskTimeoutOrThrow(currentWallClockMs, timeoutException, log);
}
@Override
public void clearTaskTimeout() {
clearTaskTimeout(log);
}
InternalProcessorContext processorContext() {
return processorContext;
}
......
......@@ -119,7 +119,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
final ProcessorStateManager stateMgr,
final RecordCollector recordCollector,
final InternalProcessorContext processorContext) {
super(id, topology, stateDirectory, stateMgr, partitions);
super(id, topology, stateDirectory, stateMgr, partitions, config.getLong(StreamsConfig.TASK_TIMEOUT_MS_CONFIG));
this.mainConsumer = mainConsumer;
this.processorContext = processorContext;
......@@ -1014,6 +1014,17 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
return commitRequested;
}
@Override
public void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
final TimeoutException timeoutException) throws StreamsException {
maybeInitTaskTimeoutOrThrow(currentWallClockMs, timeoutException, log);
}
@Override
public void clearTaskTimeout() {
clearTaskTimeout(log);
}
static String encodeTimestamp(final long partitionTime) {
final ByteBuffer buffer = ByteBuffer.allocate(9);
buffer.put(LATEST_MAGIC_BYTE);
......
......@@ -16,10 +16,10 @@
*/
package org.apache.kafka.streams.processor.internals;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateStore;
......@@ -29,6 +29,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
......@@ -205,4 +206,8 @@ public interface Task {
return false;
}
void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
final TimeoutException timeoutException) throws StreamsException;
void clearTaskTimeout();
}
......@@ -20,6 +20,7 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
......@@ -48,9 +49,11 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
......@@ -557,6 +560,34 @@ public class StandbyTaskTest {
assertThat(task.state(), equalTo(SUSPENDED));
}
@Test
public void shouldInitTaskTimeoutAndEventuallyThrow() {
EasyMock.replay(stateManager);
final Logger log = new LogContext().logger(StreamTaskTest.class);
task = createStandbyTask();
task.maybeInitTaskTimeoutOrThrow(0L, null, log);
task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).toMillis(), null, log);
assertThrows(
TimeoutException.class,
() -> task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null, log)
);
}
@Test
public void shouldCLearTaskTimeout() {
EasyMock.replay(stateManager);
final Logger log = new LogContext().logger(StreamTaskTest.class);
task = createStandbyTask();
task.maybeInitTaskTimeoutOrThrow(0L, null, log);
task.clearTaskTimeout(log);
task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null, log);
}
private StandbyTask createStandbyTask() {
final ThreadCache cache = new ThreadCache(
......
......@@ -39,6 +39,7 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
......@@ -69,6 +70,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import java.io.File;
import java.io.IOException;
......@@ -2075,6 +2077,30 @@ public class StreamTaskTest {
"are added in the same order."));
}
@Test
public void shouldInitTaskTimeoutAndEventuallyThrow() {
final Logger log = new LogContext().logger(StreamTaskTest.class);
task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);
task.maybeInitTaskTimeoutOrThrow(0L, null, log);
task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).toMillis(), null, log);
assertThrows(
TimeoutException.class,
() -> task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null, log)
);
}
@Test
public void shouldCLearTaskTimeout() {
final Logger log = new LogContext().logger(StreamTaskTest.class);
task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);
task.maybeInitTaskTimeoutOrThrow(0L, null, log);
task.clearTaskTimeout(log);
task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null, log);
}
private List<MetricName> getTaskMetrics() {
return metrics.metrics().keySet().stream().filter(m -> m.tags().containsKey("task-id")).collect(Collectors.toList());
}
......
......@@ -2692,7 +2692,7 @@ public class TaskManagerTest {
final Set<TopicPartition> partitions,
final boolean active,
final ProcessorStateManager processorStateManager) {
super(id, null, null, processorStateManager, partitions);
super(id, null, null, processorStateManager, partitions, 0L);
this.active = active;
}
......@@ -2765,6 +2765,13 @@ public class TaskManagerTest {
}
}
@Override
public void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
final TimeoutException timeoutException) throws StreamsException {};
@Override
public void clearTaskTimeout() {}
@Override
public void closeClean() {
transitionTo(State.CLOSED);
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册