Skip to content

Commit

Permalink
GH-3466: Optimize KafkaAdmin creation in KafkaTemplate (#3471)
Browse files Browse the repository at this point in the history
* GH-3466: Optimize KafkaAdmin creation in KafkaTemplate

Fixes: #3466

#3466

Improve bootstrap-server config comparison to avoid unnecessary
KafkaAdmin recreation. This addresses inconsistencies between
List<String> and String configurations for bootstrap servers.

The change ensures that List versions of bootstrap-server configs are
converted to regular Strings by removing brackets. This allows for
consistent comparison between producer and admin configurations.

This optimization is particularly relevant for Spring Boot scenarios
where configs may be provided in different formats but represent the
same underlying values.

* Addressing PR review

(cherry picked from commit 1d4f7f9)
  • Loading branch information
sobychacko authored and spring-builds committed Aug 30, 2024
1 parent 64ce024 commit 552d456
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
Expand Down Expand Up @@ -485,8 +486,9 @@ public void afterSingletonsInstantiated() {
if (this.kafkaAdmin == null) {
this.kafkaAdmin = this.applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique();
if (this.kafkaAdmin != null) {
Object producerServers = this.producerFactory.getConfigurationProperties()
.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
String producerServers = this.producerFactory.getConfigurationProperties()
.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).toString();
producerServers = removeLeadingAndTrailingBrackets(producerServers);
String adminServers = getAdminBootstrapAddress();
if (!producerServers.equals(adminServers)) {
Map<String, Object> props = new HashMap<>(this.kafkaAdmin.getConfigurationProperties());
Expand All @@ -510,16 +512,14 @@ else if (this.micrometerEnabled) {
private String getAdminBootstrapAddress() {
// Retrieve bootstrap servers from KafkaAdmin bootstrap supplier if available
String adminServers = this.kafkaAdmin.getBootstrapServers();

// Fallback to configuration properties if bootstrap servers are not set
if (adminServers == null) {
adminServers = this.kafkaAdmin.getConfigurationProperties().getOrDefault(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
""
).toString();
}

return adminServers;
return removeLeadingAndTrailingBrackets(adminServers);
}

@Nullable
Expand Down Expand Up @@ -1007,6 +1007,10 @@ public void destroy() {
}
}

private static String removeLeadingAndTrailingBrackets(String str) {
return StringUtils.trimTrailingCharacter(StringUtils.trimLeadingCharacter(str, '['), ']');
}

@SuppressWarnings("serial")
private static final class SkipAbortException extends RuntimeException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.springframework.lang.Nullable;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import org.springframework.util.StringUtils;

import io.micrometer.common.KeyValues;
import io.micrometer.core.instrument.MeterRegistry;
Expand Down Expand Up @@ -204,7 +205,7 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context)
KeyValues.of("spring.kafka.listener.id", "obs1-0", "baz", "qux"))
.hasTimerWithNameAndTags("spring.kafka.listener", KeyValues.of("spring.kafka.listener.id", "obs2-0"));
assertThat(admin.getConfigurationProperties())
.containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString());
.containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, List.of(broker.getBrokersAsString()));
// producer factory broker different to admin
KafkaAdmin pAdmin = KafkaTestUtils.getPropertyValue(template, "kafkaAdmin", KafkaAdmin.class);
assertThat(pAdmin.getOperationTimeout()).isEqualTo(admin.getOperationTimeout());
Expand Down Expand Up @@ -287,6 +288,14 @@ void observationErrorException(@Autowired ExceptionListener listener, @Autowired
.hasMessage("obs5 error");
}

@Test
void kafkaAdminNotRecreatedIfBootstrapServersSameInProducerAndAdminConfig(
@Autowired @Qualifier("reuseAdminBeanKafkaTemplate") KafkaTemplate<Integer, String> template,
@Autowired KafkaAdmin kafkaAdmin) {
// See this issue for more details: https://github.com/spring-projects/spring-kafka/issues/3466
assertThat(template.getKafkaAdmin()).isSameAs(kafkaAdmin);
}

@Configuration
@EnableKafka
public static class Config {
Expand All @@ -295,20 +304,30 @@ public static class Config {

@Bean
KafkaAdmin admin(EmbeddedKafkaBroker broker) {
String[] brokers = StringUtils.commaDelimitedListToStringArray(broker.getBrokersAsString());
List<String> brokersAsList = Arrays.asList(brokers);
KafkaAdmin admin = new KafkaAdmin(
Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()));
Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokersAsList));
admin.setOperationTimeout(42);
return admin;
}

@Bean
@Primary
ProducerFactory<Integer, String> producerFactory(EmbeddedKafkaBroker broker) {
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString() + ","
+ broker.getBrokersAsString());
return new DefaultKafkaProducerFactory<>(producerProps);
}

@Bean
ProducerFactory<Integer, String> customProducerFactory(EmbeddedKafkaBroker broker) {
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString());
return new DefaultKafkaProducerFactory<>(producerProps);
}

@Bean
ConsumerFactory<Integer, String> consumerFactory(EmbeddedKafkaBroker broker) {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("obs", "false", broker);
Expand Down Expand Up @@ -340,6 +359,14 @@ KafkaTemplate<Integer, String> throwableTemplate(ProducerFactory<Integer, String
return template;
}

@Bean
KafkaTemplate<Integer, String> reuseAdminBeanKafkaTemplate(
@Qualifier("customProducerFactory") ProducerFactory<Integer, String> pf) {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setObservationEnabled(true);
return template;
}

@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(
ConsumerFactory<Integer, String> cf) {
Expand Down

0 comments on commit 552d456

Please sign in to comment.