Skip to content

Commit

Permalink
feat: support producer access mode.
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd committed Jul 7, 2023
1 parent 766db9e commit afc6cb4
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 3 deletions.
15 changes: 13 additions & 2 deletions pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import _pulsar

from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType, \
LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode # noqa: F401
LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode, ProducerAccessMode # noqa: F401

from pulsar.__about__ import __version__

Expand Down Expand Up @@ -523,7 +523,8 @@ def create_producer(self, topic,
properties=None,
batching_type=BatchingType.Default,
encryption_key=None,
crypto_key_reader=None
crypto_key_reader=None,
access_mode=ProducerAccessMode.Shared,
):
"""
Create a new producer on a given topic.
Expand Down Expand Up @@ -614,6 +615,15 @@ def create_producer(self, topic,
crypto_key_reader: CryptoKeyReader, optional
Symmetric encryption class implementation, configuring public key encryption messages for the producer
and private key decryption messages for the consumer
access_mode: ProducerAccessMode, default=ProducerAccessMode.Shared
Set the type of access mode that the producer requires on the topic.
Supported modes:
* Shared: By default multiple producers can publish on a topic.
* Exclusive: Require exclusive access for producer.
Fail immediately if there's already a producer connected.
* WaitForExclusive: Producer creation is pending until it can acquire exclusive access.
* ExclusiveWithFencing: Acquire exclusive access for the producer.
Any existing producer will be removed and invalidated immediately.
"""
_check_type(str, topic, 'topic')
_check_type_or_none(str, producer_name, 'producer_name')
Expand Down Expand Up @@ -649,6 +659,7 @@ def create_producer(self, topic,
conf.batching_type(batching_type)
conf.chunking_enabled(chunking_enabled)
conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
conf.access_mode(access_mode)
if producer_name:
conf.producer_name(producer_name)
if initial_sequence_id:
Expand Down
4 changes: 3 additions & 1 deletion src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,9 @@ void export_config(py::module_& m) {
.def("batching_type", &ProducerConfiguration::setBatchingType, return_value_policy::reference)
.def("batching_type", &ProducerConfiguration::getBatchingType)
.def("encryption_key", &ProducerConfiguration::addEncryptionKey, return_value_policy::reference)
.def("crypto_key_reader", &ProducerConfiguration::setCryptoKeyReader, return_value_policy::reference);
.def("crypto_key_reader", &ProducerConfiguration::setCryptoKeyReader, return_value_policy::reference)
.def("access_mode", &ProducerConfiguration::setAccessMode, return_value_policy::reference)
.def("access_mode", &ProducerConfiguration::getAccessMode, return_value_policy::copy);

class_<BatchReceivePolicy>(m, "BatchReceivePolicy")
.def(init<int, int, long>())
Expand Down
6 changes: 6 additions & 0 deletions src/enums.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ void export_enums(py::module_& m) {
.value("Default", ProducerConfiguration::DefaultBatching)
.value("KeyBased", ProducerConfiguration::KeyBasedBatching);

enum_<ProducerConfiguration::ProducerAccessMode>(m, "ProducerAccessMode", "Producer Access Mode")
.value("Shared", ProducerConfiguration::ProducerAccessMode::Shared)
.value("Exclusive", ProducerConfiguration::ProducerAccessMode::Exclusive)
.value("WaitForExclusive", ProducerConfiguration::ProducerAccessMode::WaitForExclusive)
.value("ExclusiveWithFencing", ProducerConfiguration::ProducerAccessMode::ExclusiveWithFencing);

enum_<Logger::Level>(m, "LoggerLevel")
.value("Debug", Logger::LEVEL_DEBUG)
.value("Info", Logger::LEVEL_INFO)
Expand Down
56 changes: 56 additions & 0 deletions tests/pulsar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
InitialPosition,
CryptoKeyReader,
ConsumerBatchReceivePolicy,
ProducerAccessMode,
)
from pulsar.schema import JsonSchema, Record, Integer

Expand Down Expand Up @@ -166,6 +167,61 @@ def test_producer_send(self):
self.assertEqual(msg_id, msg.message_id())
client.close()

def test_producer_access_mode_exclusive(self):
client = Client(self.serviceUrl)
topic_name = "test-access-mode-exclusive"
client.create_producer(topic_name, producer_name="p1", access_mode=pulsar.ProducerAccessMode.Exclusive)
with self.assertRaises(pulsar.ProducerFenced):
client.create_producer(topic_name, producer_name="p2", access_mode=pulsar.ProducerAccessMode.Exclusive)
client.close()

def test_producer_access_mode_wait_exclusive(self):
client = Client(self.serviceUrl)
topic_name = "test_producer_access_mode_wait_exclusive"
producer1 = client.create_producer(
topic=topic_name,
producer_name='p-1',
access_mode=pulsar.ProducerAccessMode.Exclusive
)
assert producer1.producer_name() == 'p-1'

# when p1 close, p2 success created.
producer1.close()
producer2 = client.create_producer(
topic=topic_name,
producer_name='p-2',
access_mode=pulsar.ProducerAccessMode.WaitForExclusive
)
assert producer2.producer_name() == 'p-2'

producer2.close()
client.close()

def test_producer_access_mode_exclusive_with_fencing(self):
client = Client(self.serviceUrl)
topic_name = 'test_producer_access_mode_exclusive_with_fencing'

producer1 = client.create_producer(
topic=topic_name,
producer_name='p-1',
access_mode=pulsar.ProducerAccessMode.Exclusive
)
assert producer1.producer_name() == 'p-1'

producer2 = client.create_producer(
topic=topic_name,
producer_name='p-2',
access_mode=pulsar.ProducerAccessMode.ExclusiveWithFencing
)
assert producer2.producer_name() == 'p-2'

# producer1 will be fenced.
with self.assertRaises(pulsar.ProducerFenced):
producer1.send('test-msg'.encode('utf-8'))

producer2.close()
client.close()

def test_producer_is_connected(self):
client = Client(self.serviceUrl)
topic = "test_producer_is_connected"
Expand Down

0 comments on commit afc6cb4

Please sign in to comment.