From a295acace5af5936ab0bae9630ba667b7e433494 Mon Sep 17 00:00:00 2001 From: Joris Date: Fri, 22 May 2026 18:00:17 +0200 Subject: [PATCH 1/2] fix(upsert): rewrite plain INSERT into UNIQUE-keyed tables as ON CONFLICT DO UPDATE Plain INSERT into shim-added UNIQUE-keyed tables (statistics_bandwidth, statistics_media, metadata_item_settings) raises unique_violation on PG; PMS does not catch and aborts with DB::Exception. Extend transform_insert to emit ON CONFLICT DO UPDATE for plain INSERTs whose target table has known conflict columns. DO UPDATE (not DO NOTHING) is intentional so the trailing RETURNING id clause in cached_write still returns the row id. Verified via cargo run --example crash_repro and live pilot pod. --- rust/plex-pg-core/src/upsert.rs | 39 +++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/rust/plex-pg-core/src/upsert.rs b/rust/plex-pg-core/src/upsert.rs index 86f770a..b284e0f 100644 --- a/rust/plex-pg-core/src/upsert.rs +++ b/rust/plex-pg-core/src/upsert.rs @@ -86,6 +86,45 @@ fn transform_insert(insert: &mut Insert) { action: OnConflictAction::DoNothing, })); insert.or = None; + } else if conflict_cols.is_some() { + // Plain INSERT into a table where the shim has added a UNIQUE + // constraint that SQLite did not have (see plex_schema.sql vs + // sqlite_schema.sql for statistics_bandwidth, statistics_media, + // metadata_item_settings, etc.). Without this branch, plain + // `INSERT INTO statistics_bandwidth (...) VALUES (...)` succeeds + // on SQLite (no UNIQUE) but raises `unique_violation` on PG when + // the row already exists. Plex's caller does not catch that + // and it propagates as `DB::Exception: std::exception`, crashing + // PMS — observed on startup roll-up for statistics_bandwidth / + // statistics_media on a freshly migrated PG database. + // + // Use ON CONFLICT DO UPDATE (not DO NOTHING). DO NOTHING returns + // zero rows from the trailing `RETURNING id` that + // `rust_step_cached_write_build_exec_sql` appends to every INSERT; + // Plex's SOCI wrapper expects a row back to populate + // `last_insert_rowid()` and throws DB::Exception on the empty + // result, leaving the server stuck in startup Maintenance with + // an "Uncaught exception running async task" log spam every 5s. + // DO UPDATE preserves the row id (RETURNING id returns it) and + // overwrites the non-key columns, which matches what Plex would + // have done after its SELECT-or-INSERT pattern anyway: store the + // latest measurement for this (account, device, timespan, at, + // lan) tuple. + let columns = insert.columns.clone(); + let conflict_col_names: Vec = conflict_cols + .as_ref() + .map(|cols| cols.iter().map(|c| c.to_lowercase()).collect()) + .unwrap_or_default(); + insert.on = Some(OnInsert::OnConflict(make_do_update( + columns, + conflict_target, + &conflict_col_names, + ))); + if should_add_returning_id(&conflict_cols) { + insert.returning = Some(vec![SelectItem::UnnamedExpr(Expr::Identifier(Ident::new( + "id", + )))]); + } } } From e1a9d3ac6c4e4fc0ffda6414d52eb0f1f293ddf0 Mon Sep 17 00:00:00 2001 From: Joris Date: Fri, 22 May 2026 18:00:42 +0200 Subject: [PATCH 2/2] fix(last_insert_rowid): publish rowid from cached_write + fall back to real SQLite when pg_conn lookup misses MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug #3a: Cached INSERT … RETURNING id never published the row id to Plex's sqlite3_last_insert_rowid view. cached_write now mirrors the non-cached write path: writes ec.last_insert_rowid and calls rust_set_global_last_insert_rowid after a successful exec. Bug #3b: last_insert_rowid_impl returned 0 when the registry lookup missed AND the global was 0 — never fell back to the real shadow SQLite. Added orig_sqlite3_last_insert_rowid(db) fallback so the shadow's view is the source of truth in that path. Required import added in db_interpose_metadata.rs. Verified live: EXCEPTION counter stays at 0 across the 5s stats roll-up cycle; `real_sqlite=N` log markers + `global=N` ticking with N>0. --- .../plex-pg-core/src/db_interpose_metadata.rs | 8 ++--- .../db_interpose_metadata/connection_state.rs | 31 ++++++++++++++++--- .../cached_write.rs | 13 ++++++++ 3 files changed, 44 insertions(+), 8 deletions(-) diff --git a/rust/plex-pg-core/src/db_interpose_metadata.rs b/rust/plex-pg-core/src/db_interpose_metadata.rs index d2b8231..ec7f249 100644 --- a/rust/plex-pg-core/src/db_interpose_metadata.rs +++ b/rust/plex-pg-core/src/db_interpose_metadata.rs @@ -7,10 +7,10 @@ use crate::db_interpose_common::{ get_orig_sqlite3_bind_parameter_name, get_orig_sqlite3_create_collation, get_orig_sqlite3_create_collation_v2, get_orig_sqlite3_db_handle, get_orig_sqlite3_errcode, get_orig_sqlite3_errmsg, get_orig_sqlite3_expanded_sql, get_orig_sqlite3_extended_errcode, - get_orig_sqlite3_free, get_orig_sqlite3_get_table, get_orig_sqlite3_malloc, - get_orig_sqlite3_sql, get_orig_sqlite3_stmt_busy, get_orig_sqlite3_stmt_readonly, - get_orig_sqlite3_stmt_status, get_shim_sqlite3_errcode, get_shim_sqlite3_errmsg, - tls_in_interpose_call_ptr, + get_orig_sqlite3_free, get_orig_sqlite3_get_table, get_orig_sqlite3_last_insert_rowid, + get_orig_sqlite3_malloc, get_orig_sqlite3_sql, get_orig_sqlite3_stmt_busy, + get_orig_sqlite3_stmt_readonly, get_orig_sqlite3_stmt_status, get_shim_sqlite3_errcode, + get_shim_sqlite3_errmsg, tls_in_interpose_call_ptr, }; use crate::db_interpose_conn_utils::{cstr_to_string_or, log_debug, PthreadMutexGuard}; use crate::ffi_types::{sqlite3, sqlite3_stmt, PgStmt}; diff --git a/rust/plex-pg-core/src/db_interpose_metadata/connection_state.rs b/rust/plex-pg-core/src/db_interpose_metadata/connection_state.rs index 9484f28..c65a9ca 100644 --- a/rust/plex-pg-core/src/db_interpose_metadata/connection_state.rs +++ b/rust/plex-pg-core/src/db_interpose_metadata/connection_state.rs @@ -48,12 +48,35 @@ pub(super) fn last_insert_rowid_impl(db: *mut sqlite3) -> i64 { let pg_conn = crate::pg_client::rust_pg_find_connection(db); if pg_conn.is_null() { let global_rowid = crate::pg_client::rust_get_global_last_insert_rowid(); + if global_rowid > 0 { + log_debug_lazy!( + "last_insert_rowid: CALLED db={:p} pg_conn=NULL (no exact match, global={})", + db, + global_rowid + ); + return global_rowid; + } + // Fall back to real SQLite's last_insert_rowid on this db handle. + // When the PG path didn't capture the rowid (e.g. the INSERT was + // prepared via the real-SQLite path because of stack-depth caution + // or because pg_conn lookup missed at prepare time), the shadow + // SQLite engine still ran the INSERT and knows the rowid. Returning + // 0 here is what makes Plex's SOCI wrapper throw `DB::Exception: + // std::exception` every 5s, keeping the server stuck in Maintenance. + if let Some(f) = get_orig_sqlite3_last_insert_rowid() { + let real_rowid = unsafe { f(db) }; + log_debug_lazy!( + "last_insert_rowid: CALLED db={:p} pg_conn=NULL global=0, real_sqlite={}", + db, + real_rowid + ); + return real_rowid; + } log_debug_lazy!( - "last_insert_rowid: CALLED db={:p} pg_conn=NULL (no exact match, global={})", - db, - global_rowid + "last_insert_rowid: CALLED db={:p} pg_conn=NULL global=0, no real_sqlite fn", + db ); - return if global_rowid > 0 { global_rowid } else { 0 }; + return 0; } log_debug_lazy!( diff --git a/rust/plex-pg-core/src/db_interpose_step_write_utils/cached_write.rs b/rust/plex-pg-core/src/db_interpose_step_write_utils/cached_write.rs index f70e4d0..bde6882 100644 --- a/rust/plex-pg-core/src/db_interpose_step_write_utils/cached_write.rs +++ b/rust/plex-pg-core/src/db_interpose_step_write_utils/cached_write.rs @@ -247,6 +247,19 @@ pub extern "C" fn rust_step_cached_write_execute_and_finalize( id_str = id_buf.as_ptr(); } if !id_str.is_null() && !CStr::from_ptr(id_str).to_bytes().is_empty() { + // Mirror the non-cached write path (write_exec.rs): publish + // the returned id as `last_insert_rowid` on both the + // executing connection and the global atomic. Without this, + // a cached INSERT with `RETURNING id` produces a row on + // PG but `sqlite3_last_insert_rowid()` returns 0, which + // Plex's SOCI wrapper interprets as "insert failed" and + // throws DB::Exception from every async-task tick. + let rowid = crate::db_interpose_helpers::rust_pg_text_to_int64(id_str); + if rowid > 0 { + let ec = &mut *exec_conn; + ec.last_insert_rowid = rowid; + crate::pg_client::rust_set_global_last_insert_rowid(rowid); + } let meta_id = crate::pg_statement::rust_extract_metadata_id(orig_sql); if meta_id > 0 { crate::pg_client::rust_set_global_metadata_id(meta_id);