Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.ByteArrayInputStream;
import java.lang.invoke.MethodHandles;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -53,12 +54,7 @@ public static void fillProperties(SolrZkClient solrClient, Map<String, Object> p
}
}
// fill in aux Kafka env with prefix
env.forEach(
(key, val) -> {
if (key.startsWith(KAFKA_ENV_PREFIX)) {
properties.put(normalizeKafkaEnvKey(key), val);
}
});
addAdditionalKafkaProperties(properties, env, System.getProperties());

// fill in from system properties
for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
Expand All @@ -67,14 +63,6 @@ public static void fillProperties(SolrZkClient solrClient, Map<String, Object> p
properties.put(configKey.getKey(), val);
}
}
// fill in aux Kafka system properties with prefix
System.getProperties()
.forEach(
(key, val) -> {
if (key.toString().startsWith(KAFKA_PROP_PREFIX)) {
properties.put(normalizeKafkaSysPropKey(key.toString()), val);
}
});

Properties zkProps = new Properties();
if (solrClient != null) {
Expand Down Expand Up @@ -117,15 +105,56 @@ public static void fillProperties(SolrZkClient solrClient, Map<String, Object> p
Set<String> keys = new HashSet<>(properties.keySet());
keys.forEach(
key -> {
Object value = properties.get(key);
if (key.startsWith(KAFKA_ENV_PREFIX)) {
properties.put(normalizeKafkaEnvKey(key), properties.remove(key));
properties.remove(key);
putIfNonBlankAndMissing(properties, normalizeKafkaEnvKey(key), value);
} else if (key.startsWith(KAFKA_PROP_PREFIX)) {
properties.put(normalizeKafkaSysPropKey(key), properties.remove(key));
properties.remove(key);
putIfNonBlankAndMissing(properties, normalizeKafkaSysPropKey(key), value);
}
});
}
}

// System properties override environment variables for pass-through Kafka properties.
// Existing explicit keys in properties are preserved.
static void addAdditionalKafkaProperties(
Map<String, Object> properties, Map<String, String> env, Properties sysProps) {
Set<String> envDerivedKeys = new LinkedHashSet<>();
env.forEach(
(key, val) -> {
if (!key.startsWith(KAFKA_ENV_PREFIX)) {
return;
}
String normalized = normalizeKafkaEnvKey(key);
if (!isValidAdditionalProperty(normalized, val)) {
return;
}
Object existingValue = properties.get(normalized);
if (isBlankValue(existingValue)) {
properties.put(normalized, val);
envDerivedKeys.add(normalized);
}
});

sysProps.forEach(
(key, val) -> {
String propKey = key.toString();
if (!propKey.startsWith(KAFKA_PROP_PREFIX)) {
return;
}
String normalized = normalizeKafkaSysPropKey(propKey);
if (!isValidAdditionalProperty(normalized, val)) {
return;
}
Object existingValue = properties.get(normalized);
if (isBlankValue(existingValue) || envDerivedKeys.contains(normalized)) {
properties.put(normalized, val.toString());
}
});
}

