diff --git a/uploader/app/action_description.py b/uploader/app/action_description.py new file mode 100644 index 0000000..da450b6 --- /dev/null +++ b/uploader/app/action_description.py @@ -0,0 +1,33 @@ +import json +from contextvars import ContextVar, Token + +import attrs + +_action_description: ContextVar[str | None] = ContextVar("action_description", default=None) + + +def build(task_id: str, run_id: str, parameters: dict[str, object]) -> str: + return json.dumps( + {"task_id": task_id, "run_id": run_id, "parameters": parameters}, + sort_keys=True, + separators=(",", ":"), + ) + + +def set_current(description: str) -> Token[str | None]: + return _action_description.set(description) + + +def reset_current(token: Token[str | None]) -> None: + _action_description.reset(token) + + +def current() -> str | None: + return _action_description.get() + + +def apply[T](body: T) -> T: + desc = current() + if desc is None: + return body + return attrs.evolve(body, action_description=desc) diff --git a/uploader/app/crossmatch/engine.py b/uploader/app/crossmatch/engine.py index a9aeacd..7c2e450 100644 --- a/uploader/app/crossmatch/engine.py +++ b/uploader/app/crossmatch/engine.py @@ -8,6 +8,7 @@ import matplotlib.pyplot as plt from psycopg import sql +import uploader.app.action_description as action_description import uploader.app.report as report from uploader.app import log from uploader.app.crossmatch.models import ( @@ -361,11 +362,13 @@ def _write_crossmatch_results( handle_call( set_crossmatch_results.sync_detailed( client=client, - body=SetCrossmatchResultsRequest( - statuses=StatusesPayload( - new=new_pl if new_pl is not None else UNSET, - existing=existing_pl if existing_pl is not None else UNSET, - collided=collided_pl if collided_pl is not None else UNSET, + body=action_description.apply( + SetCrossmatchResultsRequest( + statuses=StatusesPayload( + new=new_pl if new_pl is not None else UNSET, + existing=existing_pl if existing_pl is not None else UNSET, + collided=collided_pl if collided_pl is not None else UNSET, + ), ), ), ) diff --git a/uploader/app/crossmatch/submit.py b/uploader/app/crossmatch/submit.py index cca4a5c..e04a33c 100644 --- a/uploader/app/crossmatch/submit.py +++ b/uploader/app/crossmatch/submit.py @@ -2,6 +2,7 @@ from psycopg import sql +import uploader.app.action_description as action_description import uploader.app.report as report from uploader.app.storage import PgStorage from uploader.app.upload import handle_call @@ -69,7 +70,9 @@ def run_submit_crossmatch( handle_call( assign_record_pgcs.sync_detailed( client=client, - body=AssignRecordPgcsRequest(record_ids=record_ids), + body=action_description.apply( + AssignRecordPgcsRequest(record_ids=record_ids), + ), ) ) submitted += len(record_ids) diff --git a/uploader/app/structured/designations/upload.py b/uploader/app/structured/designations/upload.py index c65643e..34a55a4 100644 --- a/uploader/app/structured/designations/upload.py +++ b/uploader/app/structured/designations/upload.py @@ -3,6 +3,7 @@ import matplotlib.pyplot as plt from psycopg import sql +import uploader.app.action_description as action_description import uploader.app.report as report from uploader.app import log from uploader.app.display import format_table @@ -162,11 +163,13 @@ def upload_designations( handle_call( save_structured_data.sync_detailed( client=client, - body=SaveStructuredDataRequest( - catalog="designation", - columns=["design"], - ids=batch_ids, - data=batch_names, + body=action_description.apply( + SaveStructuredDataRequest( + catalog="designation", + columns=["design"], + ids=batch_ids, + data=batch_names, + ), ), ) ) diff --git a/uploader/app/structured/geometry/upload.py b/uploader/app/structured/geometry/upload.py index 8910aaf..f7e39a9 100644 --- a/uploader/app/structured/geometry/upload.py +++ b/uploader/app/structured/geometry/upload.py @@ -5,6 +5,7 @@ import numpy as np from psycopg import sql +import uploader.app.action_description as action_description import uploader.app.report as report from uploader.app.display import format_table from uploader.app.lib.expression import Expression, parse @@ -255,12 +256,14 @@ def upload_geometry_isophotal( handle_call( save_structured_data.sync_detailed( client=client, - body=SaveStructuredDataRequest( - catalog="geometry", - columns=geometry_columns, - ids=batch_ids, - data=batch_data, - units=geometry_units, + body=action_description.apply( + SaveStructuredDataRequest( + catalog="geometry", + columns=geometry_columns, + ids=batch_ids, + data=batch_data, + units=geometry_units, + ), ), ), ) diff --git a/uploader/app/structured/icrs/upload.py b/uploader/app/structured/icrs/upload.py index 6995f3d..db98acc 100644 --- a/uploader/app/structured/icrs/upload.py +++ b/uploader/app/structured/icrs/upload.py @@ -5,6 +5,7 @@ import numpy as np from psycopg import sql +import uploader.app.action_description as action_description import uploader.app.report as report from uploader.app.display import format_table from uploader.app.lib.expression import Expression, parse @@ -193,12 +194,14 @@ def upload_icrs( handle_call( save_structured_data.sync_detailed( client=client, - body=SaveStructuredDataRequest( - catalog="icrs", - columns=ICRS_COLUMNS, - ids=batch_ids, - data=batch_data, - units=units, + body=action_description.apply( + SaveStructuredDataRequest( + catalog="icrs", + columns=ICRS_COLUMNS, + ids=batch_ids, + data=batch_data, + units=units, + ), ), ) ) diff --git a/uploader/app/structured/nature/upload.py b/uploader/app/structured/nature/upload.py index abc7a8f..1771bf9 100644 --- a/uploader/app/structured/nature/upload.py +++ b/uploader/app/structured/nature/upload.py @@ -4,6 +4,7 @@ import matplotlib.pyplot as plt from psycopg import sql +import uploader.app.action_description as action_description import uploader.app.report as report from uploader.app.display import format_table from uploader.app.lib.rawdata import rawdata_batches @@ -95,11 +96,13 @@ def upload_nature( handle_call( save_structured_data.sync_detailed( client=client, - body=SaveStructuredDataRequest( - catalog="nature", - columns=NATURE_COLUMNS, - ids=batch_ids, - data=batch_data, + body=action_description.apply( + SaveStructuredDataRequest( + catalog="nature", + columns=NATURE_COLUMNS, + ids=batch_ids, + data=batch_data, + ), ), ) ) diff --git a/uploader/app/structured/note.py b/uploader/app/structured/note.py index 4864f65..4a322b5 100644 --- a/uploader/app/structured/note.py +++ b/uploader/app/structured/note.py @@ -1,5 +1,6 @@ from collections.abc import Callable +import uploader.app.action_description as action_description import uploader.app.report as report from uploader.app.upload import handle_call from uploader.clients.gen.client import adminapi @@ -20,11 +21,13 @@ def upload_note( handle_call( save_structured_data.sync_detailed( client=client, - body=SaveStructuredDataRequest( - catalog="note", - columns=NOTE_COLUMNS, - ids=[record_id], - data=[[note]], + body=action_description.apply( + SaveStructuredDataRequest( + catalog="note", + columns=NOTE_COLUMNS, + ids=[record_id], + data=[[note]], + ), ), ) ) diff --git a/uploader/app/structured/photometry/upload.py b/uploader/app/structured/photometry/upload.py index 416e08c..96d0f94 100644 --- a/uploader/app/structured/photometry/upload.py +++ b/uploader/app/structured/photometry/upload.py @@ -2,6 +2,7 @@ from psycopg import sql +import uploader.app.action_description as action_description import uploader.app.report as report from uploader.app import log from uploader.app.display import format_table @@ -78,12 +79,14 @@ def upload_photometry_hyperleda( handle_call( save_structured_data.sync_detailed( client=client, - body=SaveStructuredDataRequest( - catalog="photometry", - columns=PHOTOMETRY_COLUMNS, - ids=batch_ids, - data=batch_data, - units=PHOTOMETRY_UNITS, + body=action_description.apply( + SaveStructuredDataRequest( + catalog="photometry", + columns=PHOTOMETRY_COLUMNS, + ids=batch_ids, + data=batch_data, + units=PHOTOMETRY_UNITS, + ), ), ) ) diff --git a/uploader/app/structured/redshift/upload.py b/uploader/app/structured/redshift/upload.py index a55c95d..4d2e4e4 100644 --- a/uploader/app/structured/redshift/upload.py +++ b/uploader/app/structured/redshift/upload.py @@ -4,6 +4,7 @@ import numpy as np from psycopg import sql +import uploader.app.action_description as action_description import uploader.app.report as report from uploader.app.display import format_table from uploader.app.lib.rawdata import rawdata_batches @@ -125,12 +126,14 @@ def upload_redshift( handle_call( save_structured_data.sync_detailed( client=client, - body=SaveStructuredDataRequest( - catalog="redshift", - columns=REDSHIFT_COLUMNS, - ids=batch_ids, - data=batch_data, - units=REDSHIFT_UNITS, + body=action_description.apply( + SaveStructuredDataRequest( + catalog="redshift", + columns=REDSHIFT_COLUMNS, + ids=batch_ids, + data=batch_data, + units=REDSHIFT_UNITS, + ), ), ) ) diff --git a/uploader/app/upload.py b/uploader/app/upload.py index c5e725b..cd844e0 100644 --- a/uploader/app/upload.py +++ b/uploader/app/upload.py @@ -6,6 +6,7 @@ import pandas as pd +import uploader.app.action_description as action_description import uploader.app.report as report from uploader.app import interface, log from uploader.app.display import format_table @@ -103,10 +104,12 @@ def _upload( resp = handle_call( create_source.sync_detailed( client=client, - body=models.CreateSourceRequest( - title=pub_name, - authors=pub_authors, - year=pub_year, + body=action_description.apply( + models.CreateSourceRequest( + title=pub_name, + authors=pub_authors, + year=pub_year, + ), ), ) ) @@ -116,12 +119,14 @@ def _upload( resp = handle_call( create_table.sync_detailed( client=client, - body=models.CreateTableRequest( - table_name=table_name, - columns=schema, - bibcode=bibcode, - datatype=models.DataType[table_type], - description=table_description, + body=action_description.apply( + models.CreateTableRequest( + table_name=table_name, + columns=schema, + bibcode=bibcode, + datatype=models.DataType[table_type], + description=table_description, + ), ), ) ) @@ -163,9 +168,11 @@ def process_chunk(data: pd.DataFrame) -> None: _ = handle_call( add_data.sync_detailed( client=client, - body=models.AddDataRequest( - table_name=table_name, - data=request_data, + body=action_description.apply( + models.AddDataRequest( + table_name=table_name, + data=request_data, + ), ), ) ) diff --git a/uploader/tasks.py b/uploader/tasks.py index 9ec5c6b..ecba270 100644 --- a/uploader/tasks.py +++ b/uploader/tasks.py @@ -9,6 +9,7 @@ from pydantic import BaseModel +import uploader.app.action_description as action_description import uploader.app.report as report from uploader import history from uploader.app.log import logger @@ -127,6 +128,9 @@ def report_func(event: report.Event) -> None: def worker() -> None: nonlocal final_status, final_message + token = action_description.set_current( + action_description.build(task_id, run_id, form.model_dump(mode="json")), + ) try: defn.handler(form, report_func) except TaskCancelledError: @@ -137,6 +141,7 @@ def worker() -> None: message = f"{e}\n\n{traceback.format_exc()}" append_report_event(report.ErrorEvent(message=message)) finally: + action_description.reset_current(token) run.done.set() if defn.rerunnable and final_status is not None: history.append_entry(