diff --git a/src/cli_utils.rs b/src/cli_utils.rs index c1baee8..b19b553 100644 --- a/src/cli_utils.rs +++ b/src/cli_utils.rs @@ -1,4 +1,5 @@ use anyhow::{Context, Result, bail}; +use std::str::FromStr; pub(crate) fn parse_broker_spec(s: &str) -> Result { let b = parse_broker_spec_optional_topic(s)?; @@ -61,8 +62,30 @@ impl BrokerAndOptionalTopic { } } +#[derive(Debug, Clone)] +pub struct KafkaOption { + pub key: String, + pub value: String, +} + +impl FromStr for KafkaOption { + type Err = String; + + fn from_str(s: &str) -> Result { + let (key, value) = s + .split_once('=') + .ok_or_else(|| format!("expected KEY=VALUE, got '{}'", s))?; + + Ok(KafkaOption { + key: key.to_string(), + value: value.to_string(), + }) + } +} + #[cfg(test)] mod tests { + use super::*; use rstest::*; @@ -122,4 +145,36 @@ mod tests { }; assert_eq!(b.broker(), "localhost:9092"); } + #[test] + fn test_parse_valid_kafka_option() { + let res = KafkaOption::from_str("key=value").unwrap(); + assert_eq!(res.key, "key"); + assert_eq!(res.value, "value"); + } + + #[test] + fn test_parse_valid_kafka_option_without_equals() { + let res = KafkaOption::from_str("key value"); + assert!( + res.is_err(), + "Expected error when parsing invalid Kafka option" + ); + assert_eq!( + res.unwrap_err().to_string(), + "expected KEY=VALUE, got 'key value'" + ); + } + + #[test] + fn test_parse_valid_kafka_option_without_space() { + let res = KafkaOption::from_str("keyvalue"); + assert!( + res.is_err(), + "Expected error when parsing invalid Kafka option" + ); + assert_eq!( + res.unwrap_err().to_string(), + "expected KEY=VALUE, got 'keyvalue'" + ); + } } diff --git a/src/consume.rs b/src/consume.rs index e52eab1..d512f9a 100644 --- a/src/consume.rs +++ b/src/consume.rs @@ -1,4 +1,6 @@ +use crate::KafkaOption; use crate::cli_utils::BrokerAndTopic; + use isis_streaming_data_types::{deserialize_message, get_schema_id}; use log::{debug, error, info}; use rdkafka::consumer::{BaseConsumer, Consumer}; @@ -7,6 +9,7 @@ use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList}; use std::time::Duration; use uuid::Uuid; +#[allow(clippy::too_many_arguments)] pub fn consume( topic: &BrokerAndTopic, partition: Option, @@ -15,6 +18,7 @@ pub fn consume( offset: Option, last: Option, timestamp: Option, + kafka_config: Option>, ) { debug!( "Listening to topic: {} partition {:?} on broker {}:{}, filtering {}", @@ -24,11 +28,21 @@ pub fn consume( topic.port, filter.as_deref().unwrap_or("none") ); - let consumer: BaseConsumer = ClientConfig::new() - .set("group.id", Uuid::new_v4().to_string()) - .set("bootstrap.servers", topic.broker()) - .create() - .expect("Consumer creation failed"); + let mut config = ClientConfig::new(); + config.set("group.id", Uuid::new_v4().to_string()); + config.set("bootstrap.servers", topic.broker()); + + if let Some(kafka_options) = kafka_config { + for option in kafka_options { + println!( + "Setting Kafka config option {}={}", + option.key, option.value + ); + config.set(&option.key, &option.value); + } + } + + let consumer: BaseConsumer = config.create().expect("Base creation failed"); let start: Option; diff --git a/src/count.rs b/src/count.rs index 796a600..d707bea 100644 --- a/src/count.rs +++ b/src/count.rs @@ -1,3 +1,4 @@ +use crate::KafkaOption; use crate::cli_utils::BrokerAndTopic; use futures::stream::StreamExt; use log::error; @@ -6,12 +7,27 @@ use rdkafka::{ClientConfig, Message}; use tokio::time::{self, Duration}; use uuid::Uuid; -pub async fn count(topic: BrokerAndTopic, message_interval: u64) { - let consumer: StreamConsumer = ClientConfig::new() - .set("group.id", Uuid::new_v4().to_string()) - .set("bootstrap.servers", topic.broker()) - .create() - .expect("Consumer creation failed"); +pub async fn count( + topic: BrokerAndTopic, + message_interval: u64, + kafka_config: Option>, +) { + let mut config = ClientConfig::new(); + config.set("group.id", Uuid::new_v4().to_string()); + config.set("bootstrap.servers", topic.broker()); + + if let Some(kafka_options) = kafka_config { + for option in kafka_options { + println!( + "Setting Kafka config option {}={}", + option.key, option.value + ); + config.set(&option.key, &option.value); + } + } + + let consumer: StreamConsumer = + config.create().expect("Consumer creation failed"); consumer .subscribe(&[&topic.topic]) @@ -27,21 +43,20 @@ pub async fn count(topic: BrokerAndTopic, message_interval: u64) { tokio::select! { _msg = stream.next() => { match _msg { - Some(Ok(msg)) - if msg.payload().is_some() => { - bytes_this_second += msg.payload_len(); - total_bytes += msg.payload_len(); - }, + Some(Ok(msg)) if msg.payload().is_some() => { + bytes_this_second += msg.payload_len(); + total_bytes += msg.payload_len(); + }, Some(Err(e)) => error!("Error reading from stream {:?}", e), _ => {} } } _ = interval.tick() => { println!("{:.5} Mbit/s (since program start: average {:.5} Mbit/s, {:.5} MB total)", - bytes_this_second as f64/125000.0 / interval.period().as_secs_f64(), - total_bytes as f64 / 125000.0 / start.elapsed().as_secs_f64(), - total_bytes as f64 / 1_000_000.0 - ); + bytes_this_second as f64/125000.0 / interval.period().as_secs_f64(), + total_bytes as f64 / 125000.0 / start.elapsed().as_secs_f64(), + total_bytes as f64 / 1_000_000.0 + ); bytes_this_second = 0; } } diff --git a/src/howl.rs b/src/howl.rs index 453712d..1985e87 100644 --- a/src/howl.rs +++ b/src/howl.rs @@ -1,3 +1,4 @@ +use crate::KafkaOption; use std::thread; use std::time::{Duration, SystemTime}; @@ -260,6 +261,7 @@ pub struct HowlConfig<'a> { pub frames_per_second: u32, pub frames_per_run: u32, pub event_message_config: &'a EventMessageConfig, + pub kafka_config: Option>, } pub fn howl(conf: &HowlConfig) { @@ -296,10 +298,21 @@ pub fn howl(conf: &HowlConfig) { println!("Each pu00 is {pu00_size} bytes"); println!("Each ev44 is {ev44_size} bytes"); - let producer: ThreadedProducer = ClientConfig::new() - .set("bootstrap.servers", conf.broker) - .create() - .expect("Producer creation error"); + let mut config: ClientConfig = ClientConfig::new(); + config.set("bootstrap.servers", conf.broker); + + if let Some(kafka_options) = &conf.kafka_config { + for option in kafka_options { + println!( + "Setting Kafka config option {}={}", + option.key, option.value + ); + config.set(&option.key, &option.value); + } + } + + let producer: ThreadedProducer = + config.create().expect("Producer creation error"); let mut current_job_id = Uuid::new_v4().to_string(); diff --git a/src/main.rs b/src/main.rs index c784954..a79973c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,6 +5,7 @@ mod howl; mod sniff; use crate::cli_utils::BrokerAndOptionalTopic; +use crate::cli_utils::KafkaOption; use crate::count::count; use crate::howl::{EventMessageConfig, HowlConfig, howl}; use crate::sniff::sniff; @@ -44,12 +45,18 @@ enum Commands { /// Print last x messages on topic #[arg(short, long, conflicts_with_all = ["offset","timestamp","messages","filter"])] last: Option, + // Additonal command line arguments + #[arg(short = 'X', long)] + kafka_config: Option>, }, /// Print broker metadata. Sniff { /// The broker to look at metadata. Optionally suffixed with a topic name to filter to that topic. #[arg(value_parser = parse_broker_spec_optional_topic)] broker: BrokerAndOptionalTopic, + // Additonal command line arguments + #[arg(short = 'X', long)] + kafka_config: Option>, }, Howl { /// Kafka Broker URL, including port @@ -80,6 +87,9 @@ enum Commands { /// Maximum detector ID #[arg(long, default_value = "1000")] det_max: i32, + // Additonal command line arguments + #[arg(short = 'X', long)] + kafka_config: Option>, }, Count { /// topic name, including broker and port. format: broker:port/topic @@ -88,6 +98,9 @@ enum Commands { /// Data information print intervals (s) #[arg[long, default_value = "1"]] message_interval: u64, + /// Additonal command line arguments + #[arg(short = 'X', long)] + kafka_config: Option>, }, } @@ -108,10 +121,21 @@ async fn main() { offset, last, timestamp, + kafka_config, } => consume::consume( - &topic, partition, &filter, messages, offset, last, timestamp, + &topic, + partition, + &filter, + messages, + offset, + last, + timestamp, + kafka_config, ), - Commands::Sniff { broker } => sniff(&broker), + Commands::Sniff { + broker, + kafka_config, + } => sniff(&broker, kafka_config), Commands::Howl { broker, topic_prefix, @@ -123,7 +147,9 @@ async fn main() { tof_sigma, det_min, det_max, + kafka_config, } => howl(&HowlConfig { + kafka_config, broker: &broker, event_topic: &format!("{topic_prefix}_rawEvents"), run_info_topic: &format!("{topic_prefix}_runInfo"), @@ -141,8 +167,9 @@ async fn main() { Commands::Count { topic, message_interval, + kafka_config, } => { - count(topic, message_interval).await; + count(topic, message_interval, kafka_config).await; } // Commands::Play {} => {} } } diff --git a/src/sniff.rs b/src/sniff.rs index f9e1ff6..2a42914 100644 --- a/src/sniff.rs +++ b/src/sniff.rs @@ -1,13 +1,24 @@ +use crate::KafkaOption; use crate::cli_utils::BrokerAndOptionalTopic; use rdkafka::ClientConfig; use rdkafka::consumer::{BaseConsumer, Consumer}; use std::time::Duration; -pub fn sniff(broker: &BrokerAndOptionalTopic) { - let consumer: BaseConsumer = ClientConfig::new() - .set("bootstrap.servers", broker.broker()) - .create() - .expect("Consumer creation failed"); +pub fn sniff(broker: &BrokerAndOptionalTopic, kafka_config: Option>) { + let mut config = ClientConfig::new(); + config.set("bootstrap.servers", broker.broker()); + + if let Some(kafka_options) = kafka_config { + for option in kafka_options { + println!( + "Setting Kafka config option {}={}", + option.key, option.value + ); + config.set(&option.key, &option.value); + } + } + + let consumer: BaseConsumer = config.create().expect("Consumer creation failed"); let metadata = consumer .fetch_metadata(broker.topic.as_deref(), Duration::from_secs(1))