public static String normalizeKafkaEnvKey(String key) {
if (key.startsWith(KAFKA_ENV_PREFIX)) {
return key.substring(KAFKA_ENV_PREFIX.length()).toLowerCase(Locale.ROOT).replace('_', '.');
Expand All @@ -142,6 +171,21 @@ public static String normalizeKafkaSysPropKey(String key) {
}
}

private static boolean isValidAdditionalProperty(String key, Object value) {
return key != null && !key.isBlank() && !isBlankValue(value);
}

private static boolean isBlankValue(Object value) {
return value == null || (value instanceof String && ((String) value).isBlank());
}

private static void putIfNonBlankAndMissing(
Map<String, Object> properties, String key, Object value) {
if (isValidAdditionalProperty(key, value) && properties.get(key) == null) {
properties.put(key, value);
}
}

public static void verifyProperties(Map<String, Object> properties) {
if (properties.get(BOOTSTRAP_SERVERS) == null) {
log.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.io.ByteArrayOutputStream;
import java.io.OutputStreamWriter;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -346,6 +347,85 @@ public void testFillProperties_ComplexScenario() throws Exception {
assertEquals("zk-value", properties.get("zk.only.property"));
}

@Test
public void testAddAdditionalKafkaProperties_MapsEnvKey() {
Map<String, Object> properties = new HashMap<>();
Map<String, String> env = Map.of("SOLR_CROSSDC_KAFKA_FOO_BAR", "env-value");

ConfUtil.addAdditionalKafkaProperties(properties, env, new Properties());

assertEquals("env-value", properties.get("foo.bar"));
}

@Test
public void testAddAdditionalKafkaProperties_MapsSysPropKey() {
Map<String, Object> properties = new HashMap<>();
Properties sysProps = new Properties();
sysProps.setProperty("solr.crossdc.kafka.foo.bar", "sys-value");

ConfUtil.addAdditionalKafkaProperties(properties, Collections.emptyMap(), sysProps);

assertEquals("sys-value", properties.get("foo.bar"));
}

@Test
public void testAddAdditionalKafkaProperties_SysPropsOverrideEnv() {
Map<String, Object> properties = new HashMap<>();
Map<String, String> env = Map.of("SOLR_CROSSDC_KAFKA_FOO_BAR", "env-value");
Properties sysProps = new Properties();
sysProps.setProperty("solr.crossdc.kafka.foo.bar", "sys-value");

ConfUtil.addAdditionalKafkaProperties(properties, env, sysProps);

assertEquals("sys-value", properties.get("foo.bar"));
}

@Test
public void testAddAdditionalKafkaProperties_DoesNotOverrideExplicitProperty() {
Map<String, Object> properties = new HashMap<>();
properties.put("foo.bar", "explicit");
Map<String, String> env = Map.of("SOLR_CROSSDC_KAFKA_FOO_BAR", "env-value");
Properties sysProps = new Properties();
sysProps.setProperty("solr.crossdc.kafka.foo.bar", "sys-value");

ConfUtil.addAdditionalKafkaProperties(properties, env, sysProps);

assertEquals("explicit", properties.get("foo.bar"));
}

@Test
public void testAddAdditionalKafkaProperties_IgnoresBlankValuesAndEmptyKeys() {
Map<String, Object> properties = new HashMap<>();
Map<String, String> env = new HashMap<>();
env.put("SOLR_CROSSDC_KAFKA_", "ignored-empty-key");
env.put("SOLR_CROSSDC_KAFKA_FOO_BAR", "");
env.put("SOLR_CROSSDC_KAFKA_BAZ_QUX", " ");
Properties sysProps = new Properties();
sysProps.setProperty("solr.crossdc.kafka.", "ignored-empty-key");
sysProps.setProperty("solr.crossdc.kafka.foo.bar", "");
sysProps.setProperty("solr.crossdc.kafka.baz.qux", " ");

ConfUtil.addAdditionalKafkaProperties(properties, env, sysProps);

assertTrue(properties.isEmpty());
}

@Test
public void testFillProperties_PassThroughPreservedInKafkaCrossDcConfAdditionalProperties() {
Map<String, Object> properties = new HashMap<>();
properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "localhost:9092");
properties.put(KafkaCrossDcConf.TOPIC_NAME, "test-topic");
Map<String, String> env = Map.of("SOLR_CROSSDC_KAFKA_MAX_POLL_RECORDS", "500");
Properties sysProps = new Properties();
sysProps.setProperty("solr.crossdc.kafka.compression.type", "lz4");

ConfUtil.addAdditionalKafkaProperties(properties, env, sysProps);
KafkaCrossDcConf conf = new KafkaCrossDcConf(properties);

assertEquals(500, conf.getAdditionalProperties().get("max.poll.records"));
assertEquals("lz4", conf.getAdditionalProperties().get("compression.type"));
}

// we can't easily modify envvars, test just the key conversion in properties
@Test
public void testUnderscoreToDotsConversion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ environment variables starting with `SOLR_CROSSDC_KAFKA_` or system properties s
`solr.crossdc.kafka.`. For example, to adjust the `max.poll.records` property specific to the Kafka client
library you can set either `SOLR_CROSSDC_KAFKA_MAX_POLL_RECORDS` environment variable or `solr.crossdc.kafka.max.poll.records`
system property.
Property names from environment variables are normalized by stripping the prefix, converting to lowercase, and replacing `_` with `.`.
Values from `solr.crossdc.kafka.` system properties override values from `SOLR_CROSSDC_KAFKA_` environment variables when they map to the same Kafka key.
Already-populated explicit CrossDC settings are preserved and are not overridden by pass-through values.
Blank values and empty normalized keys are ignored.

=== CrossDC Manager

Expand Down