diff --git a/Cargo.lock b/Cargo.lock index f43e17068..2a19cd0c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2120,6 +2120,45 @@ dependencies = [ "tokio", ] +[[package]] +name = "datafusion-proto" +version = "53.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a387aaef949dc16bb6abc81bd1af850ec7449183aef011214f9724957495738" +dependencies = [ + "arrow", + "chrono", + "datafusion-catalog", + "datafusion-catalog-listing", + "datafusion-common", + "datafusion-datasource", + "datafusion-datasource-arrow", + "datafusion-datasource-csv", + "datafusion-datasource-json", + "datafusion-datasource-parquet", + "datafusion-execution", + "datafusion-expr", + "datafusion-functions-table", + "datafusion-physical-expr", + "datafusion-physical-expr-common", + "datafusion-physical-plan", + "datafusion-proto-common", + "object_store", + "prost 0.14.1", + "rand 0.9.4", +] + +[[package]] +name = "datafusion-proto-common" +version = "53.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e614c7c53a9c304c6a850b821010bb492e57300311835f1180613f9d2c63d9" +dependencies = [ + "arrow", + "datafusion-common", + "prost 0.14.1", +] + [[package]] name = "datafusion-pruning" version = "53.1.0" @@ -3896,6 +3935,7 @@ dependencies = [ "crossterm", "dashmap", "datafusion", + "datafusion-proto", "derive_more 1.0.0", "erased-serde", "fs_extra", diff --git a/Cargo.toml b/Cargo.toml index 326ffae3f..14fe8b051 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ object_store = { version = "0.13.1", features = [ "azure", "gcp", ] } +datafusion-proto = "53.1.0" parquet = "58.0.0" # Web server and HTTP-related diff --git a/src/lib.rs b/src/lib.rs index de7ee4a5f..289494723 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -58,7 +58,12 @@ pub mod validator; use std::time::Duration; // Public re-exports of crates being used in enterprise +pub use arrow_array; +pub use arrow_flight; +pub use arrow_ipc; +pub use catalog as parseable_catalog; pub use datafusion; +pub use datafusion_proto; pub use handlers::http::modal::{ ParseableServer, ingest_server::IngestServer, query_server::QueryServer, server::Server, }; @@ -68,6 +73,7 @@ use parseable::PARSEABLE; use reqwest::{Client, ClientBuilder}; pub use {opentelemetry, opentelemetry_otlp, opentelemetry_proto, opentelemetry_sdk}; pub use {tracing_actix_web, tracing_opentelemetry, tracing_subscriber}; +pub use utils as parseable_utils; // It is very unlikely that panic will occur when dealing with locks. pub const LOCK_EXPECT: &str = "Thread shouldn't panic while holding a lock"; diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index 9c9ff86fa..4c61f2f7c 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs @@ -257,6 +257,18 @@ pub static TOTAL_QUERY_CALLS_BY_DATE: Lazy = Lazy::new(|| { .expect("metric can be created") }); +pub static TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE: Lazy = Lazy::new(|| { + IntCounterVec::new( + Opts::new( + "total_files_scanned_in_hottier_by_date", + "Total files scanned in hottier by date", + ) + .namespace(METRICS_NAMESPACE), + &["stream", "date", "tenant_id"], + ) + .expect("metric can be created") +}); + pub static TOTAL_FILES_SCANNED_IN_QUERY_BY_DATE: Lazy = Lazy::new(|| { IntCounterVec::new( Opts::new( @@ -683,6 +695,17 @@ pub fn increment_files_scanned_in_query_by_date(count: u64, date: &str, tenant_i .inc_by(count); } +pub fn increment_files_scanned_in_hottier_by_date( + count: u64, + date: &str, + tenant_id: &str, + stream_name: &str, +) { + TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE + .with_label_values(&[stream_name, date, tenant_id]) + .inc_by(count); +} + pub fn increment_bytes_scanned_in_query_by_date(bytes: u64, date: &str, tenant_id: &str) { TOTAL_BYTES_SCANNED_IN_QUERY_BY_DATE .with_label_values(&[date, tenant_id]) diff --git a/src/query/mod.rs b/src/query/mod.rs index e1df94c1a..20ebd9dcc 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -25,6 +25,7 @@ use arrow_schema::SchemaRef; use chrono::NaiveDateTime; use chrono::{DateTime, Duration, Utc}; use datafusion::arrow::record_batch::RecordBatch; +use datafusion::catalog::SchemaProvider; use datafusion::common::tree_node::Transformed; use datafusion::execution::disk_manager::DiskManager; use datafusion::execution::{ @@ -45,7 +46,7 @@ use datafusion::sql::sqlparser::dialect::PostgreSqlDialect; use futures::Stream; use futures::stream::select_all; use itertools::Itertools; -use once_cell::sync::Lazy; +use once_cell::sync::{Lazy, OnceCell}; use serde::{Deserialize, Serialize}; use serde_json::{Value, json}; use std::ops::Bound; @@ -57,7 +58,6 @@ use sysinfo::System; use tokio::runtime::Runtime; use self::error::ExecuteError; -use self::stream_schema_provider::GlobalSchemaProvider; pub use self::stream_schema_provider::PartialTimeFilter; use crate::alerts::alert_structs::Conditions; use crate::alerts::alerts_utils::get_filter_string; @@ -70,7 +70,8 @@ use crate::handlers::http::query::QueryError; use crate::metrics::increment_bytes_scanned_in_query_by_date; use crate::option::Mode; use crate::parseable::{DEFAULT_TENANT, PARSEABLE}; -use crate::storage::{ObjectStorageProvider, ObjectStoreFormat}; +use crate::query::stream_schema_provider::GlobalSchemaProvider; +use crate::storage::{ObjectStorage, ObjectStorageProvider, ObjectStoreFormat}; use crate::utils::time::TimeRange; /// Boxed record-batch stream used as the streaming half of query results. @@ -95,6 +96,13 @@ type QueryResult = Result<(Either, BoxedBatchStream>, Vec = // Lazy::new(|| Query::create_session_context(PARSEABLE.storage())); +pub static SCHEMA_PROVIDER: OnceCell> = OnceCell::new(); + +/// Additional physical optimizer rules registered by enterprise/plugins. +/// Must be populated BEFORE `QUERY_SESSION_STATE` is first accessed. +pub static ADDITIONAL_PHYSICAL_OPTIMIZER_RULES: Lazy< + RwLock>>, +> = Lazy::new(|| RwLock::new(Vec::new())); pub static QUERY_SESSION_STATE: Lazy = Lazy::new(|| Query::create_session_state(PARSEABLE.storage())); @@ -110,6 +118,15 @@ pub static QUERY_SESSION: Lazy = Lazy::new(|| { } }); +/// Trait to enable implementation of SchemaProvider +pub trait ParseableSchemaProvider: Send + Sync { + fn new_provider( + &self, + storage: Option>, + tenant_id: &Option, + ) -> Box; +} + pub struct InMemorySessionContext { session_context: Arc>, } @@ -124,18 +141,23 @@ impl InMemorySessionContext { } pub fn add_schema(&self, tenant_id: &str) { + let schema_provider = if let Some(provider) = SCHEMA_PROVIDER.get() { + provider.new_provider( + Some(PARSEABLE.storage().get_object_store()), + &Some(tenant_id.to_owned()), + ) + } else { + Box::new(GlobalSchemaProvider { + storage: PARSEABLE.storage().get_object_store(), + tenant_id: Some(tenant_id.to_owned()), + }) + }; self.session_context .write() .expect("SessionContext should be writeable") .catalog("datafusion") .expect("Default catalog should be available") - .register_schema( - tenant_id, - Arc::new(GlobalSchemaProvider { - storage: PARSEABLE.storage().get_object_store(), - tenant_id: Some(tenant_id.to_owned()), - }), - ) + .register_schema(tenant_id, schema_provider.into()) .expect("Should be able to register new schema"); } @@ -184,29 +206,41 @@ impl Query { // register multiple schemas if let Some(tenants) = PARSEABLE.list_tenants() { for t in tenants.iter() { - let schema_provider = Arc::new(GlobalSchemaProvider { - storage: storage.get_object_store(), - tenant_id: Some(t.clone()), - }); - let _ = catalog.register_schema(t, schema_provider); + let schema_provider = if let Some(provider) = SCHEMA_PROVIDER.get() { + provider.new_provider( + Some(PARSEABLE.storage().get_object_store()), + &Some(t.to_owned()), + ) + } else { + Box::new(GlobalSchemaProvider { + storage: PARSEABLE.storage().get_object_store(), + tenant_id: Some(t.to_owned()), + }) + }; + let _ = catalog.register_schema(t, schema_provider.into()); } } } else { // register just one schema - let schema_provider = Arc::new(GlobalSchemaProvider { - storage: storage.get_object_store(), - tenant_id: None, - }); + let schema_provider = if let Some(provider) = SCHEMA_PROVIDER.get() { + provider.new_provider(Some(PARSEABLE.storage().get_object_store()), &None) + } else { + Box::new(GlobalSchemaProvider { + storage: PARSEABLE.storage().get_object_store(), + tenant_id: None, + }) + }; + let _ = catalog.register_schema( &state.config_options().catalog.default_schema, - schema_provider, + schema_provider.into(), ); } SessionContext::new_with_state(state) } - fn create_session_state(storage: Arc) -> SessionState { + pub fn create_session_state(storage: Arc) -> SessionState { let runtime_config = storage .get_datafusion_runtime() .with_disk_manager_builder(DiskManager::builder()); @@ -252,11 +286,19 @@ impl Query { .parquet .schema_force_view_types = true; - SessionStateBuilder::new() + let mut builder = SessionStateBuilder::new() .with_default_features() .with_config(config) - .with_runtime_env(runtime) - .build() + .with_runtime_env(runtime); + + // Append any additional physical optimizer rules (e.g., enterprise partial agg pushdown) + if let Ok(rules) = ADDITIONAL_PHYSICAL_OPTIMIZER_RULES.read() { + for rule in rules.iter() { + builder = builder.with_physical_optimizer_rule(Arc::clone(rule)); + } + } + + builder.build() } /// this function returns the result of the query @@ -288,14 +330,12 @@ impl Query { return Ok((Either::Left(vec![]), fields)); } - let plan = QUERY_SESSION - .get_ctx() - .state() - .create_physical_plan(df.logical_plan()) - .await?; + let ctx = QUERY_SESSION.get_ctx(); + + let plan = ctx.state().create_physical_plan(df.logical_plan()).await?; let results = if !is_streaming { - let task_ctx = QUERY_SESSION.get_ctx().task_ctx(); + let task_ctx = ctx.task_ctx(); let batches = collect_partitioned(plan.clone(), task_ctx.clone()) .await? @@ -311,7 +351,7 @@ impl Query { Either::Left(batches) } else { - let task_ctx = QUERY_SESSION.get_ctx().task_ctx(); + let task_ctx = ctx.task_ctx(); let output_partitions = plan.output_partitioning().partition_count(); diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index 6d4005972..59daa4a88 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -56,7 +56,10 @@ use crate::{ }, event::DEFAULT_TIMESTAMP_KEY, hottier::HotTierManager, - metrics::{QUERY_CACHE_HIT, increment_files_scanned_in_query_by_date}, + metrics::{ + QUERY_CACHE_HIT, increment_files_scanned_in_hottier_by_date, + increment_files_scanned_in_query_by_date, + }, option::Mode, parseable::{DEFAULT_TENANT, PARSEABLE, STREAM_EXISTS}, storage::{ObjectStorage, ObjectStoreFormat}, @@ -205,6 +208,13 @@ impl StandardTableProvider { .await .map_err(|err| DataFusionError::External(Box::new(err)))?; + increment_files_scanned_in_hottier_by_date( + hot_tier_files.len() as u64, + &chrono::Utc::now().date_naive().to_string(), + self.tenant_id.as_deref().unwrap_or(DEFAULT_TENANT), + &self.stream, + ); + let hot_tier_files: Vec = hot_tier_files .into_iter() .map(|mut file| { @@ -352,101 +362,108 @@ impl StandardTableProvider { &self, manifest_files: Vec, ) -> (Vec>, datafusion::common::Statistics) { - let target_partition: usize = num_cpus::get(); - let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new())); - let mut column_statistics = HashMap::>::new(); - let mut count = 0; - let mut file_count = 0u64; - for (index, file) in manifest_files - .into_iter() - .enumerate() - .map(|(x, y)| (x % target_partition, y)) + partitioned_files(&self.schema, &self.tenant_id, manifest_files) + } +} + +#[inline(always)] +pub fn partitioned_files( + schema: &SchemaRef, + tenant_id: &Option, + manifest_files: Vec, +) -> (Vec>, datafusion::common::Statistics) { + let target_partition: usize = num_cpus::get(); + let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new())); + let mut column_statistics = HashMap::>::new(); + let mut count = 0; + let mut file_count = 0u64; + for (index, file) in manifest_files + .into_iter() + .enumerate() + .map(|(x, y)| (x % target_partition, y)) + { + #[allow(unused_mut)] + let File { + mut file_path, + num_rows, + columns, + .. + } = file; + + // Track billing metrics for files scanned in query + file_count += 1; + + // object_store::path::Path doesn't automatically deal with Windows path separators + // to do that, we are using from_absolute_path() which takes into consideration the underlying filesystem + // before sending the file path to PartitionedFile + // the github issue- https://github.com/parseablehq/parseable/issues/824 + // For some reason, the `from_absolute_path()` doesn't work for macos, hence the ugly solution + // TODO: figure out an elegant solution to this + #[cfg(windows)] { - #[allow(unused_mut)] - let File { - mut file_path, - num_rows, - columns, - .. - } = file; - - // Track billing metrics for files scanned in query - file_count += 1; - - // object_store::path::Path doesn't automatically deal with Windows path separators - // to do that, we are using from_absolute_path() which takes into consideration the underlying filesystem - // before sending the file path to PartitionedFile - // the github issue- https://github.com/parseablehq/parseable/issues/824 - // For some reason, the `from_absolute_path()` doesn't work for macos, hence the ugly solution - // TODO: figure out an elegant solution to this - #[cfg(windows)] - { - if PARSEABLE.storage.name() == "drive" { - file_path = object_store::path::Path::from_absolute_path(file_path) - .unwrap() - .to_string(); - } + if PARSEABLE.storage.name() == "drive" { + file_path = object_store::path::Path::from_absolute_path(file_path) + .unwrap() + .to_string(); } - let pf = PartitionedFile::new(file_path, file.file_size); - partitioned_files[index].push(pf); - - columns.into_iter().for_each(|col| { - column_statistics - .entry(col.name) - .and_modify(|x| { - if let Some((stats, col_stats)) = x.as_ref().cloned().zip(col.stats.clone()) - { - // update() returns None on type mismatch (e.g. column - // historically written as both Utf8 and Timestamp(ms)). - // Dropping to None here makes the planner skip min/max - // pushdown for this column instead of crashing the worker. - *x = stats.update(col_stats); - } - }) - .or_insert_with(|| col.stats.as_ref().cloned()); - }); - count += num_rows; } - let statistics = self - .schema - .fields() - .iter() - .map(|field| { - column_statistics - .get(field.name()) - .and_then(|stats| stats.as_ref()) - .and_then(|stats| stats.clone().min_max_as_scalar(field.data_type())) - .map(|(min, max)| datafusion::common::ColumnStatistics { - null_count: Precision::Absent, - max_value: Precision::Exact(max), - min_value: Precision::Exact(min), - distinct_count: Precision::Absent, - sum_value: Precision::Absent, - byte_size: Precision::Absent, - }) - .unwrap_or_default() - }) - .collect(); + let pf = PartitionedFile::new(file_path, file.file_size); + partitioned_files[index].push(pf); + + columns.into_iter().for_each(|col| { + column_statistics + .entry(col.name) + .and_modify(|x| { + if let Some((stats, col_stats)) = x.as_ref().cloned().zip(col.stats.clone()) { + // update() returns None on type mismatch (e.g. column + // historically written as both Utf8 and Timestamp(ms)). + // Dropping to None here makes the planner skip min/max + // pushdown for this column instead of crashing the worker. + *x = stats.update(col_stats); + } + }) + .or_insert_with(|| col.stats.as_ref().cloned()); + }); + count += num_rows; + } + let statistics = schema + .fields() + .iter() + .map(|field| { + column_statistics + .get(field.name()) + .and_then(|stats| stats.as_ref()) + .and_then(|stats| stats.clone().min_max_as_scalar(field.data_type())) + .map(|(min, max)| datafusion::common::ColumnStatistics { + null_count: Precision::Absent, + max_value: Precision::Exact(max), + min_value: Precision::Exact(min), + distinct_count: Precision::Absent, + sum_value: Precision::Absent, + byte_size: Precision::Absent, + }) + .unwrap_or_default() + }) + .collect(); - let statistics = datafusion::common::Statistics { - num_rows: Precision::Exact(count as usize), - total_byte_size: Precision::Absent, - column_statistics: statistics, - }; + let statistics = datafusion::common::Statistics { + num_rows: Precision::Exact(count as usize), + total_byte_size: Precision::Absent, + column_statistics: statistics, + }; - // Track billing metrics for query scan - let current_date = chrono::Utc::now().date_naive().to_string(); - increment_files_scanned_in_query_by_date( - file_count, - ¤t_date, - self.tenant_id.as_deref().unwrap_or(DEFAULT_TENANT), - ); + // Track billing metrics for query scan + let current_date = chrono::Utc::now().date_naive().to_string(); + increment_files_scanned_in_query_by_date( + file_count, + ¤t_date, + tenant_id.as_deref().unwrap_or(DEFAULT_TENANT), + ); - (partitioned_files, statistics) - } + (partitioned_files, statistics) } -async fn collect_from_snapshot( +pub async fn collect_from_snapshot( snapshot: &Snapshot, time_filters: &[PartialTimeFilter], filters: &[Expr], @@ -683,7 +700,8 @@ impl TableProvider for StandardTableProvider { } } -fn reversed_mem_table( +#[inline(always)] +pub fn reversed_mem_table( mut records: Vec, schema: Arc, ) -> Result { @@ -702,7 +720,7 @@ pub enum PartialTimeFilter { } impl PartialTimeFilter { - fn try_from_expr(expr: &Expr, time_partition: &Option) -> Option { + pub fn try_from_expr(expr: &Expr, time_partition: &Option) -> Option { let Expr::BinaryExpr(binexpr) = expr else { return None; }; @@ -863,7 +881,7 @@ pub fn is_within_staging_window(time_filters: &[PartialTimeFilter]) -> bool { !has_upper_bound } -fn expr_in_boundary(filter: &Expr) -> bool { +pub fn expr_in_boundary(filter: &Expr) -> bool { let Expr::BinaryExpr(binexpr) = filter else { return false; }; @@ -881,7 +899,7 @@ fn expr_in_boundary(filter: &Expr) -> bool { ) } -fn extract_timestamp_bound( +pub fn extract_timestamp_bound( binexpr: &BinaryExpr, time_partition: &Option, ) -> Option<(Operator, NaiveDateTime)> { diff --git a/src/utils/arrow/flight.rs b/src/utils/arrow/flight.rs index 481f6c56a..787afae5a 100644 --- a/src/utils/arrow/flight.rs +++ b/src/utils/arrow/flight.rs @@ -143,6 +143,31 @@ fn lit_timestamp_milli(time: i64) -> Expr { Expr::Literal(ScalarValue::TimestampMillisecond(Some(time), None), None) } +/// Streaming variant of into_flight_data. Converts a DataFusion record batch +/// stream directly into Flight data without materializing all batches in memory. +pub fn into_flight_data_stream( + stream: datafusion::execution::SendableRecordBatchStream, +) -> Result, Box> { + let record_stream = stream.map_err(|e| { + arrow_flight::error::FlightError::Arrow(arrow_schema::ArrowError::ExternalError( + Box::new(e), + )) + }); + + let write_options = IpcWriteOptions::default() + .try_with_compression(Some(arrow_ipc::CompressionType(1))) + .map_err(|err| Status::failed_precondition(err.to_string()))?; + + let flight_data_stream = FlightDataEncoderBuilder::new() + .with_max_flight_data_size(usize::MAX) + .with_options(write_options) + .build(record_stream); + + let flight_data_stream = flight_data_stream.map_err(|err| Status::unknown(err.to_string())); + + Ok(Response::new(Box::pin(flight_data_stream) as DoGetStream)) +} + pub fn into_flight_data(records: Vec) -> Result, Box> { let input_stream = futures::stream::iter(records.into_iter().map(Ok)); let write_options = IpcWriteOptions::default()