Skip to content

Conversation

@shashankhs11
Copy link
Contributor

@shashankhs11 shashankhs11 commented Dec 2, 2025

This PR cleans up all non stateupdater code. We also remove the config
__state.updater.enabled__ which means users will no longer have the
option to disable the stateupdater.

Reviewers: Lucas Brutschy [email protected]

@github-actions github-actions bot added triage PRs from the community streams tools labels Dec 2, 2025
final boolean eosEnabled,
final LogContext logContext,
final StateDirectory stateDirectory,
final ChangelogRegister changelogReader,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also remove changelogReader as suggested here

Comment on lines -1810 to +1727
* TODO: after we complete switching to state updater, we could rename this function as allRunningTasks
* to be differentiated from allTasks including running and restoring tasks
*/
Map<TaskId, Task> allOwnedTasks() {
Map<TaskId, Task> allRunningTasks() {
Copy link
Contributor Author

@shashankhs11 shashankhs11 Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completed TODO: Replaced all occurences of allOwnedTasks with allRunningTasks

@shashankhs11
Copy link
Contributor Author

@lucasbru, tagging for review :)

This is comparatively a larger PR. Since, we are just removing the old code, I thought it would be best if we push in a single PR.

Also, this should, hopefully, be the last PR before we finally say goodbye to the old code path 😄

@github-actions github-actions bot removed the triage PRs from the community label Dec 3, 2025
Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments

} else {
return null;
}
private static StateUpdater maybeCreateStateUpdater(final StreamsMetricsImpl streamsMetrics,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to createStateUpdater

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed in 6cff200

}
}

