-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-19851 Delete dynamic config that were removed by Kafka #21053
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
ahuang98
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partial review, thanks for working on this Tony!
I was wondering if we considered changing the behavior of validate(ConfigResource resource, Map<String, String> newConfigs, Map<String, String> existingConfigs); to log vs throw when there is an illegal existing config?
| * Get the set of valid configuration names for a given resource type. | ||
| * Returns empty set if configSchema is not initialized. | ||
| */ | ||
| static Set<String> getValidConfigNames(Type resourceType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have this method handle UNKNOWN type to simplify logic elsewhere?
| Supplier<KafkaConfigSchema> supplier = configSchemaSupplier; | ||
| if (supplier == null) { | ||
| return Set.of(); | ||
| } | ||
| KafkaConfigSchema configSchema = supplier.get(); | ||
| if (configSchema == null) { | ||
| return Set.of(); | ||
| } | ||
| return configSchema.validConfigNames(resourceType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about something like
return Optional.ofNullable(configSchemaSupplier)
.map(Supplier::get)
.map(schema -> schema.validConfigNames(resourceType))
.orElse(Set.of());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking this on @0xffff-zhiyan. I just have some high level comments for your and @ahuang98's consideration. After doing some thinking on this, the issue is more complex than it appears if we want to actually remove these records from the metadata partition.
I was wondering if we considered changing the behavior of validate(ConfigResource resource, Map<String, String> newConfigs, Map<String, String> existingConfigs); to log vs throw when there is an illegal existing config?
I think the main question we want to answer is:
- Do we want kafka to even remove these now-invalid configs from the cluster metadata partition as part of this PR?
What happens if the user wants to downgrade to a lower version? If we "clean up" config records, downgrading the software now results in lost metadata. In my opinion, the lossy downgrade case is enough to convince me kafka should not proactively clean up these configs.
Ultimately, what we want is the active controller to gracefully handle an ALTER_CONFIG after upgrading to a software version which may make some of its existing config state unknown/invalid to the new software version.
The most simple approach is to not validate dynamic configs that are not known to kafka. This matches what we do for static configs, as you can add unknown configs to the .properties file and it will not impact kafka. However, because we persist this config state to disk via the metadata partition, it is a problem to allow arbitrary config updates.
If the controller is to keep returning an error like it does today in this state, it should return an error that lists all the now-invalid configs so it is straightforward for the user to clean them up. This error should also let the user know these configs are invalid because they are unrecognized by the current kafka software version. This means the user becomes aware that a downgrade would be lossy if they delete these configs.
| * Represents changes to the configurations in the metadata image. | ||
| */ | ||
| public final class ConfigurationsDelta { | ||
| /** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't need to change this file at all. Remember that each ConfigurationDelta object will contain the current configuration image for a given ConfigResource, as well as its deltas. When we call ConfigurationDelta#apply, that is the only place we need to make changes.
| */ | ||
| public void replay(ConfigRecord record) { | ||
| Type type = Type.forId(record.resourceType()); | ||
| // Filter out invalid configs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is a better way to implement this than trying to clean these configs up in every place where we replay a ConfigRecord. This approach is disjoint and not very maintainable.
|
I think I agree that it might have less intended side effects to not delete the unknown configs. About the downgrade scenario, I would assume most cases where we're introducing new topic config, it might involve a new MV, in which case we don't guarantee lossless downgrades (this is besides the point that MV downgrade isn't supported yet). We don't technically gate the actual topic config on MV or major versions, so it is quite possible we lose information unexpectedly on a downgrade.
Seems similar to my question on if we can just avoid validating the existing configs and prevent new invalid configs from being added. I don't necessarily agree with allowing a user add a bad config - this could become a vulnerability if we don't have a cap on number of configs |
I agree that is a better way.
that way doesn't fix our current issue. the problem is that whenever users add or modify configurations, we throw an exception if there are any invalid configs. Users have to manually remove them all, which is tedious and exactly what we want to improve. Simply informing them about the invalid configs doesn’t really simplify the process, because they still need to clean them up one by one. And they will lose those configs permanently at last. So based on the discussion above, we'd better stop validating existing configs and preventing users from adding invalid configs. My only concern is: Is it really the right approach to let Kafka tolerate configurations that should no longer exist in the metadata? If we ever introduce new logic that handles existing configs in the future, we might have to keep adding code paths that explicitly ignore these existing but invalid configs. That seems like it could gradually accumulate technical debt. If we want the metadata to be clean without losing those configs permanently, is it possible we introduce a new config called |
|
Thanks for the discussion @ahuang98 @0xffff-zhiyan:
I think I misunderstood your original comment. I agree that if we ignore the existing config metadata state and only validate what is contained in the
Dynamic configs that are not known by kafka, just like static configs, shouldn't invalidate the entire config. In this case, they are because |
Problem
https://issues.apache.org/jira/browse/KAFKA-19851
When upgrading from Kafka 3.x to 4.0, the metadata log may contain dynamic configurations that were removed in 4.0 (e.g.,
message.format.versionper KIP-724). These removed configs causeInvalidConfigurationExceptionwhen users attempt to modify any configuration, because validation checks all existing configs including the removed ones.Implementation
Implement whitelist-based filtering to automatically remove invalid/removed configurations during metadata image building and replay:
KafkaConfigSchema.validConfigNames()- Returns whitelist of valid non-internal config names per resource typeConfigurationsDelta- Filter invalid configs when replayingConfigRecordand building final imageConfigurationControlManager- Filter invalid configs during replay to keep controller state cleanGROUPandCLIENT_METRICSConfigDefs toKafkaRaftServerfor complete whitelistChanges
KafkaConfigSchema: AddedvalidConfigNames()methodConfigurationsDelta/ConfigurationDelta: Filter invalid configs inreplay()andapply()ConfigurationControlManager: Filter invalid configs inreplay()KafkaRaftServer: Added GROUP and CLIENT_METRICS ConfigDefsTesting
InvalidConfigurationException