-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-18913: Remove all code related to non-stateupdater path #21059
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
| final boolean eosEnabled, | ||
| final LogContext logContext, | ||
| final StateDirectory stateDirectory, | ||
| final ChangelogRegister changelogReader, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also remove changelogReader as suggested here
| * TODO: after we complete switching to state updater, we could rename this function as allRunningTasks | ||
| * to be differentiated from allTasks including running and restoring tasks | ||
| */ | ||
| Map<TaskId, Task> allOwnedTasks() { | ||
| Map<TaskId, Task> allRunningTasks() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Completed TODO: Replaced all occurences of allOwnedTasks with allRunningTasks
|
@lucasbru, tagging for review :) This is comparatively a larger PR. Since, we are just removing the old code, I thought it would be best if we push in a single PR. Also, this should, hopefully, be the last PR before we finally say goodbye to the old code path 😄 |
lucasbru
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments
| } else { | ||
| return null; | ||
| } | ||
| private static StateUpdater maybeCreateStateUpdater(final StreamsMetricsImpl streamsMetrics, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to createStateUpdater
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed in 6cff200
| } | ||
| } | ||
|
|
||
| if (mainConsumerInstanceIdFuture.isDone() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mjsax Is this boolean condition correct like this? Looks wrong to me. Shouldn't it be !stateUpdaterEnabled || restoreConsumerInstanceIdFuture.isDone()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the condition is correct. -- But we should not remove this whole if, but only remove line && (!stateUpdaterEnabled && restoreConsumerInstanceIdFuture.isDone()).
The condition is correct, because we only care about restoreConsumerInstanceIdFuture.isDone() here if state-updater is disabled. What we do here is, we check if StreamThread did complete all it's futures, and if it did, we set fetchDeadlineClientInstanceId = -1L; indicating we are done.
If state-updater is enabled, it's not StreamsThread's duty to complete the restoreConsumerInstanceIdFuture, but it's StateUpdater's duty, so we don't care if the future is completed or not, to determine if StreamsThread is done and we can reset the fetch deadline or not.
If we remove this whole block, we would never reset fetch deadline, implying unnecessary busy work to re-check if the futures got completed -- after the futures are completed, there is nothing to be checked any longer, and we want to make maybeGetClientInstanceIds a no-op (even if it might not make a difference in practice, as the overhead to check the futures should be small -- the intention was just to make it "clean")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But suppose you replace !stateUpdaterEnabled with false (so state updater enabled), then this condition will always be false, and we never reset the fetchDeadline. So I think the condition didn't do what was intended. But I agree that we probably shoudn't remove the whole block if this logic you describe was intended.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yes... It's a smaller optimization, so not sure if we would need to go back and fix in older versions? I am fine either way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure, I fully understand the impact of the optimization, so I will let you judge what's best w.r.t. older versions.
@shashankhs11 Can you please bring back the if block, but without the line that refers to the restoreConsumer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, ofcourse! Done in 24a4ea1
| long totalPunctuateLatency = 0L; | ||
| if (state == State.RUNNING | ||
| || (stateUpdaterEnabled && isStartingRunningOrPartitionAssigned())) { | ||
| if (state == State.RUNNING || isStartingRunningOrPartitionAssigned()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the second condition implies the first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed in 6cff200
| @cluster(num_nodes=6) | ||
| @matrix(from_version=[str(LATEST_3_2), str(DEV_VERSION)], upgrade=[True, False], metadata_quorum=[quorum.combined_kraft]) | ||
| def test_upgrade_downgrade_state_updater(self, from_version, upgrade, metadata_quorum): | ||
| @matrix(from_version=[str(LATEST_3_7)], metadata_quorum=[quorum.combined_kraft]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, why have you removed the upgrade parameter?
Shouldn't we also test versions 3.8,3.9,4.0,4.1 with state updater disabled via the internal config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right! addressed in 33e3a8a
88e103c to
33e3a8a
Compare
lucasbru
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR removes all legacy code paths related to the non-state-updater implementation in Kafka Streams. The state updater, which handles state restoration on a dedicated thread, is now mandatory and always enabled. The internal configuration option __state.updater.enabled__ has been removed, simplifying the codebase significantly.
Key Changes
- Removed the
__state.updater.enabled__configuration option and all associated conditional logic - Renamed methods for clarity:
allOwnedTasks()→allRunningTasks(),maybeCreateStateUpdater()→createStateUpdater(),handleTasksWithStateUpdater()→handleTasks() - Cleaned up constructor signatures across multiple classes by removing the now-unused
ChangelogRegisterandstateUpdaterEnabledparameters
Reviewed changes
Copilot reviewed 18 out of 18 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/kafkatest/tests/streams/streams_upgrade_test.py | Updated upgrade/downgrade test to reflect that state updater is always enabled in DEV_VERSION (4.3+); added version matrix from 3.7 to 4.2 |
| streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java | Removed unused MockChangelogRegister and related parameters from task setup |
| streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java | Cleaned up test by removing StoreChangelogReader initialization and imports |
| streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java | Updated test assertions to use allRunningTasks() instead of allOwnedTasks(); removed changelogPartitions() verification calls |
| streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java | Removed STATE_UPDATER_ENABLED config from tests; updated method calls to allRunningTasks(); fixed grammar in test messages |
| streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java | Removed MockChangelogReader usage; renamed tests for clarity; added verification for store reinitialization |
| streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java | Removed changeLogReader parameter from task creator instantiation |
| streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java | Removed replaceActiveWithStandby() interface method |
| streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java | Removed replaceActiveWithStandby() implementation |
| streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java | Removed all state-updater conditional checks; simplified task handling to always use state updater path; renamed allOwnedTasks() to allRunningTasks() |
| streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java | Removed stateUpdaterEnabled field and all related conditional logic; createStateUpdater() now always creates a state updater; simplified polling and client instance ID logic |
| streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java | Removed stateUpdaterEnabled field; always use normal poll time instead of conditional non-blocking poll |
| streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java | Removed stateUpdaterEnabled parameter from startup task initialization |
| streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java | Removed ChangelogReader and stateUpdaterEnabled parameters from constructor and task creation |
| streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java | Removed ChangelogRegister and stateUpdaterEnabled parameters; removed changelog registration/unregistration logic |
| streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java | Removed ChangelogReader and stateUpdaterEnabled parameters from constructor and task creation |
| streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java | Removed STATE_UPDATER_ENABLED configuration constant and stateUpdaterEnabled() method |
| committer-tools/README.md | Updated example test output to reflect removal of stateUpdaterEnabled test parameter |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| stateUpdater.standbyTasks().stream(), | ||
| standbyTasksInTaskRegistry | ||
| ); | ||
|
|
Copilot
AI
Dec 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an unnecessary blank line before the closing brace. Consider removing it for consistent code formatting.
| // other useful work while waiting for the join response | ||
| records = pollRequests(Duration.ZERO); | ||
| } else if (state == State.RUNNING || state == State.STARTING || (state == State.PARTITIONS_ASSIGNED && stateUpdaterEnabled)) { | ||
| } else if (state == State.RUNNING || state == State.STARTING || (state == State.PARTITIONS_ASSIGNED)) { |
Copilot
AI
Dec 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parentheses around (state == State.PARTITIONS_ASSIGNED) are unnecessary. The condition can be simplified to state == State.RUNNING || state == State.STARTING || state == State.PARTITIONS_ASSIGNED for consistency with the rest of the codebase.
| } else if (state == State.RUNNING || state == State.STARTING || (state == State.PARTITIONS_ASSIGNED)) { | |
| } else if (state == State.RUNNING || state == State.STARTING || state == State.PARTITIONS_ASSIGNED) { |
| @matrix(from_version=[str(LATEST_3_7), str(LATEST_3_8), str(LATEST_3_9), str(LATEST_4_0), str(LATEST_4_1), str(LATEST_4_2)], | ||
| upgrade=[True, False], | ||
| metadata_quorum=[quorum.combined_kraft]) | ||
| def test_upgrade_downgrade_state_updater(self, from_version, metadata_quorum): |
Copilot
AI
Dec 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method signature is missing the upgrade parameter that is declared in the @matrix decorator (line 178) and used in the method body (line 191). The signature should be def test_upgrade_downgrade_state_updater(self, from_version, upgrade, metadata_quorum):.
| def test_upgrade_downgrade_state_updater(self, from_version, metadata_quorum): | |
| def test_upgrade_downgrade_state_updater(self, from_version, upgrade, metadata_quorum): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Silly me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shashankhs11 Could you please run the system test using the instructions in the tests/README.md to make sure we are not breaking them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, sure! I ran the tests and here is the test report:
Image 1
Image 2 -- Continued
Here, we see the tests with version = LATEST_4_2 fail, which seems to be expected since version 4.2 is not officially released yet.
I am thinking that we can temporarily remove the LATEST_4_2 from the matrix for this PR and I can create a follow-up PR adding just LATEST_4_2 which can be merged once 4.2 is officially released. Please advise.
| // If we are updating only standby tasks, and are not using a separate thread, we should | ||
| // use a non-blocking poll to unblock the processing as soon as possible. |
Copilot
AI
Dec 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment is now outdated. Since the state updater is always enabled (running in a separate thread), the comment about "not using a separate thread" and "non-blocking poll" is no longer relevant. This comment should be updated or removed.
| // If we are updating only standby tasks, and are not using a separate thread, we should | |
| // use a non-blocking poll to unblock the processing as soon as possible. | |
| activeRunningTaskStream(), | ||
| stateUpdater.tasks().stream().filter(Task::isActive) | ||
| ); | ||
|
|
Copilot
AI
Dec 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an unnecessary blank line before the closing brace. Consider removing it for consistent code formatting.
| final TaskId taskId = entry.getKey(); | ||
| final boolean taskIsOwned = tasks.allTaskIds().contains(taskId) | ||
| || (stateUpdater != null && stateUpdater.tasks().stream().anyMatch(task -> task.id() == taskId)); | ||
| || (stateUpdater.tasks().stream().anyMatch(task -> task.id() == taskId)); |
Copilot
AI
Dec 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comparison task.id() == taskId uses reference equality (==) instead of .equals() for comparing TaskId objects. This should be task.id().equals(taskId) to ensure correct object equality comparison.
| || (stateUpdater.tasks().stream().anyMatch(task -> task.id() == taskId)); | |
| || (stateUpdater.tasks().stream().anyMatch(task -> task.id().equals(taskId))); |
| * Returns tasks owned by the stream thread. With state updater disabled, these are all tasks. With | ||
| * state updater enabled, this does not return any tasks currently owned by the state updater. |
Copilot
AI
Dec 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation comment is now outdated. Since the state updater is always enabled, the comment should be updated to reflect the current behavior: "Returns tasks owned by the stream thread. This does not return any tasks currently owned by the state updater."
| * Returns tasks owned by the stream thread. With state updater disabled, these are all tasks. With | |
| * state updater enabled, this does not return any tasks currently owned by the state updater. | |
| * Returns tasks owned by the stream thread. | |
| * This does not return any tasks currently owned by the state updater. |
|
Waiting for @mjsax to respond on #21059 (comment) before merging. Also checkout copilot comments @shashankhs11 |
33e3a8a to
9a28455
Compare
Addressed all copilot comments in 9a28455 |
9a28455 to
24a4ea1
Compare
This PR cleans up all non stateupdater code. We also remove the config
__state.updater.enabled__which means users will no longer have theoption to disable the stateupdater.
Reviewers: Lucas Brutschy [email protected]