diff --git a/docs/configuration.md b/docs/configuration.md index 74fdfafffe..33a4390335 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -181,6 +181,8 @@ This section lists configurations about the authorization. | Name | Description | Range | Default | |-------------------------------------------|--------------------------------------------------------------------------------------------------------|-------------|---------| | kafkaEnableAuthorizationForceGroupIdCheck | Whether to enable authorization force group ID check. Note: It only support for OAuth2 authentication. | true, false | false | +| kopAuthorizationCacheRefreshMs | If it's configured with a positive value N, each connection will cache the authorization results of PRODUCE and FETCH requests for at least N ms.
It could help improve the performance when authorization is enabled, but the permission revoke will also take N ms to take effect. | 1 .. 2147483647 | 30000 | +| kopAuthorizationCacheMaxCountPerConnection | If it's configured with a positive value N, each connection will cache at most N entries for PRODUCE or FETCH requests.
If it's non-positive, the cache size will be the default value. | 1 .. 2147483647 | 100 | ## SSL encryption diff --git a/kafka-impl/pom.xml b/kafka-impl/pom.xml index 694dacf2ff..eaef9656d5 100644 --- a/kafka-impl/pom.xml +++ b/kafka-impl/pom.xml @@ -117,6 +117,12 @@ test-listener test + + + org.awaitility + awaitility + test + diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java index 25fe5f3be0..30931431d0 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java @@ -13,12 +13,14 @@ */ package io.streamnative.pulsar.handlers.kop; +import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.collect.Sets; import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetConfig; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.time.Duration; import java.util.Collections; import java.util.HashSet; import java.util.Properties; @@ -564,6 +566,24 @@ public class KafkaServiceConfiguration extends ServiceConfiguration { ) private boolean skipMessagesWithoutIndex = false; + @FieldContext( + category = CATEGORY_KOP, + doc = "If it's configured with a positive value N, each connection will cache the authorization results " + + "of PRODUCE and FETCH requests for at least N ms.\n" + + "It could help improve the performance when authorization is enabled, but the permission revoke " + + "will also take N ms to take effect.\nDefault: 30000 (30 seconds)" + ) + private int kopAuthorizationCacheRefreshMs = 30000; + + @FieldContext( + category = CATEGORY_KOP, + doc = "If it's configured with a positive value N, each connection will cache at most N " + + "entries for PRODUCE or FETCH requests.\n" + + "Default: 100\n" + + "If it's non-positive, the cache size will be the default value." + ) + private int kopAuthorizationCacheMaxCountPerConnection = 100; + private String checkAdvertisedListeners(String advertisedListeners) { StringBuilder listenersReBuilder = new StringBuilder(); for (String listener : advertisedListeners.split(EndPoint.END_POINT_SEPARATOR)) { @@ -629,4 +649,14 @@ public String getListeners() { return kopAllowedNamespaces; } + public Caffeine getAuthorizationCacheBuilder() { + if (kopAuthorizationCacheRefreshMs <= 0) { + return Caffeine.newBuilder().maximumSize(0); + } else { + int maximumSize = (kopAuthorizationCacheMaxCountPerConnection >= 0) + ? kopAuthorizationCacheMaxCountPerConnection : 100; + return Caffeine.newBuilder().maximumSize(maximumSize) + .expireAfterWrite(Duration.ofMillis(kopAuthorizationCacheRefreshMs)); + } + } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java index cf2337e9ab..eaeb61f2e4 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java @@ -13,12 +13,15 @@ */ package io.streamnative.pulsar.handlers.kop.security.auth; +import com.github.benmanes.caffeine.cache.Cache; import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration; import io.streamnative.pulsar.handlers.kop.security.KafkaPrincipal; import java.util.Objects; import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.lang3.tuple.Triple; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.common.naming.NamespaceName; @@ -39,11 +42,18 @@ public class SimpleAclAuthorizer implements Authorizer { private final AuthorizationService authorizationService; private final boolean forceCheckGroupId; + // Cache the authorization results to avoid authorizing PRODUCE or FETCH requests each time. + // key is (topic, role) + private final Cache, Boolean> produceCache; + // key is (topic, role, group) + private final Cache, Boolean> fetchCache; public SimpleAclAuthorizer(PulsarService pulsarService, KafkaServiceConfiguration config) { this.pulsarService = pulsarService; this.authorizationService = pulsarService.getBrokerService().getAuthorizationService(); this.forceCheckGroupId = config.isKafkaEnableAuthorizationForceGroupIdCheck(); + this.produceCache = config.getAuthorizationCacheBuilder().build(); + this.fetchCache = config.getAuthorizationCacheBuilder().build(); } protected PulsarService getPulsarService() { @@ -151,7 +161,16 @@ public CompletableFuture canGetTopicList(KafkaPrincipal principal, Reso public CompletableFuture canProduceAsync(KafkaPrincipal principal, Resource resource) { checkResourceType(resource, ResourceType.TOPIC); TopicName topicName = TopicName.get(resource.getName()); - return authorizationService.canProduceAsync(topicName, principal.getName(), principal.getAuthenticationData()); + final Pair key = Pair.of(topicName, principal.getName()); + final Boolean authorized = produceCache.getIfPresent(key); + if (authorized != null) { + return CompletableFuture.completedFuture(authorized); + } + return authorizationService.canProduceAsync(topicName, principal.getName(), principal.getAuthenticationData()) + .thenApply(__ -> { + produceCache.put(key, __); + return __; + }); } @Override @@ -161,8 +180,17 @@ public CompletableFuture canConsumeAsync(KafkaPrincipal principal, Reso if (forceCheckGroupId && StringUtils.isBlank(principal.getGroupId())) { return CompletableFuture.completedFuture(false); } + final Triple key = Triple.of(topicName, principal.getName(), principal.getGroupId()); + final Boolean authorized = fetchCache.getIfPresent(key); + if (authorized != null) { + return CompletableFuture.completedFuture(authorized); + } return authorizationService.canConsumeAsync( - topicName, principal.getName(), principal.getAuthenticationData(), principal.getGroupId()); + topicName, principal.getName(), principal.getAuthenticationData(), principal.getGroupId()) + .thenApply(__ -> { + fetchCache.put(key, __); + return __; + }); } @Override diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java index 55af1c9ea6..989846e9e8 100644 --- a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java +++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java @@ -19,8 +19,10 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; +import com.github.benmanes.caffeine.cache.Cache; import com.google.common.collect.Sets; import io.streamnative.pulsar.handlers.kop.utils.ConfigurationUtils; import java.io.File; @@ -31,10 +33,13 @@ import java.io.PrintWriter; import java.net.InetAddress; import java.net.UnknownHostException; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; +import java.util.Objects; import java.util.Properties; import java.util.concurrent.CompletableFuture; +import java.util.stream.IntStream; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.api.DigestType; import org.apache.pulsar.broker.PulsarService; @@ -42,6 +47,7 @@ import org.apache.pulsar.broker.ServiceConfigurationUtils; import org.apache.pulsar.broker.resources.NamespaceResources; import org.apache.pulsar.broker.resources.PulsarResources; +import org.awaitility.Awaitility; import org.testng.annotations.Test; /** @@ -283,4 +289,36 @@ public void testKopMigrationServiceConfiguration() { assertTrue(configuration.isKopMigrationEnable()); assertEquals(port, configuration.getKopMigrationServicePort()); } + + @Test(timeOut = 10000) + public void testKopAuthorizationCache() throws InterruptedException { + KafkaServiceConfiguration configuration = new KafkaServiceConfiguration(); + configuration.setKopAuthorizationCacheRefreshMs(500); + configuration.setKopAuthorizationCacheMaxCountPerConnection(5); + Cache cache = configuration.getAuthorizationCacheBuilder().build(); + for (int i = 0; i < 5; i++) { + assertNull(cache.getIfPresent(1)); + } + for (int i = 0; i < 10; i++) { + cache.put(i, i + 100); + } + Awaitility.await().atMost(Duration.ofMillis(100)).pollInterval(Duration.ofMillis(1)) + .until(() -> IntStream.range(0, 10).mapToObj(cache::getIfPresent) + .filter(Objects::nonNull).count() <= 5); + IntStream.range(0, 10).mapToObj(cache::getIfPresent).filter(Objects::nonNull).map(i -> i - 100).forEach(key -> + assertEquals(cache.getIfPresent(key), Integer.valueOf(key + 100))); + + Thread.sleep(600); // wait until the cache expired + for (int i = 0; i < 10; i++) { + assertNull(cache.getIfPresent(i)); + } + + configuration.setKopAuthorizationCacheRefreshMs(0); + Cache cache2 = configuration.getAuthorizationCacheBuilder().build(); + for (int i = 0; i < 5; i++) { + cache2.put(i, i); + } + Awaitility.await().atMost(Duration.ofMillis(10)).pollInterval(Duration.ofMillis(1)) + .until(() -> IntStream.range(0, 5).mapToObj(cache2::getIfPresent).noneMatch(Objects::nonNull)); + } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationMockTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationMockTest.java index edeada2ce6..e6aa506721 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationMockTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationMockTest.java @@ -13,130 +13,25 @@ */ package io.streamnative.pulsar.handlers.kop.security.auth; -import static org.mockito.Mockito.spy; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; - -import com.google.common.collect.Sets; -import io.jsonwebtoken.SignatureAlgorithm; -import io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase; -import java.time.Duration; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; -import javax.crypto.SecretKey; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.PartitionInfo; -import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; -import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; -import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.impl.auth.AuthenticationToken; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -/** - * Unit test for Authorization with `entryFormat=pulsar`. - */ -public class KafkaAuthorizationMockTest extends KopProtocolHandlerTestBase { - - protected static final String TENANT = "KafkaAuthorizationTest"; - protected static final String NAMESPACE = "ns1"; - private static final SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); - - protected static final String ADMIN_USER = "pass.pass"; +public class KafkaAuthorizationMockTest extends KafkaAuthorizationMockTestBase { @BeforeClass - @Override - protected void setup() throws Exception { - Properties properties = new Properties(); - properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(secretKey)); - - String adminToken = AuthTokenUtils.createToken(secretKey, ADMIN_USER, Optional.empty()); - - conf.setSaslAllowedMechanisms(Sets.newHashSet("PLAIN")); - conf.setKafkaMetadataTenant("internal"); - conf.setKafkaMetadataNamespace("__kafka"); - conf.setKafkaTenant(TENANT); - conf.setKafkaNamespace(NAMESPACE); - - conf.setClusterName(super.configClusterName); - conf.setAuthorizationEnabled(true); - conf.setAuthenticationEnabled(true); - conf.setAuthorizationAllowWildcardsMatching(true); - conf.setAuthorizationProvider(KafkaMockAuthorizationProvider.class.getName()); - conf.setAuthenticationProviders( - Sets.newHashSet(AuthenticationProviderToken.class.getName())); - conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName()); - conf.setBrokerClientAuthenticationParameters("token:" + adminToken); - conf.setProperties(properties); - - super.internalSetup(); + public void setup() throws Exception { + super.setup(); } - @AfterClass - @Override - protected void cleanup() throws Exception { - super.admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()) - .authentication(this.conf.getBrokerClientAuthenticationPlugin(), - this.conf.getBrokerClientAuthenticationParameters()).build()); + @AfterClass(alwaysRun = true) + public void cleanup() throws Exception { + super.cleanup(); } - @Override - protected void createAdmin() throws Exception { - super.admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()) - .authentication(this.conf.getBrokerClientAuthenticationPlugin(), - this.conf.getBrokerClientAuthenticationParameters()).build()); - } - - - @Test(timeOut = 30 * 1000) + @Test(timeOut = 30000) public void testSuperUserProduceAndConsume() throws PulsarAdminException { - String superUserToken = AuthTokenUtils.createToken(secretKey, "pass.pass", Optional.empty()); - String topic = "testSuperUserProduceAndConsumeTopic"; - String fullNewTopicName = "persistent://" + TENANT + "/" + NAMESPACE + "/" + topic; - KProducer kProducer = new KProducer(topic, false, "localhost", getKafkaBrokerPort(), - TENANT + "/" + NAMESPACE, "token:" + superUserToken); - int totalMsgs = 10; - String messageStrPrefix = topic + "_message_"; - - for (int i = 0; i < totalMsgs; i++) { - String messageStr = messageStrPrefix + i; - kProducer.getProducer().send(new ProducerRecord<>(topic, i, messageStr)); - } - KConsumer kConsumer = new KConsumer(topic, "localhost", getKafkaBrokerPort(), false, - TENANT + "/" + NAMESPACE, "token:" + superUserToken, "DemoKafkaOnPulsarConsumer"); - kConsumer.getConsumer().subscribe(Collections.singleton(topic)); - - int i = 0; - while (i < totalMsgs) { - ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofSeconds(1)); - for (ConsumerRecord record : records) { - Integer key = record.key(); - assertEquals(messageStrPrefix + key.toString(), record.value()); - i++; - } - } - assertEquals(i, totalMsgs); - - // no more records - ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofMillis(200)); - assertTrue(records.isEmpty()); - - // ensure that we can list the topic - Map> result = kConsumer.getConsumer().listTopics(Duration.ofSeconds(1)); - assertEquals(result.size(), 1); - assertTrue(result.containsKey(topic), - "list of topics " + result.keySet() + " does not contains " + topic); - - // Cleanup - kProducer.close(); - kConsumer.close(); - admin.topics().deletePartitionedTopic(fullNewTopicName); + super.testSuperUserProduceAndConsume(); } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationMockTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationMockTestBase.java new file mode 100644 index 0000000000..032c538e77 --- /dev/null +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationMockTestBase.java @@ -0,0 +1,136 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.security.auth; + +import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import com.google.common.collect.Sets; +import io.jsonwebtoken.SignatureAlgorithm; +import io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import javax.crypto.SecretKey; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.PartitionInfo; +import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; +import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; + +/** + * Unit test for Authorization with `entryFormat=pulsar`. + */ +public class KafkaAuthorizationMockTestBase extends KopProtocolHandlerTestBase { + + protected static final String TENANT = "KafkaAuthorizationTest"; + protected static final String NAMESPACE = "ns1"; + protected static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); + + protected static final String ADMIN_USER = "pass.pass"; + protected String authorizationProviderClassName = KafkaMockAuthorizationProvider.class.getName(); + + @Override + protected void setup() throws Exception { + Properties properties = new Properties(); + properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY)); + + String adminToken = AuthTokenUtils.createToken(SECRET_KEY, ADMIN_USER, Optional.empty()); + + conf.setSaslAllowedMechanisms(Sets.newHashSet("PLAIN")); + conf.setKafkaMetadataTenant("internal"); + conf.setKafkaMetadataNamespace("__kafka"); + conf.setKafkaTenant(TENANT); + conf.setKafkaNamespace(NAMESPACE); + + conf.setClusterName(super.configClusterName); + conf.setAuthorizationEnabled(true); + conf.setAuthenticationEnabled(true); + conf.setAuthorizationAllowWildcardsMatching(true); + conf.setAuthorizationProvider(authorizationProviderClassName); + conf.setAuthenticationProviders( + Sets.newHashSet(AuthenticationProviderToken.class.getName())); + conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName()); + conf.setBrokerClientAuthenticationParameters("token:" + adminToken); + conf.setProperties(properties); + + super.internalSetup(); + } + + @Override + protected void cleanup() throws Exception { + super.admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()) + .authentication(this.conf.getBrokerClientAuthenticationPlugin(), + this.conf.getBrokerClientAuthenticationParameters()).build()); + } + + @Override + protected void createAdmin() throws Exception { + super.admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()) + .authentication(this.conf.getBrokerClientAuthenticationPlugin(), + this.conf.getBrokerClientAuthenticationParameters()).build()); + } + + public void testSuperUserProduceAndConsume() throws PulsarAdminException { + String superUserToken = AuthTokenUtils.createToken(SECRET_KEY, "pass.pass", Optional.empty()); + String topic = "testSuperUserProduceAndConsumeTopic"; + String fullNewTopicName = "persistent://" + TENANT + "/" + NAMESPACE + "/" + topic; + KProducer kProducer = new KProducer(topic, false, "localhost", getKafkaBrokerPort(), + TENANT + "/" + NAMESPACE, "token:" + superUserToken); + int totalMsgs = 10; + String messageStrPrefix = topic + "_message_"; + + for (int i = 0; i < totalMsgs; i++) { + String messageStr = messageStrPrefix + i; + kProducer.getProducer().send(new ProducerRecord<>(topic, i, messageStr)); + } + KConsumer kConsumer = new KConsumer(topic, "localhost", getKafkaBrokerPort(), false, + TENANT + "/" + NAMESPACE, "token:" + superUserToken, "DemoKafkaOnPulsarConsumer"); + kConsumer.getConsumer().subscribe(Collections.singleton(topic)); + + int i = 0; + while (i < totalMsgs) { + ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofSeconds(1)); + for (ConsumerRecord record : records) { + Integer key = record.key(); + assertEquals(messageStrPrefix + key.toString(), record.value()); + i++; + } + } + assertEquals(i, totalMsgs); + + // no more records + ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofMillis(200)); + assertTrue(records.isEmpty()); + + // ensure that we can list the topic + Map> result = kConsumer.getConsumer().listTopics(Duration.ofSeconds(1)); + assertEquals(result.size(), 1); + assertTrue(result.containsKey(topic), + "list of topics " + result.keySet() + " does not contains " + topic); + + // Cleanup + kProducer.close(); + kConsumer.close(); + admin.topics().deletePartitionedTopic(fullNewTopicName); + } +} diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/SlowAuthorizationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/SlowAuthorizationTest.java new file mode 100644 index 0000000000..c1b1f9202f --- /dev/null +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/SlowAuthorizationTest.java @@ -0,0 +1,112 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.security.auth; + +import java.time.Duration; +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; +import org.apache.pulsar.common.naming.TopicName; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +public class SlowAuthorizationTest extends KafkaAuthorizationMockTestBase { + + @BeforeClass + public void setup() throws Exception { + super.authorizationProviderClassName = SlowMockAuthorizationProvider.class.getName(); + super.setup(); + } + + @AfterClass + public void cleanup() throws Exception { + super.cleanup(); + } + + @Test(timeOut = 60000) + public void testManyMessages() throws Exception { + String superUserToken = AuthTokenUtils.createToken(SECRET_KEY, "normal-user", Optional.empty()); + final String topic = "test-many-messages"; + @Cleanup + final KProducer kProducer = new KProducer(topic, false, "localhost", getKafkaBrokerPort(), + TENANT + "/" + NAMESPACE, "token:" + superUserToken); + long start = System.currentTimeMillis(); + log.info("Before send"); + for (int i = 0; i < 1000; i++) { + kProducer.getProducer().send(new ProducerRecord(topic, "msg-" + i)).get(); + } + log.info("After send ({} ms)", System.currentTimeMillis() - start); + @Cleanup + KConsumer kConsumer = new KConsumer(topic, "localhost", getKafkaBrokerPort(), false, + TENANT + "/" + NAMESPACE, "token:" + superUserToken, "DemoKafkaOnPulsarConsumer"); + kConsumer.getConsumer().subscribe(Collections.singleton(topic)); + int i = 0; + start = System.currentTimeMillis(); + log.info("Before poll"); + while (i < 1000) { + final ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofSeconds(1)); + i += records.count(); + } + log.info("After poll ({} ms)", System.currentTimeMillis() - start); + } + + public static class SlowMockAuthorizationProvider extends KafkaMockAuthorizationProvider { + + @Override + public CompletableFuture isSuperUser(String role, ServiceConfiguration serviceConfiguration) { + return CompletableFuture.completedFuture(role.equals("pass.pass")); + } + + @Override + public CompletableFuture isSuperUser(String role, AuthenticationDataSource authenticationData, + ServiceConfiguration serviceConfiguration) { + return CompletableFuture.completedFuture(role.equals("pass.pass")); + } + + @Override + public CompletableFuture canProduceAsync(TopicName topicName, String role, + AuthenticationDataSource authenticationData) { + return authorizeSlowly(); + } + + @Override + public CompletableFuture canConsumeAsync( + TopicName topicName, String role, AuthenticationDataSource authenticationData, String subscription) { + return authorizeSlowly(); + } + + private static CompletableFuture authorizeSlowly() { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return CompletableFuture.completedFuture(true); + } + + @Override + CompletableFuture roleAuthorizedAsync(String role) { + return CompletableFuture.completedFuture(true); + } + } +} diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/SaslOAuthKopHandlersTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/SaslOAuthKopHandlersTest.java index d261d9c8fb..a0293e5264 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/SaslOAuthKopHandlersTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/SaslOAuthKopHandlersTest.java @@ -103,6 +103,7 @@ protected void setup() throws Exception { conf.setSaslAllowedMechanisms(Sets.newHashSet("OAUTHBEARER")); conf.setKopOauth2AuthenticateCallbackHandler(OauthValidatorCallbackHandler.class.getName()); conf.setKopOauth2ConfigFile("src/test/resources/kop-handler-oauth2.properties"); + conf.setKopAuthorizationCacheRefreshMs(0); super.internalSetup(); } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/SaslOAuthKopHandlersWithMultiTenantTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/SaslOAuthKopHandlersWithMultiTenantTest.java index f8632a712c..ae6a92e94d 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/SaslOAuthKopHandlersWithMultiTenantTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/SaslOAuthKopHandlersWithMultiTenantTest.java @@ -75,6 +75,7 @@ protected void setup() throws Exception { conf.setSaslAllowedMechanisms(Sets.newHashSet("OAUTHBEARER")); conf.setKopOauth2AuthenticateCallbackHandler(OauthValidatorCallbackHandler.class.getName()); conf.setKopOauth2ConfigFile("src/test/resources/kop-handler-oauth2.properties"); + conf.setKopAuthorizationCacheRefreshMs(0); super.internalSetup(); }