From 1884cfb0675387b6a3ca3cfb9a6e2663630f45b1 Mon Sep 17 00:00:00 2001 From: Kamlendra Date: Tue, 3 Mar 2026 16:14:46 -0800 Subject: [PATCH] SOLR-18062: Support arbitrary Kafka properties via env and sysprops --- .../apache/solr/crossdc/common/ConfUtil.java | 76 ++++++++++++++---- .../solr/crossdc/common/ConfUtilTest.java | 80 +++++++++++++++++++ .../pages/cross-dc-replication.adoc | 4 + 3 files changed, 144 insertions(+), 16 deletions(-) diff --git a/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/ConfUtil.java b/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/ConfUtil.java index 924f2be802ba..ab8318450965 100644 --- a/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/ConfUtil.java +++ b/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/ConfUtil.java @@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream; import java.lang.invoke.MethodHandles; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.Locale; import java.util.Map; import java.util.Properties; @@ -53,12 +54,7 @@ public static void fillProperties(SolrZkClient solrClient, Map p } } // fill in aux Kafka env with prefix - env.forEach( - (key, val) -> { - if (key.startsWith(KAFKA_ENV_PREFIX)) { - properties.put(normalizeKafkaEnvKey(key), val); - } - }); + addAdditionalKafkaProperties(properties, env, System.getProperties()); // fill in from system properties for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) { @@ -67,14 +63,6 @@ public static void fillProperties(SolrZkClient solrClient, Map p properties.put(configKey.getKey(), val); } } - // fill in aux Kafka system properties with prefix - System.getProperties() - .forEach( - (key, val) -> { - if (key.toString().startsWith(KAFKA_PROP_PREFIX)) { - properties.put(normalizeKafkaSysPropKey(key.toString()), val); - } - }); Properties zkProps = new Properties(); if (solrClient != null) { @@ -117,15 +105,56 @@ public static void fillProperties(SolrZkClient solrClient, Map p Set keys = new HashSet<>(properties.keySet()); keys.forEach( key -> { + Object value = properties.get(key); if (key.startsWith(KAFKA_ENV_PREFIX)) { - properties.put(normalizeKafkaEnvKey(key), properties.remove(key)); + properties.remove(key); + putIfNonBlankAndMissing(properties, normalizeKafkaEnvKey(key), value); } else if (key.startsWith(KAFKA_PROP_PREFIX)) { - properties.put(normalizeKafkaSysPropKey(key), properties.remove(key)); + properties.remove(key); + putIfNonBlankAndMissing(properties, normalizeKafkaSysPropKey(key), value); } }); } } + // System properties override environment variables for pass-through Kafka properties. + // Existing explicit keys in properties are preserved. + static void addAdditionalKafkaProperties( + Map properties, Map env, Properties sysProps) { + Set envDerivedKeys = new LinkedHashSet<>(); + env.forEach( + (key, val) -> { + if (!key.startsWith(KAFKA_ENV_PREFIX)) { + return; + } + String normalized = normalizeKafkaEnvKey(key); + if (!isValidAdditionalProperty(normalized, val)) { + return; + } + Object existingValue = properties.get(normalized); + if (isBlankValue(existingValue)) { + properties.put(normalized, val); + envDerivedKeys.add(normalized); + } + }); + + sysProps.forEach( + (key, val) -> { + String propKey = key.toString(); + if (!propKey.startsWith(KAFKA_PROP_PREFIX)) { + return; + } + String normalized = normalizeKafkaSysPropKey(propKey); + if (!isValidAdditionalProperty(normalized, val)) { + return; + } + Object existingValue = properties.get(normalized); + if (isBlankValue(existingValue) || envDerivedKeys.contains(normalized)) { + properties.put(normalized, val.toString()); + } + }); + } + public static String normalizeKafkaEnvKey(String key) { if (key.startsWith(KAFKA_ENV_PREFIX)) { return key.substring(KAFKA_ENV_PREFIX.length()).toLowerCase(Locale.ROOT).replace('_', '.'); @@ -142,6 +171,21 @@ public static String normalizeKafkaSysPropKey(String key) { } } + private static boolean isValidAdditionalProperty(String key, Object value) { + return key != null && !key.isBlank() && !isBlankValue(value); + } + + private static boolean isBlankValue(Object value) { + return value == null || (value instanceof String && ((String) value).isBlank()); + } + + private static void putIfNonBlankAndMissing( + Map properties, String key, Object value) { + if (isValidAdditionalProperty(key, value) && properties.get(key) == null) { + properties.put(key, value); + } + } + public static void verifyProperties(Map properties) { if (properties.get(BOOTSTRAP_SERVERS) == null) { log.error( diff --git a/solr/modules/cross-dc/src/test/org/apache/solr/crossdc/common/ConfUtilTest.java b/solr/modules/cross-dc/src/test/org/apache/solr/crossdc/common/ConfUtilTest.java index 49de459b288a..89de2b7e1553 100644 --- a/solr/modules/cross-dc/src/test/org/apache/solr/crossdc/common/ConfUtilTest.java +++ b/solr/modules/cross-dc/src/test/org/apache/solr/crossdc/common/ConfUtilTest.java @@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream; import java.io.OutputStreamWriter; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -346,6 +347,85 @@ public void testFillProperties_ComplexScenario() throws Exception { assertEquals("zk-value", properties.get("zk.only.property")); } + @Test + public void testAddAdditionalKafkaProperties_MapsEnvKey() { + Map properties = new HashMap<>(); + Map env = Map.of("SOLR_CROSSDC_KAFKA_FOO_BAR", "env-value"); + + ConfUtil.addAdditionalKafkaProperties(properties, env, new Properties()); + + assertEquals("env-value", properties.get("foo.bar")); + } + + @Test + public void testAddAdditionalKafkaProperties_MapsSysPropKey() { + Map properties = new HashMap<>(); + Properties sysProps = new Properties(); + sysProps.setProperty("solr.crossdc.kafka.foo.bar", "sys-value"); + + ConfUtil.addAdditionalKafkaProperties(properties, Collections.emptyMap(), sysProps); + + assertEquals("sys-value", properties.get("foo.bar")); + } + + @Test + public void testAddAdditionalKafkaProperties_SysPropsOverrideEnv() { + Map properties = new HashMap<>(); + Map env = Map.of("SOLR_CROSSDC_KAFKA_FOO_BAR", "env-value"); + Properties sysProps = new Properties(); + sysProps.setProperty("solr.crossdc.kafka.foo.bar", "sys-value"); + + ConfUtil.addAdditionalKafkaProperties(properties, env, sysProps); + + assertEquals("sys-value", properties.get("foo.bar")); + } + + @Test + public void testAddAdditionalKafkaProperties_DoesNotOverrideExplicitProperty() { + Map properties = new HashMap<>(); + properties.put("foo.bar", "explicit"); + Map env = Map.of("SOLR_CROSSDC_KAFKA_FOO_BAR", "env-value"); + Properties sysProps = new Properties(); + sysProps.setProperty("solr.crossdc.kafka.foo.bar", "sys-value"); + + ConfUtil.addAdditionalKafkaProperties(properties, env, sysProps); + + assertEquals("explicit", properties.get("foo.bar")); + } + + @Test + public void testAddAdditionalKafkaProperties_IgnoresBlankValuesAndEmptyKeys() { + Map properties = new HashMap<>(); + Map env = new HashMap<>(); + env.put("SOLR_CROSSDC_KAFKA_", "ignored-empty-key"); + env.put("SOLR_CROSSDC_KAFKA_FOO_BAR", ""); + env.put("SOLR_CROSSDC_KAFKA_BAZ_QUX", " "); + Properties sysProps = new Properties(); + sysProps.setProperty("solr.crossdc.kafka.", "ignored-empty-key"); + sysProps.setProperty("solr.crossdc.kafka.foo.bar", ""); + sysProps.setProperty("solr.crossdc.kafka.baz.qux", " "); + + ConfUtil.addAdditionalKafkaProperties(properties, env, sysProps); + + assertTrue(properties.isEmpty()); + } + + @Test + public void testFillProperties_PassThroughPreservedInKafkaCrossDcConfAdditionalProperties() { + Map properties = new HashMap<>(); + properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "localhost:9092"); + properties.put(KafkaCrossDcConf.TOPIC_NAME, "test-topic"); + Map env = Map.of("SOLR_CROSSDC_KAFKA_MAX_POLL_RECORDS", "500"); + Properties sysProps = new Properties(); + sysProps.setProperty("solr.crossdc.kafka.compression.type", "lz4"); + + ConfUtil.addAdditionalKafkaProperties(properties, env, sysProps); + KafkaCrossDcConf conf = new KafkaCrossDcConf(properties); + + assertEquals(500, conf.getAdditionalProperties().get("max.poll.records")); + assertEquals("lz4", conf.getAdditionalProperties().get("compression.type")); + } + // we can't easily modify envvars, test just the key conversion in properties @Test public void testUnderscoreToDotsConversion() { diff --git a/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc b/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc index 07e1b52ecc11..af4ea4c6ec36 100644 --- a/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc +++ b/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc @@ -140,6 +140,10 @@ environment variables starting with `SOLR_CROSSDC_KAFKA_` or system properties s `solr.crossdc.kafka.`. For example, to adjust the `max.poll.records` property specific to the Kafka client library you can set either `SOLR_CROSSDC_KAFKA_MAX_POLL_RECORDS` environment variable or `solr.crossdc.kafka.max.poll.records` system property. +Property names from environment variables are normalized by stripping the prefix, converting to lowercase, and replacing `_` with `.`. +Values from `solr.crossdc.kafka.` system properties override values from `SOLR_CROSSDC_KAFKA_` environment variables when they map to the same Kafka key. +Already-populated explicit CrossDC settings are preserved and are not overridden by pass-through values. +Blank values and empty normalized keys are ignored. === CrossDC Manager