Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 27 additions & 12 deletions data_validation.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,44 @@
import time as ttime
import os

from prefect import flow, task, get_run_logger
from prefect.blocks.system import Secret
from tiled.client import from_profile
from tiled.client import from_uri
from dotenv import load_dotenv


def get_api_key_from_env():
with open("/srv/container.secret", "r") as secrets:
load_dotenv(stream=secrets)
api_key = os.environ["TILED_API_KEY"]
return api_key


@task(retries=2, retry_delay_seconds=10)
def get_run(uid, api_key=None):
if not api_key:
api_key = get_api_key_from_env()
cl = from_uri("https://tiled.nsls2.bnl.gov", api_key=api_key)
run = cl["iss/raw"][uid]
return run


@task(retries=2, retry_delay_seconds=10)
def read_all_streams(uid, beamline_acronym="iss"):
def read_stream(run, stream):
return run[stream].read()


@flow
def data_validation(uid, api_key=None):
logger = get_run_logger()
api_key = Secret.load(f"tiled-{beamline_acronym}-api-key", _sync=True).get()
tiled_client = from_profile("nsls2", api_key=api_key)
run = tiled_client[beamline_acronym]["raw"][uid]
run = get_run(uid, api_key=api_key)
logger.info(f"Validating uid {run.metadata['start']['uid']}")
start_time = ttime.monotonic()
for stream in run:
logger.info(f"{stream}:")
stream_start_time = ttime.monotonic()
stream_data = run[stream].read()
stream_data = read_stream(run, stream)
stream_elapsed_time = ttime.monotonic() - stream_start_time
logger.info(f"{stream} elapsed_time = {stream_elapsed_time}")
logger.info(f"{stream} nbytes = {stream_data.nbytes:_}")
elapsed_time = ttime.monotonic() - start_time
logger.info(f"{elapsed_time = }")


@flow(log_prints=True)
def data_validation(uid, beamline_acronym="iss"):
read_all_streams(uid, beamline_acronym=beamline_acronym)
4 changes: 2 additions & 2 deletions end_of_run_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def log_completion():


@flow(log_prints=True)
def end_of_run_workflow(stop_doc):
def end_of_run_workflow(stop_doc, api_key=None):
uid = stop_doc["run_start"]
data_validation(uid)
data_validation(uid, api_key=api_key)
log_completion()
Loading
Loading