diff --git a/src/consumer/stream_consumer.rs b/src/consumer/stream_consumer.rs index 5a7f60552..96e9c7cd4 100644 --- a/src/consumer/stream_consumer.rs +++ b/src/consumer/stream_consumer.rs @@ -207,11 +207,20 @@ where fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult { let native_config = config.create_native_config()?; let poll_interval = { - let millis: u64 = native_config - .get("max.poll.interval.ms")? - .parse() - .expect("librdkafka validated config value is valid u64"); - Duration::from_millis(millis) + let millis = native_config.get("max.poll.interval.ms")?; + match millis.parse() { + Ok(millis) => Duration::from_millis(millis), + Err(e) => { + println!("Config string: '{}'", millis); + println!("Error: '{}'", e); + return Err(KafkaError::ClientConfig( + RDKafkaConfRes::RD_KAFKA_CONF_INVALID, + "max.poll.interval.ms".to_string(), + format!("Invalid integer: {}", e), + millis, + )); + } + } }; let base = Arc::new(BaseConsumer::new(config, native_config, context)?);