Skip to content

Conversation

@0xffff-zhiyan
Copy link
Contributor

Problem

https://issues.apache.org/jira/browse/KAFKA-19851
When upgrading from Kafka 3.x to 4.0, the metadata log may contain dynamic configurations that were removed in 4.0 (e.g., message.format.version per KIP-724). These removed configs cause InvalidConfigurationException when users attempt to modify any configuration, because validation checks all existing configs including the removed ones.

Implementation

Implement whitelist-based filtering to automatically remove invalid/removed configurations during metadata image building and replay:

  1. KafkaConfigSchema.validConfigNames() - Returns whitelist of valid non-internal config names per resource type
  2. Filter in ConfigurationsDelta - Filter invalid configs when replaying ConfigRecord and building final image
  3. Filter in ConfigurationControlManager - Filter invalid configs during replay to keep controller state clean
  4. Add GROUP and CLIENT_METRICS ConfigDefs to KafkaRaftServer for complete whitelist

Changes

  • KafkaConfigSchema: Added validConfigNames() method
  • ConfigurationsDelta/ConfigurationDelta: Filter invalid configs in replay() and apply()
  • ConfigurationControlManager: Filter invalid configs in replay()
  • KafkaRaftServer: Added GROUP and CLIENT_METRICS ConfigDefs
  • Tests: Added unit tests verifying filtering behavior

Testing

  • Removed configs are filtered from base images and during replay
  • Explicitly setting a removed config still triggers InvalidConfigurationException

@github-actions github-actions bot added triage PRs from the community core Kafka Broker kraft labels Dec 2, 2025
Copy link
Contributor

@ahuang98 ahuang98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partial review, thanks for working on this Tony!

I was wondering if we considered changing the behavior of validate(ConfigResource resource, Map<String, String> newConfigs, Map<String, String> existingConfigs); to log vs throw when there is an illegal existing config?

