Skip to content

Commit

Permalink
GH-3483: Allow Overriding KafkaAdmin#createAdmin()
Browse files Browse the repository at this point in the history
Fixes: #3483 

#3483

- Refactor `KafkaAdmin` to use `Admin` interface
- Change `createAdmin()` method to protected visibility
- Update the return type to `org.apache.kafka.clients.admin.Admin`
- Modify `KafkaAdmin` to use `Admin` interface instead of `AdminClient` class
  • Loading branch information
anders-swanson authored Oct 4, 2024
1 parent a12edfb commit 8da1aa3
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,13 @@ For more details, see xref:kafka/sending-messages.adoc[Sending Messages] section

When using `DeadLetterPublishingRecovererFactory`, the user applications can override the `maybeLogListenerException` method to customize the logging behavior.

[[x33-customize-admin-client-in-KafkaAdmin]]
=== Customize Admin client in KafkaAdmin

When extending `KafkaAdmin`, user applications may override the `createAdmin` method to customize Admin client creation.

[[x33-customize-kafka-streams-implementation]]
=== Customizing The Implementation of Kafka Streams

When using `KafkaStreamsCustomizer` it is now possible to return a custom implementation of the `KafkaStreams` object by overriding the `initKafkaStreams` method.
When using `KafkaStreamsCustomizer` it is now possible to return a custom implementation of the `KafkaStreams` object by overriding the `initKafkaStreams` method.

Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.stream.Collectors;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigOp;
Expand Down Expand Up @@ -68,14 +69,15 @@
import org.springframework.util.Assert;

/**
* An admin that delegates to an {@link AdminClient} to create topics defined
* An admin that delegates to an {@link Admin} to create topics defined
* in the application context.
*
* @author Gary Russell
* @author Artem Bilan
* @author Adrian Gygax
* @author Sanghyeok An
* @author Valentina Armenise
* @author Anders Swanson
*
* @since 1.3
*/
Expand Down Expand Up @@ -114,9 +116,9 @@ public class KafkaAdmin extends KafkaResourceFactory
private String clusterId;

/**
* Create an instance with an {@link AdminClient} based on the supplied
* Create an instance with an {@link Admin} based on the supplied
* configuration.
* @param config the configuration for the {@link AdminClient}.
* @param config the configuration for the {@link Admin}.
*/
public KafkaAdmin(Map<String, Object> config) {
this.configs = new HashMap<>(config);
Expand Down Expand Up @@ -251,7 +253,7 @@ public void afterSingletonsInstantiated() {
public final boolean initialize() {
Collection<NewTopic> newTopics = newTopics();
if (!newTopics.isEmpty()) {
AdminClient adminClient = null;
Admin adminClient = null;
try {
adminClient = createAdmin();
}
Expand Down Expand Up @@ -347,7 +349,7 @@ protected Collection<NewTopic> newTopics() {
@Nullable
public String clusterId() {
if (this.clusterId == null) {
try (AdminClient client = createAdmin()) {
try (Admin client = createAdmin()) {
this.clusterId = client.describeCluster().clusterId().get(this.operationTimeout, TimeUnit.SECONDS);
if (this.clusterId == null) {
this.clusterId = "null";
Expand All @@ -365,14 +367,14 @@ public String clusterId() {

@Override
public void createOrModifyTopics(NewTopic... topics) {
try (AdminClient client = createAdmin()) {
try (Admin client = createAdmin()) {
addOrModifyTopicsIfNeeded(client, Arrays.asList(topics));
}
}

@Override
public Map<String, TopicDescription> describeTopics(String... topicNames) {
try (AdminClient admin = createAdmin()) {
try (Admin admin = createAdmin()) {
Map<String, TopicDescription> results = new HashMap<>();
DescribeTopicsResult topics = admin.describeTopics(Arrays.asList(topicNames));
try {
Expand All @@ -389,7 +391,13 @@ public Map<String, TopicDescription> describeTopics(String... topicNames) {
}
}

AdminClient createAdmin() {
/**
* Creates a new {@link Admin} client instance using the {@link AdminClient} class.
* @return the new {@link Admin} client instance.
* @since 3.3.0
* @see AdminClient#create(Map)
*/
protected Admin createAdmin() {
return AdminClient.create(getAdminConfig());
}

Expand All @@ -409,7 +417,7 @@ protected Map<String, Object> getAdminConfig() {
return configs2;
}

private void addOrModifyTopicsIfNeeded(AdminClient adminClient, Collection<NewTopic> topics) {
private void addOrModifyTopicsIfNeeded(Admin adminClient, Collection<NewTopic> topics) {
if (!topics.isEmpty()) {
Map<String, NewTopic> topicNameToTopic = new HashMap<>();
topics.forEach(t -> topicNameToTopic.compute(t.name(), (k, v) -> t));
Expand Down Expand Up @@ -439,7 +447,7 @@ private void addOrModifyTopicsIfNeeded(AdminClient adminClient, Collection<NewTo
}

private Map<ConfigResource, List<ConfigEntry>> checkTopicsForConfigMismatches(
AdminClient adminClient, Collection<NewTopic> topics) {
Admin adminClient, Collection<NewTopic> topics) {

List<ConfigResource> configResources = topics.stream()
.map(topic -> new ConfigResource(Type.TOPIC, topic.name()))
Expand Down Expand Up @@ -484,7 +492,7 @@ private Map<ConfigResource, List<ConfigEntry>> checkTopicsForConfigMismatches(
}
}

private void adjustConfigMismatches(AdminClient adminClient, Collection<NewTopic> topics,
private void adjustConfigMismatches(Admin adminClient, Collection<NewTopic> topics,
Map<ConfigResource, List<ConfigEntry>> mismatchingConfigs) {
for (Map.Entry<ConfigResource, List<ConfigEntry>> mismatchingConfigsOfTopic : mismatchingConfigs.entrySet()) {
ConfigResource topicConfigResource = mismatchingConfigsOfTopic.getKey();
Expand Down Expand Up @@ -556,7 +564,7 @@ else if (topic.numPartitions() > topicDescription.partitions().size()) {
return topicsToModify;
}

private void addTopics(AdminClient adminClient, List<NewTopic> topicsToAdd) {
private void addTopics(Admin adminClient, List<NewTopic> topicsToAdd) {
CreateTopicsResult topicResults = adminClient.createTopics(topicsToAdd);
try {
topicResults.all().get(this.operationTimeout, TimeUnit.SECONDS);
Expand All @@ -579,7 +587,7 @@ private void addTopics(AdminClient adminClient, List<NewTopic> topicsToAdd) {
}
}

private void createMissingPartitions(AdminClient adminClient, Map<String, NewPartitions> topicsToModify) {
private void createMissingPartitions(Admin adminClient, Map<String, NewPartitions> topicsToModify) {
CreatePartitionsResult partitionsResult = adminClient.createPartitions(topicsToModify);
try {
partitionsResult.all().get(this.operationTimeout, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.atomic.AtomicReference;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigOp;
Expand Down Expand Up @@ -69,6 +70,7 @@
/**
* @author Gary Russell
* @author Adrian Gygax
* @author Anders Swanson
*
* @since 1.3
*/
Expand Down Expand Up @@ -286,7 +288,7 @@ void nullClusterId() {
KafkaAdmin admin = new KafkaAdmin(Map.of()) {

@Override
AdminClient createAdmin() {
protected Admin createAdmin() {
return mock;
}

Expand Down

0 comments on commit 8da1aa3

Please sign in to comment.