Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/src/main/scala/kafka/server/KafkaRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationF
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble}
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.{ProcessRole, ServerSocketFactory}
import org.apache.kafka.coordinator.group.GroupConfig
import org.apache.kafka.server.config.ServerTopicConfigSynonyms
import org.apache.kafka.server.metrics.ClientMetricsConfigs
import org.apache.kafka.storage.internals.log.{LogConfig, UnifiedLog}
import org.slf4j.Logger

Expand Down Expand Up @@ -189,5 +191,7 @@ object KafkaRaftServer {
val configSchema = new KafkaConfigSchema(Map(
ConfigResource.Type.BROKER -> new ConfigDef(KafkaConfig.configDef),
ConfigResource.Type.TOPIC -> LogConfig.configDefCopy,
ConfigResource.Type.GROUP -> GroupConfig.configDef(),
ConfigResource.Type.CLIENT_METRICS -> ClientMetricsConfigs.configDef(),
).asJava, ServerTopicConfigSynonyms.ALL_TOPIC_CONFIG_SYNONYMS)
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;

import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
Expand Down Expand Up @@ -508,6 +509,16 @@ private List<String> getParts(String value, String key, ConfigResource configRes
*/
public void replay(ConfigRecord record) {
Type type = Type.forId(record.resourceType());
// Filter out invalid configs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a better way to implement this than trying to clean these configs up in every place where we replay a ConfigRecord. This approach is disjoint and not very maintainable.

if (type != Type.UNKNOWN) {
Set<String> validConfigNames = configSchema.validConfigNames(type);
if (!validConfigNames.isEmpty() && !validConfigNames.contains(record.name())) {
// Ignore the record if it's a removed/invalid config
log.debug("Ignoring ConfigRecord for {} with invalid/removed config name: {}",
new ConfigResource(type, record.resourceName()), record.name());
return;
}
}
ConfigResource configResource = new ConfigResource(type, record.resourceName());
TimelineHashMap<String, String> configs = configData.get(configResource);
if (configs == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
import org.apache.kafka.deferred.DeferredEvent;
import org.apache.kafka.deferred.DeferredEventQueue;
import org.apache.kafka.image.ConfigurationsDelta;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
Expand Down Expand Up @@ -1538,6 +1539,8 @@ private QuorumController(
setNodeId(nodeId).
setFeatureControl(featureControl).
build();
// Initialize the config schema supplier for ConfigurationsDelta to filter invalid configs
ConfigurationsDelta.setConfigSchemaSupplier(() -> configSchema);
this.producerIdControlManager = new ProducerIdControlManager.Builder().
setLogContext(logContext).
setSnapshotRegistry(snapshotRegistry).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

package org.apache.kafka.image;

import org.apache.kafka.common.config.ConfigResource.Type;
import org.apache.kafka.common.metadata.ConfigRecord;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;


/**
Expand Down Expand Up @@ -57,7 +59,17 @@ public void deleteAll() {

public ConfigurationImage apply() {
Map<String, String> newData = new HashMap<>(image.data().size());
Type resourceType = image.resource().type();
Set<String> validConfigNames = resourceType != Type.UNKNOWN ?
ConfigurationsDelta.getValidConfigNames(resourceType) : Set.of();

// Filter out invalid configs from the base image
for (Entry<String, String> entry : image.data().entrySet()) {
if (!validConfigNames.isEmpty() &&
!validConfigNames.contains(entry.getKey())) {
continue;
}

Optional<String> change = changes.get(entry.getKey());
if (change == null) {
newData.put(entry.getKey(), entry.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,48 @@
import org.apache.kafka.common.config.ConfigResource.Type;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.server.common.MetadataVersion;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.function.Supplier;


/**
* Represents changes to the configurations in the metadata image.
*/
public final class ConfigurationsDelta {
/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't need to change this file at all. Remember that each ConfigurationDelta object will contain the current configuration image for a given ConfigResource, as well as its deltas. When we call ConfigurationDelta#apply, that is the only place we need to make changes.

* Supplier for KafkaConfigSchema to get valid config names whitelist by resource type.
*/
private static volatile Supplier<KafkaConfigSchema> configSchemaSupplier = null;

/**
* Set the supplier for KafkaConfigSchema. This should be called during initialization.
*/
public static void setConfigSchemaSupplier(Supplier<KafkaConfigSchema> supplier) {
configSchemaSupplier = supplier;
}

/**
* Get the set of valid configuration names for a given resource type.
* Returns empty set if configSchema is not initialized.
*/
static Set<String> getValidConfigNames(Type resourceType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have this method handle UNKNOWN type to simplify logic elsewhere?

Supplier<KafkaConfigSchema> supplier = configSchemaSupplier;
if (supplier == null) {
return Set.of();
}
KafkaConfigSchema configSchema = supplier.get();
if (configSchema == null) {
return Set.of();
}
return configSchema.validConfigNames(resourceType);
Comment on lines +55 to +63
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about something like

return Optional.ofNullable(configSchemaSupplier)
    .map(Supplier::get)
    .map(schema -> schema.validConfigNames(resourceType))
    .orElse(Set.of());

}

private final ConfigurationsImage image;
private final Map<ConfigResource, ConfigurationDelta> changes = new HashMap<>();

Expand All @@ -58,8 +89,18 @@ public void handleMetadataVersionChange(MetadataVersion newVersion) {
}

public void replay(ConfigRecord record) {
// Filter out invalid configs when building the image
Type resourceType = Type.forId(record.resourceType());
if (resourceType != Type.UNKNOWN) {
Set<String> validConfigNames = getValidConfigNames(resourceType);
if (!validConfigNames.isEmpty() && !validConfigNames.contains(record.name())) {
// Ignore this record
return;
}
}

ConfigResource resource =
new ConfigResource(Type.forId(record.resourceType()), record.resourceName());
new ConfigResource(resourceType, record.resourceName());
ConfigurationImage configImage = image.resourceData().getOrDefault(resource,
new ConfigurationImage(resource, Map.of()));
ConfigurationDelta delta = changes.computeIfAbsent(resource,
Expand All @@ -84,7 +125,11 @@ public ConfigurationsImage apply() {
ConfigResource resource = entry.getKey();
ConfigurationDelta delta = changes.get(resource);
if (delta == null) {
newData.put(resource, entry.getValue());
// Filter invalid configs from the base image
ConfigurationImage filteredImage = filterBaseImage(entry.getValue());
if (!filteredImage.isEmpty()) {
newData.put(resource, filteredImage);
}
} else {
ConfigurationImage newImage = delta.apply();
if (!newImage.isEmpty()) {
Expand All @@ -103,6 +148,24 @@ public ConfigurationsImage apply() {
return new ConfigurationsImage(newData);
}

private ConfigurationImage filterBaseImage(ConfigurationImage baseImage) {
Type resourceType = baseImage.resource().type();
Set<String> validConfigNames = resourceType != Type.UNKNOWN ?
getValidConfigNames(resourceType) : Set.of();

if (validConfigNames.isEmpty()) {
return baseImage;
}

Map<String, String> filteredData = new HashMap<>();
for (Entry<String, String> entry : baseImage.data().entrySet()) {
if (validConfigNames.contains(entry.getKey())) {
filteredData.put(entry.getKey(), entry.getValue());
}
}
return new ConfigurationImage(baseImage.resource(), filteredData);
}

@Override
public String toString() {
return "ConfigurationsDelta(" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;

import static org.apache.kafka.common.config.TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG;
Expand Down Expand Up @@ -281,4 +283,26 @@ public int getStaticallyConfiguredMinInsyncReplicas(Map<String, ?> staticNodeCon
minInsyncReplicasString,
ConfigDef.Type.INT);
}

/**
* Get the set of valid dynamic configuration names for a given resource type.
* whitelists:
* - Topic: LogConfig.configDef
* - Broker: KafkaConfig.configDef
* - Group: GroupConfig.configDef
* - ClientMetrics: ClientMetricsConfigs.configDef
*
* @param type The resource type
* @return A set of valid configuration names for the given resource type
*/
public Set<String> validConfigNames(ConfigResource.Type type) {
ConfigDef configDef = configDefs.getOrDefault(type, EMPTY_CONFIG_DEF);
Set<String> validNames = new HashSet<>();
for (ConfigDef.ConfigKey configKey : configDef.configKeys().values()) {
if (!configKey.internalConfig) {
validNames.add(configKey.name);
}
}
return Collections.unmodifiableSet(validNames);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -153,6 +154,33 @@ public void testReplay() {
assertEquals("x,y,z", manager.getTopicConfig(MYTOPIC.name(), "abc").value());
}

@Test
public void testReplayFiltersRemovedConfigs() {
// Create a schema that doesn't include removed configs
Map<ConfigResource.Type, ConfigDef> configDefs = new HashMap<>();
ConfigDef topicConfigDef = new ConfigDef();
topicConfigDef.define("abc", ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, "abc");
configDefs.put(TOPIC, topicConfigDef);
KafkaConfigSchema testSchema = new KafkaConfigSchema(configDefs, Collections.emptyMap());

ConfigurationControlManager manager = new ConfigurationControlManager.Builder().
setKafkaConfigSchema(testSchema).
build();

// Replay a removed config
manager.replay(new ConfigRecord().
setResourceType(TOPIC.id()).setResourceName("mytopic").
setName("removed.config").setValue("value"));
assertEquals(Map.of(), manager.getConfigs(MYTOPIC), "Removed config should not be in configData");

// Replay a valid config
manager.replay(new ConfigRecord().
setResourceType(TOPIC.id()).setResourceName("mytopic").
setName("abc").setValue("x,y,z"));
assertEquals(toMap(entry("abc", "x,y,z")), manager.getConfigs(MYTOPIC),
"Valid config should be in configData");
}

@Test
public void testIncrementalAlterConfigs() {
ConfigurationControlManager manager = new ConfigurationControlManager.Builder().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,31 @@

package org.apache.kafka.image;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.image.writer.RecordListWriter;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.server.common.ApiMessageAndVersion;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
import static org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;


@Timeout(value = 40)
Expand Down Expand Up @@ -137,4 +144,70 @@ private static List<ApiMessageAndVersion> getImageRecords(ConfigurationsImage im
image.write(writer);
return writer.records();
}

@Test
public void testRemovedConfigFiltered() {
// Create a schema that doesn't include removed configs
Map<ConfigResource.Type, ConfigDef> configDefs = new HashMap<>();
ConfigDef topicConfigDef = new ConfigDef();
topicConfigDef.define("valid.config", ConfigDef.Type.LONG, ConfigDef.Importance.MEDIUM, "doc");
configDefs.put(TOPIC, topicConfigDef);
KafkaConfigSchema testSchema = new KafkaConfigSchema(configDefs, Collections.emptyMap());
ConfigurationsDelta.setConfigSchemaSupplier(() -> testSchema);

try {
String testTopic = "test-topic";
String removedConfig = "message.format.version";
String validConfig = "valid.config";
String validConfigValue = "604800000";

// Test 1: Filter removed configs from base image
Map<ConfigResource, ConfigurationImage> initialData = new HashMap<>();
Map<String, String> topicConfigs = new HashMap<>();
topicConfigs.put(removedConfig, "0.10.0");
topicConfigs.put(validConfig, validConfigValue);
initialData.put(new ConfigResource(TOPIC, testTopic),
new ConfigurationImage(new ConfigResource(TOPIC, testTopic), topicConfigs));

ConfigurationsImage initialImage = new ConfigurationsImage(initialData);
ConfigurationsDelta delta = new ConfigurationsDelta(initialImage);
ConfigurationsImage finalImage = delta.apply();

ConfigResource topicResource = new ConfigResource(TOPIC, testTopic);
ConfigurationImage topicConfig = finalImage.resourceData().get(topicResource);
assertNotNull(topicConfig);
assertFalse(topicConfig.data().containsKey(removedConfig), "Removed config should be filtered from base image");
assertTrue(topicConfig.data().containsKey(validConfig), "Valid config should be present");
assertEquals(validConfigValue, topicConfig.data().get(validConfig));

// Test 2: Filter removed configs when replaying records
ConfigurationsDelta delta2 = new ConfigurationsDelta(ConfigurationsImage.EMPTY);
List<ApiMessageAndVersion> records = new ArrayList<>();
records.add(new ApiMessageAndVersion(
new ConfigRecord()
.setResourceType(TOPIC.id())
.setResourceName(testTopic)
.setName(removedConfig)
.setValue("0.10.0"),
CONFIG_RECORD.highestSupportedVersion()));
records.add(new ApiMessageAndVersion(
new ConfigRecord()
.setResourceType(TOPIC.id())
.setResourceName(testTopic)
.setName(validConfig)
.setValue(validConfigValue),
CONFIG_RECORD.highestSupportedVersion()));

RecordTestUtils.replayAll(delta2, records);
ConfigurationsImage finalImage2 = delta2.apply();

ConfigurationImage topicConfig2 = finalImage2.resourceData().get(topicResource);
assertNotNull(topicConfig2);
assertFalse(topicConfig2.data().containsKey(removedConfig), "Removed config should be filtered on replay");
assertTrue(topicConfig2.data().containsKey(validConfig), "Valid config should be present");
assertEquals(1, topicConfig2.data().size(), "Only valid config should remain");
} finally {
ConfigurationsDelta.setConfigSchemaSupplier(null);
}
}
}