* Get the set of valid configuration names for a given resource type.
* Returns empty set if configSchema is not initialized.
*/
static Set<String> getValidConfigNames(Type resourceType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have this method handle UNKNOWN type to simplify logic elsewhere?

Comment on lines +55 to +63
Supplier<KafkaConfigSchema> supplier = configSchemaSupplier;
if (supplier == null) {
return Set.of();
}
KafkaConfigSchema configSchema = supplier.get();
if (configSchema == null) {
return Set.of();
}
return configSchema.validConfigNames(resourceType);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about something like

return Optional.ofNullable(configSchemaSupplier)
    .map(Supplier::get)
    .map(schema -> schema.validConfigNames(resourceType))
    .orElse(Set.of());

Copy link
Contributor

@kevin-wu24 kevin-wu24 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking this on @0xffff-zhiyan. I just have some high level comments for your and @ahuang98's consideration. After doing some thinking on this, the issue is more complex than it appears if we want to actually remove these records from the metadata partition.

I was wondering if we considered changing the behavior of validate(ConfigResource resource, Map<String, String> newConfigs, Map<String, String> existingConfigs); to log vs throw when there is an illegal existing config?

I think the main question we want to answer is:

  1. Do we want kafka to even remove these now-invalid configs from the cluster metadata partition as part of this PR?

What happens if the user wants to downgrade to a lower version? If we "clean up" config records, downgrading the software now results in lost metadata. In my opinion, the lossy downgrade case is enough to convince me kafka should not proactively clean up these configs.

Ultimately, what we want is the active controller to gracefully handle an ALTER_CONFIG after upgrading to a software version which may make some of its existing config state unknown/invalid to the new software version.

The most simple approach is to not validate dynamic configs that are not known to kafka. This matches what we do for static configs, as you can add unknown configs to the .properties file and it will not impact kafka. However, because we persist this config state to disk via the metadata partition, it is a problem to allow arbitrary config updates.

If the controller is to keep returning an error like it does today in this state, it should return an error that lists all the now-invalid configs so it is straightforward for the user to clean them up. This error should also let the user know these configs are invalid because they are unrecognized by the current kafka software version. This means the user becomes aware that a downgrade would be lossy if they delete these configs.

* Represents changes to the configurations in the metadata image.
*/
public final class ConfigurationsDelta {
/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't need to change this file at all. Remember that each ConfigurationDelta object will contain the current configuration image for a given ConfigResource, as well as its deltas. When we call ConfigurationDelta#apply, that is the only place we need to make changes.

*/
public void replay(ConfigRecord record) {
Type type = Type.forId(record.resourceType());
// Filter out invalid configs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a better way to implement this than trying to clean these configs up in every place where we replay a ConfigRecord. This approach is disjoint and not very maintainable.

@github-actions github-actions bot removed the triage PRs from the community label Dec 5, 2025
@ahuang98
Copy link
Contributor

ahuang98 commented Dec 5, 2025

I think I agree that it might have less intended side effects to not delete the unknown configs. About the downgrade scenario, I would assume most cases where we're introducing new topic config, it might involve a new MV, in which case we don't guarantee lossless downgrades (this is besides the point that MV downgrade isn't supported yet). We don't technically gate the actual topic config on MV or major versions, so it is quite possible we lose information unexpectedly on a downgrade.

The most simple approach is to not validate dynamic configs that are not known to kafka.

Seems similar to my question on if we can just avoid validating the existing configs and prevent new invalid configs from being added. I don't necessarily agree with allowing a user add a bad config - this could become a vulnerability if we don't have a cap on number of configs

@0xffff-zhiyan
Copy link
Contributor Author

0xffff-zhiyan commented Dec 5, 2025

avoid validating the existing configs and prevent new invalid configs from being added

I agree that is a better way.

If the controller is to keep returning an error like it does today in this state, it should return an error that lists all the now-invalid configs so it is straightforward for the user to clean them up.

that way doesn't fix our current issue. the problem is that whenever users add or modify configurations, we throw an exception if there are any invalid configs. Users have to manually remove them all, which is tedious and exactly what we want to improve. Simply informing them about the invalid configs doesn’t really simplify the process, because they still need to clean them up one by one. And they will lose those configs permanently at last.

So based on the discussion above, we'd better stop validating existing configs and preventing users from adding invalid configs.

My only concern is: Is it really the right approach to let Kafka tolerate configurations that should no longer exist in the metadata? If we ever introduce new logic that handles existing configs in the future, we might have to keep adding code paths that explicitly ignore these existing but invalid configs. That seems like it could gradually accumulate technical debt. If we want the metadata to be clean without losing those configs permanently, is it possible we introduce a new config called REMOVED_CONFIG and move all those configs to there?
@ahuang98 @kevin-wu24

@kevin-wu24
Copy link
Contributor

kevin-wu24 commented Dec 5, 2025

Thanks for the discussion @ahuang98 @0xffff-zhiyan:

I don't necessarily agree with allowing a user add a bad config - this could become a vulnerability if we don't have a cap on number of configs

I think I misunderstood your original comment. I agree that if we ignore the existing config metadata state and only validate what is contained in the ALTER_CONFIG request, a given version of kafka's dynamic config will be valid. When going between major versions, the removal from source code of config will not invalidate the existing dynamic config state on the new version of kafka, and allows ALTER_CONFIG to complete. This matches how the static .properties config is validated by kafka.

My only concern is: Is it really the right approach to let Kafka tolerate configurations that should no longer exist in the metadata? If we ever introduce new logic that handles existing configs in the future, we might have to keep adding code paths that explicitly ignore these existing but invalid configs

Dynamic configs that are not known by kafka, just like static configs, shouldn't invalidate the entire config. In this case, they are because ALTER_CONFIG will fail. The argument here is that we should not have been validating the existing dynamic config in the first place, since what is a "valid" (dynamic OR static) configuration depends only on the software version of kafka currently running. If I change software versions, fields in my static .properties file can go from valid -> unknown by kafka, and loading in those unknown configs into KafkaConfig does not throw an exception because they are ignored. We should apply this semantic to the dynamic configuration too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Kafka Broker kraft

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants