Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ object_store = { version = "0.13.1", features = [
"azure",
"gcp",
] }
datafusion-proto = "53.1.0"
parquet = "58.0.0"

# Web server and HTTP-related
Expand Down
6 changes: 6 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ pub mod validator;
use std::time::Duration;

// Public re-exports of crates being used in enterprise
pub use arrow_array;
pub use arrow_flight;
pub use arrow_ipc;
pub use catalog as parseable_catalog;
pub use datafusion;
pub use datafusion_proto;
pub use handlers::http::modal::{
ParseableServer, ingest_server::IngestServer, query_server::QueryServer, server::Server,
};
Expand All @@ -68,6 +73,7 @@ use parseable::PARSEABLE;
use reqwest::{Client, ClientBuilder};
pub use {opentelemetry, opentelemetry_otlp, opentelemetry_proto, opentelemetry_sdk};
pub use {tracing_actix_web, tracing_opentelemetry, tracing_subscriber};
pub use utils as parseable_utils;

// It is very unlikely that panic will occur when dealing with locks.
pub const LOCK_EXPECT: &str = "Thread shouldn't panic while holding a lock";
Expand Down
23 changes: 23 additions & 0 deletions src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,18 @@ pub static TOTAL_QUERY_CALLS_BY_DATE: Lazy<IntCounterVec> = Lazy::new(|| {
.expect("metric can be created")
});

pub static TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE: Lazy<IntCounterVec> = Lazy::new(|| {
IntCounterVec::new(
Opts::new(
"total_files_scanned_in_hottier_by_date",
"Total files scanned in hottier by date",
)
.namespace(METRICS_NAMESPACE),
&["stream", "date", "tenant_id"],
)
.expect("metric can be created")
});
Comment on lines +260 to +270
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="src/metrics/mod.rs"

echo "Occurrences of TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE:"
rg -nC2 'TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE' "$FILE"

echo
echo "Registration block in custom_metrics:"
sed -n '512,580p' "$FILE"

Repository: parseablehq/parseable

Length of output: 3289


🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="src/metrics/mod.rs"

# Find the custom_metrics function definition and check its bounds
echo "=== custom_metrics function definition and scope ==="
sed -n '463,470p' "$FILE"

# Get the complete custom_metrics function (find where it ends)
echo
echo "=== End of custom_metrics function ==="
sed -n '575,585p' "$FILE"

# Search for any other registration attempts with the new metric
echo
echo "=== All occurrences of register and TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE in same context ==="
rg -n 'register.*TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE|TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE.*register' "$FILE"

# Confirm the metric is not already registered elsewhere
echo
echo "=== Checking if the metric appears in any register calls ==="
rg -n '\.register\(' "$FILE" | grep -i 'hottier' || echo "No register calls found for hottier metric"

Repository: parseablehq/parseable

Length of output: 920


Register this counter in custom_metrics.

TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE is declared at line 260 and incremented in the helper at lines 704-706, but it is never added to METRICS_REGISTRY inside custom_metrics. With the custom registry setup in this file, the metric will not be exposed on /metrics.

Proposed fix
     registry
         .register(Box::new(TOTAL_QUERY_CALLS_BY_DATE.clone()))
         .expect("metric can be registered");
+    registry
+        .register(Box::new(TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE.clone()))
+        .expect("metric can be registered");
     registry
         .register(Box::new(TOTAL_FILES_SCANNED_IN_QUERY_BY_DATE.clone()))
         .expect("metric can be registered");
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/metrics/mod.rs` around lines 260 - 270, The new IntCounterVec
TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE is never registered with the custom
prometheus registry, so it won't be exposed; update the custom_metrics function
to call
METRICS_REGISTRY.register(Box::new(TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE.clone()))
(or the equivalent register method used in this module) alongside the other
metrics registration, handling any register errors consistently with existing
patterns so the counter is available on /metrics.


pub static TOTAL_FILES_SCANNED_IN_QUERY_BY_DATE: Lazy<IntCounterVec> = Lazy::new(|| {
IntCounterVec::new(
Opts::new(
Expand Down Expand Up @@ -683,6 +695,17 @@ pub fn increment_files_scanned_in_query_by_date(count: u64, date: &str, tenant_i
.inc_by(count);
}

pub fn increment_files_scanned_in_hottier_by_date(
count: u64,
date: &str,
tenant_id: &str,
stream_name: &str,
) {
TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE
.with_label_values(&[stream_name, date, tenant_id])
.inc_by(count);
}

pub fn increment_bytes_scanned_in_query_by_date(bytes: u64, date: &str, tenant_id: &str) {
TOTAL_BYTES_SCANNED_IN_QUERY_BY_DATE
.with_label_values(&[date, tenant_id])
Expand Down
102 changes: 71 additions & 31 deletions src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use arrow_schema::SchemaRef;
use chrono::NaiveDateTime;
use chrono::{DateTime, Duration, Utc};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::catalog::SchemaProvider;
use datafusion::common::tree_node::Transformed;
use datafusion::execution::disk_manager::DiskManager;
use datafusion::execution::{
Expand All @@ -45,7 +46,7 @@ use datafusion::sql::sqlparser::dialect::PostgreSqlDialect;
use futures::Stream;
use futures::stream::select_all;
use itertools::Itertools;
use once_cell::sync::Lazy;
use once_cell::sync::{Lazy, OnceCell};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use std::ops::Bound;
Expand All @@ -57,7 +58,6 @@ use sysinfo::System;
use tokio::runtime::Runtime;

use self::error::ExecuteError;
use self::stream_schema_provider::GlobalSchemaProvider;
pub use self::stream_schema_provider::PartialTimeFilter;
use crate::alerts::alert_structs::Conditions;
use crate::alerts::alerts_utils::get_filter_string;
Expand All @@ -70,7 +70,8 @@ use crate::handlers::http::query::QueryError;
use crate::metrics::increment_bytes_scanned_in_query_by_date;
use crate::option::Mode;
use crate::parseable::{DEFAULT_TENANT, PARSEABLE};
use crate::storage::{ObjectStorageProvider, ObjectStoreFormat};
use crate::query::stream_schema_provider::GlobalSchemaProvider;
use crate::storage::{ObjectStorage, ObjectStorageProvider, ObjectStoreFormat};
use crate::utils::time::TimeRange;

/// Boxed record-batch stream used as the streaming half of query results.
Expand All @@ -95,6 +96,13 @@ type QueryResult = Result<(Either<Vec<RecordBatch>, BoxedBatchStream>, Vec<Strin

// pub static QUERY_SESSION: Lazy<SessionContext> =
// Lazy::new(|| Query::create_session_context(PARSEABLE.storage()));
pub static SCHEMA_PROVIDER: OnceCell<Box<dyn ParseableSchemaProvider>> = OnceCell::new();

/// Additional physical optimizer rules registered by enterprise/plugins.
/// Must be populated BEFORE `QUERY_SESSION_STATE` is first accessed.
pub static ADDITIONAL_PHYSICAL_OPTIMIZER_RULES: Lazy<
RwLock<Vec<Arc<dyn datafusion::physical_optimizer::PhysicalOptimizerRule + Send + Sync>>>,
> = Lazy::new(|| RwLock::new(Vec::new()));

pub static QUERY_SESSION_STATE: Lazy<SessionState> =
Lazy::new(|| Query::create_session_state(PARSEABLE.storage()));
Expand All @@ -110,6 +118,15 @@ pub static QUERY_SESSION: Lazy<InMemorySessionContext> = Lazy::new(|| {
}
});

/// Trait to enable implementation of SchemaProvider
pub trait ParseableSchemaProvider: Send + Sync {
fn new_provider(
&self,
storage: Option<Arc<dyn ObjectStorage>>,
tenant_id: &Option<String>,
) -> Box<dyn SchemaProvider>;
}

pub struct InMemorySessionContext {
session_context: Arc<RwLock<SessionContext>>,
}
Expand All @@ -124,18 +141,23 @@ impl InMemorySessionContext {
}

pub fn add_schema(&self, tenant_id: &str) {
let schema_provider = if let Some(provider) = SCHEMA_PROVIDER.get() {
provider.new_provider(
Some(PARSEABLE.storage().get_object_store()),
&Some(tenant_id.to_owned()),
)
} else {
Box::new(GlobalSchemaProvider {
storage: PARSEABLE.storage().get_object_store(),
tenant_id: Some(tenant_id.to_owned()),
})
};
self.session_context
.write()
.expect("SessionContext should be writeable")
.catalog("datafusion")
.expect("Default catalog should be available")
.register_schema(
tenant_id,
Arc::new(GlobalSchemaProvider {
storage: PARSEABLE.storage().get_object_store(),
tenant_id: Some(tenant_id.to_owned()),
}),
)
.register_schema(tenant_id, schema_provider.into())
.expect("Should be able to register new schema");
}

Expand Down Expand Up @@ -184,29 +206,41 @@ impl Query {
// register multiple schemas
if let Some(tenants) = PARSEABLE.list_tenants() {
for t in tenants.iter() {
let schema_provider = Arc::new(GlobalSchemaProvider {
storage: storage.get_object_store(),
tenant_id: Some(t.clone()),
});
let _ = catalog.register_schema(t, schema_provider);
let schema_provider = if let Some(provider) = SCHEMA_PROVIDER.get() {
provider.new_provider(
Some(PARSEABLE.storage().get_object_store()),
&Some(t.to_owned()),
)
} else {
Box::new(GlobalSchemaProvider {
storage: PARSEABLE.storage().get_object_store(),
tenant_id: Some(t.to_owned()),
})
};
let _ = catalog.register_schema(t, schema_provider.into());
}
}
} else {
// register just one schema
let schema_provider = Arc::new(GlobalSchemaProvider {
storage: storage.get_object_store(),
tenant_id: None,
});
let schema_provider = if let Some(provider) = SCHEMA_PROVIDER.get() {
provider.new_provider(Some(PARSEABLE.storage().get_object_store()), &None)
} else {
Box::new(GlobalSchemaProvider {
storage: PARSEABLE.storage().get_object_store(),
tenant_id: None,
})
};

let _ = catalog.register_schema(
&state.config_options().catalog.default_schema,
schema_provider,
schema_provider.into(),
);
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

SessionContext::new_with_state(state)
}

fn create_session_state(storage: Arc<dyn ObjectStorageProvider>) -> SessionState {
pub fn create_session_state(storage: Arc<dyn ObjectStorageProvider>) -> SessionState {
let runtime_config = storage
.get_datafusion_runtime()
.with_disk_manager_builder(DiskManager::builder());
Expand Down Expand Up @@ -252,11 +286,19 @@ impl Query {
.parquet
.schema_force_view_types = true;

SessionStateBuilder::new()
let mut builder = SessionStateBuilder::new()
.with_default_features()
.with_config(config)
.with_runtime_env(runtime)
.build()
.with_runtime_env(runtime);

// Append any additional physical optimizer rules (e.g., enterprise partial agg pushdown)
if let Ok(rules) = ADDITIONAL_PHYSICAL_OPTIMIZER_RULES.read() {
for rule in rules.iter() {
builder = builder.with_physical_optimizer_rule(Arc::clone(rule));
}
}

builder.build()
}

/// this function returns the result of the query
Expand Down Expand Up @@ -288,14 +330,12 @@ impl Query {
return Ok((Either::Left(vec![]), fields));
}

let plan = QUERY_SESSION
.get_ctx()
.state()
.create_physical_plan(df.logical_plan())
.await?;
let ctx = QUERY_SESSION.get_ctx();

let plan = ctx.state().create_physical_plan(df.logical_plan()).await?;

let results = if !is_streaming {
let task_ctx = QUERY_SESSION.get_ctx().task_ctx();
let task_ctx = ctx.task_ctx();

let batches = collect_partitioned(plan.clone(), task_ctx.clone())
.await?
Expand All @@ -311,7 +351,7 @@ impl Query {

Either::Left(batches)
} else {
let task_ctx = QUERY_SESSION.get_ctx().task_ctx();
let task_ctx = ctx.task_ctx();

let output_partitions = plan.output_partitioning().partition_count();

Expand Down
Loading
Loading