Skip to content
55 changes: 55 additions & 0 deletions src/cli_utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::{Context, Result, bail};
use std::str::FromStr;

pub(crate) fn parse_broker_spec(s: &str) -> Result<BrokerAndTopic> {
let b = parse_broker_spec_optional_topic(s)?;
Expand Down Expand Up @@ -61,8 +62,30 @@ impl BrokerAndOptionalTopic {
}
}

#[derive(Debug, Clone)]
pub struct KafkaOption {
pub key: String,
pub value: String,
}

impl FromStr for KafkaOption {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let (key, value) = s
.split_once('=')
.ok_or_else(|| format!("expected KEY=VALUE, got '{}'", s))?;

Ok(KafkaOption {
key: key.to_string(),
value: value.to_string(),
})
}
}

#[cfg(test)]
mod tests {

use super::*;
use rstest::*;

Expand Down Expand Up @@ -122,4 +145,36 @@ mod tests {
};
assert_eq!(b.broker(), "localhost:9092");
}
#[test]
fn test_parse_valid_kafka_option() {
let res = KafkaOption::from_str("key=value").unwrap();
assert_eq!(res.key, "key");
assert_eq!(res.value, "value");
}

#[test]
fn test_parse_valid_kafka_option_without_equals() {
let res = KafkaOption::from_str("key value");
assert!(
res.is_err(),
"Expected error when parsing invalid Kafka option"
);
assert_eq!(
res.unwrap_err().to_string(),
"expected KEY=VALUE, got 'key value'"
);
}

#[test]
fn test_parse_valid_kafka_option_without_space() {
let res = KafkaOption::from_str("keyvalue");
assert!(
res.is_err(),
"Expected error when parsing invalid Kafka option"
);
assert_eq!(
res.unwrap_err().to_string(),
"expected KEY=VALUE, got 'keyvalue'"
);
}
}
24 changes: 19 additions & 5 deletions src/consume.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::KafkaOption;
use crate::cli_utils::BrokerAndTopic;

use isis_streaming_data_types::{deserialize_message, get_schema_id};
use log::{debug, error, info};
use rdkafka::consumer::{BaseConsumer, Consumer};
Expand All @@ -7,6 +9,7 @@ use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList};
use std::time::Duration;
use uuid::Uuid;

