Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions rust/plex-pg-core/src/db_interpose_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ use crate::db_interpose_common::{
get_orig_sqlite3_bind_parameter_name, get_orig_sqlite3_create_collation,
get_orig_sqlite3_create_collation_v2, get_orig_sqlite3_db_handle, get_orig_sqlite3_errcode,
get_orig_sqlite3_errmsg, get_orig_sqlite3_expanded_sql, get_orig_sqlite3_extended_errcode,
get_orig_sqlite3_free, get_orig_sqlite3_get_table, get_orig_sqlite3_malloc,
get_orig_sqlite3_sql, get_orig_sqlite3_stmt_busy, get_orig_sqlite3_stmt_readonly,
get_orig_sqlite3_stmt_status, get_shim_sqlite3_errcode, get_shim_sqlite3_errmsg,
tls_in_interpose_call_ptr,
get_orig_sqlite3_free, get_orig_sqlite3_get_table, get_orig_sqlite3_last_insert_rowid,
get_orig_sqlite3_malloc, get_orig_sqlite3_sql, get_orig_sqlite3_stmt_busy,
get_orig_sqlite3_stmt_readonly, get_orig_sqlite3_stmt_status, get_shim_sqlite3_errcode,
get_shim_sqlite3_errmsg, tls_in_interpose_call_ptr,
};
use crate::db_interpose_conn_utils::{cstr_to_string_or, log_debug, PthreadMutexGuard};
use crate::ffi_types::{sqlite3, sqlite3_stmt, PgStmt};
Expand Down
31 changes: 27 additions & 4 deletions rust/plex-pg-core/src/db_interpose_metadata/connection_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,35 @@ pub(super) fn last_insert_rowid_impl(db: *mut sqlite3) -> i64 {
let pg_conn = crate::pg_client::rust_pg_find_connection(db);
if pg_conn.is_null() {
let global_rowid = crate::pg_client::rust_get_global_last_insert_rowid();
if global_rowid > 0 {
log_debug_lazy!(
"last_insert_rowid: CALLED db={:p} pg_conn=NULL (no exact match, global={})",
db,
global_rowid
);
return global_rowid;
}
// Fall back to real SQLite's last_insert_rowid on this db handle.
// When the PG path didn't capture the rowid (e.g. the INSERT was
// prepared via the real-SQLite path because of stack-depth caution
// or because pg_conn lookup missed at prepare time), the shadow
// SQLite engine still ran the INSERT and knows the rowid. Returning
// 0 here is what makes Plex's SOCI wrapper throw `DB::Exception:
// std::exception` every 5s, keeping the server stuck in Maintenance.
if let Some(f) = get_orig_sqlite3_last_insert_rowid() {
let real_rowid = unsafe { f(db) };
log_debug_lazy!(
"last_insert_rowid: CALLED db={:p} pg_conn=NULL global=0, real_sqlite={}",
db,
real_rowid
);
return real_rowid;
}
log_debug_lazy!(
"last_insert_rowid: CALLED db={:p} pg_conn=NULL (no exact match, global={})",
db,
global_rowid
"last_insert_rowid: CALLED db={:p} pg_conn=NULL global=0, no real_sqlite fn",
db
);
return if global_rowid > 0 { global_rowid } else { 0 };
return 0;
}

log_debug_lazy!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,19 @@ pub extern "C" fn rust_step_cached_write_execute_and_finalize(
id_str = id_buf.as_ptr();
}
if !id_str.is_null() && !CStr::from_ptr(id_str).to_bytes().is_empty() {
// Mirror the non-cached write path (write_exec.rs): publish
// the returned id as `last_insert_rowid` on both the
// executing connection and the global atomic. Without this,
// a cached INSERT with `RETURNING id` produces a row on
// PG but `sqlite3_last_insert_rowid()` returns 0, which
// Plex's SOCI wrapper interprets as "insert failed" and
// throws DB::Exception from every async-task tick.
let rowid = crate::db_interpose_helpers::rust_pg_text_to_int64(id_str);
if rowid > 0 {
let ec = &mut *exec_conn;
ec.last_insert_rowid = rowid;
crate::pg_client::rust_set_global_last_insert_rowid(rowid);
}
let meta_id = crate::pg_statement::rust_extract_metadata_id(orig_sql);
if meta_id > 0 {
crate::pg_client::rust_set_global_metadata_id(meta_id);
Expand Down
39 changes: 39 additions & 0 deletions rust/plex-pg-core/src/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,45 @@ fn transform_insert(insert: &mut Insert) {
action: OnConflictAction::DoNothing,
}));
insert.or = None;
} else if conflict_cols.is_some() {
// Plain INSERT into a table where the shim has added a UNIQUE
// constraint that SQLite did not have (see plex_schema.sql vs
// sqlite_schema.sql for statistics_bandwidth, statistics_media,
// metadata_item_settings, etc.). Without this branch, plain
// `INSERT INTO statistics_bandwidth (...) VALUES (...)` succeeds
// on SQLite (no UNIQUE) but raises `unique_violation` on PG when
// the row already exists. Plex's caller does not catch that
// and it propagates as `DB::Exception: std::exception`, crashing
// PMS — observed on startup roll-up for statistics_bandwidth /
// statistics_media on a freshly migrated PG database.
//
// Use ON CONFLICT DO UPDATE (not DO NOTHING). DO NOTHING returns
// zero rows from the trailing `RETURNING id` that
// `rust_step_cached_write_build_exec_sql` appends to every INSERT;
// Plex's SOCI wrapper expects a row back to populate
// `last_insert_rowid()` and throws DB::Exception on the empty
// result, leaving the server stuck in startup Maintenance with
// an "Uncaught exception running async task" log spam every 5s.
// DO UPDATE preserves the row id (RETURNING id returns it) and
// overwrites the non-key columns, which matches what Plex would
// have done after its SELECT-or-INSERT pattern anyway: store the
// latest measurement for this (account, device, timespan, at,
// lan) tuple.
let columns = insert.columns.clone();
let conflict_col_names: Vec<String> = conflict_cols
.as_ref()
.map(|cols| cols.iter().map(|c| c.to_lowercase()).collect())
.unwrap_or_default();
insert.on = Some(OnInsert::OnConflict(make_do_update(
columns,
conflict_target,
&conflict_col_names,
)));
if should_add_returning_id(&conflict_cols) {
insert.returning = Some(vec![SelectItem::UnnamedExpr(Expr::Identifier(Ident::new(
"id",
)))]);
}
}
}

Expand Down