if (mainConsumerInstanceIdFuture.isDone()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax Is this boolean condition correct like this? Looks wrong to me. Shouldn't it be !stateUpdaterEnabled || restoreConsumerInstanceIdFuture.isDone()?

Copy link
Member

@mjsax mjsax Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the condition is correct. -- But we should not remove this whole if, but only remove line && (!stateUpdaterEnabled && restoreConsumerInstanceIdFuture.isDone()).

The condition is correct, because we only care about restoreConsumerInstanceIdFuture.isDone() here if state-updater is disabled. What we do here is, we check if StreamThread did complete all it's futures, and if it did, we set fetchDeadlineClientInstanceId = -1L; indicating we are done.

If state-updater is enabled, it's not StreamsThread's duty to complete the restoreConsumerInstanceIdFuture, but it's StateUpdater's duty, so we don't care if the future is completed or not, to determine if StreamsThread is done and we can reset the fetch deadline or not.

If we remove this whole block, we would never reset fetch deadline, implying unnecessary busy work to re-check if the futures got completed -- after the futures are completed, there is nothing to be checked any longer, and we want to make maybeGetClientInstanceIds a no-op (even if it might not make a difference in practice, as the overhead to check the futures should be small -- the intention was just to make it "clean")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But suppose you replace !stateUpdaterEnabled with false (so state updater enabled), then this condition will always be false, and we never reset the fetchDeadline. So I think the condition didn't do what was intended. But I agree that we probably shoudn't remove the whole block if this logic you describe was intended.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes... It's a smaller optimization, so not sure if we would need to go back and fix in older versions? I am fine either way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, I fully understand the impact of the optimization, so I will let you judge what's best w.r.t. older versions.

@shashankhs11 Can you please bring back the if block, but without the line that refers to the restoreConsumer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, ofcourse! Done in 24a4ea1

long totalPunctuateLatency = 0L;
if (state == State.RUNNING
|| (stateUpdaterEnabled && isStartingRunningOrPartitionAssigned())) {
if (state == State.RUNNING || isStartingRunningOrPartitionAssigned()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the second condition implies the first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed in 6cff200

@cluster(num_nodes=6)
@matrix(from_version=[str(LATEST_3_2), str(DEV_VERSION)], upgrade=[True, False], metadata_quorum=[quorum.combined_kraft])
def test_upgrade_downgrade_state_updater(self, from_version, upgrade, metadata_quorum):
@matrix(from_version=[str(LATEST_3_7)], metadata_quorum=[quorum.combined_kraft])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, why have you removed the upgrade parameter?
Shouldn't we also test versions 3.8,3.9,4.0,4.1 with state updater disabled via the internal config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right! addressed in 33e3a8a

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copilot finished reviewing on behalf of lucasbru December 4, 2025 11:09
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR removes all legacy code paths related to the non-state-updater implementation in Kafka Streams. The state updater, which handles state restoration on a dedicated thread, is now mandatory and always enabled. The internal configuration option __state.updater.enabled__ has been removed, simplifying the codebase significantly.

Key Changes

  • Removed the __state.updater.enabled__ configuration option and all associated conditional logic
  • Renamed methods for clarity: allOwnedTasks()allRunningTasks(), maybeCreateStateUpdater()createStateUpdater(), handleTasksWithStateUpdater()handleTasks()
  • Cleaned up constructor signatures across multiple classes by removing the now-unused ChangelogRegister and stateUpdaterEnabled parameters

Reviewed changes

Copilot reviewed 18 out of 18 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
tests/kafkatest/tests/streams/streams_upgrade_test.py Updated upgrade/downgrade test to reflect that state updater is always enabled in DEV_VERSION (4.3+); added version matrix from 3.7 to 4.2
streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java Removed unused MockChangelogRegister and related parameters from task setup
streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java Cleaned up test by removing StoreChangelogReader initialization and imports
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java Updated test assertions to use allRunningTasks() instead of allOwnedTasks(); removed changelogPartitions() verification calls
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java Removed STATE_UPDATER_ENABLED config from tests; updated method calls to allRunningTasks(); fixed grammar in test messages
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java Removed MockChangelogReader usage; renamed tests for clarity; added verification for store reinitialization
streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java Removed changeLogReader parameter from task creator instantiation
streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java Removed replaceActiveWithStandby() interface method
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java Removed replaceActiveWithStandby() implementation
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java Removed all state-updater conditional checks; simplified task handling to always use state updater path; renamed allOwnedTasks() to allRunningTasks()
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java Removed stateUpdaterEnabled field and all related conditional logic; createStateUpdater() now always creates a state updater; simplified polling and client instance ID logic
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java Removed stateUpdaterEnabled field; always use normal poll time instead of conditional non-blocking poll
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java Removed stateUpdaterEnabled parameter from startup task initialization
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java Removed ChangelogReader and stateUpdaterEnabled parameters from constructor and task creation
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java Removed ChangelogRegister and stateUpdaterEnabled parameters; removed changelog registration/unregistration logic
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java Removed ChangelogReader and stateUpdaterEnabled parameters from constructor and task creation
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java Removed STATE_UPDATER_ENABLED configuration constant and stateUpdaterEnabled() method
committer-tools/README.md Updated example test output to reflect removal of stateUpdaterEnabled test parameter

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

stateUpdater.standbyTasks().stream(),
standbyTasksInTaskRegistry
);

Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an unnecessary blank line before the closing brace. Consider removing it for consistent code formatting.

Suggested change

Copilot uses AI. Check for mistakes.
// other useful work while waiting for the join response
records = pollRequests(Duration.ZERO);
} else if (state == State.RUNNING || state == State.STARTING || (state == State.PARTITIONS_ASSIGNED && stateUpdaterEnabled)) {
} else if (state == State.RUNNING || state == State.STARTING || (state == State.PARTITIONS_ASSIGNED)) {
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parentheses around (state == State.PARTITIONS_ASSIGNED) are unnecessary. The condition can be simplified to state == State.RUNNING || state == State.STARTING || state == State.PARTITIONS_ASSIGNED for consistency with the rest of the codebase.

Suggested change
} else if (state == State.RUNNING || state == State.STARTING || (state == State.PARTITIONS_ASSIGNED)) {
} else if (state == State.RUNNING || state == State.STARTING || state == State.PARTITIONS_ASSIGNED) {

Copilot uses AI. Check for mistakes.
@matrix(from_version=[str(LATEST_3_7), str(LATEST_3_8), str(LATEST_3_9), str(LATEST_4_0), str(LATEST_4_1), str(LATEST_4_2)],
upgrade=[True, False],
metadata_quorum=[quorum.combined_kraft])
def test_upgrade_downgrade_state_updater(self, from_version, metadata_quorum):
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method signature is missing the upgrade parameter that is declared in the @matrix decorator (line 178) and used in the method body (line 191). The signature should be def test_upgrade_downgrade_state_updater(self, from_version, upgrade, metadata_quorum):.

Suggested change
def test_upgrade_downgrade_state_updater(self, from_version, metadata_quorum):
def test_upgrade_downgrade_state_updater(self, from_version, upgrade, metadata_quorum):

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Silly me

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shashankhs11 Could you please run the system test using the instructions in the tests/README.md to make sure we are not breaking them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sure! I ran the tests and here is the test report:

Image 1

Test_Report_1

Image 2 -- Continued

Test_Report_2

Here, we see the tests with version = LATEST_4_2 fail, which seems to be expected since version 4.2 is not officially released yet.

I am thinking that we can temporarily remove the LATEST_4_2 from the matrix for this PR and I can create a follow-up PR adding just LATEST_4_2 which can be merged once 4.2 is officially released. Please advise.

Comment on lines 489 to 490
// If we are updating only standby tasks, and are not using a separate thread, we should
// use a non-blocking poll to unblock the processing as soon as possible.
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is now outdated. Since the state updater is always enabled (running in a separate thread), the comment about "not using a separate thread" and "non-blocking poll" is no longer relevant. This comment should be updated or removed.

Suggested change
// If we are updating only standby tasks, and are not using a separate thread, we should
// use a non-blocking poll to unblock the processing as soon as possible.

Copilot uses AI. Check for mistakes.
activeRunningTaskStream(),
stateUpdater.tasks().stream().filter(Task::isActive)
);

Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an unnecessary blank line before the closing brace. Consider removing it for consistent code formatting.

Suggested change

Copilot uses AI. Check for mistakes.
final TaskId taskId = entry.getKey();
final boolean taskIsOwned = tasks.allTaskIds().contains(taskId)
|| (stateUpdater != null && stateUpdater.tasks().stream().anyMatch(task -> task.id() == taskId));
|| (stateUpdater.tasks().stream().anyMatch(task -> task.id() == taskId));
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comparison task.id() == taskId uses reference equality (==) instead of .equals() for comparing TaskId objects. This should be task.id().equals(taskId) to ensure correct object equality comparison.

Suggested change
|| (stateUpdater.tasks().stream().anyMatch(task -> task.id() == taskId));
|| (stateUpdater.tasks().stream().anyMatch(task -> task.id().equals(taskId)));

Copilot uses AI. Check for mistakes.
Comment on lines 1723 to 1724
* Returns tasks owned by the stream thread. With state updater disabled, these are all tasks. With
* state updater enabled, this does not return any tasks currently owned by the state updater.
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation comment is now outdated. Since the state updater is always enabled, the comment should be updated to reflect the current behavior: "Returns tasks owned by the stream thread. This does not return any tasks currently owned by the state updater."

Suggested change
* Returns tasks owned by the stream thread. With state updater disabled, these are all tasks. With
* state updater enabled, this does not return any tasks currently owned by the state updater.
* Returns tasks owned by the stream thread.
* This does not return any tasks currently owned by the state updater.

Copilot uses AI. Check for mistakes.
@lucasbru
Copy link
Member

lucasbru commented Dec 4, 2025

Waiting for @mjsax to respond on #21059 (comment) before merging. Also checkout copilot comments @shashankhs11

@shashankhs11
Copy link
Contributor Author

Also checkout copilot comments

Addressed all copilot comments in 9a28455

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants