-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-19851 Delete dynamic config that were removed by Kafka #21053
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,17 +21,48 @@ | |
| import org.apache.kafka.common.config.ConfigResource.Type; | ||
| import org.apache.kafka.common.metadata.ConfigRecord; | ||
| import org.apache.kafka.common.metadata.RemoveTopicRecord; | ||
| import org.apache.kafka.metadata.KafkaConfigSchema; | ||
| import org.apache.kafka.server.common.MetadataVersion; | ||
|
|
||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
| import java.util.Map.Entry; | ||
| import java.util.Set; | ||
| import java.util.function.Supplier; | ||
|
|
||
|
|
||
| /** | ||
| * Represents changes to the configurations in the metadata image. | ||
| */ | ||
| public final class ConfigurationsDelta { | ||
| /** | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We shouldn't need to change this file at all. Remember that each |
||
| * Supplier for KafkaConfigSchema to get valid config names whitelist by resource type. | ||
| */ | ||
| private static volatile Supplier<KafkaConfigSchema> configSchemaSupplier = null; | ||
|
|
||
| /** | ||
| * Set the supplier for KafkaConfigSchema. This should be called during initialization. | ||
| */ | ||
| public static void setConfigSchemaSupplier(Supplier<KafkaConfigSchema> supplier) { | ||
| configSchemaSupplier = supplier; | ||
| } | ||
|
|
||
| /** | ||
| * Get the set of valid configuration names for a given resource type. | ||
| * Returns empty set if configSchema is not initialized. | ||
| */ | ||
| static Set<String> getValidConfigNames(Type resourceType) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we have this method handle |
||
| Supplier<KafkaConfigSchema> supplier = configSchemaSupplier; | ||
| if (supplier == null) { | ||
| return Set.of(); | ||
| } | ||
| KafkaConfigSchema configSchema = supplier.get(); | ||
| if (configSchema == null) { | ||
| return Set.of(); | ||
| } | ||
| return configSchema.validConfigNames(resourceType); | ||
|
Comment on lines
+55
to
+63
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what about something like |
||
| } | ||
|
|
||
| private final ConfigurationsImage image; | ||
| private final Map<ConfigResource, ConfigurationDelta> changes = new HashMap<>(); | ||
|
|
||
|
|
@@ -58,8 +89,18 @@ public void handleMetadataVersionChange(MetadataVersion newVersion) { | |
| } | ||
|
|
||
| public void replay(ConfigRecord record) { | ||
| // Filter out invalid configs when building the image | ||
| Type resourceType = Type.forId(record.resourceType()); | ||
| if (resourceType != Type.UNKNOWN) { | ||
| Set<String> validConfigNames = getValidConfigNames(resourceType); | ||
| if (!validConfigNames.isEmpty() && !validConfigNames.contains(record.name())) { | ||
| // Ignore this record | ||
| return; | ||
| } | ||
| } | ||
|
|
||
| ConfigResource resource = | ||
| new ConfigResource(Type.forId(record.resourceType()), record.resourceName()); | ||
| new ConfigResource(resourceType, record.resourceName()); | ||
| ConfigurationImage configImage = image.resourceData().getOrDefault(resource, | ||
| new ConfigurationImage(resource, Map.of())); | ||
| ConfigurationDelta delta = changes.computeIfAbsent(resource, | ||
|
|
@@ -84,7 +125,11 @@ public ConfigurationsImage apply() { | |
| ConfigResource resource = entry.getKey(); | ||
| ConfigurationDelta delta = changes.get(resource); | ||
| if (delta == null) { | ||
| newData.put(resource, entry.getValue()); | ||
| // Filter invalid configs from the base image | ||
| ConfigurationImage filteredImage = filterBaseImage(entry.getValue()); | ||
| if (!filteredImage.isEmpty()) { | ||
| newData.put(resource, filteredImage); | ||
| } | ||
| } else { | ||
| ConfigurationImage newImage = delta.apply(); | ||
| if (!newImage.isEmpty()) { | ||
|
|
@@ -103,6 +148,24 @@ public ConfigurationsImage apply() { | |
| return new ConfigurationsImage(newData); | ||
| } | ||
|
|
||
| private ConfigurationImage filterBaseImage(ConfigurationImage baseImage) { | ||
| Type resourceType = baseImage.resource().type(); | ||
| Set<String> validConfigNames = resourceType != Type.UNKNOWN ? | ||
| getValidConfigNames(resourceType) : Set.of(); | ||
|
|
||
| if (validConfigNames.isEmpty()) { | ||
| return baseImage; | ||
| } | ||
|
|
||
| Map<String, String> filteredData = new HashMap<>(); | ||
| for (Entry<String, String> entry : baseImage.data().entrySet()) { | ||
| if (validConfigNames.contains(entry.getKey())) { | ||
| filteredData.put(entry.getKey(), entry.getValue()); | ||
| } | ||
| } | ||
| return new ConfigurationImage(baseImage.resource(), filteredData); | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return "ConfigurationsDelta(" + | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is a better way to implement this than trying to clean these configs up in every place where we replay a ConfigRecord. This approach is disjoint and not very maintainable.