diff --git a/rust/plex-pg-core/src/db_interpose_metadata.rs b/rust/plex-pg-core/src/db_interpose_metadata.rs index d2b8231..ec7f249 100644 --- a/rust/plex-pg-core/src/db_interpose_metadata.rs +++ b/rust/plex-pg-core/src/db_interpose_metadata.rs @@ -7,10 +7,10 @@ use crate::db_interpose_common::{ get_orig_sqlite3_bind_parameter_name, get_orig_sqlite3_create_collation, get_orig_sqlite3_create_collation_v2, get_orig_sqlite3_db_handle, get_orig_sqlite3_errcode, get_orig_sqlite3_errmsg, get_orig_sqlite3_expanded_sql, get_orig_sqlite3_extended_errcode, - get_orig_sqlite3_free, get_orig_sqlite3_get_table, get_orig_sqlite3_malloc, - get_orig_sqlite3_sql, get_orig_sqlite3_stmt_busy, get_orig_sqlite3_stmt_readonly, - get_orig_sqlite3_stmt_status, get_shim_sqlite3_errcode, get_shim_sqlite3_errmsg, - tls_in_interpose_call_ptr, + get_orig_sqlite3_free, get_orig_sqlite3_get_table, get_orig_sqlite3_last_insert_rowid, + get_orig_sqlite3_malloc, get_orig_sqlite3_sql, get_orig_sqlite3_stmt_busy, + get_orig_sqlite3_stmt_readonly, get_orig_sqlite3_stmt_status, get_shim_sqlite3_errcode, + get_shim_sqlite3_errmsg, tls_in_interpose_call_ptr, }; use crate::db_interpose_conn_utils::{cstr_to_string_or, log_debug, PthreadMutexGuard}; use crate::ffi_types::{sqlite3, sqlite3_stmt, PgStmt}; diff --git a/rust/plex-pg-core/src/db_interpose_metadata/connection_state.rs b/rust/plex-pg-core/src/db_interpose_metadata/connection_state.rs index 9484f28..c65a9ca 100644 --- a/rust/plex-pg-core/src/db_interpose_metadata/connection_state.rs +++ b/rust/plex-pg-core/src/db_interpose_metadata/connection_state.rs @@ -48,12 +48,35 @@ pub(super) fn last_insert_rowid_impl(db: *mut sqlite3) -> i64 { let pg_conn = crate::pg_client::rust_pg_find_connection(db); if pg_conn.is_null() { let global_rowid = crate::pg_client::rust_get_global_last_insert_rowid(); + if global_rowid > 0 { + log_debug_lazy!( + "last_insert_rowid: CALLED db={:p} pg_conn=NULL (no exact match, global={})", + db, + global_rowid + ); + return global_rowid; + } + // Fall back to real SQLite's last_insert_rowid on this db handle. + // When the PG path didn't capture the rowid (e.g. the INSERT was + // prepared via the real-SQLite path because of stack-depth caution + // or because pg_conn lookup missed at prepare time), the shadow + // SQLite engine still ran the INSERT and knows the rowid. Returning + // 0 here is what makes Plex's SOCI wrapper throw `DB::Exception: + // std::exception` every 5s, keeping the server stuck in Maintenance. + if let Some(f) = get_orig_sqlite3_last_insert_rowid() { + let real_rowid = unsafe { f(db) }; + log_debug_lazy!( + "last_insert_rowid: CALLED db={:p} pg_conn=NULL global=0, real_sqlite={}", + db, + real_rowid + ); + return real_rowid; + } log_debug_lazy!( - "last_insert_rowid: CALLED db={:p} pg_conn=NULL (no exact match, global={})", - db, - global_rowid + "last_insert_rowid: CALLED db={:p} pg_conn=NULL global=0, no real_sqlite fn", + db ); - return if global_rowid > 0 { global_rowid } else { 0 }; + return 0; } log_debug_lazy!( diff --git a/rust/plex-pg-core/src/db_interpose_step_write_utils/cached_write.rs b/rust/plex-pg-core/src/db_interpose_step_write_utils/cached_write.rs index f70e4d0..bde6882 100644 --- a/rust/plex-pg-core/src/db_interpose_step_write_utils/cached_write.rs +++ b/rust/plex-pg-core/src/db_interpose_step_write_utils/cached_write.rs @@ -247,6 +247,19 @@ pub extern "C" fn rust_step_cached_write_execute_and_finalize( id_str = id_buf.as_ptr(); } if !id_str.is_null() && !CStr::from_ptr(id_str).to_bytes().is_empty() { + // Mirror the non-cached write path (write_exec.rs): publish + // the returned id as `last_insert_rowid` on both the + // executing connection and the global atomic. Without this, + // a cached INSERT with `RETURNING id` produces a row on + // PG but `sqlite3_last_insert_rowid()` returns 0, which + // Plex's SOCI wrapper interprets as "insert failed" and + // throws DB::Exception from every async-task tick. + let rowid = crate::db_interpose_helpers::rust_pg_text_to_int64(id_str); + if rowid > 0 { + let ec = &mut *exec_conn; + ec.last_insert_rowid = rowid; + crate::pg_client::rust_set_global_last_insert_rowid(rowid); + } let meta_id = crate::pg_statement::rust_extract_metadata_id(orig_sql); if meta_id > 0 { crate::pg_client::rust_set_global_metadata_id(meta_id); diff --git a/rust/plex-pg-core/src/upsert.rs b/rust/plex-pg-core/src/upsert.rs index 86f770a..b284e0f 100644 --- a/rust/plex-pg-core/src/upsert.rs +++ b/rust/plex-pg-core/src/upsert.rs @@ -86,6 +86,45 @@ fn transform_insert(insert: &mut Insert) { action: OnConflictAction::DoNothing, })); insert.or = None; + } else if conflict_cols.is_some() { + // Plain INSERT into a table where the shim has added a UNIQUE + // constraint that SQLite did not have (see plex_schema.sql vs + // sqlite_schema.sql for statistics_bandwidth, statistics_media, + // metadata_item_settings, etc.). Without this branch, plain + // `INSERT INTO statistics_bandwidth (...) VALUES (...)` succeeds + // on SQLite (no UNIQUE) but raises `unique_violation` on PG when + // the row already exists. Plex's caller does not catch that + // and it propagates as `DB::Exception: std::exception`, crashing + // PMS — observed on startup roll-up for statistics_bandwidth / + // statistics_media on a freshly migrated PG database. + // + // Use ON CONFLICT DO UPDATE (not DO NOTHING). DO NOTHING returns + // zero rows from the trailing `RETURNING id` that + // `rust_step_cached_write_build_exec_sql` appends to every INSERT; + // Plex's SOCI wrapper expects a row back to populate + // `last_insert_rowid()` and throws DB::Exception on the empty + // result, leaving the server stuck in startup Maintenance with + // an "Uncaught exception running async task" log spam every 5s. + // DO UPDATE preserves the row id (RETURNING id returns it) and + // overwrites the non-key columns, which matches what Plex would + // have done after its SELECT-or-INSERT pattern anyway: store the + // latest measurement for this (account, device, timespan, at, + // lan) tuple. + let columns = insert.columns.clone(); + let conflict_col_names: Vec = conflict_cols + .as_ref() + .map(|cols| cols.iter().map(|c| c.to_lowercase()).collect()) + .unwrap_or_default(); + insert.on = Some(OnInsert::OnConflict(make_do_update( + columns, + conflict_target, + &conflict_col_names, + ))); + if should_add_returning_id(&conflict_cols) { + insert.returning = Some(vec![SelectItem::UnnamedExpr(Expr::Identifier(Ident::new( + "id", + )))]); + } } }