Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix regression on MacOS #647

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:
branches: [master]

env:
rust_version: 1.61.0
rust_version: 1.70.0

jobs:
lint:
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ keywords = ["kafka", "rdkafka"]
categories = ["api-bindings"]
edition = "2018"
exclude = ["Cargo.lock"]
rust-version = "1.61"
rust-version = "1.70"

[workspace]
members = ["rdkafka-sys"]
Expand Down
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ RUN apt-get update && apt-get install -y build-essential \

RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain nightly-2019-10-17
ENV PATH /root/.cargo/bin/:$PATH
ENV CARGO_REGISTRIES_CRATES_IO_PROTOCOL sparse

# # Create dummy project for rdkafka
# COPY Cargo.toml /rdkafka/
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ re-exported as rdkafka features.

### Minimum supported Rust version (MSRV)

The current minimum supported Rust version (MSRV) is 1.61.0. Note that
The current minimum supported Rust version (MSRV) is 1.70.0. Note that
bumping the MSRV is not considered a breaking change. Any release of
rust-rdkafka may bump the MSRV.

Expand Down
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).

## Unreleased
* Bump MSRV to 1.70 to pick up cargo index improvement and `CStr::from_bytes_until_nul`

## 0.36.2 (2024-01-16)

Expand Down
2 changes: 1 addition & 1 deletion rdkafka-sys/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ description = "Native bindings to the librdkafka library"
keywords = ["kafka", "rdkafka"]
categories = ["external-ffi-bindings"]
edition = "2018"
rust-version = "1.61"
rust-version = "1.70"

