diff --git a/uploader/app/crossmatch/submit.py b/uploader/app/crossmatch/submit.py new file mode 100644 index 0000000..cca4a5c --- /dev/null +++ b/uploader/app/crossmatch/submit.py @@ -0,0 +1,89 @@ +from collections.abc import Callable + +from psycopg import sql + +import uploader.app.report as report +from uploader.app.storage import PgStorage +from uploader.app.upload import handle_call +from uploader.clients.gen.client import adminapi +from uploader.clients.gen.client.adminapi.api.default import assign_record_pgcs +from uploader.clients.gen.client.adminapi.models.assign_record_pgcs_request import AssignRecordPgcsRequest + +ELIGIBLE_QUERY = sql.SQL(""" + SELECT r.id + FROM layer0.records r + JOIN layer0.crossmatch c ON c.record_id = r.id + WHERE r.table_id = %s + AND r.id > %s + AND r.pgc IS NULL + AND c.triage_status = 'resolved' + AND NOT (c.metadata::jsonb ? 'possible_matches') + ORDER BY r.id ASC + LIMIT %s +""") + +ELIGIBLE_COUNT_QUERY = """ + SELECT COUNT(*) AS cnt + FROM layer0.records r + JOIN layer0.crossmatch c ON c.record_id = r.id + WHERE r.table_id = %s + AND r.pgc IS NULL + AND c.triage_status = 'resolved' + AND NOT (c.metadata::jsonb ? 'possible_matches') +""" + + +def run_submit_crossmatch( + storage: PgStorage, + table_name: str, + batch_size: int, + client: adminapi.AuthenticatedClient, + report_func: Callable[[report.Event], None], + *, + write: bool = False, +) -> None: + table_rows = storage.query( + "SELECT id FROM layer0.tables WHERE table_name = %s", + (table_name,), + ) + if not table_rows: + raise RuntimeError(f"Table not found: {table_name}") + table_id = table_rows[0]["id"] + + eligible_total = int(storage.query(ELIGIBLE_COUNT_QUERY, (table_id,))[0]["cnt"]) + report_func( + report.LogEvent( + message=f"Submitting crossmatch for {table_name}: {eligible_total} eligible records (write={write}).", + ) + ) + + submitted = 0 + last_id = "" + while True: + rows = storage.query(ELIGIBLE_QUERY, (table_id, last_id, batch_size)) + if not rows: + break + record_ids = [r["id"] for r in rows] + last_id = record_ids[-1] + if write: + handle_call( + assign_record_pgcs.sync_detailed( + client=client, + body=AssignRecordPgcsRequest(record_ids=record_ids), + ) + ) + submitted += len(record_ids) + report_func( + report.LogEvent( + message=f"Batch submitted: {len(record_ids)} records ({submitted}/{eligible_total}).", + ) + ) + progress = 100.0 if eligible_total == 0 else (100.0 * submitted / eligible_total) + report_func(report.ProgressEvent(percent=min(progress, 100.0))) + + report_func(report.ProgressEvent(percent=100)) + report_func( + report.DoneEvent( + message=f"Submitted {submitted}/{eligible_total} records (write={write}).", + ) + ) diff --git a/uploader/forms/submit_crossmatch.py b/uploader/forms/submit_crossmatch.py new file mode 100644 index 0000000..38f0dc7 --- /dev/null +++ b/uploader/forms/submit_crossmatch.py @@ -0,0 +1,50 @@ +from collections.abc import Callable +from typing import Literal, cast +from urllib.parse import quote_plus + +from psycopg import connect +from pydantic import BaseModel, Field + +import uploader.app.report as report +from uploader.app.crossmatch.submit import run_submit_crossmatch +from uploader.app.endpoints import db_dsn_map, env_map +from uploader.app.storage import PgStorage +from uploader.clients.gen.client import adminapi +from uploader.credentials import load_credentials, load_token + + +class SubmitCrossmatchForm(BaseModel): + endpoint: Literal["dev", "test", "prod"] = Field(default="prod", title="API endpoint") + table_name: str = Field(..., title="Table name") + batch_size: int = Field(default=1000, title="Batch size", ge=1, le=10000) + write: bool = Field( + default=False, + title="Write to API", + description="If disabled, only counts records that would be submitted.", + ) + + +def handle_submit_crossmatch( + form: BaseModel, + report_func: Callable[[report.Event], None], +) -> None: + f = cast(SubmitCrossmatchForm, form) + db_user, db_password = load_credentials() + dsn = db_dsn_map[f.endpoint].format( + user=quote_plus(db_user), + password=quote_plus(db_password), + ) + client = adminapi.AuthenticatedClient( + base_url=env_map[f.endpoint], + token=load_token(), + ) + with connect(dsn) as conn: + storage = PgStorage(conn) + run_submit_crossmatch( + storage, + f.table_name.strip(), + f.batch_size, + client, + report_func=report_func, + write=f.write, + ) diff --git a/uploader/task_registry.py b/uploader/task_registry.py index 41defc3..fc6670a 100644 --- a/uploader/task_registry.py +++ b/uploader/task_registry.py @@ -17,6 +17,7 @@ handle_structured_photometry_hyperleda, ) from uploader.forms.structured_redshift import StructuredRedshiftForm, handle_structured_redshift +from uploader.forms.submit_crossmatch import SubmitCrossmatchForm, handle_submit_crossmatch from uploader.forms.upload_csv import UploadCsvForm, handle_upload_csv from uploader.forms.upload_fits import UploadFitsForm, handle_upload_fits from uploader.forms.upload_vizier import UploadVizierForm, handle_upload_vizier @@ -148,3 +149,15 @@ def register_all_tasks() -> None: group="Crossmatch", ), ) + register_task( + TaskDefinition( + id="submit-crossmatch", + title="Submit crossmatch", + description=( + "Submit resolved crossmatch results to the backend, assigning PGCs and promoting records to layer 2." + ), + form_model=SubmitCrossmatchForm, + handler=handle_submit_crossmatch, + group="Crossmatch", + ), + )