Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions rust/plex-pg-core/src/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,45 @@ fn transform_insert(insert: &mut Insert) {
action: OnConflictAction::DoNothing,
}));
insert.or = None;
} else if conflict_cols.is_some() {
// Plain INSERT into a table where the shim has added a UNIQUE
// constraint that SQLite did not have (see plex_schema.sql vs
// sqlite_schema.sql for statistics_bandwidth, statistics_media,
// metadata_item_settings, etc.). Without this branch, plain
// `INSERT INTO statistics_bandwidth (...) VALUES (...)` succeeds
// on SQLite (no UNIQUE) but raises `unique_violation` on PG when
// the row already exists. Plex's caller does not catch that
// and it propagates as `DB::Exception: std::exception`, crashing
// PMS — observed on startup roll-up for statistics_bandwidth /
// statistics_media on a freshly migrated PG database.
//
// Use ON CONFLICT DO UPDATE (not DO NOTHING). DO NOTHING returns
// zero rows from the trailing `RETURNING id` that
// `rust_step_cached_write_build_exec_sql` appends to every INSERT;
// Plex's SOCI wrapper expects a row back to populate
// `last_insert_rowid()` and throws DB::Exception on the empty
// result, leaving the server stuck in startup Maintenance with
// an "Uncaught exception running async task" log spam every 5s.
// DO UPDATE preserves the row id (RETURNING id returns it) and
// overwrites the non-key columns, which matches what Plex would
// have done after its SELECT-or-INSERT pattern anyway: store the
// latest measurement for this (account, device, timespan, at,
// lan) tuple.
let columns = insert.columns.clone();
let conflict_col_names: Vec<String> = conflict_cols
.as_ref()
.map(|cols| cols.iter().map(|c| c.to_lowercase()).collect())
.unwrap_or_default();
insert.on = Some(OnInsert::OnConflict(make_do_update(
columns,
conflict_target,
&conflict_col_names,
)));
if should_add_returning_id(&conflict_cols) {
insert.returning = Some(vec![SelectItem::UnnamedExpr(Expr::Identifier(Ident::new(
"id",
)))]);
}
}
}

Expand Down