From 865bc9d3d8a3f793c759acc67a60dcb0c947675b Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 19 Feb 2024 10:08:18 +0800 Subject: [PATCH 1/2] Disable topic level policies to make tests work for latest Pulsar (#201) See https://lists.apache.org/thread/gjdvl1ys71k9mknwypxgkbsdyjwr31yt --- tests/test-conf/standalone-ssl.conf | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/test-conf/standalone-ssl.conf b/tests/test-conf/standalone-ssl.conf index beed278..74a0b37 100644 --- a/tests/test-conf/standalone-ssl.conf +++ b/tests/test-conf/standalone-ssl.conf @@ -310,3 +310,8 @@ maxMessageSize=1024000 # Disable consistent hashing to fix flaky `KeySharedConsumerTest#testMultiTopics`. subscriptionKeySharedUseConsistentHashing=false + +# It's true by default since 2.11. After https://github.com/apache/pulsar/pull/21445, we must configure +# brokerClientAuthenticationPlugin and brokerClientAuthenticationParameters correctly when enabling topic +# level policies. Otherwise, no topic could be loaded. +topicLevelPoliciesEnabled=false From 48be1795a7993422cf4bce2785351f48e456dc26 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 19 Feb 2024 10:46:53 +0800 Subject: [PATCH 2/2] Fix incorrect logs when a message failed to be decoded with the writer schema (#200) ### Motivation See https://github.com/apache/pulsar-client-python/blob/f9b2d168ae85f289d6ee043cd81791d569ba8844/pulsar/schema/schema_avro.py#L92C32-L92C69 When `self._decode_bytes(msg.data(), writer_schema)` failed, the error log is still `Failed to get schema info`, which is confusing. ### Modifications Modify the error message. Even if it failed at `self._get_writer_schema(topic, version)`, there would still be error logs from the C++ client. --- pulsar/schema/schema_avro.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar/schema/schema_avro.py b/pulsar/schema/schema_avro.py index 480afe9..09b341b 100644 --- a/pulsar/schema/schema_avro.py +++ b/pulsar/schema/schema_avro.py @@ -91,7 +91,8 @@ def decode_message(self, msg: _pulsar.Message): writer_schema = self._get_writer_schema(topic, version) return self._decode_bytes(msg.data(), writer_schema) except Exception as e: - self._logger.error(f'Failed to get schema info of {topic} version {version}: {e}') + msg_id = msg.message_id() + self._logger.warn(f'Failed to decode {msg_id} with schema {topic} version {version}: {e}') return self._decode_bytes(msg.data(), self._schema) def _get_writer_schema(self, topic: str, version: int) -> 'dict':