Skip to content
代码片段 群组 项目
未验证 提交 157c79a0 编辑于 作者: Eric Wu's avatar Eric Wu 提交者: GitHub
浏览文件

MINOR: Use milliseconds for topic catalog event timestamps (#7079)

Use milliseconds for topic catalog event timestamps, instead of nanoseconds, which cannot be used to reflect system or wall-clock time.
上级 e37a6539
No related branches found
No related tags found
无相关合并请求
......@@ -36,7 +36,7 @@ public abstract class MetadataCollectorEvent implements EventQueue.Event {
protected static final Logger LOG = LoggerFactory.getLogger(MetadataCollectorEvent.class);
protected ZKTopicMetadataCollector collector;
protected long eventObserveTime;
protected long eventObservedTimeMillis;
protected final Time time;
......@@ -46,7 +46,7 @@ public abstract class MetadataCollectorEvent implements EventQueue.Event {
public MetadataCollectorEvent(ZKTopicMetadataCollector collector, Time time) {
this.collector = collector;
this.time = time;
this.eventObserveTime = time.nanoseconds();
this.eventObservedTimeMillis = time.milliseconds();
}
@Override
......
......@@ -48,7 +48,7 @@ public class TopicConfigChangeEvent extends MetadataCollectorEvent {
LOG.debug("Updating Topic Config for topic {} with new config {}", topic, newConfig);
MetadataEvent metadataEvent = context.localStore().metadataEvent(topic);
Timestamp observeTimestamp = Timestamps.fromNanos(this.eventObserveTime);
Timestamp eventTimestamp = Timestamps.fromMillis(this.eventObservedTimeMillis);
Uuid id;
int partitions;
int replicas;
......@@ -78,7 +78,7 @@ public class TopicConfigChangeEvent extends MetadataCollectorEvent {
partitions,
replicas,
null,
observeTimestamp);
eventTimestamp);
context.localStore().addMetadataEvent(logicalClusterId, topic, metadataEvent);
MetadataChange topicMetadataChange =
......
......@@ -44,7 +44,7 @@ public class TopicCreationEvent extends MetadataCollectorEvent {
LOG.debug("Creating topics {}", newTopicWithInfo.keySet());
String logicalClusterId;
Timestamp observeTimestamp = Timestamps.fromNanos(this.eventObserveTime);
Timestamp eventTimestamp = Timestamps.fromMillis(this.eventObservedTimeMillis);
for (String topic : newTopicWithInfo.keySet()) {
TopicInfo topicInfo = newTopicWithInfo.get(topic);
logicalClusterId = topicInfo.logicalClusterId();
......@@ -58,7 +58,7 @@ public class TopicCreationEvent extends MetadataCollectorEvent {
topicInfo.topicId(),
topicInfo.partitions(),
topicInfo.replicationFactors(),
observeTimestamp,
eventTimestamp,
null
);
context.localStore()
......
......@@ -39,7 +39,7 @@ public class TopicDeletionEvent extends MetadataCollectorEvent {
LOG.debug("Deleting topics {}", deleteTopics);
String logicalClusterId;
Timestamp observeTimestamp = Timestamps.fromNanos(this.eventObserveTime);
Timestamp eventTimestamp = Timestamps.fromMillis(this.eventObservedTimeMillis);
for (String deleteTopic : deleteTopics) {
logicalClusterId = TenantHelpers.extractTenantPrefix(deleteTopic, false);
MetadataEvent deleted = context.localStore()
......@@ -59,7 +59,7 @@ public class TopicDeletionEvent extends MetadataCollectorEvent {
MetadataEventUtils.topicMetadataEventForDeletion(
TenantHelpers.extractLogicalName(deleteTopic),
topicId,
observeTimestamp);
eventTimestamp);
MetadataChange topicDeleteChange =
MetadataEventUtils.topicDeleteEvent(logicalClusterId, topicMetadataEvent);
emitDeltaEvent(context, topicDeleteChange);
......
......@@ -50,12 +50,12 @@ public class TopicPartitionChangeEvent extends MetadataCollectorEvent {
LOG.debug("Updating Topic partitions for topic {} with new partitions {}", topic, newPartition);
MetadataEvent metadataEvent = context.localStore().metadataEvent(topic);
Timestamp observeTimestamp = Timestamps.fromNanos(this.eventObserveTime);
Timestamp eventTimestamp = Timestamps.fromMillis(this.eventObservedTimeMillis);
if (metadataEvent != null) {
TopicMetadata topicMetadata = TopicMetadata.newBuilder()
.mergeFrom(metadataEvent.getTopicMetadata())
.setPartitionsCount(newPartition)
.setUpdateTime(observeTimestamp)
.setUpdateTime(eventTimestamp)
.build();
metadataEvent = MetadataEvent.newBuilder().setTopicMetadata(topicMetadata).build();
} else {
......@@ -86,7 +86,7 @@ public class TopicPartitionChangeEvent extends MetadataCollectorEvent {
newPartition,
replicas,
null,
observeTimestamp);
eventTimestamp);
}
}
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册