From 2f860fbe044cc49d5c600f9c438be7793441f438 Mon Sep 17 00:00:00 2001 From: kraysent Date: Tue, 26 May 2026 23:11:00 +0100 Subject: [PATCH 1/2] remove old crossmatch tasks & make icrs errors expressions --- tests/test_crossmatch_resolver.py | 386 ------------------------- uploader/app/crossmatch/models.py | 9 - uploader/app/crossmatch/resolver.py | 304 ------------------- uploader/app/structured/icrs/upload.py | 100 ++++--- uploader/forms/crossmatch_default.py | 60 ---- uploader/forms/crossmatch_two_radii.py | 63 ---- uploader/forms/structured_icrs.py | 15 +- uploader/task_registry.py | 28 +- 8 files changed, 75 insertions(+), 890 deletions(-) delete mode 100644 tests/test_crossmatch_resolver.py delete mode 100644 uploader/forms/crossmatch_default.py delete mode 100644 uploader/forms/crossmatch_two_radii.py diff --git a/tests/test_crossmatch_resolver.py b/tests/test_crossmatch_resolver.py deleted file mode 100644 index e15c57e..0000000 --- a/tests/test_crossmatch_resolver.py +++ /dev/null @@ -1,386 +0,0 @@ -from uploader.app.crossmatch.models import ( - CrossmatchResult, - CrossmatchStatus, - Neighbor, - PendingReason, - RecordEvidence, - TriageStatus, -) -from uploader.app.crossmatch.resolver import ( - _apply_redshift_check, - _resolve_by_radius_coordinate, - resolve, - resolve_by_radius, -) - - -def test_resolve_zero_neighbors() -> None: - evidence = RecordEvidence(neighbors=[]) - result = resolve(evidence) - assert result.status == CrossmatchStatus.NEW - assert result.triage_status == TriageStatus.RESOLVED - assert result.matched_pgc is None - - -def test_resolve_one_neighbor() -> None: - evidence = RecordEvidence( - neighbors=[Neighbor(pgc=42, ra=10.0, dec=20.0, distance_deg=0.001)], - ) - result = resolve(evidence) - assert result.status == CrossmatchStatus.EXISTING - assert result.triage_status == TriageStatus.RESOLVED - assert result.matched_pgc == 42 - - -def test_resolve_multiple_neighbors() -> None: - evidence = RecordEvidence( - neighbors=[ - Neighbor(pgc=1, ra=10.0, dec=20.0, distance_deg=0.001), - Neighbor(pgc=2, ra=10.01, dec=20.01, distance_deg=0.002), - ], - ) - result = resolve(evidence) - assert result.status == CrossmatchStatus.COLLIDING - assert result.triage_status == TriageStatus.PENDING - assert result.matched_pgc is None - assert result.colliding_pgcs == [1, 2] - assert result.pending_reason == PendingReason.MULTIPLE_OBJECTS_MATCHED - - -def test_resolve_name_match_in_circle() -> None: - evidence = RecordEvidence( - neighbors=[ - Neighbor(pgc=1, ra=10.0, dec=20.0, distance_deg=0.001, design="NGC 123"), - Neighbor(pgc=2, ra=10.01, dec=20.01, distance_deg=0.002, design="PGC 456"), - ], - record_designation="NGC 123", - same_name_pgcs=[1], - ) - result = resolve(evidence) - assert result.status == CrossmatchStatus.EXISTING - assert result.triage_status == TriageStatus.RESOLVED - assert result.matched_pgc == 1 - - -def test_resolve_name_match_outside_circle() -> None: - evidence = RecordEvidence( - neighbors=[], - record_designation="NGC 999", - same_name_pgcs=[100], - ) - result = resolve(evidence) - assert result.status == CrossmatchStatus.EXISTING - assert result.triage_status == TriageStatus.PENDING - assert result.matched_pgc == 100 - assert result.pending_reason == PendingReason.MATCHED_NAME_OUTSIDE_CIRCLE - - -def test_resolve_name_match_in_circle_ambiguous_two_matching() -> None: - evidence = RecordEvidence( - neighbors=[ - Neighbor(pgc=1, ra=10.0, dec=20.0, distance_deg=0.001, design="NGC 123"), - Neighbor(pgc=2, ra=10.01, dec=20.01, distance_deg=0.002, design="NGC 123"), - ], - record_designation="NGC 123", - same_name_pgcs=[1, 2], - ) - result = resolve(evidence) - assert result.status == CrossmatchStatus.COLLIDING - assert result.triage_status == TriageStatus.PENDING - assert result.matched_pgc is None - assert result.colliding_pgcs == [1, 2] - - -def test_resolve_name_match_outside_circle_ambiguous_multiple_pgcs() -> None: - evidence = RecordEvidence( - neighbors=[], - record_designation="NGC 999", - same_name_pgcs=[100, 101], - ) - result = resolve(evidence) - assert result.status == CrossmatchStatus.NEW - assert result.triage_status == TriageStatus.RESOLVED - assert result.matched_pgc is None - - -def test_resolve_one_neighbor_matching_pgc() -> None: - evidence = RecordEvidence( - neighbors=[Neighbor(pgc=42, ra=10.0, dec=20.0, distance_deg=0.001)], - record_pgc=42, - claimed_pgc_exists_in_layer2=True, - ) - result = resolve(evidence) - assert result.status == CrossmatchStatus.EXISTING - assert result.triage_status == TriageStatus.RESOLVED - assert result.matched_pgc == 42 - - -def test_resolve_one_neighbor_different_pgc() -> None: - evidence = RecordEvidence( - neighbors=[Neighbor(pgc=100, ra=10.0, dec=20.0, distance_deg=0.001)], - record_pgc=42, - claimed_pgc_exists_in_layer2=True, - ) - result = resolve(evidence) - assert result.status == CrossmatchStatus.EXISTING - assert result.triage_status == TriageStatus.PENDING - assert result.matched_pgc == 100 - assert result.pending_reason == PendingReason.PGC_MISMATCH - - -def test_resolve_no_neighbors_claimed_pgc_exists() -> None: - evidence = RecordEvidence( - neighbors=[], - record_pgc=42, - claimed_pgc_exists_in_layer2=True, - ) - result = resolve(evidence) - assert result.status == CrossmatchStatus.EXISTING - assert result.triage_status == TriageStatus.PENDING - assert result.matched_pgc == 42 - assert result.pending_reason == PendingReason.MATCHED_PGC_OUTSIDE_CIRCLE - - -def test_resolve_one_neighbor_name_match_pgc_mismatch() -> None: - evidence = RecordEvidence( - neighbors=[Neighbor(pgc=100, ra=10.0, dec=20.0, distance_deg=0.001, design="NGC 123")], - record_designation="NGC 123", - record_pgc=42, - claimed_pgc_exists_in_layer2=True, - ) - result = resolve(evidence) - assert result.status == CrossmatchStatus.EXISTING - assert result.triage_status == TriageStatus.PENDING - assert result.matched_pgc == 100 - assert result.pending_reason == PendingReason.PGC_MISMATCH - - -R1, R2 = 0.0005, 0.002 -TOL = 0.0003 - - -def test_resolve_by_radius_no_neighbors_new_resolved() -> None: - evidence = RecordEvidence(neighbors=[]) - result = resolve_by_radius(evidence, R1, R2, TOL) - assert result.status == CrossmatchStatus.NEW - assert result.triage_status == TriageStatus.RESOLVED - assert result.matched_pgc is None - - -def test_resolve_by_radius_multiple_in_inner_colliding() -> None: - evidence = RecordEvidence( - neighbors=[ - Neighbor(pgc=1, ra=10.0, dec=20.0, distance_deg=0.0003), - Neighbor(pgc=2, ra=10.0, dec=20.0, distance_deg=0.0004), - ], - ) - result = resolve_by_radius(evidence, R1, R2, TOL) - assert result.status == CrossmatchStatus.COLLIDING - assert result.triage_status == TriageStatus.PENDING - assert result.colliding_pgcs == [1, 2] - assert result.pending_reason == PendingReason.MULTIPLE_IN_INNER_RADIUS - - -def test_resolve_by_radius_single_in_inner_none_in_outer_existing_resolved() -> None: - evidence = RecordEvidence( - neighbors=[ - Neighbor(pgc=10, ra=10.0, dec=20.0, distance_deg=0.0003), - ], - ) - result = resolve_by_radius(evidence, R1, R2, TOL) - assert result.status == CrossmatchStatus.EXISTING - assert result.triage_status == TriageStatus.RESOLVED - assert result.matched_pgc == 10 - - -def test_resolve_by_radius_single_in_inner_none_in_outer_redshift_not_close_pending() -> None: - evidence = RecordEvidence( - neighbors=[ - Neighbor(pgc=10, ra=10.0, dec=20.0, distance_deg=0.0003, redshift=0.1), - ], - record_redshift=0.5, - ) - result = resolve_by_radius(evidence, R1, R2, TOL) - assert result.status == CrossmatchStatus.EXISTING - assert result.triage_status == TriageStatus.PENDING - assert result.matched_pgc == 10 - assert result.pending_reason == PendingReason.REDSHIFT_MISMATCH - - -def test_resolve_by_radius_single_in_inner_none_in_outer_redshift_close_stays_resolved() -> None: - evidence = RecordEvidence( - neighbors=[ - Neighbor(pgc=10, ra=10.0, dec=20.0, distance_deg=0.0003, redshift=0.05), - ], - record_redshift=0.0502, - ) - result = resolve_by_radius(evidence, R1, R2, TOL) - assert result.status == CrossmatchStatus.EXISTING - assert result.triage_status == TriageStatus.RESOLVED - assert result.matched_pgc == 10 - - -def test_resolve_by_radius_single_in_outer_only_no_redshift_proxy_pending() -> None: - evidence = RecordEvidence( - neighbors=[ - Neighbor(pgc=10, ra=10.0, dec=20.0, distance_deg=0.001), - ], - ) - result = resolve_by_radius(evidence, R1, R2, TOL) - assert result.status == CrossmatchStatus.EXISTING - assert result.triage_status == TriageStatus.PENDING - assert result.matched_pgc == 10 - assert result.pending_reason == PendingReason.SINGLE_IN_OUTER_RADIUS_ONLY - - -def test_resolve_by_radius_single_in_outer_only_record_no_redshift_proxy() -> None: - evidence = RecordEvidence( - neighbors=[ - Neighbor(pgc=10, ra=10.0, dec=20.0, distance_deg=0.001, redshift=0.05), - ], - record_redshift=None, - ) - result = resolve_by_radius(evidence, R1, R2, TOL) - assert result.status == CrossmatchStatus.EXISTING - assert result.triage_status == TriageStatus.PENDING - assert result.pending_reason == PendingReason.SINGLE_IN_OUTER_RADIUS_ONLY - - -def test_resolve_by_radius_single_in_outer_only_redshift_close_resolved() -> None: - evidence = RecordEvidence( - neighbors=[ - Neighbor(pgc=10, ra=10.0, dec=20.0, distance_deg=0.001, redshift=0.05), - ], - record_redshift=0.0501, - ) - result = resolve_by_radius(evidence, R1, R2, TOL) - assert result.status == CrossmatchStatus.EXISTING - assert result.triage_status == TriageStatus.RESOLVED - assert result.matched_pgc == 10 - - -def test_resolve_by_radius_single_in_outer_only_redshift_not_close_pending() -> None: - evidence = RecordEvidence( - neighbors=[ - Neighbor(pgc=10, ra=10.0, dec=20.0, distance_deg=0.001, redshift=0.05), - ], - record_redshift=0.2, - ) - result = resolve_by_radius(evidence, R1, R2, TOL) - assert result.status == CrossmatchStatus.EXISTING - assert result.triage_status == TriageStatus.PENDING - assert result.matched_pgc == 10 - assert result.pending_reason == PendingReason.REDSHIFT_MISMATCH - - -def test_resolve_by_radius_multiple_in_outer_any_no_redshift_colliding_proxy() -> None: - evidence = RecordEvidence( - neighbors=[ - Neighbor(pgc=10, ra=10.0, dec=20.0, distance_deg=0.001, redshift=0.05), - Neighbor(pgc=20, ra=10.01, dec=20.01, distance_deg=0.0015, redshift=None), - ], - record_redshift=0.05, - ) - result = resolve_by_radius(evidence, R1, R2, TOL) - assert result.status == CrossmatchStatus.COLLIDING - assert result.triage_status == TriageStatus.PENDING - assert result.colliding_pgcs == [10, 20] - assert result.pending_reason == PendingReason.MULTIPLE_IN_OUTER_RADIUS - - -def test_resolve_by_radius_multiple_in_outer_one_close_redshift_resolved() -> None: - evidence = RecordEvidence( - neighbors=[ - Neighbor(pgc=10, ra=10.0, dec=20.0, distance_deg=0.001, redshift=0.05), - Neighbor(pgc=20, ra=10.01, dec=20.01, distance_deg=0.0015, redshift=0.2), - ], - record_redshift=0.0502, - ) - result = resolve_by_radius(evidence, R1, R2, TOL) - assert result.status == CrossmatchStatus.EXISTING - assert result.triage_status == TriageStatus.RESOLVED - assert result.matched_pgc == 10 - - -def test_resolve_by_radius_multiple_in_outer_multiple_close_redshift_colliding() -> None: - evidence = RecordEvidence( - neighbors=[ - Neighbor(pgc=10, ra=10.0, dec=20.0, distance_deg=0.001, redshift=0.05), - Neighbor(pgc=20, ra=10.01, dec=20.01, distance_deg=0.0015, redshift=0.0501), - ], - record_redshift=0.05, - ) - result = resolve_by_radius(evidence, R1, R2, TOL) - assert result.status == CrossmatchStatus.COLLIDING - assert result.triage_status == TriageStatus.PENDING - assert result.colliding_pgcs == [10, 20] - assert result.pending_reason == PendingReason.MULTIPLE_IN_OUTER_RADIUS - - -def test_resolve_by_radius_multiple_in_outer_zero_close_redshift_colliding() -> None: - evidence = RecordEvidence( - neighbors=[ - Neighbor(pgc=10, ra=10.0, dec=20.0, distance_deg=0.001, redshift=0.05), - Neighbor(pgc=20, ra=10.01, dec=20.01, distance_deg=0.0015, redshift=0.1), - ], - record_redshift=0.5, - ) - result = resolve_by_radius(evidence, R1, R2, TOL) - assert result.status == CrossmatchStatus.COLLIDING - assert result.triage_status == TriageStatus.PENDING - assert result.colliding_pgcs == [10, 20] - - -def test_resolve_by_radius_single_in_inner_with_outer_colliding() -> None: - evidence = RecordEvidence( - neighbors=[ - Neighbor(pgc=1, ra=10.0, dec=20.0, distance_deg=0.0003), - Neighbor(pgc=2, ra=10.01, dec=20.01, distance_deg=0.001), - ], - ) - result = resolve_by_radius(evidence, R1, R2, TOL) - assert result.status == CrossmatchStatus.COLLIDING - assert result.triage_status == TriageStatus.PENDING - assert result.matched_pgc == 1 - assert result.pending_reason == PendingReason.SINGLE_IN_INNER_WITH_OUTER_NEIGHBORS - - -def test_apply_redshift_check_new_proxy() -> None: - coord = CrossmatchResult( - status=CrossmatchStatus.NEW, - triage_status=TriageStatus.RESOLVED, - matched_pgc=None, - ) - evidence = RecordEvidence(neighbors=[], record_redshift=0.05) - result = _apply_redshift_check(coord, evidence, TOL) - assert result.status == CrossmatchStatus.NEW - assert result.triage_status == TriageStatus.RESOLVED - - -def test_apply_redshift_check_no_record_redshift_proxy() -> None: - coord = CrossmatchResult( - status=CrossmatchStatus.EXISTING, - triage_status=TriageStatus.PENDING, - matched_pgc=10, - pending_reason=PendingReason.SINGLE_IN_OUTER_RADIUS_ONLY, - ) - evidence = RecordEvidence( - neighbors=[Neighbor(pgc=10, ra=0, dec=0, distance_deg=0.001, redshift=0.05)], - record_redshift=None, - ) - result = _apply_redshift_check(coord, evidence, TOL) - assert result.status == CrossmatchStatus.EXISTING - assert result.pending_reason == PendingReason.SINGLE_IN_OUTER_RADIUS_ONLY - - -def test_resolve_by_radius_coordinate_single_in_outer_only() -> None: - evidence = RecordEvidence( - neighbors=[ - Neighbor(pgc=10, ra=10.0, dec=20.0, distance_deg=0.001), - ], - ) - result = _resolve_by_radius_coordinate(evidence, R1, R2) - assert result.status == CrossmatchStatus.EXISTING - assert result.triage_status == TriageStatus.PENDING - assert result.matched_pgc == 10 - assert result.pending_reason == PendingReason.SINGLE_IN_OUTER_RADIUS_ONLY diff --git a/uploader/app/crossmatch/models.py b/uploader/app/crossmatch/models.py index 81de5d1..e38f88d 100644 --- a/uploader/app/crossmatch/models.py +++ b/uploader/app/crossmatch/models.py @@ -36,19 +36,10 @@ class TriageStatus(enum.Enum): class PendingReason(enum.Enum): - # default - PGC_MISMATCH = "PGC_MISMATCH" MULTIPLE_OBJECTS_MATCHED = "MULTIPLE_OBJECTS_MATCHED" MATCHED_NAME_OUTSIDE_CIRCLE = "MATCHED_NAME_OUTSIDE_CIRCLE" MATCHED_PGC_OUTSIDE_CIRCLE = "MATCHED_PGC_OUTSIDE_CIRCLE" - SINGLE_NEIGHBOR_NO_IDENTITY_MATCH = "SINGLE_NEIGHBOR_NO_IDENTITY_MATCH" UNKNOWN_PGC = "UNKNOWN_PGC" - - # two radii - MULTIPLE_IN_INNER_RADIUS = "MULTIPLE_IN_INNER_RADIUS" - MULTIPLE_IN_OUTER_RADIUS = "MULTIPLE_IN_OUTER_RADIUS" - SINGLE_IN_INNER_WITH_OUTER_NEIGHBORS = "SINGLE_IN_INNER_WITH_OUTER_NEIGHBORS" - SINGLE_IN_OUTER_RADIUS_ONLY = "SINGLE_IN_OUTER_RADIUS_ONLY" REDSHIFT_MISMATCH = "REDSHIFT_MISMATCH" TYPE_MISMATCH = "TYPE_MISMATCH" diff --git a/uploader/app/crossmatch/resolver.py b/uploader/app/crossmatch/resolver.py index 5abc5d5..fc2de5c 100644 --- a/uploader/app/crossmatch/resolver.py +++ b/uploader/app/crossmatch/resolver.py @@ -1,271 +1,12 @@ -"""Crossmatch decision tree. - -Overview: - - - "Neighbors" = layer2 objects within the search radius (after angular-distance - post-filter). "Preferred" = neighbor matches by PGC (record_pgc == neighbor.pgc) - or by designation (record_designation matches neighbor.design). - - - ONE NEIGHBOR - - PGC mismatch (record has claimed PGC and it differs from neighbor) - → EXISTING, PENDING (PGC_MISMATCH). - - Preferred (PGC or name match) or no PGC column - → EXISTING, RESOLVED. - - Else (one neighbor, no identity match, PGC column used) - → EXISTING, PENDING (SINGLE_NEIGHBOR_NO_IDENTITY_MATCH). - - - MULTIPLE NEIGHBORS - - Exactly one preferred neighbor - - PGC match or no PGC column → EXISTING, RESOLVED. - - PGC mismatch → EXISTING, PENDING (PGC_MISMATCH). - - Zero or more-than-one preferred - → COLLIDING, PENDING (MULTIPLE_OBJECTS_MATCHED). - - - ZERO NEIGHBORS - - Exactly one PGC elsewhere (by name and/or claimed PGC in layer2) - → EXISTING, PENDING (MATCHED_NAME_OUTSIDE_CIRCLE or - MATCHED_PGC_OUTSIDE_CIRCLE). - - Else - → NEW, RESOLVED. -""" - from typing import Protocol from uploader.app.crossmatch import layered from uploader.app.crossmatch.models import ( CrossmatchResult, - CrossmatchStatus, - Neighbor, - PendingReason, RecordEvidence, - TriageStatus, ) -def _designations_match(a: str | None, b: str | None) -> bool: - if a is None or b is None: - return False - return a.strip().upper() == b.strip().upper() - - -def _preferred_neighbor( - evidence: RecordEvidence, - neighbor: Neighbor, -) -> bool: - if evidence.record_pgc is not None and neighbor.pgc == evidence.record_pgc: - return True - return _designations_match(evidence.record_designation, neighbor.design) - - -def resolve(evidence: RecordEvidence) -> CrossmatchResult: - neighbors = evidence.neighbors - record_pgc = evidence.record_pgc - claimed_pgc_exists = evidence.claimed_pgc_exists_in_layer2 - global_pgcs = evidence.same_name_pgcs or [] - - if len(neighbors) == 1: - n = neighbors[0] - - if record_pgc is not None and n.pgc != record_pgc: - return CrossmatchResult( - status=CrossmatchStatus.EXISTING, - triage_status=TriageStatus.PENDING, - matched_pgc=n.pgc, - pending_reason=PendingReason.PGC_MISMATCH, - ) - - if _preferred_neighbor(evidence, n) or record_pgc is None: - return CrossmatchResult( - status=CrossmatchStatus.EXISTING, - triage_status=TriageStatus.RESOLVED, - matched_pgc=n.pgc, - ) - - return CrossmatchResult( - status=CrossmatchStatus.EXISTING, - triage_status=TriageStatus.PENDING, - matched_pgc=n.pgc, - pending_reason=PendingReason.SINGLE_NEIGHBOR_NO_IDENTITY_MATCH, - ) - - if len(neighbors) > 1: - preferred = [n for n in neighbors if _preferred_neighbor(evidence, n)] - - if len(preferred) == 1: - p = preferred[0] - triage = TriageStatus.RESOLVED if record_pgc is None or p.pgc == record_pgc else TriageStatus.PENDING - return CrossmatchResult( - status=CrossmatchStatus.EXISTING, - triage_status=triage, - matched_pgc=p.pgc, - pending_reason=PendingReason.PGC_MISMATCH if triage == TriageStatus.PENDING else None, - ) - - return CrossmatchResult( - status=CrossmatchStatus.COLLIDING, - triage_status=TriageStatus.PENDING, - matched_pgc=None, - colliding_pgcs=[n.pgc for n in neighbors], - pending_reason=PendingReason.MULTIPLE_OBJECTS_MATCHED, - ) - - pgcs_elsewhere = set(global_pgcs) - - if record_pgc is not None and claimed_pgc_exists: - pgcs_elsewhere.add(record_pgc) - - if len(pgcs_elsewhere) == 1: - matched_pgc = next(iter(pgcs_elsewhere)) - reason = ( - PendingReason.MATCHED_NAME_OUTSIDE_CIRCLE - if matched_pgc in global_pgcs - else PendingReason.MATCHED_PGC_OUTSIDE_CIRCLE - ) - return CrossmatchResult( - status=CrossmatchStatus.EXISTING, - triage_status=TriageStatus.PENDING, - matched_pgc=matched_pgc, - pending_reason=reason, - ) - - return CrossmatchResult( - status=CrossmatchStatus.NEW, - triage_status=TriageStatus.RESOLVED, - matched_pgc=None, - ) - - -def _resolve_by_radius_coordinate( - evidence: RecordEvidence, - r1_deg: float, - r2_deg: float, -) -> CrossmatchResult: - """Coordinate-only two-radii check. No redshift. Same rules as doc below.""" - inner = [n for n in evidence.neighbors if n.distance_deg <= r1_deg] - outer = [n for n in evidence.neighbors if r1_deg < n.distance_deg <= r2_deg] - - if len(inner) > 1: - return CrossmatchResult( - status=CrossmatchStatus.COLLIDING, - triage_status=TriageStatus.PENDING, - matched_pgc=None, - colliding_pgcs=[n.pgc for n in inner], - pending_reason=PendingReason.MULTIPLE_IN_INNER_RADIUS, - ) - - if len(inner) == 1 and len(outer) >= 1: - return CrossmatchResult( - status=CrossmatchStatus.COLLIDING, - triage_status=TriageStatus.PENDING, - matched_pgc=inner[0].pgc, - pending_reason=PendingReason.SINGLE_IN_INNER_WITH_OUTER_NEIGHBORS, - ) - - if len(inner) == 1 and len(outer) == 0: - return CrossmatchResult( - status=CrossmatchStatus.EXISTING, - triage_status=TriageStatus.RESOLVED, - matched_pgc=inner[0].pgc, - ) - - if len(inner) == 0 and len(outer) == 1: - return CrossmatchResult( - status=CrossmatchStatus.EXISTING, - triage_status=TriageStatus.PENDING, - matched_pgc=outer[0].pgc, - pending_reason=PendingReason.SINGLE_IN_OUTER_RADIUS_ONLY, - ) - - if len(inner) == 0 and len(outer) > 1: - return CrossmatchResult( - status=CrossmatchStatus.COLLIDING, - triage_status=TriageStatus.PENDING, - matched_pgc=None, - colliding_pgcs=[n.pgc for n in outer], - pending_reason=PendingReason.MULTIPLE_IN_OUTER_RADIUS, - ) - - return CrossmatchResult( - status=CrossmatchStatus.NEW, - triage_status=TriageStatus.RESOLVED, - matched_pgc=None, - ) - - -def _redshift_close(record_z: float, neighbor_z: float | None, tolerance: float) -> bool: - if neighbor_z is None: - return False - return abs(neighbor_z - record_z) < tolerance - - -def _apply_redshift_check( - coord_result: CrossmatchResult, - evidence: RecordEvidence, - redshift_tolerance: float, -) -> CrossmatchResult: - """Refine coordinate result using redshift when record and involved neighbors have redshift.""" - record_z = evidence.record_redshift - - if record_z is None: - return coord_result - - if coord_result.status == CrossmatchStatus.NEW: - return coord_result - - if coord_result.status == CrossmatchStatus.EXISTING: - matched_pgc = coord_result.matched_pgc - if matched_pgc is None: - return coord_result - - neighbor = next((n for n in evidence.neighbors if n.pgc == matched_pgc), None) - if neighbor is None or neighbor.redshift is None: - return coord_result - - if _redshift_close(record_z, neighbor.redshift, redshift_tolerance): - return CrossmatchResult( - status=CrossmatchStatus.EXISTING, - triage_status=TriageStatus.RESOLVED, - matched_pgc=matched_pgc, - ) - - return CrossmatchResult( - status=CrossmatchStatus.EXISTING, - triage_status=TriageStatus.PENDING, - matched_pgc=matched_pgc, - pending_reason=PendingReason.REDSHIFT_MISMATCH, - ) - - if coord_result.status == CrossmatchStatus.COLLIDING: - involved_pgcs = ( - coord_result.colliding_pgcs if coord_result.colliding_pgcs else [n.pgc for n in evidence.neighbors] - ) - neighbors_involved = [n for n in evidence.neighbors if n.pgc in involved_pgcs] - - if any(n.redshift is None for n in neighbors_involved): - return coord_result - - close = [n for n in neighbors_involved if _redshift_close(record_z, n.redshift, redshift_tolerance)] - if len(close) == 1: - return CrossmatchResult( - status=CrossmatchStatus.EXISTING, - triage_status=TriageStatus.RESOLVED, - matched_pgc=close[0].pgc, - ) - return coord_result - - return coord_result - - -def resolve_by_radius( - evidence: RecordEvidence, - r1_deg: float, - r2_deg: float, - redshift_tolerance: float, -) -> CrossmatchResult: - coord_result = _resolve_by_radius_coordinate(evidence, r1_deg, r2_deg) - return _apply_redshift_check(coord_result, evidence, redshift_tolerance) - - class Resolver(Protocol): @property def search_radius_deg(self) -> float: ... @@ -276,49 +17,4 @@ def pgc_column(self) -> str | None: ... def resolve(self, evidence: RecordEvidence) -> CrossmatchResult: ... -class DefaultResolver: - def __init__(self, radius_deg: float, pgc_column: str | None = None) -> None: - self._radius_deg = radius_deg - self._pgc_column = pgc_column - - @property - def search_radius_deg(self) -> float: - return self._radius_deg - - @property - def pgc_column(self) -> str | None: - return self._pgc_column - - def resolve(self, evidence: RecordEvidence) -> CrossmatchResult: - return resolve(evidence) - - -class TwoRadiiResolver: - def __init__( - self, - r1_deg: float, - r2_deg: float, - redshift_tolerance: float = 0.0003, - ) -> None: - self._r1_deg = r1_deg - self._r2_deg = r2_deg - self._redshift_tolerance = redshift_tolerance - - @property - def search_radius_deg(self) -> float: - return self._r2_deg - - @property - def pgc_column(self) -> str | None: - return None - - def resolve(self, evidence: RecordEvidence) -> CrossmatchResult: - return resolve_by_radius( - evidence, - self._r1_deg, - self._r2_deg, - self._redshift_tolerance, - ) - - LayeredResolver = layered.LayeredResolver diff --git a/uploader/app/structured/icrs/upload.py b/uploader/app/structured/icrs/upload.py index 9b011d4..6995f3d 100644 --- a/uploader/app/structured/icrs/upload.py +++ b/uploader/app/structured/icrs/upload.py @@ -1,11 +1,13 @@ from collections.abc import Callable +import astropy.units as u import matplotlib.pyplot as plt import numpy as np from psycopg import sql import uploader.app.report as report from uploader.app.display import format_table +from uploader.app.lib.expression import Expression, parse from uploader.app.lib.rawdata import rawdata_batches from uploader.app.storage import PgStorage from uploader.app.upload import handle_call @@ -66,29 +68,38 @@ def emit_image( report_func(report.image_event_from_figure(fig, caption=caption)) -def _fetch_units( +TARGET_ERROR_UNITS = { + "e_ra": "arcsec", + "e_dec": "arcsec", +} + + +def _parse_expressions(expressions: dict[str, str]) -> dict[str, Expression]: + return {field: parse(source) for field, source in expressions.items()} + + +def _evaluate_error_field( + expr: Expression, + values: dict[str, float], + column_units: dict[str, str], + field: str, +) -> float: + quantity = expr.evaluate(values, column_units).to(u.Unit(TARGET_ERROR_UNITS[field])) + return float(quantity.value) + + +def _fetch_column_units( client: adminapi.AuthenticatedClient, table_name: str, - ra_column: str, - dec_column: str, - ra_error_unit: str, - dec_error_unit: str, -) -> SaveStructuredDataRequestUnits: +) -> tuple[set[str], dict[str, str]]: resp = handle_call(get_table.sync_detailed(client=client, table_name=table_name)) + column_names: set[str] = set() column_units: dict[str, str] = {} for col in resp.data.column_info: + column_names.add(col.name) if isinstance(col.unit, str): column_units[col.name] = col.unit - missing = [c for c in (ra_column, dec_column) if c not in column_units] - if missing: - raise RuntimeError(f"Table {table_name} has no unit for column(s): {missing}") - units_dict = { - "ra": column_units[ra_column], - "dec": column_units[dec_column], - "e_ra": ra_error_unit, - "e_dec": dec_error_unit, - } - return SaveStructuredDataRequestUnits.from_dict(units_dict) + return column_names, column_units def upload_icrs( @@ -96,24 +107,34 @@ def upload_icrs( table_name: str, ra_column: str, dec_column: str, + expressions: dict[str, str], batch_size: int, client: adminapi.AuthenticatedClient, *, write: bool = False, - ra_error: float, - ra_error_unit: str, - dec_error: float, - dec_error_unit: str, report_func: Callable[[report.Event], None], ) -> int: - units = _fetch_units( - client, - table_name, - ra_column, - dec_column, - ra_error_unit, - dec_error_unit, + parsed = _parse_expressions(expressions) + column_names, column_units = _fetch_column_units(client, table_name) + + error_cols = set().union(*(expr.referenced_columns for expr in parsed.values())) + all_needed_cols = {ra_column, dec_column} | error_cols + missing = sorted(col for col in all_needed_cols if col not in column_names) + if missing: + raise RuntimeError(f"Table {table_name} has no column(s): {missing}") + + missing_units = [c for c in (ra_column, dec_column) if c not in column_units] + if missing_units: + raise RuntimeError(f"Table {table_name} has no unit for column(s): {missing_units}") + + units = SaveStructuredDataRequestUnits.from_dict( + { + "ra": column_units[ra_column], + "dec": column_units[dec_column], + **TARGET_ERROR_UNITS, + } ) + uploaded = 0 skipped = 0 ra_min = float("inf") @@ -122,7 +143,6 @@ def upload_icrs( dec_max = float("-inf") ra_sum = 0.0 dec_sum = 0.0 - total_count = 0 cnt = storage.query( sql.SQL("SELECT COUNT(*) AS cnt FROM rawdata.{}").format(sql.Identifier(table_name)), (), @@ -131,22 +151,32 @@ def upload_icrs( processed_rows = 0 sky = _SkyCoverageAccumulator() - for rows in rawdata_batches(storage, table_name, [ra_column, dec_column], batch_size): + fetch_columns = sorted(all_needed_cols) + for rows in rawdata_batches(storage, table_name, fetch_columns, batch_size): batch_ids: list[str] = [] batch_data: list[list[float]] = [] batch_ra: list[float] = [] batch_dec: list[float] = [] for row in rows: - ra_val = row[ra_column] - dec_val = row[dec_column] - if ra_val is None or dec_val is None: + if any(row[col] is None for col in all_needed_cols): skipped += 1 continue - ra_f = float(ra_val) - dec_f = float(dec_val) + + ra_f = float(row[ra_column]) + dec_f = float(row[dec_column]) + + values = {col: float(row[col]) for col in error_cols} + try: + e_ra_val = _evaluate_error_field(parsed["e_ra"], values, column_units, "e_ra") + e_dec_val = _evaluate_error_field(parsed["e_dec"], values, column_units, "e_dec") + except (ValueError, u.UnitConversionError, u.UnitTypeError) as e: + raise RuntimeError( + f"failed to evaluate expressions for row {row['hyperleda_internal_id']}: {e}", + ) from e + batch_ids.append(row["hyperleda_internal_id"]) - batch_data.append([ra_f, dec_f, float(ra_error), float(dec_error)]) + batch_data.append([ra_f, dec_f, e_ra_val, e_dec_val]) uploaded += 1 ra_min = min(ra_min, ra_f) ra_max = max(ra_max, ra_f) diff --git a/uploader/forms/crossmatch_default.py b/uploader/forms/crossmatch_default.py deleted file mode 100644 index db038bc..0000000 --- a/uploader/forms/crossmatch_default.py +++ /dev/null @@ -1,60 +0,0 @@ -from collections.abc import Callable -from typing import Literal, cast -from urllib.parse import quote_plus - -from psycopg import connect -from pydantic import BaseModel, Field - -import uploader.app.report as report -from uploader.app.crossmatch import run_crossmatch as run_crossmatch_cmd -from uploader.app.crossmatch.resolver import DefaultResolver -from uploader.app.endpoints import db_dsn_map, env_map -from uploader.app.storage import PgStorage -from uploader.clients.gen.client import adminapi -from uploader.credentials import load_credentials, load_token - - -class CrossmatchDefaultForm(BaseModel): - endpoint: Literal["dev", "test", "prod"] = Field(default="prod", title="API endpoint") - table_name: str = Field(..., title="Layer 0 table name") - radius: float = Field(..., title="Search radius (arcsec)", gt=0) - pgc_column: str = Field( - default="", - title="PGC column", - description="Column containing claimed PGC in raw table; leave empty to disable.", - ) - batch_size: int = Field(default=10000, title="Batch size", ge=1, le=500_000) - print_pending: bool = Field(default=False, title="Log pending cases") - write: bool = Field(default=False, title="Write to API") - - -def handle_crossmatch_default( - form: BaseModel, - report_func: Callable[[report.Event], None], -) -> None: - f = cast(CrossmatchDefaultForm, form) - db_user, db_password = load_credentials() - dsn = db_dsn_map[f.endpoint].format( - user=quote_plus(db_user), - password=quote_plus(db_password), - ) - client = adminapi.AuthenticatedClient( - base_url=env_map[f.endpoint], - token=load_token(), - ) - resolver = DefaultResolver( - radius_deg=f.radius / 3600.0, - pgc_column=f.pgc_column.strip() or None, - ) - with connect(dsn) as conn: - storage = PgStorage(conn) - run_crossmatch_cmd( - storage, - f.table_name.strip(), - f.batch_size, - client, - resolver=resolver, - print_pending=f.print_pending, - write=f.write, - report_func=report_func, - ) diff --git a/uploader/forms/crossmatch_two_radii.py b/uploader/forms/crossmatch_two_radii.py deleted file mode 100644 index 3a8218c..0000000 --- a/uploader/forms/crossmatch_two_radii.py +++ /dev/null @@ -1,63 +0,0 @@ -from collections.abc import Callable -from typing import Literal, cast -from urllib.parse import quote_plus - -from psycopg import connect -from pydantic import BaseModel, Field - -import uploader.app.report as report -from uploader.app.crossmatch import run_crossmatch as run_crossmatch_cmd -from uploader.app.crossmatch.resolver import TwoRadiiResolver -from uploader.app.endpoints import db_dsn_map, env_map -from uploader.app.storage import PgStorage -from uploader.clients.gen.client import adminapi -from uploader.credentials import load_credentials, load_token - - -class CrossmatchTwoRadiiForm(BaseModel): - endpoint: Literal["dev", "test", "prod"] = Field(default="prod", title="API endpoint") - table_name: str = Field(..., title="Layer 0 table name") - r1: float = Field(..., title="Inner radius (arcsec)", gt=0) - r2: float = Field(..., title="Outer radius (arcsec)", gt=0) - redshift_tolerance: float = Field( - default=0.0003, - title="Redshift tolerance", - description="Tolerance in z used for redshift-based disambiguation.", - ge=0, - ) - batch_size: int = Field(default=10000, title="Batch size", ge=1, le=500_000) - print_pending: bool = Field(default=False, title="Log pending cases") - write: bool = Field(default=False, title="Write to API") - - -def handle_crossmatch_two_radii( - form: BaseModel, - report_func: Callable[[report.Event], None], -) -> None: - f = cast(CrossmatchTwoRadiiForm, form) - db_user, db_password = load_credentials() - dsn = db_dsn_map[f.endpoint].format( - user=quote_plus(db_user), - password=quote_plus(db_password), - ) - client = adminapi.AuthenticatedClient( - base_url=env_map[f.endpoint], - token=load_token(), - ) - resolver = TwoRadiiResolver( - r1_deg=f.r1 / 3600.0, - r2_deg=f.r2 / 3600.0, - redshift_tolerance=f.redshift_tolerance, - ) - with connect(dsn) as conn: - storage = PgStorage(conn) - run_crossmatch_cmd( - storage, - f.table_name.strip(), - f.batch_size, - client, - resolver=resolver, - print_pending=f.print_pending, - write=f.write, - report_func=report_func, - ) diff --git a/uploader/forms/structured_icrs.py b/uploader/forms/structured_icrs.py index 9a72f0b..722250c 100644 --- a/uploader/forms/structured_icrs.py +++ b/uploader/forms/structured_icrs.py @@ -22,10 +22,8 @@ class StructuredIcrsForm(BaseModel): table_name: str = Field(..., title="Rawdata table name") ra_column: str = Field(..., title="RA column", description="Column containing right ascension.") dec_column: str = Field(..., title="Dec column", description="Column containing declination.") - ra_error: float = Field(..., title="RA error", description="Positional error for RA (all rows).") - ra_error_unit: str = Field(..., title="RA error unit", description="e.g. arcsec") - dec_error: float = Field(..., title="Dec error", description="Positional error for Dec (all rows).") - dec_error_unit: str = Field(..., title="Dec error unit", description="e.g. arcsec") + e_ra: str = Field(..., title="e_ra", description="Expression. Positional error for RA.") + e_dec: str = Field(..., title="e_dec", description="Expression. Positional error for Dec.") write: bool = Field( default=False, title="Write to API", @@ -52,6 +50,10 @@ def handle_structured_icrs( base_url=env_map[advanced.endpoint], token=load_token(), ) + expressions: dict[str, str] = { + "e_ra": f.e_ra.strip(), + "e_dec": f.e_dec.strip(), + } with connect(dsn) as conn: storage = PgStorage(conn) run_upload_icrs( @@ -59,12 +61,9 @@ def handle_structured_icrs( f.table_name.strip(), f.ra_column.strip(), f.dec_column.strip(), + expressions, advanced.batch_size, client, write=f.write, - ra_error=f.ra_error, - ra_error_unit=f.ra_error_unit.strip(), - dec_error=f.dec_error, - dec_error_unit=f.dec_error_unit.strip(), report_func=report_func, ) diff --git a/uploader/task_registry.py b/uploader/task_registry.py index d8490ae..41defc3 100644 --- a/uploader/task_registry.py +++ b/uploader/task_registry.py @@ -1,8 +1,6 @@ from uploader.app.lib.expression import expression_syntax_help from uploader.forms.authenticate import AuthenticateForm, handle_authenticate -from uploader.forms.crossmatch_default import CrossmatchDefaultForm, handle_crossmatch_default from uploader.forms.crossmatch_layered import CrossmatchLayeredForm, handle_crossmatch_layered -from uploader.forms.crossmatch_two_radii import CrossmatchTwoRadiiForm, handle_crossmatch_two_radii from uploader.forms.structured_designation import ( StructuredDesignationForm, handle_structured_designation, @@ -91,7 +89,7 @@ def register_all_tasks() -> None: TaskDefinition( id="upload-structured-icrs", title="ICRS", - description="Upload ICRS coordinates to the database.", + description=(f"Upload ICRS coordinates to the database.\n\n{expression_syntax_help()}"), form_model=StructuredIcrsForm, handler=handle_structured_icrs, group="Catalogs", @@ -140,31 +138,11 @@ def register_all_tasks() -> None: group="Catalogs", ), ) - register_task( - TaskDefinition( - id="crossmatch-default", - title="Crossmatch (default)", - description="Cross-identify objects using a single search radius.", - form_model=CrossmatchDefaultForm, - handler=handle_crossmatch_default, - group="Crossmatch", - ), - ) - register_task( - TaskDefinition( - id="crossmatch-two-radii", - title="Crossmatch (two-radii)", - description="Cross-identify objects using inner and outer search radii.", - form_model=CrossmatchTwoRadiiForm, - handler=handle_crossmatch_two_radii, - group="Crossmatch", - ), - ) register_task( TaskDefinition( id="crossmatch-layered", - title="Crossmatch (layered)", - description="Cross-identify objects using layered ICRS then name resolution.", + title="Crossmatch", + description="Cross-identify objects", form_model=CrossmatchLayeredForm, handler=handle_crossmatch_layered, group="Crossmatch", From 9923a149cf0a4209311a34992b18882d16487882 Mon Sep 17 00:00:00 2001 From: kraysent Date: Wed, 27 May 2026 21:04:46 +0100 Subject: [PATCH 2/2] report object for pending manual check --- uploader/app/crossmatch/engine.py | 46 +++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/uploader/app/crossmatch/engine.py b/uploader/app/crossmatch/engine.py index bfd24da..a9aeacd 100644 --- a/uploader/app/crossmatch/engine.py +++ b/uploader/app/crossmatch/engine.py @@ -5,6 +5,7 @@ from collections.abc import Callable from typing import cast +import matplotlib.pyplot as plt from psycopg import sql import uploader.app.report as report @@ -35,6 +36,41 @@ C_M_S = 299792458 +CHART_FIGSIZE = (8, 6) + + +def _emit_status_distribution_image( + report_func: Callable[[report.Event], None], + counts: dict[tuple[CrossmatchStatus, TriageStatus, PendingReason | None], int], + *, + caption: str, +) -> None: + triage_counts: dict[str, int] = defaultdict(int) + for (status, triage, reason), count in counts.items(): + if triage == TriageStatus.RESOLVED: + if status == CrossmatchStatus.NEW: + triage_counts["new"] += count + elif status == CrossmatchStatus.EXISTING: + triage_counts["existing"] += count + elif reason is not None: + triage_counts[reason.value] += count + + if not triage_counts: + return + + sorted_items = sorted(triage_counts.items(), key=lambda kv: (-kv[1], kv[0])) + labels = [name for name, _ in sorted_items] + values = [count for _, count in sorted_items] + fig, ax = plt.subplots(figsize=CHART_FIGSIZE) + ax.barh(labels, values) + ax.invert_yaxis() + ax.set_xscale("log") + ax.set_xlabel("Count") + ax.set_title("Crossmatch triage distribution") + fig.tight_layout() + report_func(report.image_event_from_figure(fig, caption=caption)) + + BATCH_QUERY = sql.SQL(""" WITH batch AS ( SELECT rec.id @@ -209,6 +245,7 @@ def _resolve_batch( design_to_pgcs: dict[str, list[int]], resolver: Resolver, print_pending: bool, + report_func: Callable[[report.Event], None], ) -> list[tuple[str, CrossmatchResult]]: results: list[tuple[str, CrossmatchResult]] = [] radius_deg = resolver.search_radius_deg @@ -260,6 +297,12 @@ def _resolve_batch( link=f"https://leda.sao.ru/records/{record_id}/crossmatch", evidence=json.dumps(_evidence_to_dict(evidence)), ) + reason = result.pending_reason.value if result.pending_reason is not None else "unknown" + report_func( + report.LogEvent( + message=f"Pending crossmatch: record {record_id}, reason: {reason}", + ) + ) return results @@ -380,6 +423,7 @@ def run_crossmatch( design_to_pgcs, resolver, print_pending, + report_func, ) batch_processed = len(batch_results) batch_pending = sum( @@ -406,6 +450,7 @@ def run_crossmatch( ) progress = 100.0 if total_records == 0 else (100.0 * total / total_records) report_func(report.ProgressEvent(percent=min(progress, 100.0))) + _emit_status_distribution_image(report_func, counts, caption=f"{total} records crossmatched") finally: def pct(n: int) -> float: @@ -432,4 +477,5 @@ def pct(n: int) -> float: ) report_func(report.ProgressEvent(percent=100)) + _emit_status_distribution_image(report_func, counts, caption=f"Final: {total} records") report_func(report.DoneEvent(message=summary))