From a295acace5af5936ab0bae9630ba667b7e433494 Mon Sep 17 00:00:00 2001 From: Joris Date: Fri, 22 May 2026 18:00:17 +0200 Subject: [PATCH] fix(upsert): rewrite plain INSERT into UNIQUE-keyed tables as ON CONFLICT DO UPDATE Plain INSERT into shim-added UNIQUE-keyed tables (statistics_bandwidth, statistics_media, metadata_item_settings) raises unique_violation on PG; PMS does not catch and aborts with DB::Exception. Extend transform_insert to emit ON CONFLICT DO UPDATE for plain INSERTs whose target table has known conflict columns. DO UPDATE (not DO NOTHING) is intentional so the trailing RETURNING id clause in cached_write still returns the row id. Verified via cargo run --example crash_repro and live pilot pod. --- rust/plex-pg-core/src/upsert.rs | 39 +++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/rust/plex-pg-core/src/upsert.rs b/rust/plex-pg-core/src/upsert.rs index 86f770a..b284e0f 100644 --- a/rust/plex-pg-core/src/upsert.rs +++ b/rust/plex-pg-core/src/upsert.rs @@ -86,6 +86,45 @@ fn transform_insert(insert: &mut Insert) { action: OnConflictAction::DoNothing, })); insert.or = None; + } else if conflict_cols.is_some() { + // Plain INSERT into a table where the shim has added a UNIQUE + // constraint that SQLite did not have (see plex_schema.sql vs + // sqlite_schema.sql for statistics_bandwidth, statistics_media, + // metadata_item_settings, etc.). Without this branch, plain + // `INSERT INTO statistics_bandwidth (...) VALUES (...)` succeeds + // on SQLite (no UNIQUE) but raises `unique_violation` on PG when + // the row already exists. Plex's caller does not catch that + // and it propagates as `DB::Exception: std::exception`, crashing + // PMS — observed on startup roll-up for statistics_bandwidth / + // statistics_media on a freshly migrated PG database. + // + // Use ON CONFLICT DO UPDATE (not DO NOTHING). DO NOTHING returns + // zero rows from the trailing `RETURNING id` that + // `rust_step_cached_write_build_exec_sql` appends to every INSERT; + // Plex's SOCI wrapper expects a row back to populate + // `last_insert_rowid()` and throws DB::Exception on the empty + // result, leaving the server stuck in startup Maintenance with + // an "Uncaught exception running async task" log spam every 5s. + // DO UPDATE preserves the row id (RETURNING id returns it) and + // overwrites the non-key columns, which matches what Plex would + // have done after its SELECT-or-INSERT pattern anyway: store the + // latest measurement for this (account, device, timespan, at, + // lan) tuple. + let columns = insert.columns.clone(); + let conflict_col_names: Vec = conflict_cols + .as_ref() + .map(|cols| cols.iter().map(|c| c.to_lowercase()).collect()) + .unwrap_or_default(); + insert.on = Some(OnInsert::OnConflict(make_do_update( + columns, + conflict_target, + &conflict_col_names, + ))); + if should_add_returning_id(&conflict_cols) { + insert.returning = Some(vec![SelectItem::UnnamedExpr(Expr::Identifier(Ident::new( + "id", + )))]); + } } }