From 9ab8411ed4eae94e2f7b6868ed816e31afaef483 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sun, 27 Aug 2023 18:56:40 +0800 Subject: [PATCH] Optimize authorization by caching authorization results (#1999) ### Motivation To follow Kafka's behavior, KoP also performs authorization for each PRODUCE or FETCH request. If the custom authorization provider is slow to authorize produce or consume permissions, the performance will be impacted. ### Modifications Introduce caches for authorization: - PRODUCE: (topic, role) -> result - FETCH: (topic, role, group) -> result; Add `SlowAuthorizationTest` to verify the producer and consumer won't be affected significantly by slow authorization. Introduce two configs to configure the cache policy so that revoke permission can work: - kopAuthorizationCacheRefreshMs: the refresh timeout - kopAuthorizationCacheMaxCountPerConnection: the max cache size --- docs/configuration.md | 2 + kafka-impl/pom.xml | 6 + .../kop/KafkaServiceConfiguration.java | 30 ++++ .../security/auth/SimpleAclAuthorizer.java | 32 ++++- .../kop/KafkaServiceConfigurationTest.java | 38 +++++ .../auth/KafkaAuthorizationMockTest.java | 121 ++-------------- .../auth/KafkaAuthorizationMockTestBase.java | 136 ++++++++++++++++++ .../security/auth/SlowAuthorizationTest.java | 112 +++++++++++++++ .../oauth/SaslOAuthKopHandlersTest.java | 1 + ...slOAuthKopHandlersWithMultiTenantTest.java | 1 + 10 files changed, 364 insertions(+), 115 deletions(-) create mode 100644 tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationMockTestBase.java create mode 100644 tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/SlowAuthorizationTest.java diff --git a/docs/configuration.md b/docs/configuration.md index 74fdfafffe..33a4390335 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -181,6 +181,8 @@ This section lists configurations about the authorization. | Name | Description | Range | Default | |-------------------------------------------|--------------------------------------------------------------------------------------------------------|-------------|---------| | kafkaEnableAuthorizationForceGroupIdCheck | Whether to enable authorization force group ID check. Note: It only support for OAuth2 authentication. | true, false | false | +| kopAuthorizationCacheRefreshMs | If it's configured with a positive value N, each connection will cache the authorization results of PRODUCE and FETCH requests for at least N ms.
It could help improve the performance when authorization is enabled, but the permission revoke will also take N ms to take effect. | 1 .. 2147483647 | 30000 | +| kopAuthorizationCacheMaxCountPerConnection | If it's configured with a positive value N, each connection will cache at most N entries for PRODUCE or FETCH requests.
If it's non-positive, the cache size will be the default value. | 1 .. 2147483647 | 100 | ## SSL encryption diff --git a/kafka-impl/pom.xml b/kafka-impl/pom.xml index 694dacf2ff..eaef9656d5 100644 --- a/kafka-impl/pom.xml +++ b/kafka-impl/pom.xml @@ -117,6 +117,12 @@ test-listener test + + + org.awaitility + awaitility + test + diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java index 25fe5f3be0..30931431d0 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java @@ -13,12 +13,14 @@ */ package io.streamnative.pulsar.handlers.kop; +import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.collect.Sets; import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetConfig; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.time.Duration; import java.util.Collections; import java.util.HashSet; import java.util.Properties; @@ -564,6 +566,24 @@ public class KafkaServiceConfiguration extends ServiceConfiguration { ) private boolean skipMessagesWithoutIndex = false; + @FieldContext( + category = CATEGORY_KOP, + doc = "If it's configured with a positive value N, each connection will cache the authorization results " + + "of PRODUCE and FETCH requests for at least N ms.\n" + + "It could help improve the performance when authorization is enabled, but the permission revoke " + + "will also take N ms to take effect.\nDefault: 30000 (30 seconds)" + ) + private int kopAuthorizationCacheRefreshMs = 30000; + + @FieldContext( + category = CATEGORY_KOP, + doc = "If it's configured with a positive value N, each connection will cache at most N " + + "entries for PRODUCE or FETCH requests.\n" + + "Default: 100\n" + + "If it's non-positive, the cache size will be the default value." + ) + private int kopAuthorizationCacheMaxCountPerConnection = 100; + private String checkAdvertisedListeners(String advertisedListeners) { StringBuilder listenersReBuilder = new StringBuilder(); for (String listener : advertisedListeners.split(EndPoint.END_POINT_SEPARATOR)) { @@ -629,4 +649,14 @@ public String getListeners() { return kopAllowedNamespaces; } + public Caffeine getAuthorizationCacheBuilder() { + if (kopAuthorizationCacheRefreshMs <= 0) { + return Caffeine.newBuilder().maximumSize(0); + } else { + int maximumSize = (kopAuthorizationCacheMaxCountPerConnection >= 0) + ? kopAuthorizationCacheMaxCountPerConnection : 100; + return Caffeine.newBuilder().maximumSize(maximumSize) + .expireAfterWrite(Duration.ofMillis(kopAuthorizationCacheRefreshMs)); + } + } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java index cf2337e9ab..eaeb61f2e4 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java @@ -13,12 +13,15 @@ */ package io.streamnative.pulsar.handlers.kop.security.auth; +import com.github.benmanes.caffeine.cache.Cache; import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration; import io.streamnative.pulsar.handlers.kop.security.KafkaPrincipal; import java.util.Objects; import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.lang3.tuple.Triple; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.common.naming.NamespaceName; @@ -39,11 +42,18 @@ public class SimpleAclAuthorizer implements Authorizer { private final AuthorizationService authorizationService; private final boolean forceCheckGroupId; + // Cache the authorization results to avoid authorizing PRODUCE or FETCH requests each time. + // key is (topic, role) + private final Cache, Boolean> produceCache; + // key is (topic, role, group) + private final Cache, Boolean> fetchCache; public SimpleAclAuthorizer(PulsarService pulsarService, KafkaServiceConfiguration config) { this.pulsarService = pulsarService; this.authorizationService = pulsarService.getBrokerService().getAuthorizationService(); this.forceCheckGroupId = config.isKafkaEnableAuthorizationForceGroupIdCheck(); + this.produceCache = config.getAuthorizationCacheBuilder().build(); + this.fetchCache = config.getAuthorizationCacheBuilder().build(); } protected PulsarService getPulsarService() { @@ -151,7 +161,16 @@ public CompletableFuture canGetTopicList(KafkaPrincipal principal, Reso public CompletableFuture canProduceAsync(KafkaPrincipal principal, Resource resource) { checkResourceType(resource, ResourceType.TOPIC); TopicName topicName = TopicName.get(resource.getName()); - return authorizationService.canProduceAsync(topicName, principal.getName(), principal.getAuthenticationData()); + final Pair key = Pair.of(topicName, principal.getName()); + final Boolean authorized = produceCache.getIfPresent(key); + if (authorized != null) { + return CompletableFuture.completedFuture(authorized); + } + return authorizationService.canProduceAsync(topicName, principal.getName(), principal.getAuthenticationData()) + .thenApply(__ -> { + produceCache.put(key, __); + return __; + }); } @Override @@ -161,8 +180,17 @@ public CompletableFuture canConsumeAsync(KafkaPrincipal principal, Reso if (forceCheckGroupId && StringUtils.isBlank(principal.getGroupId())) { return CompletableFuture.completedFuture(false); } + final Triple key = Triple.of(topicName, principal.getName(), principal.getGroupId()); + final Boolean authorized = fetchCache.getIfPresent(key); + if (authorized != null) { + return CompletableFuture.completedFuture(authorized); + } return authorizationService.canConsumeAsync( - topicName, principal.getName(), principal.getAuthenticationData(), principal.getGroupId()); + topicName, principal.getName(), principal.getAuthenticationData(), principal.getGroupId()) + .thenApply(__ -> { + fetchCache.put(key, __); + return __; + }); } @Override diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java index 55af1c9ea6..989846e9e8 100644 --- a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java +++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java @@ -19,8 +19,10 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; +import com.github.benmanes.caffeine.cache.Cache; import com.google.common.collect.Sets; import io.streamnative.pulsar.handlers.kop.utils.ConfigurationUtils; import java.io.File; @@ -31,10 +33,13 @@ import java.io.PrintWriter; import java.net.InetAddress; import java.net.UnknownHostException; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; +import java.util.Objects; import java.util.Properties; import java.util.concurrent.CompletableFuture; +import java.util.stream.IntStream; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.api.DigestType; import org.apache.pulsar.broker.PulsarService; @@ -42,6 +47,7 @@ import org.apache.pulsar.broker.ServiceConfigurationUtils; import org.apache.pulsar.broker.resources.NamespaceResources; import org.apache.pulsar.broker.resources.PulsarResources; +import org.awaitility.Awaitility; import org.testng.annotations.Test; /** @@ -283,4 +289,36 @@ public void testKopMigrationServiceConfiguration() { assertTrue(configuration.isKopMigrationEnable()); assertEquals(port, configuration.getKopMigrationServicePort()); } + + @Test(timeOut = 10000) + public void testKopAuthorizationCache() throws InterruptedException { + KafkaServiceConfiguration configuration = new KafkaServiceConfiguration(); + configuration.setKopAuthorizationCacheRefreshMs(500); + configuration.setKopAuthorizationCacheMaxCountPerConnection(5); + Cache cache = configuration.getAuthorizationCacheBuilder().build(); + for (int i = 0; i < 5; i++) { + assertNull(cache.getIfPresent(1)); + } + for (int i = 0; i < 10; i++) { + cache.put(i, i + 100); + } + Awaitility.await().atMost(Duration.ofMillis(100)).pollInterval(Duration.ofMillis(1)) + .until(() -> IntStream.range(0, 10).mapToObj(cache::getIfPresent) + .filter(Objects::nonNull).count() <= 5); + IntStream.range(0, 10).mapToObj(cache::getIfPresent).filter(Objects::nonNull).map(i -> i - 100).forEach(key -> + assertEquals(cache.getIfPresent(key), Integer.valueOf(key + 100))); + + Thread.sleep(600); // wait until the cache expired + for (int i = 0; i < 10; i++) { + assertNull(cache.getIfPresent(i)); + } + + configuration.setKopAuthorizationCacheRefreshMs(0); + Cache cache2 = configuration.getAuthorizationCacheBuilder().build(); + for (int i = 0; i < 5; i++) { + cache2.put(i, i); + } + Awaitility.await().atMost(Duration.ofMillis(10)).pollInterval(Duration.ofMillis(1)) + .until(() -> IntStream.range(0, 5).mapToObj(cache2::getIfPresent).noneMatch(Objects::nonNull)); + } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationMockTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationMockTest.java index edeada2ce6..e6aa506721 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationMockTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationMockTest.java @@ -13,130 +13,25 @@ */ package io.streamnative.pulsar.handlers.kop.security.auth; -import static org.mockito.Mockito.spy; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; - -import com.google.common.collect.Sets; -import io.jsonwebtoken.SignatureAlgorithm; -import io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase; -import java.time.Duration; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; -import javax.crypto.SecretKey; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.PartitionInfo; -import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; -import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; -import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.impl.auth.AuthenticationToken; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -/** - * Unit test for Authorization with `entryFormat=pulsar`. - */ -public class KafkaAuthorizationMockTest extends KopProtocolHandlerTestBase { - - protected static final String TENANT = "KafkaAuthorizationTest"; - protected static final String NAMESPACE = "ns1"; - private static final SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); - - protected static final String ADMIN_USER = "pass.pass"; +public class KafkaAuthorizationMockTest extends KafkaAuthorizationMockTestBase { @BeforeClass - @Override - protected void setup() throws Exception { - Properties properties = new Properties(); - properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(secretKey)); - - String adminToken = AuthTokenUtils.createToken(secretKey, ADMIN_USER, Optional.empty()); - - conf.setSaslAllowedMechanisms(Sets.newHashSet("PLAIN")); - conf.setKafkaMetadataTenant("internal"); - conf.setKafkaMetadataNamespace("__kafka"); - conf.setKafkaTenant(TENANT); - conf.setKafkaNamespace(NAMESPACE); - - conf.setClusterName(super.configClusterName); - conf.setAuthorizationEnabled(true); - conf.setAuthenticationEnabled(true); - conf.setAuthorizationAllowWildcardsMatching(true); - conf.setAuthorizationProvider(KafkaMockAuthorizationProvider.class.getName()); - conf.setAuthenticationProviders( - Sets.newHashSet(AuthenticationProviderToken.class.getName())); - conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName()); - conf.setBrokerClientAuthenticationParameters("token:" + adminToken); - conf.setProperties(properties); - - super.internalSetup(); + public void setup() throws Exception { + super.setup(); } - @AfterClass - @Override - protected void cleanup() throws Exception { - super.admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()) - .authentication(this.conf.getBrokerClientAuthenticationPlugin(), - this.conf.getBrokerClientAuthenticationParameters()).build()); + @AfterClass(alwaysRun = true) + public void cleanup() throws Exception { + super.cleanup(); } - @Override - protected void createAdmin() throws Exception { - super.admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()) - .authentication(this.conf.getBrokerClientAuthenticationPlugin(), - this.conf.getBrokerClientAuthenticationParameters()).build()); - } - - - @Test(timeOut = 30 * 1000) + @Test(timeOut = 30000) public void testSuperUserProduceAndConsume() throws PulsarAdminException { - String superUserToken = AuthTokenUtils.createToken(secretKey, "pass.pass", Optional.empty()); - String topic = "testSuperUserProduceAndConsumeTopic"; - String fullNewTopicName = "persistent://" + TENANT + "/" + NAMESPACE + "/" + topic; - KProducer kProducer = new KProducer(topic, false, "localhost", getKafkaBrokerPort(), - TENANT + "/" + NAMESPACE, "token:" + superUserToken); - int totalMsgs = 10; - String messageStrPrefix = topic + "_message_"; - - for (int i = 0; i < totalMsgs; i++) { - String messageStr = messageStrPrefix + i; - kProducer.getProducer().send(new ProducerRecord<>(topic, i, messageStr)); - } - KConsumer kConsumer = new KConsumer(topic, "localhost", getKafkaBrokerPort(), false, - TENANT + "/" + NAMESPACE, "token:" + superUserToken, "DemoKafkaOnPulsarConsumer"); - kConsumer.getConsumer().subscribe(Collections.singleton(topic)); - - int i = 0; - while (i < totalMsgs) { - ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofSeconds(1)); - for (ConsumerRecord record : records) { - Integer key = record.key(); - assertEquals(messageStrPrefix + key.toString(), record.value()); - i++; - } - } - assertEquals(i, totalMsgs); - - // no more records - ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofMillis(200)); - assertTrue(records.isEmpty()); - - // ensure that we can list the topic - Map> result = kConsumer.getConsumer().listTopics(Duration.ofSeconds(1)); - assertEquals(result.size(), 1); - assertTrue(result.containsKey(topic), - "list of topics " + result.keySet() + " does not contains " + topic); - - // Cleanup - kProducer.close(); - kConsumer.close(); - admin.topics().deletePartitionedTopic(fullNewTopicName); + super.testSuperUserProduceAndConsume(); } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationMockTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationMockTestBase.java new file mode 100644 index 0000000000..032c538e77 --- /dev/null +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationMockTestBase.java @@ -0,0 +1,136 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.security.auth; + +import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import com.google.common.collect.Sets; +import io.jsonwebtoken.SignatureAlgorithm; +import io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import javax.crypto.SecretKey; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.PartitionInfo; +import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; +import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; + +/** + * Unit test for Authorization with `entryFormat=pulsar`. + */ +public class KafkaAuthorizationMockTestBase extends KopProtocolHandlerTestBase { + + protected static final String TENANT = "KafkaAuthorizationTest"; + protected static final String NAMESPACE = "ns1"; + protected static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); + + protected static final String ADMIN_USER = "pass.pass"; + protected String authorizationProviderClassName = KafkaMockAuthorizationProvider.class.getName(); + + @Override + protected void setup() throws Exception { + Properties properties = new Properties(); + properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY)); + + String adminToken = AuthTokenUtils.createToken(SECRET_KEY, ADMIN_USER, Optional.empty()); + + conf.setSaslAllowedMechanisms(Sets.newHashSet("PLAIN")); + conf.setKafkaMetadataTenant("internal"); + conf.setKafkaMetadataNamespace("__kafka"); + conf.setKafkaTenant(TENANT); + conf.setKafkaNamespace(NAMESPACE); + + conf.setClusterName(super.configClusterName); + conf.setAuthorizationEnabled(true); + conf.setAuthenticationEnabled(true); + conf.setAuthorizationAllowWildcardsMatching(true); + conf.setAuthorizationProvider(authorizationProviderClassName); + conf.setAuthenticationProviders( + Sets.newHashSet(AuthenticationProviderToken.class.getName())); + conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName()); + conf.setBrokerClientAuthenticationParameters("token:" + adminToken); + conf.setProperties(properties); + + super.internalSetup(); + } + + @Override + protected void cleanup() throws Exception { + super.admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()) + .authentication(this.conf.getBrokerClientAuthenticationPlugin(), + this.conf.getBrokerClientAuthenticationParameters()).build()); + } + + @Override + protected void createAdmin() throws Exception { + super.admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()) + .authentication(this.conf.getBrokerClientAuthenticationPlugin(), + this.conf.getBrokerClientAuthenticationParameters()).build()); + } + + public void testSuperUserProduceAndConsume() throws PulsarAdminException { + String superUserToken = AuthTokenUtils.createToken(SECRET_KEY, "pass.pass", Optional.empty()); + String topic = "testSuperUserProduceAndConsumeTopic"; + String fullNewTopicName = "persistent://" + TENANT + "/" + NAMESPACE + "/" + topic; + KProducer kProducer = new KProducer(topic, false, "localhost", getKafkaBrokerPort(), + TENANT + "/" + NAMESPACE, "token:" + superUserToken); + int totalMsgs = 10; + String messageStrPrefix = topic + "_message_"; + + for (int i = 0; i < totalMsgs; i++) { + String messageStr = messageStrPrefix + i; + kProducer.getProducer().send(new ProducerRecord<>(topic, i, messageStr)); + } + KConsumer kConsumer = new KConsumer(topic, "localhost", getKafkaBrokerPort(), false, + TENANT + "/" + NAMESPACE, "token:" + superUserToken, "DemoKafkaOnPulsarConsumer"); + kConsumer.getConsumer().subscribe(Collections.singleton(topic)); + + int i = 0; + while (i < totalMsgs) { + ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofSeconds(1)); + for (ConsumerRecord record : records) { + Integer key = record.key(); + assertEquals(messageStrPrefix + key.toString(), record.value()); + i++; + } + } + assertEquals(i, totalMsgs); + + // no more records + ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofMillis(200)); + assertTrue(records.isEmpty()); + + // ensure that we can list the topic + Map> result = kConsumer.getConsumer().listTopics(Duration.ofSeconds(1)); + assertEquals(result.size(), 1); + assertTrue(result.containsKey(topic), + "list of topics " + result.keySet() + " does not contains " + topic); + + // Cleanup + kProducer.close(); + kConsumer.close(); + admin.topics().deletePartitionedTopic(fullNewTopicName); + } +} diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/SlowAuthorizationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/SlowAuthorizationTest.java new file mode 100644 index 0000000000..c1b1f9202f --- /dev/null +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/SlowAuthorizationTest.java @@ -0,0 +1,112 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.security.auth; + +import java.time.Duration; +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; +import org.apache.pulsar.common.naming.TopicName; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +public class SlowAuthorizationTest extends KafkaAuthorizationMockTestBase { + + @BeforeClass + public void setup() throws Exception { + super.authorizationProviderClassName = SlowMockAuthorizationProvider.class.getName(); + super.setup(); + } + + @AfterClass + public void cleanup() throws Exception { + super.cleanup(); + } + + @Test(timeOut = 60000) + public void testManyMessages() throws Exception { + String superUserToken = AuthTokenUtils.createToken(SECRET_KEY, "normal-user", Optional.empty()); + final String topic = "test-many-messages"; + @Cleanup + final KProducer kProducer = new KProducer(topic, false, "localhost", getKafkaBrokerPort(), + TENANT + "/" + NAMESPACE, "token:" + superUserToken); + long start = System.currentTimeMillis(); + log.info("Before send"); + for (int i = 0; i < 1000; i++) { + kProducer.getProducer().send(new ProducerRecord(topic, "msg-" + i)).get(); + } + log.info("After send ({} ms)", System.currentTimeMillis() - start); + @Cleanup + KConsumer kConsumer = new KConsumer(topic, "localhost", getKafkaBrokerPort(), false, + TENANT + "/" + NAMESPACE, "token:" + superUserToken, "DemoKafkaOnPulsarConsumer"); + kConsumer.getConsumer().subscribe(Collections.singleton(topic)); + int i = 0; + start = System.currentTimeMillis(); + log.info("Before poll"); + while (i < 1000) { + final ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofSeconds(1)); + i += records.count(); + } + log.info("After poll ({} ms)", System.currentTimeMillis() - start); + } + + public static class SlowMockAuthorizationProvider extends KafkaMockAuthorizationProvider { + + @Override + public CompletableFuture isSuperUser(String role, ServiceConfiguration serviceConfiguration) { + return CompletableFuture.completedFuture(role.equals("pass.pass")); + } + + @Override + public CompletableFuture isSuperUser(String role, AuthenticationDataSource authenticationData, + ServiceConfiguration serviceConfiguration) { + return CompletableFuture.completedFuture(role.equals("pass.pass")); + } + + @Override + public CompletableFuture canProduceAsync(TopicName topicName, String role, + AuthenticationDataSource authenticationData) { + return authorizeSlowly(); + } + + @Override + public CompletableFuture canConsumeAsync( + TopicName topicName, String role, AuthenticationDataSource authenticationData, String subscription) { + return authorizeSlowly(); + } + + private static CompletableFuture authorizeSlowly() { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return CompletableFuture.completedFuture(true); + } + + @Override + CompletableFuture roleAuthorizedAsync(String role) { + return CompletableFuture.completedFuture(true); + } + } +} diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/SaslOAuthKopHandlersTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/SaslOAuthKopHandlersTest.java index d261d9c8fb..a0293e5264 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/SaslOAuthKopHandlersTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/SaslOAuthKopHandlersTest.java @@ -103,6 +103,7 @@ protected void setup() throws Exception { conf.setSaslAllowedMechanisms(Sets.newHashSet("OAUTHBEARER")); conf.setKopOauth2AuthenticateCallbackHandler(OauthValidatorCallbackHandler.class.getName()); conf.setKopOauth2ConfigFile("src/test/resources/kop-handler-oauth2.properties"); + conf.setKopAuthorizationCacheRefreshMs(0); super.internalSetup(); } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/SaslOAuthKopHandlersWithMultiTenantTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/SaslOAuthKopHandlersWithMultiTenantTest.java index f8632a712c..ae6a92e94d 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/SaslOAuthKopHandlersWithMultiTenantTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/SaslOAuthKopHandlersWithMultiTenantTest.java @@ -75,6 +75,7 @@ protected void setup() throws Exception { conf.setSaslAllowedMechanisms(Sets.newHashSet("OAUTHBEARER")); conf.setKopOauth2AuthenticateCallbackHandler(OauthValidatorCallbackHandler.class.getName()); conf.setKopOauth2ConfigFile("src/test/resources/kop-handler-oauth2.properties"); + conf.setKopAuthorizationCacheRefreshMs(0); super.internalSetup(); }