diff --git a/backend/src/data-source.ts b/backend/src/data-source.ts index 0b64bde..0d300af 100644 --- a/backend/src/data-source.ts +++ b/backend/src/data-source.ts @@ -29,6 +29,13 @@ import { OauthClient } from './modules/oauth-clients/oauth-client.entity'; import { BigBangBaselineMigration1748000000000 } from './migrations/1748000000000-BigBangBaselineMigration'; import { CatalogEtlSchemaMigration1748000000001 } from './migrations/1748000000001-CatalogEtlSchemaMigration'; import { AddNoStepsEtlStatus1748000000002 } from './migrations/1748000000002-AddNoStepsEtlStatus'; +import { AddDiscordAuthToUsers1779608598950 } from './migrations/1779608598950-1748100000000-AddDiscordAuthToUsers'; +import { AddPasswordExpiryToUsers1779642418093 } from './migrations/1779642418093-AddPasswordExpiryToUsers'; +import { AlterJumpPointsForSyntheticRows1779664556916 } from './migrations/1779664556916-AlterJumpPointsForSyntheticRows'; +import { FixCategoriesSectionTypeExpressionIndex1779700000000 } from './migrations/1779700000000-FixCategoriesSectionTypeExpressionIndex'; +import { AddStepNameToEtlRun1779710000000 } from './migrations/1779710000000-AddStepNameToEtlRun'; +import { AddUniqueUuidToStationItem1780010901444 } from './migrations/1780010901444-AddUniqueUuidToStationItem'; +import { MakeItemFksDeferrable1780020000000 } from './migrations/1780020000000-MakeItemFksDeferrable'; export const AppDataSource = new DataSource({ type: 'postgres', @@ -67,6 +74,13 @@ export const AppDataSource = new DataSource({ BigBangBaselineMigration1748000000000, CatalogEtlSchemaMigration1748000000001, AddNoStepsEtlStatus1748000000002, + AddDiscordAuthToUsers1779608598950, + AddPasswordExpiryToUsers1779642418093, + AlterJumpPointsForSyntheticRows1779664556916, + FixCategoriesSectionTypeExpressionIndex1779700000000, + AddStepNameToEtlRun1779710000000, + AddUniqueUuidToStationItem1780010901444, + MakeItemFksDeferrable1780020000000, ], synchronize: false, extra: { parseInt8: true }, diff --git a/backend/src/migrations/1780010901444-AddUniqueUuidToStationItem.ts b/backend/src/migrations/1780010901444-AddUniqueUuidToStationItem.ts new file mode 100644 index 0000000..cadea0f --- /dev/null +++ b/backend/src/migrations/1780010901444-AddUniqueUuidToStationItem.ts @@ -0,0 +1,41 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; + +export class AddUniqueUuidToStationItem1780010901444 + implements MigrationInterface +{ + public async up(queryRunner: QueryRunner): Promise { + // Remove duplicate uuid rows before adding the constraint, keeping the + // most recently synced row for each uuid (NULL uuids are never deduplicated + // by a UNIQUE constraint so they are left as-is). + await queryRunner.query(` + DELETE FROM station_item + WHERE id IN ( + SELECT id FROM ( + SELECT id, + ROW_NUMBER() OVER (PARTITION BY uuid ORDER BY synced_at DESC, id DESC) AS rn + FROM station_item + WHERE uuid IS NOT NULL + ) ranked + WHERE rn > 1 + ) + `); + + await queryRunner.query( + `CREATE UNIQUE INDEX "uq_station_item_uuid" ON "station_item" ("uuid") WHERE "uuid" IS NOT NULL`, + ); + + // Drop the baseline non-unique idx_items_uuid — it is fully superseded by + // the partial unique index above, which also serves as a lookup index. + // Keeping both would leave a non-unique index that contradicts the new + // uniqueness guarantee and could cause the reconciliation SELECT to return + // an arbitrary row if a duplicate somehow slipped through. + await queryRunner.query(`DROP INDEX IF EXISTS "idx_items_uuid"`); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP INDEX IF EXISTS "uq_station_item_uuid"`); + await queryRunner.query( + `CREATE INDEX "idx_items_uuid" ON "station_item" ("uuid")`, + ); + } +} diff --git a/backend/src/migrations/1780020000000-MakeItemFksDeferrable.ts b/backend/src/migrations/1780020000000-MakeItemFksDeferrable.ts new file mode 100644 index 0000000..27357ae --- /dev/null +++ b/backend/src/migrations/1780020000000-MakeItemFksDeferrable.ts @@ -0,0 +1,89 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; + +export class MakeItemFksDeferrable1780020000000 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + // Drop the auto-named inline FK from station_item_attribute.item_uex_id + // and recreate it as DEFERRABLE INITIALLY DEFERRED so the UUID-based + // uex_id reconciliation in ItemsSyncStep can re-key dependents and update + // the referenced uex_id within a single transaction without violating the + // FK mid-statement. + await queryRunner.query(` + DO $$ + DECLARE + con_name TEXT; + BEGIN + SELECT conname INTO con_name + FROM pg_constraint + WHERE conrelid = 'station_item_attribute'::regclass + AND confrelid = 'station_item'::regclass + AND contype = 'f' + LIMIT 1; + + IF con_name IS NOT NULL THEN + EXECUTE format('ALTER TABLE station_item_attribute DROP CONSTRAINT %I', con_name); + END IF; + END $$; + `); + + await queryRunner.query(` + ALTER TABLE station_item_attribute + ADD CONSTRAINT fk_item_attr_item_uex_id + FOREIGN KEY (item_uex_id) + REFERENCES station_item (uex_id) + ON DELETE CASCADE + DEFERRABLE INITIALLY DEFERRED + `); + + // Drop and recreate the self-referential FK on station_item.parent_uex_id. + await queryRunner.query(` + DO $$ + DECLARE + con_name TEXT; + BEGIN + SELECT conname INTO con_name + FROM pg_constraint + WHERE conrelid = 'station_item'::regclass + AND confrelid = 'station_item'::regclass + AND contype = 'f' + LIMIT 1; + + IF con_name IS NOT NULL THEN + EXECUTE format('ALTER TABLE station_item DROP CONSTRAINT %I', con_name); + END IF; + END $$; + `); + + await queryRunner.query(` + ALTER TABLE station_item + ADD CONSTRAINT fk_item_parent_uex_id + FOREIGN KEY (parent_uex_id) + REFERENCES station_item (uex_id) + ON DELETE SET NULL + DEFERRABLE INITIALLY DEFERRED + `); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `ALTER TABLE station_item_attribute DROP CONSTRAINT IF EXISTS fk_item_attr_item_uex_id`, + ); + await queryRunner.query(` + ALTER TABLE station_item_attribute + ADD CONSTRAINT fk_item_attr_item_uex_id + FOREIGN KEY (item_uex_id) + REFERENCES station_item (uex_id) + ON DELETE CASCADE + `); + + await queryRunner.query( + `ALTER TABLE station_item DROP CONSTRAINT IF EXISTS fk_item_parent_uex_id`, + ); + await queryRunner.query(` + ALTER TABLE station_item + ADD CONSTRAINT fk_item_parent_uex_id + FOREIGN KEY (parent_uex_id) + REFERENCES station_item (uex_id) + ON DELETE SET NULL + `); + } +} diff --git a/backend/src/modules/catalog-etl/catalog-etl.module.ts b/backend/src/modules/catalog-etl/catalog-etl.module.ts index 7e2ac6d..a369009 100644 --- a/backend/src/modules/catalog-etl/catalog-etl.module.ts +++ b/backend/src/modules/catalog-etl/catalog-etl.module.ts @@ -21,6 +21,7 @@ import { CategoriesSyncStep } from './steps/categories-sync.step'; import { TerminalsSyncStep } from './steps/terminals-sync.step'; import { TerminalDistancesSyncStep } from './steps/terminal-distances-sync.step'; import { VehiclesSyncStep } from './steps/vehicles-sync.step'; +import { ItemsSyncStep } from './steps/items-sync.step'; import { CommoditiesSyncStep } from './steps/commodities-sync.step'; import { CatalogEtlScheduler } from './schedulers/catalog-etl.scheduler'; import { UexSyncModule } from '../uex-sync/uex-sync.module'; @@ -48,6 +49,7 @@ import { UexSyncModule } from '../uex-sync/uex-sync.module'; TerminalsSyncStep, TerminalDistancesSyncStep, VehiclesSyncStep, + ItemsSyncStep, CommoditiesSyncStep, ], exports: [CatalogEtlService], diff --git a/backend/src/modules/catalog-etl/catalog-etl.service.spec.ts b/backend/src/modules/catalog-etl/catalog-etl.service.spec.ts index 7c644de..cd0a680 100644 --- a/backend/src/modules/catalog-etl/catalog-etl.service.spec.ts +++ b/backend/src/modules/catalog-etl/catalog-etl.service.spec.ts @@ -24,6 +24,7 @@ import { CategoriesSyncStep } from './steps/categories-sync.step'; import { TerminalsSyncStep } from './steps/terminals-sync.step'; import { TerminalDistancesSyncStep } from './steps/terminal-distances-sync.step'; import { VehiclesSyncStep } from './steps/vehicles-sync.step'; +import { ItemsSyncStep } from './steps/items-sync.step'; import { CommoditiesSyncStep } from './steps/commodities-sync.step'; function buildMockRun(overrides: Partial = {}): EtlRun { @@ -159,6 +160,10 @@ describe('CatalogEtlService', () => { provide: VehiclesSyncStep, useValue: { name: 'vehicles-sync', execute: jest.fn() }, }, + { + provide: ItemsSyncStep, + useValue: { name: 'items-sync', execute: jest.fn() }, + }, { provide: CommoditiesSyncStep, useValue: { name: 'commodities-sync', execute: jest.fn() }, diff --git a/backend/src/modules/catalog-etl/catalog-etl.service.ts b/backend/src/modules/catalog-etl/catalog-etl.service.ts index 2569aba..85fc8c5 100644 --- a/backend/src/modules/catalog-etl/catalog-etl.service.ts +++ b/backend/src/modules/catalog-etl/catalog-etl.service.ts @@ -22,6 +22,7 @@ import { CategoriesSyncStep } from './steps/categories-sync.step'; import { TerminalsSyncStep } from './steps/terminals-sync.step'; import { TerminalDistancesSyncStep } from './steps/terminal-distances-sync.step'; import { VehiclesSyncStep } from './steps/vehicles-sync.step'; +import { ItemsSyncStep } from './steps/items-sync.step'; import { CommoditiesSyncStep } from './steps/commodities-sync.step'; @Injectable() @@ -53,6 +54,7 @@ export class CatalogEtlService { private readonly terminalsSyncStep: TerminalsSyncStep, private readonly terminalDistancesSyncStep: TerminalDistancesSyncStep, private readonly vehiclesSyncStep: VehiclesSyncStep, + private readonly itemsSyncStep: ItemsSyncStep, private readonly commoditiesSyncStep: CommoditiesSyncStep, ) { this.ETL_STEPS = [ @@ -72,6 +74,7 @@ export class CatalogEtlService { terminalsSyncStep, terminalDistancesSyncStep, vehiclesSyncStep, + itemsSyncStep, commoditiesSyncStep, ]; } diff --git a/backend/src/modules/catalog-etl/steps/items-sync.step.spec.ts b/backend/src/modules/catalog-etl/steps/items-sync.step.spec.ts new file mode 100644 index 0000000..f35c765 --- /dev/null +++ b/backend/src/modules/catalog-etl/steps/items-sync.step.spec.ts @@ -0,0 +1,890 @@ +import { ItemsSyncStep } from './items-sync.step'; + +const CTX = { runId: 'test-run-id' }; + +function makeLogger() { + return { + info: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + debug: jest.fn(), + }; +} + +function makeAttr(overrides: Record = {}) { + return { + id: 101, + id_category: 5, + id_category_attribute: 42, + value: '150', + unit: 'damage', + date_added: null, + date_modified: null, + ...overrides, + }; +} + +function makeItem(overrides: Record = {}) { + return { + id: 1, + uuid: 'sc-uuid-item-1', + id_parent: null, + id_category: 5, + id_company: 10, + id_vehicle: null, + name: 'Laser Repeater S1', + slug: 'laser-repeater-s1', + size: 'S1', + color: null, + color2: null, + quality: null, + url_store: null, + is_exclusive_pledge: 0, + is_exclusive_subscriber: 0, + is_exclusive_concierge: 0, + is_commodity: 0, + is_harvestable: 0, + screenshot: null, + notification: null, + game_version: '3.22', + date_added: 1700000000, + date_modified: 1710000000, + attributes: [makeAttr()], + ...overrides, + }; +} + +function buildDsQuery( + knownCompanyIds: number[] = [10], + knownVehicleIds: number[] = [], + knownCategoryAttrIds: number[] = [42], +): jest.Mock { + return jest.fn().mockImplementation((sql: string) => { + if (sql.includes('SELECT uex_id FROM station_company')) { + return Promise.resolve(knownCompanyIds.map((id) => ({ uex_id: id }))); + } + if (sql.includes('SELECT uex_id FROM station_vehicle')) { + return Promise.resolve(knownVehicleIds.map((id) => ({ uex_id: id }))); + } + if (sql.includes('SELECT uex_id FROM station_category_attribute')) { + return Promise.resolve( + knownCategoryAttrIds.map((id) => ({ uex_id: id })), + ); + } + return Promise.resolve([]); + }); +} + +function buildStep( + uexGet: jest.Mock, + dsQuery: jest.Mock, + repoCreate: jest.Mock, + repoSave: jest.Mock, +) { + const ds = { + query: dsQuery, + transaction: jest + .fn() + .mockImplementation((cb: (em: { query: jest.Mock }) => Promise) => + cb({ query: dsQuery }), + ), + }; + return new ItemsSyncStep( + { get: uexGet } as never, + ds as never, + { create: repoCreate, save: repoSave } as never, + makeLogger() as never, + ); +} + +describe('ItemsSyncStep', () => { + let uexGet: jest.Mock; + let repoCreate: jest.Mock; + let repoSave: jest.Mock; + + beforeEach(() => { + uexGet = jest.fn(); + repoCreate = jest.fn().mockImplementation((dto) => dto); + repoSave = jest.fn().mockResolvedValue({}); + }); + + afterEach(() => jest.clearAllMocks()); + + describe('item upsert', () => { + it('inserts with parent_uex_id=NULL literal in pass 1a', async () => { + const dsQuery = buildDsQuery(); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([makeItem()]); + + await step.execute(CTX); + + const itemInsert = dsQuery.mock.calls.find(([sql]: [string]) => + sql.includes('INSERT INTO station_item'), + ); + expect(itemInsert[0]).toMatch(/VALUES\s*\(\s*\$1,NULL/); + }); + + it('has ON CONFLICT (uex_id) DO UPDATE SET; parent_uex_id not overwritten on conflict', async () => { + const dsQuery = buildDsQuery(); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([makeItem()]); + + await step.execute(CTX); + + const itemInsert = dsQuery.mock.calls.find(([sql]: [string]) => + sql.includes('INSERT INTO station_item'), + ); + expect(itemInsert[0]).toContain('ON CONFLICT (uex_id) DO UPDATE SET'); + // parent_uex_id must NOT be in the DO UPDATE list — pass 1b/1c handle it + expect(itemInsert[0]).not.toContain( + 'parent_uex_id=EXCLUDED.parent_uex_id', + ); + }); + + it('params array has exactly 23 entries matching $1..$23', async () => { + const dsQuery = buildDsQuery(); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([makeItem()]); + + await step.execute(CTX); + + const itemInsert = dsQuery.mock.calls.find(([sql]: [string]) => + sql.includes('INSERT INTO station_item'), + ); + expect(itemInsert[1]).toHaveLength(23); + // Last two are date timestamps ($22, $23) + expect(itemInsert[1][21]).toBeInstanceOf(Date); // uex_date_added + expect(itemInsert[1][22]).toBeInstanceOf(Date); // uex_date_modified + expect(itemInsert[0]).toMatch(/\$23,NOW\(\)/); + }); + + it('stores category_uex_id, company_uex_id, vehicle_uex_id correctly', async () => { + const dsQuery = buildDsQuery([3], [99]); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([ + makeItem({ id_category: 7, id_company: 3, id_vehicle: 99 }), + ]); + + await step.execute(CTX); + + const itemInsert = dsQuery.mock.calls.find(([sql]: [string]) => + sql.includes('INSERT INTO station_item'), + ); + expect(itemInsert[1][1]).toBe(7); // $2 category_uex_id + expect(itemInsert[1][2]).toBe(3); // $3 company_uex_id + expect(itemInsert[1][3]).toBe(99); // $4 vehicle_uex_id + }); + + it('stores uuid at $7 (index [6])', async () => { + const dsQuery = buildDsQuery(); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([makeItem({ uuid: 'my-sc-uuid' })]); + + await step.execute(CTX); + + const itemInsert = dsQuery.mock.calls.find(([sql]: [string]) => + sql.includes('INSERT INTO station_item'), + ); + expect(itemInsert[1][6]).toBe('my-sc-uuid'); + }); + + it('null uuid stored as null', async () => { + const dsQuery = buildDsQuery(); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([makeItem({ uuid: null })]); + + await step.execute(CTX); + + const itemInsert = dsQuery.mock.calls.find(([sql]: [string]) => + sql.includes('INSERT INTO station_item'), + ); + expect(itemInsert[1][6]).toBeNull(); + }); + + it('skips item with no name and emits warn', async () => { + const dsQuery = buildDsQuery(); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([makeItem({ name: '' })]); + + await step.execute(CTX); + + const itemInsert = dsQuery.mock.calls.find(([sql]: [string]) => + sql.includes('INSERT INTO station_item'), + ); + expect(itemInsert).toBeUndefined(); + expect(repoSave).toHaveBeenCalledWith( + expect.objectContaining({ severity: 'warn' }), + ); + }); + + it('empty item list produces no inserts', async () => { + const dsQuery = buildDsQuery(); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([]); + + await step.execute(CTX); + + const anyInsert = dsQuery.mock.calls.find(([sql]: [string]) => + sql.includes('INSERT'), + ); + expect(anyInsert).toBeUndefined(); + }); + + it('item upsert executes inside transaction (transaction() called per item)', async () => { + const dsQuery = buildDsQuery(); + const ds = { + query: dsQuery, + transaction: jest + .fn() + .mockImplementation( + (cb: (em: { query: jest.Mock }) => Promise) => + cb({ query: dsQuery }), + ), + }; + const step = new ItemsSyncStep( + { get: uexGet } as never, + ds as never, + { create: repoCreate, save: repoSave } as never, + makeLogger() as never, + ); + uexGet.mockResolvedValueOnce([ + makeItem(), + makeItem({ id: 2, uuid: 'sc-uuid-2' }), + ]); + + await step.execute(CTX); + + // One transaction per item + expect(ds.transaction).toHaveBeenCalledTimes(2); + // Item upsert must be inside the transaction, not in the outer dsQuery + const outerItemInsert = + // transaction calls go through dsQuery too (mock wires them together), so + // verify at least one INSERT INTO station_item call was made overall + (dsQuery.mock.calls as [string, unknown[]][]).filter(([sql]) => + sql.includes('INSERT INTO station_item'), + ).length; + expect(outerItemInsert).toBeGreaterThan(0); + }); + }); + + describe('UUID-based reconciliation (UEX ID instability)', () => { + it('issues uuid lookup SELECT for items that have a uuid', async () => { + const dsQuery = buildDsQuery(); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([makeItem({ uuid: 'stable-uuid' })]); + + await step.execute(CTX); + + const uuidLookup = dsQuery.mock.calls.find( + ([sql]: [string]) => + sql.includes('FROM station_item') && + sql.includes('uuid = $1') && + sql.includes('uex_id != $2'), + ); + expect(uuidLookup).toBeDefined(); + expect(uuidLookup[1][0]).toBe('stable-uuid'); // $1 uuid + expect(uuidLookup[1][1]).toBe(1); // $2 uex_id + }); + + it('does NOT issue uuid lookup for items with null uuid', async () => { + const dsQuery = buildDsQuery(); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([makeItem({ uuid: null })]); + + await step.execute(CTX); + + const uuidLookup = dsQuery.mock.calls.find( + ([sql]: [string]) => + sql.includes('FROM station_item') && + sql.includes('uuid = $1') && + sql.includes('uex_id != $2'), + ); + expect(uuidLookup).toBeUndefined(); + }); + + it('when uuid match found: re-keys station_item_attribute.item_uex_id old → new', async () => { + // dsQuery returns an existing row with old uex_id=99 for the uuid lookup + const dsQuery = jest.fn().mockImplementation((sql: string) => { + if (sql.includes('SELECT uex_id FROM station_company')) + return Promise.resolve([{ uex_id: 10 }]); + if (sql.includes('SELECT uex_id FROM station_vehicle')) + return Promise.resolve([]); + if (sql.includes('SELECT uex_id FROM station_category_attribute')) + return Promise.resolve([{ uex_id: 42 }]); + // uuid lookup — returns the old row + if ( + sql.includes('FROM station_item') && + sql.includes('uuid = $1') && + sql.includes('uex_id != $2') + ) + return Promise.resolve([{ uex_id: 99 }]); + return Promise.resolve([]); + }); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([makeItem({ id: 1, uuid: 'stable-uuid' })]); + + await step.execute(CTX); + + const reKeyAttr = dsQuery.mock.calls.find( + ([sql]: [string]) => + sql.includes('UPDATE station_item_attribute') && + sql.includes('item_uex_id = $1'), + ); + expect(reKeyAttr).toBeDefined(); + expect(reKeyAttr[1][0]).toBe(1); // new uex_id + expect(reKeyAttr[1][1]).toBe(99); // old uex_id + }); + + it('when uuid match found: re-keys station_item.parent_uex_id old → new', async () => { + const dsQuery = jest.fn().mockImplementation((sql: string) => { + if (sql.includes('SELECT uex_id FROM station_company')) + return Promise.resolve([{ uex_id: 10 }]); + if (sql.includes('SELECT uex_id FROM station_vehicle')) + return Promise.resolve([]); + if (sql.includes('SELECT uex_id FROM station_category_attribute')) + return Promise.resolve([{ uex_id: 42 }]); + if ( + sql.includes('FROM station_item') && + sql.includes('uuid = $1') && + sql.includes('uex_id != $2') + ) + return Promise.resolve([{ uex_id: 99 }]); + return Promise.resolve([]); + }); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([makeItem({ id: 1, uuid: 'stable-uuid' })]); + + await step.execute(CTX); + + const reKeyParent = dsQuery.mock.calls.find( + ([sql]: [string]) => + sql.includes('UPDATE station_item') && + sql.includes('parent_uex_id = $1') && + sql.includes('parent_uex_id = $2'), + ); + expect(reKeyParent).toBeDefined(); + expect(reKeyParent[1][0]).toBe(1); // new uex_id + expect(reKeyParent[1][1]).toBe(99); // old uex_id + }); + + it('when uuid match found: canonical row updated before dependents are re-keyed', async () => { + const dsQuery = jest.fn().mockImplementation((sql: string) => { + if (sql.includes('SELECT uex_id FROM station_company')) + return Promise.resolve([{ uex_id: 10 }]); + if (sql.includes('SELECT uex_id FROM station_vehicle')) + return Promise.resolve([]); + if (sql.includes('SELECT uex_id FROM station_category_attribute')) + return Promise.resolve([{ uex_id: 42 }]); + if ( + sql.includes('FROM station_item') && + sql.includes('uuid = $1') && + sql.includes('uex_id != $2') + ) + return Promise.resolve([{ uex_id: 99 }]); + return Promise.resolve([]); + }); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([makeItem({ id: 1, uuid: 'stable-uuid' })]); + + await step.execute(CTX); + + const sqls = dsQuery.mock.calls.map(([sql]: [string]) => sql.trim()); + const canonicalIdx = sqls.findIndex( + (s) => + s.startsWith('UPDATE station_item SET') && s.includes('uex_id=$1'), + ); + const reKeyAttrIdx = sqls.findIndex( + (s) => + s.startsWith('UPDATE station_item_attribute') && + s.includes('item_uex_id = $1'), + ); + const reKeyParentIdx = sqls.findIndex( + (s) => + s.startsWith('UPDATE station_item') && + s.includes('parent_uex_id = $1') && + s.includes('parent_uex_id = $2'), + ); + expect(canonicalIdx).toBeGreaterThanOrEqual(0); + expect(reKeyAttrIdx).toBeGreaterThan(canonicalIdx); + expect(reKeyParentIdx).toBeGreaterThan(canonicalIdx); + }); + + it('when uuid match found: updates canonical row with uex_id=$24 (old uex_id) and new columns', async () => { + const dsQuery = jest.fn().mockImplementation((sql: string) => { + if (sql.includes('SELECT uex_id FROM station_company')) + return Promise.resolve([{ uex_id: 10 }]); + if (sql.includes('SELECT uex_id FROM station_vehicle')) + return Promise.resolve([]); + if (sql.includes('SELECT uex_id FROM station_category_attribute')) + return Promise.resolve([{ uex_id: 42 }]); + if ( + sql.includes('FROM station_item') && + sql.includes('uuid = $1') && + sql.includes('uex_id != $2') + ) + return Promise.resolve([{ uex_id: 99 }]); + return Promise.resolve([]); + }); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([makeItem({ id: 1, uuid: 'stable-uuid' })]); + + await step.execute(CTX); + + const canonicalUpdate = dsQuery.mock.calls.find( + ([sql]: [string]) => + sql.includes('UPDATE station_item SET') && + sql.includes('uex_id=$1') && + sql.includes('WHERE uex_id = $24'), + ); + expect(canonicalUpdate).toBeDefined(); + expect(canonicalUpdate[1][0]).toBe(1); // $1 = new uex_id + expect(canonicalUpdate[1][23]).toBe(99); // $24 = old uex_id (WHERE clause) + }); + + it('when uuid match found: skips INSERT INTO station_item (no phantom row)', async () => { + const dsQuery = jest.fn().mockImplementation((sql: string) => { + if (sql.includes('SELECT uex_id FROM station_company')) + return Promise.resolve([{ uex_id: 10 }]); + if (sql.includes('SELECT uex_id FROM station_vehicle')) + return Promise.resolve([]); + if (sql.includes('SELECT uex_id FROM station_category_attribute')) + return Promise.resolve([{ uex_id: 42 }]); + if ( + sql.includes('FROM station_item') && + sql.includes('uuid = $1') && + sql.includes('uex_id != $2') + ) + return Promise.resolve([{ uex_id: 99 }]); + return Promise.resolve([]); + }); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([makeItem({ id: 1, uuid: 'stable-uuid' })]); + + await step.execute(CTX); + + // Use trim().startsWith to avoid matching INSERT INTO station_item_attribute + const insertCall = dsQuery.mock.calls.find(([sql]: [string]) => + sql.trim().startsWith('INSERT INTO station_item '), + ); + expect(insertCall).toBeUndefined(); + }); + + it('when no uuid match: INSERT runs normally (stable item path)', async () => { + // dsQuery returns empty for uuid lookup → no match → INSERT path + const dsQuery = buildDsQuery(); // returns [] for uuid lookup too + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([makeItem({ uuid: 'stable-uuid' })]); + + await step.execute(CTX); + + const insertCall = dsQuery.mock.calls.find(([sql]: [string]) => + sql.includes('INSERT INTO station_item'), + ); + expect(insertCall).toBeDefined(); + }); + }); + + describe('company / vehicle FK', () => { + it('unknown id_company coerced to null and emits warn', async () => { + const dsQuery = buildDsQuery([]); // no known companies + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([makeItem({ id_company: 99 })]); + + await step.execute(CTX); + + const itemInsert = dsQuery.mock.calls.find(([sql]: [string]) => + sql.includes('INSERT INTO station_item'), + ); + expect(itemInsert[1][2]).toBeNull(); // $3 company_uex_id + expect(repoSave).toHaveBeenCalledWith( + expect.objectContaining({ + severity: 'warn', + message: expect.stringContaining('unknown company uex_id=99'), + }), + ); + }); + + it('null id_company stored as null', async () => { + const dsQuery = buildDsQuery(); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([makeItem({ id_company: null })]); + + await step.execute(CTX); + + const itemInsert = dsQuery.mock.calls.find(([sql]: [string]) => + sql.includes('INSERT INTO station_item'), + ); + expect(itemInsert[1][2]).toBeNull(); + }); + + it('unknown id_vehicle coerced to null and emits warn', async () => { + const dsQuery = buildDsQuery([10], []); // no known vehicles + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([makeItem({ id_vehicle: 55 })]); + + await step.execute(CTX); + + const itemInsert = dsQuery.mock.calls.find(([sql]: [string]) => + sql.includes('INSERT INTO station_item'), + ); + expect(itemInsert[1][3]).toBeNull(); // $4 vehicle_uex_id + expect(repoSave).toHaveBeenCalledWith( + expect.objectContaining({ + severity: 'warn', + message: expect.stringContaining('unknown vehicle uex_id=55'), + }), + ); + }); + + it('null id_vehicle stored as null', async () => { + const dsQuery = buildDsQuery(); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([makeItem({ id_vehicle: null })]); + + await step.execute(CTX); + + const itemInsert = dsQuery.mock.calls.find(([sql]: [string]) => + sql.includes('INSERT INTO station_item'), + ); + expect(itemInsert[1][3]).toBeNull(); + }); + }); + + describe('parent_uex_id two-pass', () => { + it('pass 1b issues UPDATE to set parent_uex_id when both item and parent are in payload', async () => { + const dsQuery = buildDsQuery(); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + // Parent (id=1) and child (id=2) both in payload + uexGet.mockResolvedValueOnce([ + makeItem({ id: 1 }), + makeItem({ + id: 2, + uuid: 'sc-uuid-2', + name: 'Child Item', + id_parent: 1, + }), + ]); + + await step.execute(CTX); + + const updateCall = dsQuery.mock.calls.find( + ([sql]: [string]) => + sql.includes('UPDATE station_item') && + sql.includes('parent_uex_id = $1'), + ); + expect(updateCall).toBeDefined(); + expect(updateCall[1]).toEqual([1, 2]); // [id_parent, uex_id] + }); + + it('skips UPDATE and emits warn when id_parent references an unknown uex_id', async () => { + const dsQuery = buildDsQuery(); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + // Item references parent 999 which is not in the current payload + uexGet.mockResolvedValueOnce([makeItem({ id: 2, id_parent: 999 })]); + + await step.execute(CTX); + + const updateCall = dsQuery.mock.calls.find( + ([sql]: [string]) => + sql.includes('UPDATE station_item') && + sql.includes('parent_uex_id = $1'), + ); + expect(updateCall).toBeUndefined(); + expect(repoSave).toHaveBeenCalledWith( + expect.objectContaining({ severity: 'warn' }), + ); + }); + + it('no parent UPDATE issued when id_parent is null', async () => { + const dsQuery = buildDsQuery(); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([makeItem({ id_parent: null })]); + + await step.execute(CTX); + + const updateCall = dsQuery.mock.calls.find( + ([sql]: [string]) => + sql.includes('UPDATE station_item') && + sql.includes('parent_uex_id = $1'), + ); + expect(updateCall).toBeUndefined(); + }); + + it('pass 1c clears parent_uex_id for de-parented items', async () => { + const dsQuery = buildDsQuery(); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + // Item has no id_parent — pass 1c should null it out + uexGet.mockResolvedValueOnce([makeItem({ id: 1, id_parent: null })]); + + await step.execute(CTX); + + const clearCall = dsQuery.mock.calls.find( + ([sql]: [string]) => + sql.includes('UPDATE station_item') && + sql.includes('parent_uex_id = NULL'), + ); + expect(clearCall).toBeDefined(); + expect(clearCall[1][0]).toContain(1); + }); + }); + + describe('attributes_summary JSONB', () => { + it('builds summary keyed by category_attribute_uex_id with attribute value', async () => { + const dsQuery = buildDsQuery([10], [], [42, 43]); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([ + makeItem({ + attributes: [ + makeAttr({ id_category_attribute: 42, value: '150' }), + makeAttr({ id: 102, id_category_attribute: 43, value: '900' }), + ], + }), + ]); + + await step.execute(CTX); + + const itemInsert = dsQuery.mock.calls.find(([sql]: [string]) => + sql.includes('INSERT INTO station_item'), + ); + const summary = JSON.parse(itemInsert[1][19] as string); // $20 attributes_summary + expect(summary).toEqual({ '42': '150', '43': '900' }); + }); + + it('excludes attributes absent from station_category_attribute from summary', async () => { + const dsQuery = buildDsQuery([10], [], [42]); // only 42 is known; 43 is not + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([ + makeItem({ + attributes: [ + makeAttr({ id_category_attribute: 42, value: 'included' }), + makeAttr({ id: 102, id_category_attribute: 43, value: 'excluded' }), + ], + }), + ]); + + await step.execute(CTX); + + const itemInsert = dsQuery.mock.calls.find(([sql]: [string]) => + sql.includes('INSERT INTO station_item'), + ); + const summary = JSON.parse(itemInsert[1][19] as string); + expect(summary).toEqual({ '42': 'included' }); + expect(summary['43']).toBeUndefined(); + }); + + it('produces empty object when item has no attributes', async () => { + const dsQuery = buildDsQuery(); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([makeItem({ attributes: [] })]); + + await step.execute(CTX); + + const itemInsert = dsQuery.mock.calls.find(([sql]: [string]) => + sql.includes('INSERT INTO station_item'), + ); + expect(JSON.parse(itemInsert[1][19] as string)).toEqual({}); + }); + + it('excludes attributes with falsy id_category_attribute from summary', async () => { + const dsQuery = buildDsQuery(); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([ + makeItem({ + attributes: [ + makeAttr({ id_category_attribute: 0, value: 'should-be-excluded' }), + makeAttr({ id: 102, id_category_attribute: 42, value: 'included' }), + ], + }), + ]); + + await step.execute(CTX); + + const itemInsert = dsQuery.mock.calls.find(([sql]: [string]) => + sql.includes('INSERT INTO station_item'), + ); + const summary = JSON.parse(itemInsert[1][19] as string); + expect(summary).toEqual({ '42': 'included' }); + expect(summary['0']).toBeUndefined(); + }); + + it('stores null attribute value as null in summary', async () => { + const dsQuery = buildDsQuery(); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([ + makeItem({ + attributes: [makeAttr({ id_category_attribute: 42, value: null })], + }), + ]); + + await step.execute(CTX); + + const itemInsert = dsQuery.mock.calls.find(([sql]: [string]) => + sql.includes('INSERT INTO station_item'), + ); + const summary = JSON.parse(itemInsert[1][19] as string); + expect(summary['42']).toBeNull(); + }); + }); + + describe('item attributes upsert', () => { + it('upserts each attribute with correct params', async () => { + const dsQuery = buildDsQuery(); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([ + makeItem({ + attributes: [ + makeAttr({ + id: 101, + id_category_attribute: 42, + value: '150', + unit: 'dps', + }), + ], + }), + ]); + + await step.execute(CTX); + + const attrInsert = dsQuery.mock.calls.find(([sql]: [string]) => + sql.includes('INSERT INTO station_item_attribute'), + ); + expect(attrInsert).toBeDefined(); + expect(attrInsert[1][0]).toBe(101); // $1 uex_id + expect(attrInsert[1][1]).toBe(1); // $2 item_uex_id + expect(attrInsert[1][3]).toBe(42); // $4 category_attribute_uex_id + expect(attrInsert[1][4]).toBe('150'); // $5 value + expect(attrInsert[1][5]).toBe('dps'); // $6 unit + }); + + it('skips attribute with no category_attribute_uex_id and emits warn', async () => { + const dsQuery = buildDsQuery(); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([ + makeItem({ + attributes: [makeAttr({ id_category_attribute: 0 })], + }), + ]); + + await step.execute(CTX); + + const attrInsert = dsQuery.mock.calls.find(([sql]: [string]) => + sql.includes('INSERT INTO station_item_attribute'), + ); + expect(attrInsert).toBeUndefined(); + expect(repoSave).toHaveBeenCalledWith( + expect.objectContaining({ severity: 'warn' }), + ); + }); + + it('skips attribute when id_category_attribute is not in station_category_attribute and emits warn', async () => { + const dsQuery = buildDsQuery([10], [], []); // no known category attributes + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([ + makeItem({ attributes: [makeAttr({ id_category_attribute: 42 })] }), + ]); + + await step.execute(CTX); + + const attrInsert = dsQuery.mock.calls.find(([sql]: [string]) => + sql.includes('INSERT INTO station_item_attribute'), + ); + expect(attrInsert).toBeUndefined(); + expect(repoSave).toHaveBeenCalledWith( + expect.objectContaining({ + severity: 'warn', + message: expect.stringContaining( + 'unknown category_attribute uex_id=42', + ), + }), + ); + }); + + it('upserts multiple attributes for a single item', async () => { + const dsQuery = buildDsQuery([10], [], [42, 43]); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([ + makeItem({ + attributes: [ + makeAttr({ id: 101, id_category_attribute: 42 }), + makeAttr({ id: 102, id_category_attribute: 43 }), + ], + }), + ]); + + await step.execute(CTX); + + const attrInserts = dsQuery.mock.calls.filter(([sql]: [string]) => + sql.includes('INSERT INTO station_item_attribute'), + ); + expect(attrInserts).toHaveLength(2); + }); + + it('deletes stale attributes for upserted items before inserting', async () => { + const dsQuery = buildDsQuery(); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([makeItem({ id: 1 })]); + + await step.execute(CTX); + + const deleteCall = dsQuery.mock.calls.find(([sql]: [string]) => + sql.includes('DELETE FROM station_item_attribute'), + ); + expect(deleteCall).toBeDefined(); + expect(deleteCall[1][0]).toBe(1); // scoped to item uex_id + + const deleteIdx = dsQuery.mock.calls.findIndex(([sql]: [string]) => + sql.includes('DELETE FROM station_item_attribute'), + ); + const attrInsertIdx = dsQuery.mock.calls.findIndex(([sql]: [string]) => + sql.includes('INSERT INTO station_item_attribute'), + ); + expect(deleteIdx).toBeLessThan(attrInsertIdx); + }); + + it('items with no attributes produce no attribute inserts', async () => { + const dsQuery = buildDsQuery(); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([makeItem({ attributes: [] })]); + + await step.execute(CTX); + + const attrInsert = dsQuery.mock.calls.find(([sql]: [string]) => + sql.includes('INSERT INTO station_item_attribute'), + ); + expect(attrInsert).toBeUndefined(); + }); + + it('has ON CONFLICT (uex_id) DO UPDATE on attribute insert', async () => { + const dsQuery = buildDsQuery(); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([makeItem()]); + + await step.execute(CTX); + + const attrInsert = dsQuery.mock.calls.find(([sql]: [string]) => + sql.includes('INSERT INTO station_item_attribute'), + ); + expect(attrInsert[0]).toContain('ON CONFLICT (uex_id) DO UPDATE SET'); + }); + }); + + describe('ordering', () => { + it('item inserts happen before attribute inserts', async () => { + const dsQuery = buildDsQuery(); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + uexGet.mockResolvedValueOnce([makeItem()]); + + await step.execute(CTX); + + const sqls = dsQuery.mock.calls.map(([sql]: [string]) => sql.trim()); + const itemInsertIdx = sqls.findIndex((s) => + s.startsWith('INSERT INTO station_item '), + ); + const attrInsertIdx = sqls.findIndex((s) => + s.startsWith('INSERT INTO station_item_attribute'), + ); + expect(itemInsertIdx).toBeGreaterThanOrEqual(0); + expect(attrInsertIdx).toBeGreaterThan(itemInsertIdx); + }); + }); +}); diff --git a/backend/src/modules/catalog-etl/steps/items-sync.step.ts b/backend/src/modules/catalog-etl/steps/items-sync.step.ts new file mode 100644 index 0000000..d214031 --- /dev/null +++ b/backend/src/modules/catalog-etl/steps/items-sync.step.ts @@ -0,0 +1,467 @@ +import { Injectable } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; +import { Repository, DataSource } from 'typeorm'; +import { InjectPinoLogger, PinoLogger } from 'nestjs-pino'; +import { EtlStep, EtlStepContext } from '../interfaces/etl-step.interface'; +import { EtlWarning } from '../entities/etl-warning.entity'; +import { UexApiClient } from '../../uex-sync/clients/uex-api.client'; + +interface UexItemAttribute { + id: number; + id_category: number | null; + id_category_attribute: number | null; + value: string | null; + unit: string | null; + date_added: number | null; + date_modified: number | null; +} + +interface UexItem { + id: number; + uuid: string | null; + id_parent: number | null; + id_category: number | null; + id_company: number | null; + id_vehicle: number | null; + name: string; + slug: string | null; + size: string | null; + color: string | null; + color2: string | null; + quality: number | null; + url_store: string | null; + is_exclusive_pledge: number; + is_exclusive_subscriber: number; + is_exclusive_concierge: number; + is_commodity: number; + is_harvestable: number; + screenshot: string | null; + notification: unknown | null; + game_version: string | null; + date_added: number | null; + date_modified: number | null; + attributes?: UexItemAttribute[]; +} + +function toDate(unixTs: number | null | undefined): Date | null { + if (!unixTs) return null; + return new Date(unixTs * 1000); +} + +function buildAttributesSummary( + attributes: UexItemAttribute[] | undefined, + knownCategoryAttributeUexIds: Set, +): Record { + if (!attributes?.length) return {}; + const summary: Record = {}; + for (const attr of attributes) { + if (!attr.id_category_attribute) continue; + if (!knownCategoryAttributeUexIds.has(attr.id_category_attribute)) continue; + summary[String(attr.id_category_attribute)] = attr.value ?? null; + } + return summary; +} + +@Injectable() +export class ItemsSyncStep implements EtlStep { + readonly name = 'items-sync'; + + constructor( + private readonly uexApiClient: UexApiClient, + private readonly dataSource: DataSource, + @InjectRepository(EtlWarning) + private readonly warningsRepo: Repository, + @InjectPinoLogger(ItemsSyncStep.name) + private readonly logger: PinoLogger, + ) {} + + async execute(ctx: EtlStepContext): Promise { + const items = await this.uexApiClient.get('/items'); + + this.logger.info( + { runId: ctx.runId, count: items.length }, + 'Fetched items from UEX', + ); + + // Preload valid FK sets to guard company_uex_id, vehicle_uex_id, and + // category_attribute_uex_id before any writes. + const [companyRows, vehicleRows, categoryAttrRows] = await Promise.all([ + this.dataSource.query<{ uex_id: number }[]>( + `SELECT uex_id FROM station_company`, + ), + this.dataSource.query<{ uex_id: number }[]>( + `SELECT uex_id FROM station_vehicle`, + ), + this.dataSource.query<{ uex_id: number }[]>( + `SELECT uex_id FROM station_category_attribute`, + ), + ]); + const knownCompanyUexIds = new Set( + companyRows.map((r) => r.uex_id), + ); + const knownVehicleUexIds = new Set( + vehicleRows.map((r) => r.uex_id), + ); + const knownCategoryAttributeUexIds = new Set( + categoryAttrRows.map((r) => r.uex_id), + ); + + // Pass 1a — for each item, upsert the item row and reconcile its attributes + // atomically inside a single transaction. This keeps attributes_summary (on + // station_item) and station_item_attribute rows consistent: if any attribute + // INSERT fails the entire transaction rolls back, including the item upsert. + // + // UEX ID instability is handled before the INSERT: if a row with the same uuid + // already exists under a different uex_id, we re-key its FK dependents and + // update the canonical row in place rather than inserting a phantom duplicate + // that would violate the uex_id UNIQUE constraint. + // + // parent_uex_id is written as NULL here (self-referential FK safety). Passes + // 1b and 1c fill and clear it after all items are written. + let upserted = 0; + let skipped = 0; + const upsertedUexIds = new Set(); + let attrUpserted = 0; + let attrSkipped = 0; + + for (const record of items) { + if (!record.name) { + await this.warningsRepo.save( + this.warningsRepo.create({ + runId: ctx.runId, + stepName: this.name, + severity: 'warn', + message: `Item uex_id=${record.id} has no name — skipped`, + rawPayload: { id: record.id }, + }), + ); + skipped++; + continue; + } + + const attributesSummary = buildAttributesSummary( + record.attributes, + knownCategoryAttributeUexIds, + ); + + let companyUexId: number | null = record.id_company ?? null; + if (companyUexId !== null && !knownCompanyUexIds.has(companyUexId)) { + await this.warningsRepo.save( + this.warningsRepo.create({ + runId: ctx.runId, + stepName: this.name, + severity: 'warn', + message: `Item uex_id=${record.id} references unknown company uex_id=${companyUexId} — company_uex_id set to NULL`, + rawPayload: { id: record.id, id_company: companyUexId }, + }), + ); + companyUexId = null; + } + + let vehicleUexId: number | null = record.id_vehicle ?? null; + if (vehicleUexId !== null && !knownVehicleUexIds.has(vehicleUexId)) { + await this.warningsRepo.save( + this.warningsRepo.create({ + runId: ctx.runId, + stepName: this.name, + severity: 'warn', + message: `Item uex_id=${record.id} references unknown vehicle uex_id=${vehicleUexId} — vehicle_uex_id set to NULL`, + rawPayload: { id: record.id, id_vehicle: vehicleUexId }, + }), + ); + vehicleUexId = null; + } + + const validAttrs: UexItemAttribute[] = []; + for (const attr of record.attributes ?? []) { + if (!attr.id_category_attribute) { + await this.warningsRepo.save( + this.warningsRepo.create({ + runId: ctx.runId, + stepName: this.name, + severity: 'warn', + message: `Item attribute uex_id=${attr.id} has no category_attribute_uex_id — skipped`, + rawPayload: { item_id: record.id, attr_id: attr.id }, + }), + ); + attrSkipped++; + continue; + } + + if (!knownCategoryAttributeUexIds.has(attr.id_category_attribute)) { + await this.warningsRepo.save( + this.warningsRepo.create({ + runId: ctx.runId, + stepName: this.name, + severity: 'warn', + message: `Item attribute uex_id=${attr.id} references unknown category_attribute uex_id=${attr.id_category_attribute} — skipped`, + rawPayload: { + item_id: record.id, + attr_id: attr.id, + id_category_attribute: attr.id_category_attribute, + }, + }), + ); + attrSkipped++; + continue; + } + + validAttrs.push(attr); + } + + // Column layout (parent_uex_id is a NULL literal — no placeholder): + // $1 uex_id $2 category_uex_id $3 company_uex_id + // $4 vehicle_uex_id $5 name $6 slug + // $7 uuid $8 size $9 color + // $10 color2 $11 quality $12 url_store + // $13 is_exclusive_pledge $14 is_exclusive_subscriber $15 is_exclusive_concierge + // $16 is_commodity $17 is_harvestable $18 screenshot + // $19 notification $20 attributes_summary $21 game_version + // $22 uex_date_added $23 uex_date_modified + // synced_at = NOW() literal + const itemParams = [ + record.id, // $1 uex_id + // parent_uex_id = NULL literal + record.id_category ?? null, // $2 category_uex_id + companyUexId, // $3 company_uex_id + vehicleUexId, // $4 vehicle_uex_id + record.name, // $5 name + record.slug ?? null, // $6 slug + record.uuid ?? null, // $7 uuid + record.size ?? null, // $8 size + record.color ?? null, // $9 color + record.color2 ?? null, // $10 color2 + record.quality ?? null, // $11 quality + record.url_store ?? null, // $12 url_store + Boolean(record.is_exclusive_pledge), // $13 + Boolean(record.is_exclusive_subscriber), // $14 + Boolean(record.is_exclusive_concierge), // $15 + Boolean(record.is_commodity), // $16 + Boolean(record.is_harvestable), // $17 + record.screenshot ?? null, // $18 screenshot + record.notification ?? null, // $19 notification + JSON.stringify(attributesSummary), // $20 attributes_summary + record.game_version ?? null, // $21 game_version + toDate(record.date_added), // $22 uex_date_added + toDate(record.date_modified), // $23 uex_date_modified + // synced_at = NOW() literal + ]; + + await this.dataSource.transaction(async (em) => { + // Pass 1a-uuid — UEX ID instability: check whether this uuid already + // exists under a different uex_id before inserting. If it does, we are + // looking at a canonical row whose uex_id was reassigned by UEX. + // + // Strategy (no phantom rows; FK-safe via DEFERRABLE INITIALLY DEFERRED + // constraints added by migration 1780020000000): + // 1. Update the canonical row's uex_id to the new value first. + // The FK constraints on dependents are deferred — PostgreSQL will + // not check them until COMMIT, so the dependent rows may + // temporarily reference a uex_id that no longer exists. + // 2. Re-key station_item_attribute.item_uex_id old → new. + // 3. Re-key station_item.parent_uex_id old → new for any children. + // 4. Skip the INSERT below (no phantom is ever created). + // + // When no uuid match exists the INSERT path runs normally. + let uuidReconciled = false; + if (record.uuid) { + const existing = await em.query<{ uex_id: number }[]>( + `SELECT uex_id FROM station_item + WHERE uuid = $1 AND uex_id != $2 + LIMIT 1`, + [record.uuid, record.id], + ); + if (existing.length > 0) { + const oldUexId = existing[0].uex_id; + // Update the canonical row first so the new uex_id is present in + // station_item before dependents are re-keyed to it. + await em.query( + `UPDATE station_item SET + uex_id=$1, + category_uex_id=$2, + company_uex_id=$3, + vehicle_uex_id=$4, + name=$5, + slug=$6, + uuid=$7, + size=$8, + color=$9, + color2=$10, + quality=$11, + url_store=$12, + is_exclusive_pledge=$13, + is_exclusive_subscriber=$14, + is_exclusive_concierge=$15, + is_commodity=$16, + is_harvestable=$17, + screenshot=$18, + notification=$19, + attributes_summary=$20, + game_version=$21, + uex_date_added=$22, + uex_date_modified=$23, + synced_at=NOW() + WHERE uex_id = $24`, + [...itemParams, oldUexId], + ); + // Re-key FK dependents now that the new uex_id exists in station_item. + await em.query( + `UPDATE station_item_attribute + SET item_uex_id = $1 + WHERE item_uex_id = $2`, + [record.id, oldUexId], + ); + await em.query( + `UPDATE station_item + SET parent_uex_id = $1 + WHERE parent_uex_id = $2`, + [record.id, oldUexId], + ); + uuidReconciled = true; + } + } + + if (!uuidReconciled) { + // Normal path: upsert by uex_id. parent_uex_id is always written as + // NULL here; passes 1b/1c handle it after all items are written. + await em.query( + `INSERT INTO station_item ( + uex_id, parent_uex_id, + category_uex_id, company_uex_id, vehicle_uex_id, + name, slug, uuid, size, color, color2, quality, + url_store, + is_exclusive_pledge, is_exclusive_subscriber, is_exclusive_concierge, + is_commodity, is_harvestable, + screenshot, notification, attributes_summary, + game_version, uex_date_added, uex_date_modified, synced_at + ) + VALUES ( + $1,NULL, + $2,$3,$4, + $5,$6,$7,$8,$9,$10,$11, + $12, + $13,$14,$15, + $16,$17, + $18,$19,$20, + $21,$22,$23,NOW() + ) + ON CONFLICT (uex_id) DO UPDATE SET + category_uex_id=EXCLUDED.category_uex_id, + company_uex_id=EXCLUDED.company_uex_id, + vehicle_uex_id=EXCLUDED.vehicle_uex_id, + name=EXCLUDED.name, + slug=EXCLUDED.slug, + uuid=EXCLUDED.uuid, + size=EXCLUDED.size, + color=EXCLUDED.color, + color2=EXCLUDED.color2, + quality=EXCLUDED.quality, + url_store=EXCLUDED.url_store, + is_exclusive_pledge=EXCLUDED.is_exclusive_pledge, + is_exclusive_subscriber=EXCLUDED.is_exclusive_subscriber, + is_exclusive_concierge=EXCLUDED.is_exclusive_concierge, + is_commodity=EXCLUDED.is_commodity, + is_harvestable=EXCLUDED.is_harvestable, + screenshot=EXCLUDED.screenshot, + notification=EXCLUDED.notification, + attributes_summary=EXCLUDED.attributes_summary, + game_version=EXCLUDED.game_version, + uex_date_added=EXCLUDED.uex_date_added, + uex_date_modified=EXCLUDED.uex_date_modified, + synced_at=NOW()`, + itemParams, + ); + } + + // Reconcile attributes: wipe stale rows then re-insert current set. + // Both operations are inside this transaction so an attribute INSERT + // failure rolls back the DELETE and the item upsert together. + await em.query( + `DELETE FROM station_item_attribute WHERE item_uex_id = $1`, + [record.id], + ); + + for (const attr of validAttrs) { + await em.query( + `INSERT INTO station_item_attribute + (uex_id, item_uex_id, category_uex_id, category_attribute_uex_id, + value, unit, uex_date_added, uex_date_modified, synced_at) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,NOW()) + ON CONFLICT (uex_id) DO UPDATE SET + item_uex_id=EXCLUDED.item_uex_id, + category_uex_id=EXCLUDED.category_uex_id, + category_attribute_uex_id=EXCLUDED.category_attribute_uex_id, + value=EXCLUDED.value, + unit=EXCLUDED.unit, + uex_date_added=EXCLUDED.uex_date_added, + uex_date_modified=EXCLUDED.uex_date_modified, + synced_at=NOW()`, + [ + attr.id, // $1 uex_id + record.id, // $2 item_uex_id + attr.id_category ?? null, // $3 category_uex_id + attr.id_category_attribute, // $4 category_attribute_uex_id + attr.value ?? null, // $5 value + attr.unit ?? null, // $6 unit + toDate(attr.date_added), // $7 uex_date_added + toDate(attr.date_modified), // $8 uex_date_modified + ], + ); + attrUpserted++; + } + }); + + upsertedUexIds.add(record.id); + upserted++; + } + + this.logger.info({ runId: ctx.runId, upserted, skipped }, 'items upserted'); + + // Pass 1b — set parent_uex_id for items whose parent was also upserted. + // ON CONFLICT in pass 1a does not touch parent_uex_id, so existing links are + // preserved across runs; only explicit updates (here and in pass 1c) change it. + for (const record of items) { + if (!record.name || !record.id_parent) continue; + if (!upsertedUexIds.has(record.id_parent)) { + await this.warningsRepo.save( + this.warningsRepo.create({ + runId: ctx.runId, + stepName: this.name, + severity: 'warn', + message: `Item uex_id=${record.id} references unknown parent uex_id=${record.id_parent} — parent_uex_id not set`, + rawPayload: { id: record.id, id_parent: record.id_parent }, + }), + ); + continue; + } + await this.dataSource.query( + `UPDATE station_item + SET parent_uex_id = $1 + WHERE uex_id = $2 + AND parent_uex_id IS DISTINCT FROM $1`, + [record.id_parent, record.id], + ); + } + + // Pass 1c — clear parent_uex_id for items that UEX has de-parented. + if (upsertedUexIds.size > 0) { + const deParentedIds = items + .filter((i) => i.name && !i.id_parent) + .map((i) => i.id); + if (deParentedIds.length > 0) { + await this.dataSource.query( + `UPDATE station_item + SET parent_uex_id = NULL + WHERE uex_id = ANY($1) + AND parent_uex_id IS NOT NULL`, + [deParentedIds], + ); + } + } + + this.logger.info( + { runId: ctx.runId, attrUpserted, attrSkipped }, + 'item attributes upserted', + ); + } +} diff --git a/backend/src/modules/catalog-etl/steps/vehicles-sync.step.spec.ts b/backend/src/modules/catalog-etl/steps/vehicles-sync.step.spec.ts index 14e9b07..00c4c59 100644 --- a/backend/src/modules/catalog-etl/steps/vehicles-sync.step.spec.ts +++ b/backend/src/modules/catalog-etl/steps/vehicles-sync.step.spec.ts @@ -261,7 +261,7 @@ describe('VehiclesSyncStep', () => { const dsQuery = buildDsQuery([]); // no known companies const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); uexGet - .mockResolvedValueOnce([makeVehicle({ id_company: 999 })]) + .mockResolvedValueOnce([makeVehicle({ id_company: 99 })]) .mockResolvedValueOnce([]); await step.execute(CTX); @@ -271,7 +271,10 @@ describe('VehiclesSyncStep', () => { ); expect(vehicleInsert[1][2]).toBeNull(); expect(repoSave).toHaveBeenCalledWith( - expect.objectContaining({ severity: 'warn' }), + expect.objectContaining({ + severity: 'warn', + message: expect.stringContaining('unknown company uex_id=99'), + }), ); }); }); @@ -334,7 +337,7 @@ describe('VehiclesSyncStep', () => { const updateCall = dsQuery.mock.calls.find( ([sql]: [string]) => sql.includes('UPDATE station_vehicle') && - sql.includes('parent_uex_id'), + sql.includes('parent_uex_id = $1'), ); expect(updateCall).toBeDefined(); expect(updateCall[1]).toEqual([1, 2]); // [id_parent, uex_id] @@ -350,8 +353,10 @@ describe('VehiclesSyncStep', () => { await step.execute(CTX); - const updateCall = dsQuery.mock.calls.find(([sql]: [string]) => - sql.includes('UPDATE station_vehicle'), + const updateCall = dsQuery.mock.calls.find( + ([sql]: [string]) => + sql.includes('UPDATE station_vehicle') && + sql.includes('parent_uex_id = $1'), ); expect(updateCall).toBeUndefined(); expect(repoSave).toHaveBeenCalledWith( @@ -368,11 +373,32 @@ describe('VehiclesSyncStep', () => { await step.execute(CTX); - const updateCall = dsQuery.mock.calls.find(([sql]: [string]) => - sql.includes('UPDATE station_vehicle'), + const updateCall = dsQuery.mock.calls.find( + ([sql]: [string]) => + sql.includes('UPDATE station_vehicle') && + sql.includes('parent_uex_id = $1'), ); expect(updateCall).toBeUndefined(); }); + + it('pass 1c clears parent_uex_id for de-parented vehicles', async () => { + const dsQuery = buildDsQuery(); + const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); + // Vehicle has no id_parent — pass 1c should null it out + uexGet + .mockResolvedValueOnce([makeVehicle({ id: 1, id_parent: null })]) + .mockResolvedValueOnce([]); + + await step.execute(CTX); + + const clearCall = dsQuery.mock.calls.find( + ([sql]: [string]) => + sql.includes('UPDATE station_vehicle') && + sql.includes('parent_uex_id = NULL'), + ); + expect(clearCall).toBeDefined(); + expect(clearCall[1][0]).toContain(1); + }); }); describe('vehicle loaners two-pass', () => { @@ -541,7 +567,7 @@ describe('VehiclesSyncStep', () => { expect(anyInsert).toBeUndefined(); }); - it('is idempotent — ON CONFLICT (uex_id) DO UPDATE SET present, parent_uex_id cleared', async () => { + it('ON CONFLICT (uex_id) DO UPDATE SET present; parent_uex_id not overwritten on conflict', async () => { const dsQuery = buildDsQuery(); const step = buildStep(uexGet, dsQuery, repoCreate, repoSave); uexGet.mockResolvedValueOnce([makeVehicle()]).mockResolvedValueOnce([]); @@ -552,7 +578,8 @@ describe('VehiclesSyncStep', () => { sql.includes('INSERT INTO station_vehicle'), ); expect(vehicleInsert[0]).toContain('ON CONFLICT (uex_id) DO UPDATE SET'); - expect(vehicleInsert[0]).toContain( + // parent_uex_id must NOT be in the DO UPDATE list — pass 1b/1c handle it + expect(vehicleInsert[0]).not.toContain( 'parent_uex_id=EXCLUDED.parent_uex_id', ); }); diff --git a/backend/src/modules/catalog-etl/steps/vehicles-sync.step.ts b/backend/src/modules/catalog-etl/steps/vehicles-sync.step.ts index 5769b3f..4e83b32 100644 --- a/backend/src/modules/catalog-etl/steps/vehicles-sync.step.ts +++ b/backend/src/modules/catalog-etl/steps/vehicles-sync.step.ts @@ -207,7 +207,6 @@ export class VehiclesSyncStep implements EtlStep { ON CONFLICT (uex_id) DO UPDATE SET uuid=EXCLUDED.uuid, company_uex_id=EXCLUDED.company_uex_id, - parent_uex_id=EXCLUDED.parent_uex_id, name=EXCLUDED.name, name_full=EXCLUDED.name_full, slug=EXCLUDED.slug, @@ -339,9 +338,9 @@ export class VehiclesSyncStep implements EtlStep { upserted++; } - // Pass 1b — back-fill parent_uex_id now that all rows exist. - // The ON CONFLICT clause sets parent_uex_id=EXCLUDED.parent_uex_id (NULL) on - // re-insert, so this pass is only needed to set non-null parents. + // Pass 1b — set parent_uex_id for vehicles whose parent was also upserted. + // ON CONFLICT in pass 1a does not touch parent_uex_id, so existing links are + // preserved across runs; only explicit updates (here and in pass 1c) change it. // Skip rows whose id_parent is not in the current payload to avoid FK violations. for (const record of vehicles) { if (!record.name || !record.id_parent) continue; @@ -366,6 +365,23 @@ export class VehiclesSyncStep implements EtlStep { ); } + // Pass 1c — clear parent_uex_id for vehicles that UEX has de-parented. + // Only runs for upserted rows with no id_parent in the current payload. + if (upsertedUexIds.size > 0) { + const deParentedIds = vehicles + .filter((v) => v.name && !v.id_parent) + .map((v) => v.id); + if (deParentedIds.length > 0) { + await this.dataSource.query( + `UPDATE station_vehicle + SET parent_uex_id = NULL + WHERE uex_id = ANY($1) + AND parent_uex_id IS NOT NULL`, + [deParentedIds], + ); + } + } + this.logger.info( { runId: ctx.runId, upserted, skipped }, 'vehicles upserted',