Skip to content

Commit

Permalink
Return error instead of panicing.
Browse files Browse the repository at this point in the history
  • Loading branch information
davidblewett committed Jan 19, 2024
1 parent 2a7b528 commit cae9423
Showing 1 changed file with 14 additions and 5 deletions.
19 changes: 14 additions & 5 deletions src/consumer/stream_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,20 @@ where
fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult<Self> {
let native_config = config.create_native_config()?;
let poll_interval = {
let millis: u64 = native_config
.get("max.poll.interval.ms")?
.parse()
.expect("librdkafka validated config value is valid u64");
Duration::from_millis(millis)
let millis = native_config.get("max.poll.interval.ms")?;
match millis.parse() {
Ok(millis) => Duration::from_millis(millis),
Err(e) => {
println!("Config string: '{}'", millis);
println!("Error: '{}'", e);
return Err(KafkaError::ClientConfig(
RDKafkaConfRes::RD_KAFKA_CONF_INVALID,
"max.poll.interval.ms".to_string(),
format!("Invalid integer: {}", e),
millis,
));
}
}
};

let base = Arc::new(BaseConsumer::new(config, native_config, context)?);
Expand Down

0 comments on commit cae9423

Please sign in to comment.