Skip to content
代码片段 群组 项目
未验证 提交 a8b5f5a4 编辑于 作者: Sharath Bhat's avatar Sharath Bhat 提交者: GitHub
浏览文件

KAFKA-10362: When resuming Streams active task with EOS, the checkpoint file is deleted (#9247)

Deleted the checkpoint file before the transition from SUSPENDED state to RESTORING state

Reviewers: Guozhang Wang <wangguoz@gmail.com>
上级 fb4f2972
No related branches found
No related tags found
无相关合并请求
......@@ -663,4 +663,10 @@ public class ProcessorStateManager implements StateManager {
public String changelogFor(final String storeName) {
return storeToChangelogTopic.get(storeName);
}
public void deleteCheckPointFileIfEOSEnabled() throws IOException {
if (eosEnabled) {
checkpointFile.delete();
}
}
}
......@@ -333,6 +333,15 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
case SUSPENDED:
// just transit the state without any logical changes: suspended and restoring states
// are not actually any different for inner modules
// Deleting checkpoint file before transition to RESTORING state (KAFKA-10362)
try {
stateMgr.deleteCheckPointFileIfEOSEnabled();
log.debug("Deleted check point file upon resuming with EOS enabled");
} catch (final IOException ioe) {
log.error("Encountered error while deleting the checkpoint file due to this exception", ioe);
}
transitionTo(State.RESTORING);
log.info("Resumed to restoring state");
......
......@@ -989,6 +989,36 @@ public class ProcessorStateManagerTest {
stateMgr.close();
}
@Test
public void shouldDeleteCheckPointFileIfEosEnabled() throws IOException {
final long checkpointOffset = 10L;
final Map<TopicPartition, Long> offsets = mkMap(
mkEntry(persistentStorePartition, checkpointOffset),
mkEntry(nonPersistentStorePartition, checkpointOffset),
mkEntry(irrelevantPartition, 999L)
);
checkpoint.write(offsets);
final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE, true);
stateMgr.deleteCheckPointFileIfEOSEnabled();
stateMgr.close();
assertFalse(checkpointFile.exists());
}
@Test
public void shouldNotDeleteCheckPointFileIfEosNotEnabled() throws IOException {
final long checkpointOffset = 10L;
final Map<TopicPartition, Long> offsets = mkMap(
mkEntry(persistentStorePartition, checkpointOffset),
mkEntry(nonPersistentStorePartition, checkpointOffset),
mkEntry(irrelevantPartition, 999L)
);
checkpoint.write(offsets);
final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE, false);
stateMgr.deleteCheckPointFileIfEOSEnabled();
stateMgr.close();
assertTrue(checkpointFile.exists());
}
private ProcessorStateManager getStateManager(final Task.TaskType taskType, final boolean eosEnabled) {
return new ProcessorStateManager(
taskId,
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册