#[allow(clippy::too_many_arguments)]
pub fn consume(
topic: &BrokerAndTopic,
partition: Option<i32>,
Expand All @@ -15,6 +18,7 @@ pub fn consume(
offset: Option<i64>,
last: Option<i64>,
timestamp: Option<i64>,
kafka_config: Option<Vec<KafkaOption>>,
) {
debug!(
"Listening to topic: {} partition {:?} on broker {}:{}, filtering {}",
Expand All @@ -24,11 +28,21 @@ pub fn consume(
topic.port,
filter.as_deref().unwrap_or("none")
);
let consumer: BaseConsumer = ClientConfig::new()
.set("group.id", Uuid::new_v4().to_string())
.set("bootstrap.servers", topic.broker())
.create()
.expect("Consumer creation failed");
let mut config = ClientConfig::new();
config.set("group.id", Uuid::new_v4().to_string());
config.set("bootstrap.servers", topic.broker());

if let Some(kafka_options) = kafka_config {
for option in kafka_options {
println!(
"Setting Kafka config option {}={}",
option.key, option.value
);
config.set(&option.key, &option.value);
}
}

let consumer: BaseConsumer = config.create().expect("Base creation failed");

let start: Option<Offset>;

Expand Down
45 changes: 30 additions & 15 deletions src/count.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::KafkaOption;
use crate::cli_utils::BrokerAndTopic;
use futures::stream::StreamExt;
use log::error;
Expand All @@ -6,12 +7,27 @@ use rdkafka::{ClientConfig, Message};
use tokio::time::{self, Duration};
use uuid::Uuid;

pub async fn count(topic: BrokerAndTopic, message_interval: u64) {
let consumer: StreamConsumer<DefaultConsumerContext> = ClientConfig::new()
.set("group.id", Uuid::new_v4().to_string())
.set("bootstrap.servers", topic.broker())
.create()
.expect("Consumer creation failed");
pub async fn count(
topic: BrokerAndTopic,
message_interval: u64,
kafka_config: Option<Vec<KafkaOption>>,
) {
let mut config = ClientConfig::new();
config.set("group.id", Uuid::new_v4().to_string());
config.set("bootstrap.servers", topic.broker());

if let Some(kafka_options) = kafka_config {
for option in kafka_options {
println!(
"Setting Kafka config option {}={}",
option.key, option.value
);
config.set(&option.key, &option.value);
}
}

let consumer: StreamConsumer<DefaultConsumerContext> =
config.create().expect("Consumer creation failed");

consumer
.subscribe(&[&topic.topic])
Expand All @@ -27,21 +43,20 @@ pub async fn count(topic: BrokerAndTopic, message_interval: u64) {
tokio::select! {
_msg = stream.next() => {
match _msg {
Some(Ok(msg))
if msg.payload().is_some() => {
bytes_this_second += msg.payload_len();
total_bytes += msg.payload_len();
},
Some(Ok(msg)) if msg.payload().is_some() => {
bytes_this_second += msg.payload_len();
total_bytes += msg.payload_len();
},
Some(Err(e)) => error!("Error reading from stream {:?}", e),
_ => {}
}
}
_ = interval.tick() => {
println!("{:.5} Mbit/s (since program start: average {:.5} Mbit/s, {:.5} MB total)",
bytes_this_second as f64/125000.0 / interval.period().as_secs_f64(),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think this may be a mistake- looks like the / interval.period().as_secs_f64() has been taken out

total_bytes as f64 / 125000.0 / start.elapsed().as_secs_f64(),
total_bytes as f64 / 1_000_000.0
);
bytes_this_second as f64/125000.0 / interval.period().as_secs_f64(),
total_bytes as f64 / 125000.0 / start.elapsed().as_secs_f64(),
total_bytes as f64 / 1_000_000.0
);
bytes_this_second = 0;
}
}
Expand Down
21 changes: 17 additions & 4 deletions src/howl.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::KafkaOption;
use std::thread;
use std::time::{Duration, SystemTime};

Expand Down Expand Up @@ -260,6 +261,7 @@ pub struct HowlConfig<'a> {
pub frames_per_second: u32,
pub frames_per_run: u32,
pub event_message_config: &'a EventMessageConfig,
pub kafka_config: Option<Vec<KafkaOption>>,
}

pub fn howl(conf: &HowlConfig) {
Expand Down Expand Up @@ -296,10 +298,21 @@ pub fn howl(conf: &HowlConfig) {
println!("Each pu00 is {pu00_size} bytes");
println!("Each ev44 is {ev44_size} bytes");

let producer: ThreadedProducer<DefaultProducerContext> = ClientConfig::new()
.set("bootstrap.servers", conf.broker)
.create()
.expect("Producer creation error");
let mut config: ClientConfig = ClientConfig::new();
config.set("bootstrap.servers", conf.broker);

if let Some(kafka_options) = &conf.kafka_config {
for option in kafka_options {
println!(
"Setting Kafka config option {}={}",
option.key, option.value
);
config.set(&option.key, &option.value);
}
}

let producer: ThreadedProducer<DefaultProducerContext> =
config.create().expect("Producer creation error");

let mut current_job_id = Uuid::new_v4().to_string();

Expand Down
33 changes: 30 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod howl;
mod sniff;

use crate::cli_utils::BrokerAndOptionalTopic;
use crate::cli_utils::KafkaOption;
use crate::count::count;
use crate::howl::{EventMessageConfig, HowlConfig, howl};
use crate::sniff::sniff;
Expand Down Expand Up @@ -44,12 +45,18 @@ enum Commands {
/// Print last x messages on topic
#[arg(short, long, conflicts_with_all = ["offset","timestamp","messages","filter"])]
last: Option<i64>,
// Additonal command line arguments
#[arg(short = 'X', long)]
kafka_config: Option<Vec<KafkaOption>>,
},
/// Print broker metadata.
Sniff {
/// The broker to look at metadata. Optionally suffixed with a topic name to filter to that topic.
#[arg(value_parser = parse_broker_spec_optional_topic)]
broker: BrokerAndOptionalTopic,
// Additonal command line arguments
#[arg(short = 'X', long)]
kafka_config: Option<Vec<KafkaOption>>,
},
Howl {
/// Kafka Broker URL, including port
Expand Down Expand Up @@ -80,6 +87,9 @@ enum Commands {
/// Maximum detector ID
#[arg(long, default_value = "1000")]
det_max: i32,
// Additonal command line arguments
#[arg(short = 'X', long)]
kafka_config: Option<Vec<KafkaOption>>,
},
Count {
/// topic name, including broker and port. format: broker:port/topic
Expand All @@ -88,6 +98,9 @@ enum Commands {
/// Data information print intervals (s)
#[arg[long, default_value = "1"]]
message_interval: u64,
/// Additonal command line arguments
#[arg(short = 'X', long)]
kafka_config: Option<Vec<KafkaOption>>,
},
}

Expand All @@ -108,10 +121,21 @@ async fn main() {
offset,
last,
timestamp,
kafka_config,
} => consume::consume(
&topic, partition, &filter, messages, offset, last, timestamp,
&topic,
partition,
&filter,
messages,
offset,
last,
timestamp,
kafka_config,
),
Commands::Sniff { broker } => sniff(&broker),
Commands::Sniff {
broker,
kafka_config,
} => sniff(&broker, kafka_config),
Commands::Howl {
broker,
topic_prefix,
Expand All @@ -123,7 +147,9 @@ async fn main() {
tof_sigma,
det_min,
det_max,
kafka_config,
} => howl(&HowlConfig {
kafka_config,
broker: &broker,
event_topic: &format!("{topic_prefix}_rawEvents"),
run_info_topic: &format!("{topic_prefix}_runInfo"),
Expand All @@ -141,8 +167,9 @@ async fn main() {
Commands::Count {
topic,
message_interval,
kafka_config,
} => {
count(topic, message_interval).await;
count(topic, message_interval, kafka_config).await;
} // Commands::Play {} => {}
}
}
21 changes: 16 additions & 5 deletions src/sniff.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
use crate::KafkaOption;
use crate::cli_utils::BrokerAndOptionalTopic;
use rdkafka::ClientConfig;
use rdkafka::consumer::{BaseConsumer, Consumer};
use std::time::Duration;

pub fn sniff(broker: &BrokerAndOptionalTopic) {
let consumer: BaseConsumer = ClientConfig::new()
.set("bootstrap.servers", broker.broker())
.create()
.expect("Consumer creation failed");
pub fn sniff(broker: &BrokerAndOptionalTopic, kafka_config: Option<Vec<KafkaOption>>) {
let mut config = ClientConfig::new();
config.set("bootstrap.servers", broker.broker());

if let Some(kafka_options) = kafka_config {
for option in kafka_options {
println!(
"Setting Kafka config option {}={}",
option.key, option.value
);
config.set(&option.key, &option.value);
}
}

let consumer: BaseConsumer = config.create().expect("Consumer creation failed");

let metadata = consumer
.fetch_metadata(broker.topic.as_deref(), Duration::from_secs(1))
Expand Down