From 2a7b528f69dbf7c293241d413bffe56c440b49b1 Mon Sep 17 00:00:00 2001 From: David Blewett Date: Fri, 19 Jan 2024 14:45:18 -0500 Subject: [PATCH 01/10] Use from_bytes_until_nul to avoid problems with multiple nuls. --- src/config.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/config.rs b/src/config.rs index 296d9f867..50cbbe00f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -150,7 +150,11 @@ impl NativeClientConfig { } // Convert the C string to a Rust string. - Ok(String::from_utf8_lossy(&buf).to_string()) + let cstr = CStr::from_bytes_until_nul(&buf) + .unwrap() + .to_string_lossy() + .into(); + Ok(cstr) } } From cae942346745b3c726513e19eecc9c71a9fade1f Mon Sep 17 00:00:00 2001 From: David Blewett Date: Fri, 19 Jan 2024 14:45:32 -0500 Subject: [PATCH 02/10] Return error instead of panicing. --- src/consumer/stream_consumer.rs | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/src/consumer/stream_consumer.rs b/src/consumer/stream_consumer.rs index 5a7f60552..96e9c7cd4 100644 --- a/src/consumer/stream_consumer.rs +++ b/src/consumer/stream_consumer.rs @@ -207,11 +207,20 @@ where fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult { let native_config = config.create_native_config()?; let poll_interval = { - let millis: u64 = native_config - .get("max.poll.interval.ms")? - .parse() - .expect("librdkafka validated config value is valid u64"); - Duration::from_millis(millis) + let millis = native_config.get("max.poll.interval.ms")?; + match millis.parse() { + Ok(millis) => Duration::from_millis(millis), + Err(e) => { + println!("Config string: '{}'", millis); + println!("Error: '{}'", e); + return Err(KafkaError::ClientConfig( + RDKafkaConfRes::RD_KAFKA_CONF_INVALID, + "max.poll.interval.ms".to_string(), + format!("Invalid integer: {}", e), + millis, + )); + } + } }; let base = Arc::new(BaseConsumer::new(config, native_config, context)?); From d23a97e80504933dad8458f27cd79483343f9737 Mon Sep 17 00:00:00 2001 From: David Blewett Date: Fri, 19 Jan 2024 14:58:02 -0500 Subject: [PATCH 03/10] Bump MSRV to 1.69.0 to pick up cargo index improvement and `CStr::from_bytes_until_nul` --- .github/workflows/ci.yml | 2 +- Cargo.toml | 2 +- Dockerfile | 1 + README.md | 2 +- changelog.md | 1 + rdkafka-sys/Cargo.toml | 2 +- rdkafka-sys/build.rs | 2 +- rdkafka-sys/changelog.md | 1 + src/lib.rs | 2 +- 9 files changed, 9 insertions(+), 6 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 96708961d..06df67deb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,7 +7,7 @@ on: branches: [master] env: - rust_version: 1.61.0 + rust_version: 1.69.0 jobs: lint: diff --git a/Cargo.toml b/Cargo.toml index e190bd18e..064c7ec08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ keywords = ["kafka", "rdkafka"] categories = ["api-bindings"] edition = "2018" exclude = ["Cargo.lock"] -rust-version = "1.61" +rust-version = "1.69" [workspace] members = ["rdkafka-sys"] diff --git a/Dockerfile b/Dockerfile index 04086b51c..ace214610 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,6 +10,7 @@ RUN apt-get update && apt-get install -y build-essential \ RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain nightly-2019-10-17 ENV PATH /root/.cargo/bin/:$PATH +ENV CARGO_REGISTRIES_CRATES_IO_PROTOCOL sparse # # Create dummy project for rdkafka # COPY Cargo.toml /rdkafka/ diff --git a/README.md b/README.md index 6070ef6a7..cb44723f3 100644 --- a/README.md +++ b/README.md @@ -184,7 +184,7 @@ re-exported as rdkafka features. ### Minimum supported Rust version (MSRV) -The current minimum supported Rust version (MSRV) is 1.61.0. Note that +The current minimum supported Rust version (MSRV) is 1.69.0. Note that bumping the MSRV is not considered a breaking change. Any release of rust-rdkafka may bump the MSRV. diff --git a/changelog.md b/changelog.md index 1806d5807..9aeaff9c8 100644 --- a/changelog.md +++ b/changelog.md @@ -3,6 +3,7 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md). ## Unreleased +* Bump MSRV to 1.69 to pick up cargo index improvement and `CStr::from_bytes_until_nul` ## 0.36.2 (2024-01-16) diff --git a/rdkafka-sys/Cargo.toml b/rdkafka-sys/Cargo.toml index 4870e15b3..ea937216e 100644 --- a/rdkafka-sys/Cargo.toml +++ b/rdkafka-sys/Cargo.toml @@ -10,7 +10,7 @@ description = "Native bindings to the librdkafka library" keywords = ["kafka", "rdkafka"] categories = ["external-ffi-bindings"] edition = "2018" -rust-version = "1.61" +rust-version = "1.69" [dependencies] num_enum = "0.5.0" diff --git a/rdkafka-sys/build.rs b/rdkafka-sys/build.rs index d615744f3..b7c3e42ca 100644 --- a/rdkafka-sys/build.rs +++ b/rdkafka-sys/build.rs @@ -77,7 +77,7 @@ fn main() { // Ensure that we are in the right directory let rdkafkasys_root = Path::new("rdkafka-sys"); if rdkafkasys_root.exists() { - assert!(env::set_current_dir(&rdkafkasys_root).is_ok()); + assert!(env::set_current_dir(rdkafkasys_root).is_ok()); } if !Path::new("librdkafka/LICENSE").exists() { eprintln!("Setting up submodules"); diff --git a/rdkafka-sys/changelog.md b/rdkafka-sys/changelog.md index 44451c738..f07565790 100644 --- a/rdkafka-sys/changelog.md +++ b/rdkafka-sys/changelog.md @@ -1,6 +1,7 @@ # Changelog ## Unreleased +* Bump MSRV to sync with rdkafka crate ## v4.7.0+2.2.0 (2023-11-07) diff --git a/src/lib.rs b/src/lib.rs index 79a8d113f..8854adc92 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -176,7 +176,7 @@ //! //! ### Minimum supported Rust version (MSRV) //! -//! The current minimum supported Rust version (MSRV) is 1.61.0. Note that +//! The current minimum supported Rust version (MSRV) is 1.69.0. Note that //! bumping the MSRV is not considered a breaking change. Any release of //! rust-rdkafka may bump the MSRV. //! From 33a028fd5794b8f7dc6c6b89472f9138625ebeab Mon Sep 17 00:00:00 2001 From: David Blewett Date: Fri, 19 Jan 2024 15:07:29 -0500 Subject: [PATCH 04/10] Make random suffix longer. --- tests/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/utils.rs b/tests/utils.rs index 447213672..3be24ce77 100644 --- a/tests/utils.rs +++ b/tests/utils.rs @@ -20,7 +20,7 @@ use rdkafka::TopicPartitionList; pub fn rand_test_topic(test_name: &str) -> String { let id = rand::thread_rng() .gen_ascii_chars() - .take(10) + .take(20) .collect::(); format!("__{}_{}", test_name, id) } From 88bbbe0cdce5f4e79c9d264c7de34588de74a3cf Mon Sep 17 00:00:00 2001 From: David Blewett Date: Fri, 19 Jan 2024 15:31:15 -0500 Subject: [PATCH 05/10] Address updated lints. --- src/consumer/mod.rs | 4 ++-- src/message.rs | 8 ++++---- src/producer/base_producer.rs | 2 ++ src/producer/future_producer.rs | 1 + src/topic_partition_list.rs | 6 +++--- 5 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index 95e91ffb0..5ce8b05b1 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -100,12 +100,12 @@ pub trait ConsumerContext: ClientContext + Sized { /// Pre-rebalance callback. This method will run before the rebalance and /// should terminate its execution quickly. #[allow(unused_variables)] - fn pre_rebalance<'a>(&self, base_consumer: &BaseConsumer, rebalance: &Rebalance<'a>) {} + fn pre_rebalance(&self, base_consumer: &BaseConsumer, rebalance: &Rebalance<'_>) {} /// Post-rebalance callback. This method will run after the rebalance and /// should terminate its execution quickly. #[allow(unused_variables)] - fn post_rebalance<'a>(&self, base_consumer: &BaseConsumer, rebalance: &Rebalance<'a>) {} + fn post_rebalance(&self, base_consumer: &BaseConsumer, rebalance: &Rebalance<'_>) {} // TODO: convert pointer to structure /// Post commit callback. This method will run after a group of offsets was diff --git a/src/message.rs b/src/message.rs index 76bac9c39..7a422608e 100644 --- a/src/message.rs +++ b/src/message.rs @@ -425,20 +425,20 @@ impl<'a> Message for BorrowedMessage<'a> { type Headers = BorrowedHeaders; fn key(&self) -> Option<&[u8]> { - unsafe { util::ptr_to_opt_slice((*self.ptr).key, (*self.ptr).key_len) } + unsafe { util::ptr_to_opt_slice(self.ptr.key, self.ptr.key_len) } } fn payload(&self) -> Option<&[u8]> { - unsafe { util::ptr_to_opt_slice((*self.ptr).payload, (*self.ptr).len) } + unsafe { util::ptr_to_opt_slice(self.ptr.payload, self.ptr.len) } } unsafe fn payload_mut(&mut self) -> Option<&mut [u8]> { - util::ptr_to_opt_mut_slice((*self.ptr).payload, (*self.ptr).len) + util::ptr_to_opt_mut_slice(self.ptr.payload, self.ptr.len) } fn topic(&self) -> &str { unsafe { - CStr::from_ptr(rdsys::rd_kafka_topic_name((*self.ptr).rkt)) + CStr::from_ptr(rdsys::rd_kafka_topic_name(self.ptr.rkt)) .to_str() .expect("Topic name is not valid UTF-8") } diff --git a/src/producer/base_producer.rs b/src/producer/base_producer.rs index 1cc6e05ce..7623869f2 100644 --- a/src/producer/base_producer.rs +++ b/src/producer/base_producer.rs @@ -425,6 +425,7 @@ where /// Note that this method will never block. // Simplifying the return type requires generic associated types, which are // unstable. + #[allow(clippy::result_large_err)] pub fn send<'a, K, P>( &self, mut record: BaseRecord<'a, K, P, C::DeliveryOpaque>, @@ -701,6 +702,7 @@ where /// See the documentation for [`BaseProducer::send`] for details. // Simplifying the return type requires generic associated types, which are // unstable. + #[allow(clippy::result_large_err)] pub fn send<'a, K, P>( &self, record: BaseRecord<'a, K, P, C::DeliveryOpaque>, diff --git a/src/producer/future_producer.rs b/src/producer/future_producer.rs index 0769a16a8..baae2cc15 100644 --- a/src/producer/future_producer.rs +++ b/src/producer/future_producer.rs @@ -346,6 +346,7 @@ where /// Like [`FutureProducer::send`], but if enqueuing fails, an error will be /// returned immediately, alongside the [`FutureRecord`] provided. + #[allow(clippy::result_large_err)] pub fn send_result<'a, K, P>( &self, record: FutureRecord<'a, K, P>, diff --git a/src/topic_partition_list.rs b/src/topic_partition_list.rs index 1d8e77ce9..16063bfbc 100644 --- a/src/topic_partition_list.rs +++ b/src/topic_partition_list.rs @@ -317,7 +317,7 @@ impl TopicPartitionList { /// Sets all partitions in the list to the specified offset. pub fn set_all_offsets(&mut self, offset: Offset) -> Result<(), KafkaError> { - let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) }; + let slice = unsafe { slice::from_raw_parts_mut(self.ptr.elems, self.count()) }; for elem_ptr in slice { let mut elem = TopicPartitionListElem::from_ptr(self, &mut *elem_ptr); elem.set_offset(offset)?; @@ -327,7 +327,7 @@ impl TopicPartitionList { /// Returns all the elements of the list. pub fn elements(&self) -> Vec> { - let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) }; + let slice = unsafe { slice::from_raw_parts_mut(self.ptr.elems, self.count()) }; let mut vec = Vec::with_capacity(slice.len()); for elem_ptr in slice { vec.push(TopicPartitionListElem::from_ptr(self, &mut *elem_ptr)); @@ -337,7 +337,7 @@ impl TopicPartitionList { /// Returns all the elements of the list that belong to the specified topic. pub fn elements_for_topic<'a>(&'a self, topic: &str) -> Vec> { - let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) }; + let slice = unsafe { slice::from_raw_parts_mut(self.ptr.elems, self.count()) }; let mut vec = Vec::with_capacity(slice.len()); for elem_ptr in slice { let tp = TopicPartitionListElem::from_ptr(self, &mut *elem_ptr); From 53df310b70f7f33819109c7c82bda0af63fae16b Mon Sep 17 00:00:00 2001 From: David Blewett Date: Mon, 22 Jan 2024 10:53:16 -0500 Subject: [PATCH 06/10] Test if sparse index is triggering clippy compiler error --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index ace214610..6f9036d32 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,7 +10,7 @@ RUN apt-get update && apt-get install -y build-essential \ RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain nightly-2019-10-17 ENV PATH /root/.cargo/bin/:$PATH -ENV CARGO_REGISTRIES_CRATES_IO_PROTOCOL sparse +#ENV CARGO_REGISTRIES_CRATES_IO_PROTOCOL sparse # # Create dummy project for rdkafka # COPY Cargo.toml /rdkafka/ From e86a46aaecf2890987b6421e3bfb731ac3e32279 Mon Sep 17 00:00:00 2001 From: David Blewett Date: Mon, 22 Jan 2024 13:09:54 -0500 Subject: [PATCH 07/10] Bump to 1.70 to see if compiler panic is fixed. --- .github/workflows/ci.yml | 2 +- Cargo.toml | 2 +- Dockerfile | 2 +- README.md | 2 +- changelog.md | 2 +- rdkafka-sys/Cargo.toml | 2 +- src/lib.rs | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 06df67deb..d2988e35e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,7 +7,7 @@ on: branches: [master] env: - rust_version: 1.69.0 + rust_version: 1.70.0 jobs: lint: diff --git a/Cargo.toml b/Cargo.toml index 064c7ec08..b9045ce58 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ keywords = ["kafka", "rdkafka"] categories = ["api-bindings"] edition = "2018" exclude = ["Cargo.lock"] -rust-version = "1.69" +rust-version = "1.70" [workspace] members = ["rdkafka-sys"] diff --git a/Dockerfile b/Dockerfile index 6f9036d32..ace214610 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,7 +10,7 @@ RUN apt-get update && apt-get install -y build-essential \ RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain nightly-2019-10-17 ENV PATH /root/.cargo/bin/:$PATH -#ENV CARGO_REGISTRIES_CRATES_IO_PROTOCOL sparse +ENV CARGO_REGISTRIES_CRATES_IO_PROTOCOL sparse # # Create dummy project for rdkafka # COPY Cargo.toml /rdkafka/ diff --git a/README.md b/README.md index cb44723f3..333fb5140 100644 --- a/README.md +++ b/README.md @@ -184,7 +184,7 @@ re-exported as rdkafka features. ### Minimum supported Rust version (MSRV) -The current minimum supported Rust version (MSRV) is 1.69.0. Note that +The current minimum supported Rust version (MSRV) is 1.70.0. Note that bumping the MSRV is not considered a breaking change. Any release of rust-rdkafka may bump the MSRV. diff --git a/changelog.md b/changelog.md index 9aeaff9c8..b354c0f97 100644 --- a/changelog.md +++ b/changelog.md @@ -3,7 +3,7 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md). ## Unreleased -* Bump MSRV to 1.69 to pick up cargo index improvement and `CStr::from_bytes_until_nul` +* Bump MSRV to 1.70 to pick up cargo index improvement and `CStr::from_bytes_until_nul` ## 0.36.2 (2024-01-16) diff --git a/rdkafka-sys/Cargo.toml b/rdkafka-sys/Cargo.toml index ea937216e..7ea379ef8 100644 --- a/rdkafka-sys/Cargo.toml +++ b/rdkafka-sys/Cargo.toml @@ -10,7 +10,7 @@ description = "Native bindings to the librdkafka library" keywords = ["kafka", "rdkafka"] categories = ["external-ffi-bindings"] edition = "2018" -rust-version = "1.69" +rust-version = "1.70" [dependencies] num_enum = "0.5.0" diff --git a/src/lib.rs b/src/lib.rs index 8854adc92..46709c5a7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -176,7 +176,7 @@ //! //! ### Minimum supported Rust version (MSRV) //! -//! The current minimum supported Rust version (MSRV) is 1.69.0. Note that +//! The current minimum supported Rust version (MSRV) is 1.70.0. Note that //! bumping the MSRV is not considered a breaking change. Any release of //! rust-rdkafka may bump the MSRV. //! From fe1fb6543514825a944bef25ed735f5c54cf8337 Mon Sep 17 00:00:00 2001 From: David Blewett Date: Mon, 22 Jan 2024 15:08:37 -0500 Subject: [PATCH 08/10] Use tokio sleep instead of synchronous thread sleep. --- tests/test_low_consumers.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/test_low_consumers.rs b/tests/test_low_consumers.rs index e6642b688..25bde2a80 100644 --- a/tests/test_low_consumers.rs +++ b/tests/test_low_consumers.rs @@ -401,7 +401,7 @@ async fn test_produce_consume_message_queue_nonempty_callback() { assert!(consumer.poll(Duration::from_secs(0)).is_none()); // Expect no wakeups for 1s. - thread::sleep(Duration::from_secs(1)); + tokio::time::sleep(Duration::from_secs(1)).await; assert_eq!(wakeups.load(Ordering::SeqCst), 0); // Verify there are no messages waiting. @@ -418,7 +418,7 @@ async fn test_produce_consume_message_queue_nonempty_callback() { // Add more messages to the topic. Expect no additional wakeups, as the // queue is not fully drained, for 1s. populate_topic(&topic_name, 2, &value_fn, &key_fn, None, None).await; - thread::sleep(Duration::from_secs(1)); + tokio::time::sleep(Duration::from_secs(1)).await; assert_eq!(wakeups.load(Ordering::SeqCst), 1); // Drain the queue. @@ -427,7 +427,7 @@ async fn test_produce_consume_message_queue_nonempty_callback() { assert!(queue.poll(None).is_some()); // Expect no additional wakeups for 1s. - thread::sleep(Duration::from_secs(1)); + tokio::time::sleep(Duration::from_secs(1)).await; assert_eq!(wakeups.load(Ordering::SeqCst), 1); // Add another message, and expect a wakeup. @@ -435,7 +435,7 @@ async fn test_produce_consume_message_queue_nonempty_callback() { wait_for_wakeups(2); // Expect no additional wakeups for 1s. - thread::sleep(Duration::from_secs(1)); + tokio::time::sleep(Duration::from_secs(1)).await; assert_eq!(wakeups.load(Ordering::SeqCst), 2); // Disable the queue and add another message. @@ -443,7 +443,7 @@ async fn test_produce_consume_message_queue_nonempty_callback() { populate_topic(&topic_name, 1, &value_fn, &key_fn, None, None).await; // Expect no additional wakeups for 1s. - thread::sleep(Duration::from_secs(1)); + tokio::time::sleep(Duration::from_secs(1)).await; assert_eq!(wakeups.load(Ordering::SeqCst), 2); } From 15ccb4d54c7aa502fa00f4e5cfb5e9f82ae754e8 Mon Sep 17 00:00:00 2001 From: David Blewett Date: Mon, 22 Jan 2024 15:54:25 -0500 Subject: [PATCH 09/10] Test to see if we might sometimes get a spurious initial callback. --- tests/test_low_consumers.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_low_consumers.rs b/tests/test_low_consumers.rs index 25bde2a80..ff866e372 100644 --- a/tests/test_low_consumers.rs +++ b/tests/test_low_consumers.rs @@ -401,8 +401,8 @@ async fn test_produce_consume_message_queue_nonempty_callback() { assert!(consumer.poll(Duration::from_secs(0)).is_none()); // Expect no wakeups for 1s. - tokio::time::sleep(Duration::from_secs(1)).await; - assert_eq!(wakeups.load(Ordering::SeqCst), 0); + //tokio::time::sleep(Duration::from_secs(1)).await; + //assert_eq!(wakeups.load(Ordering::SeqCst), 0); // Verify there are no messages waiting. assert!(consumer.poll(Duration::from_secs(0)).is_none()); From fea66c1a155be8add7532e64617116e7a452cd84 Mon Sep 17 00:00:00 2001 From: David Blewett Date: Mon, 22 Jan 2024 16:24:56 -0500 Subject: [PATCH 10/10] restore initial test, move queue creation before assign and only clone once --- tests/test_low_consumers.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/test_low_consumers.rs b/tests/test_low_consumers.rs index ff866e372..aacf23364 100644 --- a/tests/test_low_consumers.rs +++ b/tests/test_low_consumers.rs @@ -365,6 +365,7 @@ async fn test_produce_consume_message_queue_nonempty_callback() { .create_with_context(ConsumerTestContext { _n: 64 }) .expect("Consumer creation failed"); let consumer = Arc::new(consumer); + let mut queue = consumer.split_partition_queue(&topic_name, 0).unwrap(); let mut tpl = TopicPartitionList::new(); tpl.add_partition_offset(&topic_name, 0, Offset::Beginning) @@ -372,11 +373,10 @@ async fn test_produce_consume_message_queue_nonempty_callback() { consumer.assign(&tpl).unwrap(); let wakeups = Arc::new(AtomicUsize::new(0)); - let mut queue = consumer.split_partition_queue(&topic_name, 0).unwrap(); + let cb_wakeups = wakeups.clone(); queue.set_nonempty_callback({ - let wakeups = wakeups.clone(); move || { - wakeups.fetch_add(1, Ordering::SeqCst); + cb_wakeups.fetch_add(1, Ordering::SeqCst); } }); @@ -401,8 +401,8 @@ async fn test_produce_consume_message_queue_nonempty_callback() { assert!(consumer.poll(Duration::from_secs(0)).is_none()); // Expect no wakeups for 1s. - //tokio::time::sleep(Duration::from_secs(1)).await; - //assert_eq!(wakeups.load(Ordering::SeqCst), 0); + tokio::time::sleep(Duration::from_secs(1)).await; + assert_eq!(wakeups.load(Ordering::SeqCst), 0); // Verify there are no messages waiting. assert!(consumer.poll(Duration::from_secs(0)).is_none());