KAFKA-10199: Further refactor task lifecycle management (#12439)
1. Consolidate the task recycle procedure into a single function within the task. The current procedure now becomes: a) task.recycleStateAndConvert, at end of it the task is in closed while its stateManager is retained, and the manager type has been converted; 2) create the new task with old task's fields and the stateManager inside the creators. 2. Move the task execution related metadata into the corresponding TaskExecutionMetadata class, including the task idle related metadata (e.g. successfully processed tasks); reduce the number of params needed for TaskExecutor as well as Tasks. 3. Move the task execution related fields (embedded producer and consumer) and task creators out of Tasks and migrated into TaskManager. Now the Tasks is only a bookkeeping place without any task mutation logic. 4. When adding tests, I realized that we should not add task to state updater right after creation, since it was not initialized yet, while state updater would validate that the task's state is already restoring / running. So I updated that logic while adding unit tests. Reviewers: Bruno Cadonna <cadonna@apache.org>
显示
- streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java 3 个添加, 2 个删除...pache/kafka/streams/processor/internals/AbstractTask.java
- streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java 30 个添加, 6 个删除.../kafka/streams/processor/internals/ActiveTaskCreator.java
- streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java 8 个添加, 43 个删除...apache/kafka/streams/processor/internals/StandbyTask.java
- streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java 26 个添加, 6 个删除...kafka/streams/processor/internals/StandbyTaskCreator.java
- streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java 6 个添加, 46 个删除.../apache/kafka/streams/processor/internals/StreamTask.java
- streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java 2 个添加, 3 个删除...va/org/apache/kafka/streams/processor/internals/Task.java
- streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java 34 个添加, 2 个删除...ka/streams/processor/internals/TaskExecutionMetadata.java
- streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java 22 个添加, 27 个删除...pache/kafka/streams/processor/internals/TaskExecutor.java
- streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java 127 个添加, 61 个删除...apache/kafka/streams/processor/internals/TaskManager.java
- streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java 54 个添加, 149 个删除...a/org/apache/kafka/streams/processor/internals/Tasks.java
- streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java 2 个添加, 2 个删除...e/kafka/streams/processor/internals/TopologyMetadata.java
- streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java 3 个添加, 3 个删除...he/kafka/streams/processor/internals/StandbyTaskTest.java
- streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java 6 个添加, 6 个删除...che/kafka/streams/processor/internals/StreamTaskTest.java
- streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java 1 个添加, 0 个删除...e/kafka/streams/processor/internals/StreamThreadTest.java
- streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadataTest.java 5 个添加, 6 个删除...treams/processor/internals/TaskExecutionMetadataTest.java
- streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java 2 个添加, 3 个删除...e/kafka/streams/processor/internals/TaskExecutorTest.java
- streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java 177 个添加, 116 个删除...he/kafka/streams/processor/internals/TaskManagerTest.java
- streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java 4 个添加, 72 个删除...g/apache/kafka/streams/processor/internals/TasksTest.java
加载中
想要评论请 注册 或 登录