Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions uploader/app/crossmatch/submit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from collections.abc import Callable

from psycopg import sql

import uploader.app.report as report
from uploader.app.storage import PgStorage
from uploader.app.upload import handle_call
from uploader.clients.gen.client import adminapi
from uploader.clients.gen.client.adminapi.api.default import assign_record_pgcs
from uploader.clients.gen.client.adminapi.models.assign_record_pgcs_request import AssignRecordPgcsRequest

ELIGIBLE_QUERY = sql.SQL("""
SELECT r.id
FROM layer0.records r
JOIN layer0.crossmatch c ON c.record_id = r.id
WHERE r.table_id = %s
AND r.id > %s
AND r.pgc IS NULL
AND c.triage_status = 'resolved'
AND NOT (c.metadata::jsonb ? 'possible_matches')
ORDER BY r.id ASC
LIMIT %s
""")

ELIGIBLE_COUNT_QUERY = """
SELECT COUNT(*) AS cnt
FROM layer0.records r
JOIN layer0.crossmatch c ON c.record_id = r.id
WHERE r.table_id = %s
AND r.pgc IS NULL
AND c.triage_status = 'resolved'
AND NOT (c.metadata::jsonb ? 'possible_matches')
"""


def run_submit_crossmatch(
storage: PgStorage,
table_name: str,
batch_size: int,
client: adminapi.AuthenticatedClient,
report_func: Callable[[report.Event], None],
*,
write: bool = False,
) -> None:
table_rows = storage.query(
"SELECT id FROM layer0.tables WHERE table_name = %s",
(table_name,),
)
if not table_rows:
raise RuntimeError(f"Table not found: {table_name}")
table_id = table_rows[0]["id"]

eligible_total = int(storage.query(ELIGIBLE_COUNT_QUERY, (table_id,))[0]["cnt"])
report_func(
report.LogEvent(
message=f"Submitting crossmatch for {table_name}: {eligible_total} eligible records (write={write}).",
)
)

submitted = 0
last_id = ""
while True:
rows = storage.query(ELIGIBLE_QUERY, (table_id, last_id, batch_size))
if not rows:
break
record_ids = [r["id"] for r in rows]
last_id = record_ids[-1]
if write:
handle_call(
assign_record_pgcs.sync_detailed(
client=client,
body=AssignRecordPgcsRequest(record_ids=record_ids),
)
)
submitted += len(record_ids)
report_func(
report.LogEvent(
message=f"Batch submitted: {len(record_ids)} records ({submitted}/{eligible_total}).",
)
)
progress = 100.0 if eligible_total == 0 else (100.0 * submitted / eligible_total)
report_func(report.ProgressEvent(percent=min(progress, 100.0)))

report_func(report.ProgressEvent(percent=100))
report_func(
report.DoneEvent(
message=f"Submitted {submitted}/{eligible_total} records (write={write}).",
)
)
50 changes: 50 additions & 0 deletions uploader/forms/submit_crossmatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from collections.abc import Callable
from typing import Literal, cast
from urllib.parse import quote_plus

from psycopg import connect
from pydantic import BaseModel, Field

import uploader.app.report as report
from uploader.app.crossmatch.submit import run_submit_crossmatch
from uploader.app.endpoints import db_dsn_map, env_map
from uploader.app.storage import PgStorage
from uploader.clients.gen.client import adminapi
from uploader.credentials import load_credentials, load_token


class SubmitCrossmatchForm(BaseModel):
endpoint: Literal["dev", "test", "prod"] = Field(default="prod", title="API endpoint")
table_name: str = Field(..., title="Table name")
batch_size: int = Field(default=1000, title="Batch size", ge=1, le=10000)
write: bool = Field(
default=False,
title="Write to API",
description="If disabled, only counts records that would be submitted.",
)


def handle_submit_crossmatch(
form: BaseModel,
report_func: Callable[[report.Event], None],
) -> None:
f = cast(SubmitCrossmatchForm, form)
db_user, db_password = load_credentials()
dsn = db_dsn_map[f.endpoint].format(
user=quote_plus(db_user),
password=quote_plus(db_password),
)
client = adminapi.AuthenticatedClient(
base_url=env_map[f.endpoint],
token=load_token(),
)
with connect(dsn) as conn:
storage = PgStorage(conn)
run_submit_crossmatch(
storage,
f.table_name.strip(),
f.batch_size,
client,
report_func=report_func,
write=f.write,
)
13 changes: 13 additions & 0 deletions uploader/task_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
handle_structured_photometry_hyperleda,
)
from uploader.forms.structured_redshift import StructuredRedshiftForm, handle_structured_redshift
from uploader.forms.submit_crossmatch import SubmitCrossmatchForm, handle_submit_crossmatch
from uploader.forms.upload_csv import UploadCsvForm, handle_upload_csv
from uploader.forms.upload_fits import UploadFitsForm, handle_upload_fits
from uploader.forms.upload_vizier import UploadVizierForm, handle_upload_vizier
Expand Down Expand Up @@ -148,3 +149,15 @@ def register_all_tasks() -> None:
group="Crossmatch",
),
)
register_task(
TaskDefinition(
id="submit-crossmatch",
title="Submit crossmatch",
description=(
"Submit resolved crossmatch results to the backend, assigning PGCs and promoting records to layer 2."
),
form_model=SubmitCrossmatchForm,
handler=handle_submit_crossmatch,
group="Crossmatch",
),
)
Loading