diff --git a/rust/plex-pg-core/src/upsert.rs b/rust/plex-pg-core/src/upsert.rs index 86f770a..b284e0f 100644 --- a/rust/plex-pg-core/src/upsert.rs +++ b/rust/plex-pg-core/src/upsert.rs @@ -86,6 +86,45 @@ fn transform_insert(insert: &mut Insert) { action: OnConflictAction::DoNothing, })); insert.or = None; + } else if conflict_cols.is_some() { + // Plain INSERT into a table where the shim has added a UNIQUE + // constraint that SQLite did not have (see plex_schema.sql vs + // sqlite_schema.sql for statistics_bandwidth, statistics_media, + // metadata_item_settings, etc.). Without this branch, plain + // `INSERT INTO statistics_bandwidth (...) VALUES (...)` succeeds + // on SQLite (no UNIQUE) but raises `unique_violation` on PG when + // the row already exists. Plex's caller does not catch that + // and it propagates as `DB::Exception: std::exception`, crashing + // PMS — observed on startup roll-up for statistics_bandwidth / + // statistics_media on a freshly migrated PG database. + // + // Use ON CONFLICT DO UPDATE (not DO NOTHING). DO NOTHING returns + // zero rows from the trailing `RETURNING id` that + // `rust_step_cached_write_build_exec_sql` appends to every INSERT; + // Plex's SOCI wrapper expects a row back to populate + // `last_insert_rowid()` and throws DB::Exception on the empty + // result, leaving the server stuck in startup Maintenance with + // an "Uncaught exception running async task" log spam every 5s. + // DO UPDATE preserves the row id (RETURNING id returns it) and + // overwrites the non-key columns, which matches what Plex would + // have done after its SELECT-or-INSERT pattern anyway: store the + // latest measurement for this (account, device, timespan, at, + // lan) tuple. + let columns = insert.columns.clone(); + let conflict_col_names: Vec = conflict_cols + .as_ref() + .map(|cols| cols.iter().map(|c| c.to_lowercase()).collect()) + .unwrap_or_default(); + insert.on = Some(OnInsert::OnConflict(make_do_update( + columns, + conflict_target, + &conflict_col_names, + ))); + if should_add_returning_id(&conflict_cols) { + insert.returning = Some(vec![SelectItem::UnnamedExpr(Expr::Identifier(Ident::new( + "id", + )))]); + } } }