Skip to content
代码片段 群组 项目
未验证 提交 6fa9f3c9 编辑于 作者: Vikas Singh's avatar Vikas Singh 提交者: GitHub
浏览文件

KAFKA-10531: Check for negative values to Thread.sleep call (#9347)

System.currentTimeMillis() is not monotonic, so using that to calculate time to sleep can result in negative values. That will throw IllegalArgumentException.

This change checks for that and sleeps for a second (to avoid tight loop) if the value returned is negative.

Author: Shaik Zakir Hussain <zhussain@confluent.io>
Reviewer: Randall Hauch <rhauch@gmail.com>
上级 69790a14
无相关合并请求
......@@ -31,7 +31,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -45,6 +44,7 @@ import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
......@@ -70,7 +70,8 @@ import java.util.concurrent.Future;
*/
public class KafkaBasedLog<K, V> {
private static final Logger log = LoggerFactory.getLogger(KafkaBasedLog.class);
private static final long CREATE_TOPIC_TIMEOUT_MS = 30000;
private static final long CREATE_TOPIC_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(30);
private static final long MAX_SLEEP_MS = TimeUnit.SECONDS.toMillis(1);
private Time time;
private final String topic;
......@@ -133,11 +134,13 @@ public class KafkaBasedLog<K, V> {
List<TopicPartition> partitions = new ArrayList<>();
// We expect that the topics will have been created either manually by the user or automatically by the herder
List<PartitionInfo> partitionInfos = null;
long started = time.milliseconds();
while (partitionInfos == null && time.milliseconds() - started < CREATE_TOPIC_TIMEOUT_MS) {
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
long started = time.nanoseconds();
long sleepMs = 100;
while (partitionInfos == null && time.nanoseconds() - started < CREATE_TOPIC_TIMEOUT_NS) {
time.sleep(sleepMs);
sleepMs = Math.min(2 * sleepMs, MAX_SLEEP_MS);
partitionInfos = consumer.partitionsFor(topic);
Utils.sleep(Math.min(time.milliseconds() - started, 1000));
}
if (partitionInfos == null)
throw new ConnectException("Could not look up partition metadata for offset backing store topic in" +
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册