From 3d6b002029db90a827d638a9f86a23ddbb176a79 Mon Sep 17 00:00:00 2001 From: Daniel Maclaren Date: Fri, 17 Apr 2026 17:03:03 +0100 Subject: [PATCH 01/10] additional command args to 3 out of 4 functions --- src/consume.rs | 24 +++++++++++++++++++----- src/count.rs | 30 +++++++++++++++++++++++------- src/howl.rs | 21 +++++++++++++++++---- src/main.rs | 44 +++++++++++++++++++++++++++++++++++++++++--- src/sniff.rs | 7 ++++++- 5 files changed, 106 insertions(+), 20 deletions(-) diff --git a/src/consume.rs b/src/consume.rs index e52eab1..1c88757 100644 --- a/src/consume.rs +++ b/src/consume.rs @@ -1,4 +1,5 @@ use crate::cli_utils::BrokerAndTopic; +use crate::KafkaOption; use isis_streaming_data_types::{deserialize_message, get_schema_id}; use log::{debug, error, info}; use rdkafka::consumer::{BaseConsumer, Consumer}; @@ -15,6 +16,7 @@ pub fn consume( offset: Option, last: Option, timestamp: Option, + kafka_config: Option>, ) { debug!( "Listening to topic: {} partition {:?} on broker {}:{}, filtering {}", @@ -24,11 +26,23 @@ pub fn consume( topic.port, filter.as_deref().unwrap_or("none") ); - let consumer: BaseConsumer = ClientConfig::new() - .set("group.id", Uuid::new_v4().to_string()) - .set("bootstrap.servers", topic.broker()) - .create() - .expect("Consumer creation failed"); + + let mut config = ClientConfig::new(); + config.set("group.id", Uuid::new_v4().to_string()); + config.set("bootstrap.servers", topic.broker()); + + if let Some(kafka_options) = kafka_config { + for option in kafka_options { + println!( + "Setting Kafka config option {}={}", + option.key, option.value + ); + config.set(&option.key, &option.value); + } + } + + let consumer: StreamConsumer = + config.create().expect("Consumer creation failed"); let start: Option; diff --git a/src/count.rs b/src/count.rs index b22d870..70c1ca8 100644 --- a/src/count.rs +++ b/src/count.rs @@ -1,3 +1,4 @@ +use crate::KafkaOption; use crate::cli_utils::BrokerAndTopic; use futures::stream::StreamExt; use log::error; @@ -6,12 +7,27 @@ use rdkafka::{ClientConfig, Message}; use tokio::time::{self, Duration}; use uuid::Uuid; -pub async fn count(topic: BrokerAndTopic, message_interval: u64) { - let consumer: StreamConsumer = ClientConfig::new() - .set("group.id", Uuid::new_v4().to_string()) - .set("bootstrap.servers", topic.broker()) - .create() - .expect("Consumer creation failed"); +pub async fn count( + topic: BrokerAndTopic, + message_interval: u64, + kafka_config: Option>, +) { + let mut config = ClientConfig::new(); + config.set("group.id", Uuid::new_v4().to_string()); + config.set("bootstrap.servers", topic.broker()); + + if let Some(kafka_options) = kafka_config { + for option in kafka_options { + println!( + "Setting Kafka config option {}={}", + option.key, option.value + ); + config.set(&option.key, &option.value); + } + } + + let consumer: StreamConsumer = + config.create().expect("Consumer creation failed"); consumer .subscribe(&[&topic.topic]) @@ -40,7 +56,7 @@ pub async fn count(topic: BrokerAndTopic, message_interval: u64) { } _ = interval.tick() => { println!("{:.5} Mbit/s (since program start: average {:.5} Mbit/s, {:.5} MB total)", - bytes_this_second as f64/125000.0 / interval.period().as_secs_f64(), + bytes_this_second as f64/125000.0, total_bytes as f64 / 125000.0 / start.elapsed().as_secs_f64(), total_bytes as f64 / 1_000_000.0 ); diff --git a/src/howl.rs b/src/howl.rs index 453712d..999fdf4 100644 --- a/src/howl.rs +++ b/src/howl.rs @@ -1,3 +1,4 @@ +use crate::KafkaOption; use std::thread; use std::time::{Duration, SystemTime}; @@ -260,6 +261,7 @@ pub struct HowlConfig<'a> { pub frames_per_second: u32, pub frames_per_run: u32, pub event_message_config: &'a EventMessageConfig, + pub kafka_config: Option>, } pub fn howl(conf: &HowlConfig) { @@ -296,10 +298,21 @@ pub fn howl(conf: &HowlConfig) { println!("Each pu00 is {pu00_size} bytes"); println!("Each ev44 is {ev44_size} bytes"); - let producer: ThreadedProducer = ClientConfig::new() - .set("bootstrap.servers", conf.broker) - .create() - .expect("Producer creation error"); + let mut config: ClientConfig= ClientConfig::new(); + config.set("bootstrap.servers", conf.broker); + + if let Some(kafka_options) = &conf.kafka_config { + for option in kafka_options { + println!( + "Setting Kafka config option {}={}", + option.key, option.value + ); + config.set(&option.key, &option.value); + } + } + + let producer: ThreadedProducer = + config.create().expect("Producer creation error"); let mut current_job_id = Uuid::new_v4().to_string(); diff --git a/src/main.rs b/src/main.rs index c784954..8792dcc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,6 +10,7 @@ use crate::howl::{EventMessageConfig, HowlConfig, howl}; use crate::sniff::sniff; use clap::{Parser, Subcommand}; use cli_utils::{BrokerAndTopic, parse_broker_spec, parse_broker_spec_optional_topic}; +use std::str::FromStr; #[derive(Parser, Debug)] struct Cli { @@ -19,6 +20,27 @@ struct Cli { verbosity: clap_verbosity_flag::Verbosity, } +#[derive(Debug, Clone)] +pub struct KafkaOption { + key: String, + value: String, +} + +impl FromStr for KafkaOption { + type Err = String; + + fn from_str(s: &str) -> Result { + let (key, value) = s + .split_once('=') + .ok_or_else(|| format!("expected KEY=VALUE, got '{}", s))?; + + Ok(KafkaOption { + key: key.to_string(), + value: value.to_string(), + }) + } +} + #[derive(Subcommand, Debug)] enum Commands { #[command(alias = "listen")] // for muscle memory's sake... @@ -44,12 +66,18 @@ enum Commands { /// Print last x messages on topic #[arg(short, long, conflicts_with_all = ["offset","timestamp","messages","filter"])] last: Option, + // Additonal command line arguments + #[arg(short = 'X', long)] + kafka_config: Option>, }, /// Print broker metadata. Sniff { /// The broker to look at metadata. Optionally suffixed with a topic name to filter to that topic. #[arg(value_parser = parse_broker_spec_optional_topic)] broker: BrokerAndOptionalTopic, + // Additonal command line arguments + #[arg(short = 'X', long)] + kafka_config: Option>, }, Howl { /// Kafka Broker URL, including port @@ -80,6 +108,9 @@ enum Commands { /// Maximum detector ID #[arg(long, default_value = "1000")] det_max: i32, + // Additonal command line arguments + #[arg(short = 'X', long)] + kafka_config: Option>, }, Count { /// topic name, including broker and port. format: broker:port/topic @@ -88,6 +119,9 @@ enum Commands { /// Data information print intervals (s) #[arg[long, default_value = "1"]] message_interval: u64, + /// Additonal command line arguments + #[arg(short = 'X', long)] + kafka_config: Option>, }, } @@ -108,10 +142,11 @@ async fn main() { offset, last, timestamp, + kafka_config, } => consume::consume( - &topic, partition, &filter, messages, offset, last, timestamp, + &topic, partition, &filter, messages, offset, last, timestamp, kafka_config, ), - Commands::Sniff { broker } => sniff(&broker), + Commands::Sniff { broker, kafka_config } => sniff(&broker, kafka_config), Commands::Howl { broker, topic_prefix, @@ -123,6 +158,7 @@ async fn main() { tof_sigma, det_min, det_max, + kafka_config, } => howl(&HowlConfig { broker: &broker, event_topic: &format!("{topic_prefix}_rawEvents"), @@ -136,13 +172,15 @@ async fn main() { tof_sigma, det_min, det_max, + kafka_config, }, }), Commands::Count { topic, message_interval, + kafka_config, } => { - count(topic, message_interval).await; + count(topic, message_interval, kafka_config).await; } // Commands::Play {} => {} } } diff --git a/src/sniff.rs b/src/sniff.rs index f9e1ff6..b0f4989 100644 --- a/src/sniff.rs +++ b/src/sniff.rs @@ -1,9 +1,14 @@ use crate::cli_utils::BrokerAndOptionalTopic; +use crate::KafkaOption; use rdkafka::ClientConfig; use rdkafka::consumer::{BaseConsumer, Consumer}; use std::time::Duration; -pub fn sniff(broker: &BrokerAndOptionalTopic) { +pub fn sniff(broker: &BrokerAndOptionalTopic, kafka_config: Option>) { + let consumer: BaseConsumer = ClientConfig::new() + .set("bootstrap.servers", broker.broker()) + .create() + .expect("Consumer creation failed"); let consumer: BaseConsumer = ClientConfig::new() .set("bootstrap.servers", broker.broker()) .create() From 8fb2fb3b123a840af4999e0817a80aded3f6ca6e Mon Sep 17 00:00:00 2001 From: Daniel Maclaren Date: Fri, 17 Apr 2026 17:06:55 +0100 Subject: [PATCH 02/10] cargo fmt --- src/consume.rs | 2 +- src/howl.rs | 2 +- src/main.rs | 14 ++++++++++++-- src/sniff.rs | 4 ++-- 4 files changed, 16 insertions(+), 6 deletions(-) diff --git a/src/consume.rs b/src/consume.rs index 1c88757..35ca832 100644 --- a/src/consume.rs +++ b/src/consume.rs @@ -1,5 +1,5 @@ -use crate::cli_utils::BrokerAndTopic; use crate::KafkaOption; +use crate::cli_utils::BrokerAndTopic; use isis_streaming_data_types::{deserialize_message, get_schema_id}; use log::{debug, error, info}; use rdkafka::consumer::{BaseConsumer, Consumer}; diff --git a/src/howl.rs b/src/howl.rs index 999fdf4..1985e87 100644 --- a/src/howl.rs +++ b/src/howl.rs @@ -298,7 +298,7 @@ pub fn howl(conf: &HowlConfig) { println!("Each pu00 is {pu00_size} bytes"); println!("Each ev44 is {ev44_size} bytes"); - let mut config: ClientConfig= ClientConfig::new(); + let mut config: ClientConfig = ClientConfig::new(); config.set("bootstrap.servers", conf.broker); if let Some(kafka_options) = &conf.kafka_config { diff --git a/src/main.rs b/src/main.rs index 8792dcc..75346fd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -144,9 +144,19 @@ async fn main() { timestamp, kafka_config, } => consume::consume( - &topic, partition, &filter, messages, offset, last, timestamp, kafka_config, + &topic, + partition, + &filter, + messages, + offset, + last, + timestamp, + kafka_config, ), - Commands::Sniff { broker, kafka_config } => sniff(&broker, kafka_config), + Commands::Sniff { + broker, + kafka_config, + } => sniff(&broker, kafka_config), Commands::Howl { broker, topic_prefix, diff --git a/src/sniff.rs b/src/sniff.rs index b0f4989..70ad2c1 100644 --- a/src/sniff.rs +++ b/src/sniff.rs @@ -1,11 +1,11 @@ -use crate::cli_utils::BrokerAndOptionalTopic; use crate::KafkaOption; +use crate::cli_utils::BrokerAndOptionalTopic; use rdkafka::ClientConfig; use rdkafka::consumer::{BaseConsumer, Consumer}; use std::time::Duration; pub fn sniff(broker: &BrokerAndOptionalTopic, kafka_config: Option>) { - let consumer: BaseConsumer = ClientConfig::new() + let consumer: BaseConsumer = ClientConfig::new() .set("bootstrap.servers", broker.broker()) .create() .expect("Consumer creation failed"); From 4f91f35aa351a7adeedf84159dc5ecc403be418e Mon Sep 17 00:00:00 2001 From: Daniel Maclaren Date: Mon, 20 Apr 2026 15:28:57 +0100 Subject: [PATCH 03/10] updated all saluki functions to allow extra arguments --- src/consume.rs | 5 ++--- src/main.rs | 2 +- src/sniff.rs | 22 ++++++++++++++-------- 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/src/consume.rs b/src/consume.rs index 35ca832..1fa3a9f 100644 --- a/src/consume.rs +++ b/src/consume.rs @@ -1,5 +1,6 @@ use crate::KafkaOption; use crate::cli_utils::BrokerAndTopic; + use isis_streaming_data_types::{deserialize_message, get_schema_id}; use log::{debug, error, info}; use rdkafka::consumer::{BaseConsumer, Consumer}; @@ -26,7 +27,6 @@ pub fn consume( topic.port, filter.as_deref().unwrap_or("none") ); - let mut config = ClientConfig::new(); config.set("group.id", Uuid::new_v4().to_string()); config.set("bootstrap.servers", topic.broker()); @@ -41,8 +41,7 @@ pub fn consume( } } - let consumer: StreamConsumer = - config.create().expect("Consumer creation failed"); + let consumer: BaseConsumer = config.create().expect("Base creation failed"); let start: Option; diff --git a/src/main.rs b/src/main.rs index 75346fd..a5b4708 100644 --- a/src/main.rs +++ b/src/main.rs @@ -170,6 +170,7 @@ async fn main() { det_max, kafka_config, } => howl(&HowlConfig { + kafka_config, broker: &broker, event_topic: &format!("{topic_prefix}_rawEvents"), run_info_topic: &format!("{topic_prefix}_runInfo"), @@ -182,7 +183,6 @@ async fn main() { tof_sigma, det_min, det_max, - kafka_config, }, }), Commands::Count { diff --git a/src/sniff.rs b/src/sniff.rs index 70ad2c1..2a42914 100644 --- a/src/sniff.rs +++ b/src/sniff.rs @@ -5,14 +5,20 @@ use rdkafka::consumer::{BaseConsumer, Consumer}; use std::time::Duration; pub fn sniff(broker: &BrokerAndOptionalTopic, kafka_config: Option>) { - let consumer: BaseConsumer = ClientConfig::new() - .set("bootstrap.servers", broker.broker()) - .create() - .expect("Consumer creation failed"); - let consumer: BaseConsumer = ClientConfig::new() - .set("bootstrap.servers", broker.broker()) - .create() - .expect("Consumer creation failed"); + let mut config = ClientConfig::new(); + config.set("bootstrap.servers", broker.broker()); + + if let Some(kafka_options) = kafka_config { + for option in kafka_options { + println!( + "Setting Kafka config option {}={}", + option.key, option.value + ); + config.set(&option.key, &option.value); + } + } + + let consumer: BaseConsumer = config.create().expect("Consumer creation failed"); let metadata = consumer .fetch_metadata(broker.topic.as_deref(), Duration::from_secs(1)) From d89ad3166bc95bcd75b2f2ee182b2e15267dd5be Mon Sep 17 00:00:00 2001 From: Daniel Maclaren Date: Mon, 20 Apr 2026 15:46:54 +0100 Subject: [PATCH 04/10] clippy #allow --- src/consume.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/consume.rs b/src/consume.rs index 1fa3a9f..7d4644c 100644 --- a/src/consume.rs +++ b/src/consume.rs @@ -9,6 +9,7 @@ use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList}; use std::time::Duration; use uuid::Uuid; +#[allow(clippy::too_many_arguments)] pub fn consume( topic: &BrokerAndTopic, partition: Option, @@ -18,7 +19,7 @@ pub fn consume( last: Option, timestamp: Option, kafka_config: Option>, -) { +) { debug!( "Listening to topic: {} partition {:?} on broker {}:{}, filtering {}", topic.topic, From b43a0682122835d4f122b6f08bdcb29c2df9c7ab Mon Sep 17 00:00:00 2001 From: Daniel Maclaren Date: Mon, 20 Apr 2026 15:51:12 +0100 Subject: [PATCH 05/10] cargo fmt --- src/consume.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/consume.rs b/src/consume.rs index 7d4644c..d512f9a 100644 --- a/src/consume.rs +++ b/src/consume.rs @@ -19,7 +19,7 @@ pub fn consume( last: Option, timestamp: Option, kafka_config: Option>, -) { +) { debug!( "Listening to topic: {} partition {:?} on broker {}:{}, filtering {}", topic.topic, From 55cd0f95fe8379e8492dd3f102a63621d6779d9c Mon Sep 17 00:00:00 2001 From: Daniel Maclaren Date: Thu, 23 Apr 2026 10:57:55 +0100 Subject: [PATCH 06/10] updated tests to check cases of incorrect input, moved kafkaoption to cli_utils --- src/cli_utils.rs | 55 ++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 23 +------------------- 2 files changed, 56 insertions(+), 22 deletions(-) diff --git a/src/cli_utils.rs b/src/cli_utils.rs index c1baee8..b19b553 100644 --- a/src/cli_utils.rs +++ b/src/cli_utils.rs @@ -1,4 +1,5 @@ use anyhow::{Context, Result, bail}; +use std::str::FromStr; pub(crate) fn parse_broker_spec(s: &str) -> Result { let b = parse_broker_spec_optional_topic(s)?; @@ -61,8 +62,30 @@ impl BrokerAndOptionalTopic { } } +#[derive(Debug, Clone)] +pub struct KafkaOption { + pub key: String, + pub value: String, +} + +impl FromStr for KafkaOption { + type Err = String; + + fn from_str(s: &str) -> Result { + let (key, value) = s + .split_once('=') + .ok_or_else(|| format!("expected KEY=VALUE, got '{}'", s))?; + + Ok(KafkaOption { + key: key.to_string(), + value: value.to_string(), + }) + } +} + #[cfg(test)] mod tests { + use super::*; use rstest::*; @@ -122,4 +145,36 @@ mod tests { }; assert_eq!(b.broker(), "localhost:9092"); } + #[test] + fn test_parse_valid_kafka_option() { + let res = KafkaOption::from_str("key=value").unwrap(); + assert_eq!(res.key, "key"); + assert_eq!(res.value, "value"); + } + + #[test] + fn test_parse_valid_kafka_option_without_equals() { + let res = KafkaOption::from_str("key value"); + assert!( + res.is_err(), + "Expected error when parsing invalid Kafka option" + ); + assert_eq!( + res.unwrap_err().to_string(), + "expected KEY=VALUE, got 'key value'" + ); + } + + #[test] + fn test_parse_valid_kafka_option_without_space() { + let res = KafkaOption::from_str("keyvalue"); + assert!( + res.is_err(), + "Expected error when parsing invalid Kafka option" + ); + assert_eq!( + res.unwrap_err().to_string(), + "expected KEY=VALUE, got 'keyvalue'" + ); + } } diff --git a/src/main.rs b/src/main.rs index a5b4708..a79973c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,12 +5,12 @@ mod howl; mod sniff; use crate::cli_utils::BrokerAndOptionalTopic; +use crate::cli_utils::KafkaOption; use crate::count::count; use crate::howl::{EventMessageConfig, HowlConfig, howl}; use crate::sniff::sniff; use clap::{Parser, Subcommand}; use cli_utils::{BrokerAndTopic, parse_broker_spec, parse_broker_spec_optional_topic}; -use std::str::FromStr; #[derive(Parser, Debug)] struct Cli { @@ -20,27 +20,6 @@ struct Cli { verbosity: clap_verbosity_flag::Verbosity, } -#[derive(Debug, Clone)] -pub struct KafkaOption { - key: String, - value: String, -} - -impl FromStr for KafkaOption { - type Err = String; - - fn from_str(s: &str) -> Result { - let (key, value) = s - .split_once('=') - .ok_or_else(|| format!("expected KEY=VALUE, got '{}", s))?; - - Ok(KafkaOption { - key: key.to_string(), - value: value.to_string(), - }) - } -} - #[derive(Subcommand, Debug)] enum Commands { #[command(alias = "listen")] // for muscle memory's sake... From b2c8b42c49feaac872ead798cbf349a78b17f220 Mon Sep 17 00:00:00 2001 From: Daniel Maclaren Date: Thu, 23 Apr 2026 11:10:57 +0100 Subject: [PATCH 07/10] cargo fmt --- src/count.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/count.rs b/src/count.rs index 70c1ca8..a5021d6 100644 --- a/src/count.rs +++ b/src/count.rs @@ -43,12 +43,9 @@ pub async fn count( tokio::select! { _msg = stream.next() => { match _msg { - Some(Ok(msg)) => { - if msg.payload().is_some() { + Some(Ok(msg)) => if msg.payload().is_some() { bytes_this_second += msg.payload_len(); total_bytes += msg.payload_len(); - } - }, Some(Err(e)) => error!("Error reading from stream {:?}", e), None => {} From ae1d36966ff674972d08935e053a7fc0469cfca9 Mon Sep 17 00:00:00 2001 From: Daniel Maclaren Date: Thu, 23 Apr 2026 11:50:13 +0100 Subject: [PATCH 08/10] cargo fmt fix --- src/count.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/count.rs b/src/count.rs index a5021d6..8c94551 100644 --- a/src/count.rs +++ b/src/count.rs @@ -43,20 +43,20 @@ pub async fn count( tokio::select! { _msg = stream.next() => { match _msg { - Some(Ok(msg)) => if msg.payload().is_some() { - bytes_this_second += msg.payload_len(); - total_bytes += msg.payload_len(); + Some(Ok(msg)) if msg.payload().is_some() => { + bytes_this_second += msg.payload_len(); + total_bytes += msg.payload_len(); }, Some(Err(e)) => error!("Error reading from stream {:?}", e), - None => {} + _ => {} } } _ = interval.tick() => { println!("{:.5} Mbit/s (since program start: average {:.5} Mbit/s, {:.5} MB total)", - bytes_this_second as f64/125000.0, - total_bytes as f64 / 125000.0 / start.elapsed().as_secs_f64(), - total_bytes as f64 / 1_000_000.0 - ); + bytes_this_second as f64/125000.0, + total_bytes as f64 / 125000.0 / start.elapsed().as_secs_f64(), + total_bytes as f64 / 1_000_000.0 + ); bytes_this_second = 0; } } From 3fb2510374fdba6ed0f9569934b92344e7087c52 Mon Sep 17 00:00:00 2001 From: Daniel Maclaren Date: Thu, 23 Apr 2026 13:23:12 +0100 Subject: [PATCH 09/10] readded missing unit calc --- src/count.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/count.rs b/src/count.rs index 8c94551..16eeabe 100644 --- a/src/count.rs +++ b/src/count.rs @@ -53,7 +53,7 @@ pub async fn count( } _ = interval.tick() => { println!("{:.5} Mbit/s (since program start: average {:.5} Mbit/s, {:.5} MB total)", - bytes_this_second as f64/125000.0, + bytes_this_second as f64/125000.0 / interval.period().as_secs_f64(), total_bytes as f64 / 125000.0 / start.elapsed().as_secs_f64(), total_bytes as f64 / 1_000_000.0 ); From bae537cb6b5c2dc51b6cbd23e0ece4c80bb1f970 Mon Sep 17 00:00:00 2001 From: Daniel Maclaren Date: Thu, 23 Apr 2026 13:24:32 +0100 Subject: [PATCH 10/10] readded missing calc --- src/count.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/count.rs b/src/count.rs index 16eeabe..d707bea 100644 --- a/src/count.rs +++ b/src/count.rs @@ -53,7 +53,7 @@ pub async fn count( } _ = interval.tick() => { println!("{:.5} Mbit/s (since program start: average {:.5} Mbit/s, {:.5} MB total)", - bytes_this_second as f64/125000.0 / interval.period().as_secs_f64(), + bytes_this_second as f64/125000.0 / interval.period().as_secs_f64(), total_bytes as f64 / 125000.0 / start.elapsed().as_secs_f64(), total_bytes as f64 / 1_000_000.0 );