[dependencies]
num_enum = "0.5.0"
Expand Down
2 changes: 1 addition & 1 deletion rdkafka-sys/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ fn main() {
// Ensure that we are in the right directory
let rdkafkasys_root = Path::new("rdkafka-sys");
if rdkafkasys_root.exists() {
assert!(env::set_current_dir(&rdkafkasys_root).is_ok());
assert!(env::set_current_dir(rdkafkasys_root).is_ok());
}
if !Path::new("librdkafka/LICENSE").exists() {
eprintln!("Setting up submodules");
Expand Down
1 change: 1 addition & 0 deletions rdkafka-sys/changelog.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## Unreleased
* Bump MSRV to sync with rdkafka crate

## v4.7.0+2.2.0 (2023-11-07)

Expand Down
6 changes: 5 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,11 @@ impl NativeClientConfig {
}

// Convert the C string to a Rust string.
Ok(String::from_utf8_lossy(&buf).to_string())
let cstr = CStr::from_bytes_until_nul(&buf)
.unwrap()
.to_string_lossy()
.into();
Ok(cstr)
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ pub trait ConsumerContext: ClientContext + Sized {
/// Pre-rebalance callback. This method will run before the rebalance and
/// should terminate its execution quickly.
#[allow(unused_variables)]
fn pre_rebalance<'a>(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'a>) {}
fn pre_rebalance(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'_>) {}

/// Post-rebalance callback. This method will run after the rebalance and
/// should terminate its execution quickly.
#[allow(unused_variables)]
fn post_rebalance<'a>(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'a>) {}
fn post_rebalance(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'_>) {}

// TODO: convert pointer to structure
/// Post commit callback. This method will run after a group of offsets was
Expand Down
19 changes: 14 additions & 5 deletions src/consumer/stream_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,20 @@ where
fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult<Self> {
let native_config = config.create_native_config()?;
let poll_interval = {
let millis: u64 = native_config
.get("max.poll.interval.ms")?
.parse()
.expect("librdkafka validated config value is valid u64");
Duration::from_millis(millis)
let millis = native_config.get("max.poll.interval.ms")?;
match millis.parse() {
Ok(millis) => Duration::from_millis(millis),
Err(e) => {
println!("Config string: '{}'", millis);
println!("Error: '{}'", e);
return Err(KafkaError::ClientConfig(
RDKafkaConfRes::RD_KAFKA_CONF_INVALID,
"max.poll.interval.ms".to_string(),
format!("Invalid integer: {}", e),
millis,
));
}
}
};

let base = Arc::new(BaseConsumer::new(config, native_config, context)?);
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@
//!
//! ### Minimum supported Rust version (MSRV)
//!
//! The current minimum supported Rust version (MSRV) is 1.61.0. Note that
//! The current minimum supported Rust version (MSRV) is 1.70.0. Note that
//! bumping the MSRV is not considered a breaking change. Any release of
//! rust-rdkafka may bump the MSRV.
//!
Expand Down
8 changes: 4 additions & 4 deletions src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,20 +425,20 @@ impl<'a> Message for BorrowedMessage<'a> {
type Headers = BorrowedHeaders;

fn key(&self) -> Option<&[u8]> {
unsafe { util::ptr_to_opt_slice((*self.ptr).key, (*self.ptr).key_len) }
unsafe { util::ptr_to_opt_slice(self.ptr.key, self.ptr.key_len) }
}

fn payload(&self) -> Option<&[u8]> {
unsafe { util::ptr_to_opt_slice((*self.ptr).payload, (*self.ptr).len) }
unsafe { util::ptr_to_opt_slice(self.ptr.payload, self.ptr.len) }
}

unsafe fn payload_mut(&mut self) -> Option<&mut [u8]> {
util::ptr_to_opt_mut_slice((*self.ptr).payload, (*self.ptr).len)
util::ptr_to_opt_mut_slice(self.ptr.payload, self.ptr.len)
}

fn topic(&self) -> &str {
unsafe {
CStr::from_ptr(rdsys::rd_kafka_topic_name((*self.ptr).rkt))
CStr::from_ptr(rdsys::rd_kafka_topic_name(self.ptr.rkt))
.to_str()
.expect("Topic name is not valid UTF-8")
}
Expand Down
2 changes: 2 additions & 0 deletions src/producer/base_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ where
/// Note that this method will never block.
// Simplifying the return type requires generic associated types, which are
// unstable.
#[allow(clippy::result_large_err)]
pub fn send<'a, K, P>(
&self,
mut record: BaseRecord<'a, K, P, C::DeliveryOpaque>,
Expand Down Expand Up @@ -701,6 +702,7 @@ where
/// See the documentation for [`BaseProducer::send`] for details.
// Simplifying the return type requires generic associated types, which are
// unstable.
#[allow(clippy::result_large_err)]
pub fn send<'a, K, P>(
&self,
record: BaseRecord<'a, K, P, C::DeliveryOpaque>,
Expand Down
1 change: 1 addition & 0 deletions src/producer/future_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ where

/// Like [`FutureProducer::send`], but if enqueuing fails, an error will be
/// returned immediately, alongside the [`FutureRecord`] provided.
#[allow(clippy::result_large_err)]
pub fn send_result<'a, K, P>(
&self,
record: FutureRecord<'a, K, P>,
Expand Down
6 changes: 3 additions & 3 deletions src/topic_partition_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ impl TopicPartitionList {

/// Sets all partitions in the list to the specified offset.
pub fn set_all_offsets(&mut self, offset: Offset) -> Result<(), KafkaError> {
let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
let slice = unsafe { slice::from_raw_parts_mut(self.ptr.elems, self.count()) };
for elem_ptr in slice {
let mut elem = TopicPartitionListElem::from_ptr(self, &mut *elem_ptr);
elem.set_offset(offset)?;
Expand All @@ -327,7 +327,7 @@ impl TopicPartitionList {

/// Returns all the elements of the list.
pub fn elements(&self) -> Vec<TopicPartitionListElem<'_>> {
let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
let slice = unsafe { slice::from_raw_parts_mut(self.ptr.elems, self.count()) };
let mut vec = Vec::with_capacity(slice.len());
for elem_ptr in slice {
vec.push(TopicPartitionListElem::from_ptr(self, &mut *elem_ptr));
Expand All @@ -337,7 +337,7 @@ impl TopicPartitionList {

/// Returns all the elements of the list that belong to the specified topic.
pub fn elements_for_topic<'a>(&'a self, topic: &str) -> Vec<TopicPartitionListElem<'a>> {
let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
let slice = unsafe { slice::from_raw_parts_mut(self.ptr.elems, self.count()) };
let mut vec = Vec::with_capacity(slice.len());
for elem_ptr in slice {
let tp = TopicPartitionListElem::from_ptr(self, &mut *elem_ptr);
Expand Down
16 changes: 8 additions & 8 deletions tests/test_low_consumers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,18 +365,18 @@ async fn test_produce_consume_message_queue_nonempty_callback() {
.create_with_context(ConsumerTestContext { _n: 64 })
.expect("Consumer creation failed");
let consumer = Arc::new(consumer);
let mut queue = consumer.split_partition_queue(&topic_name, 0).unwrap();

let mut tpl = TopicPartitionList::new();
tpl.add_partition_offset(&topic_name, 0, Offset::Beginning)
.unwrap();
consumer.assign(&tpl).unwrap();

let wakeups = Arc::new(AtomicUsize::new(0));
let mut queue = consumer.split_partition_queue(&topic_name, 0).unwrap();
let cb_wakeups = wakeups.clone();
queue.set_nonempty_callback({
let wakeups = wakeups.clone();
move || {
wakeups.fetch_add(1, Ordering::SeqCst);
cb_wakeups.fetch_add(1, Ordering::SeqCst);
}
});

Expand All @@ -401,7 +401,7 @@ async fn test_produce_consume_message_queue_nonempty_callback() {
assert!(consumer.poll(Duration::from_secs(0)).is_none());

// Expect no wakeups for 1s.
thread::sleep(Duration::from_secs(1));
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(wakeups.load(Ordering::SeqCst), 0);

// Verify there are no messages waiting.
Expand All @@ -418,7 +418,7 @@ async fn test_produce_consume_message_queue_nonempty_callback() {
// Add more messages to the topic. Expect no additional wakeups, as the
// queue is not fully drained, for 1s.
populate_topic(&topic_name, 2, &value_fn, &key_fn, None, None).await;
thread::sleep(Duration::from_secs(1));
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(wakeups.load(Ordering::SeqCst), 1);

// Drain the queue.
Expand All @@ -427,23 +427,23 @@ async fn test_produce_consume_message_queue_nonempty_callback() {
assert!(queue.poll(None).is_some());

// Expect no additional wakeups for 1s.
thread::sleep(Duration::from_secs(1));
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(wakeups.load(Ordering::SeqCst), 1);

// Add another message, and expect a wakeup.
populate_topic(&topic_name, 1, &value_fn, &key_fn, None, None).await;
wait_for_wakeups(2);

// Expect no additional wakeups for 1s.
thread::sleep(Duration::from_secs(1));
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(wakeups.load(Ordering::SeqCst), 2);

// Disable the queue and add another message.
queue.set_nonempty_callback(|| ());
populate_topic(&topic_name, 1, &value_fn, &key_fn, None, None).await;

// Expect no additional wakeups for 1s.
thread::sleep(Duration::from_secs(1));
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(wakeups.load(Ordering::SeqCst), 2);
}

Expand Down
2 changes: 1 addition & 1 deletion tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use rdkafka::TopicPartitionList;
pub fn rand_test_topic(test_name: &str) -> String {
let id = rand::thread_rng()
.gen_ascii_chars()
.take(10)
.take(20)
.collect::<String>();
format!("__{}_{}", test_name, id)
}
Expand Down
Loading