From e89f23a57da1b4b3c542f7838c255391c2d9ffa9 Mon Sep 17 00:00:00 2001 From: Sky Ning Date: Mon, 4 May 2026 16:52:22 -0400 Subject: [PATCH 1/4] Use mozdata as BigQuery billing project. --- mozetl/bhr_collection/bhr_collection.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mozetl/bhr_collection/bhr_collection.py b/mozetl/bhr_collection/bhr_collection.py index 5549b4d1..7701fe52 100644 --- a/mozetl/bhr_collection/bhr_collection.py +++ b/mozetl/bhr_collection/bhr_collection.py @@ -549,8 +549,9 @@ def get_data(sc, sql_context, config, date, end_date=None): .option("query", sql) .option("viewsEnabled", True) # sql results need to be saved to a table - .option("materializationProject", "moz-fx-data-shared-prod") + .option("materializationProject", "mozdata") .option("materializationDataset", "tmp") + .option("billingProject", "mozdata") .load() ) From 2323017ca4d77954323ebf2fa29e3f5231535232 Mon Sep 17 00:00:00 2001 From: Sky Ning Date: Tue, 5 May 2026 09:08:54 -0400 Subject: [PATCH 2/4] Removed the usage of payload/time_since_last_ping as it is not used by the front end. --- mozetl/bhr_collection/bhr_collection.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/mozetl/bhr_collection/bhr_collection.py b/mozetl/bhr_collection/bhr_collection.py index 7701fe52..008e718a 100644 --- a/mozetl/bhr_collection/bhr_collection.py +++ b/mozetl/bhr_collection/bhr_collection.py @@ -562,7 +562,6 @@ def get_data(sc, sql_context, config, date, end_date=None): "application/architecture", "application/build_id", "payload/hangs", - "payload/time_since_last_ping", ] else: properties = [ @@ -572,7 +571,6 @@ def get_data(sc, sql_context, config, date, end_date=None): "application/build_id", "payload/modules", "payload/hangs", - "payload/time_since_last_ping", ] print("%d results total" % pings_df.rdd.count()) @@ -598,8 +596,6 @@ def ping_is_valid(ping): return False if not isinstance(ping["application/build_id"], str): return False - if not isinstance(ping["payload/time_since_last_ping"], int): - return False return True From 914dde2385ea8ff3e08dd437324bf003fad07abe Mon Sep 17 00:00:00 2001 From: Sky Ning Date: Tue, 5 May 2026 10:24:55 -0400 Subject: [PATCH 3/4] Read BHR hang reports from Glean Update bhr_collection.py to read from firefox_desktop_stable.hang_report_v1 instead of telemetry_stable.bhr_v4. Map the Glean client_info and metrics fields into the existing processing shape, and parse the Glean hang report and module object metrics before processing. Refs #409. --- mozetl/bhr_collection/bhr_collection.py | 110 +++++++++++++----------- 1 file changed, 60 insertions(+), 50 deletions(-) diff --git a/mozetl/bhr_collection/bhr_collection.py b/mozetl/bhr_collection/bhr_collection.py index 008e718a..a7bfb926 100644 --- a/mozetl/bhr_collection/bhr_collection.py +++ b/mozetl/bhr_collection/bhr_collection.py @@ -533,11 +533,11 @@ def get_data(sc, sql_context, config, date, end_date=None): sql = f""" SELECT - environment, - application, - payload, + client_info, + ping_info, + metrics, FROM - `moz-fx-data-shared-prod.telemetry_stable.bhr_v4` + `moz-fx-data-shared-prod.firefox_desktop_stable.hang_report_v1` WHERE -- Use document_id to sample ABS(MOD(FARM_FINGERPRINT(document_id), {max_sample_slices})) < {sample_slices} @@ -557,20 +557,20 @@ def get_data(sc, sql_context, config, date, end_date=None): if config["exclude_modules"]: properties = [ - "environment/system/os/name", - "environment/system/os/version", - "application/architecture", - "application/build_id", - "payload/hangs", + "client_info/os", + "client_info/os_version", + "client_info/architecture", + "client_info/app_build", + "metrics/object/hangs_reports", ] else: properties = [ - "environment/system/os/name", - "environment/system/os/version", - "application/architecture", - "application/build_id", - "payload/modules", - "payload/hangs", + "client_info/os", + "client_info/os_version", + "client_info/architecture", + "client_info/app_build", + "metrics/object/hangs_modules", + "metrics/object/hangs_reports", ] print("%d results total" % pings_df.rdd.count()) @@ -580,8 +580,8 @@ def get_data(sc, sql_context, config, date, end_date=None): try: result = mapped.filter( - lambda p: p["application/build_id"][:8] >= date_str - and p["application/build_id"][:8] <= end_date_str + lambda p: p["client_info/app_build"][:8] >= date_str + and p["client_info/app_build"][:8] <= end_date_str ) print("%d results after first filter" % result.count()) return result @@ -590,11 +590,13 @@ def get_data(sc, sql_context, config, date, end_date=None): def ping_is_valid(ping): - if not isinstance(ping["environment/system/os/version"], str): + if not isinstance(ping["client_info/os_version"], str): return False - if not isinstance(ping["environment/system/os/name"], str): + if not isinstance(ping["client_info/os"], str): return False - if not isinstance(ping["application/build_id"], str): + if not isinstance(ping["client_info/app_build"], str): + return False + if ping["metrics/object/hangs_reports"] is None: return False return True @@ -616,14 +618,32 @@ def string_to_module(string_module): def process_frame(frame, modules): + # Glean format: + # {"frame": "...", "module": 0} + if isinstance(frame, dict): + module_index = frame.get("module") + offset = frame.get("frame") + + if module_index is None: + return (("pseudo", None), offset) + + if module_index < 0 or module_index >= len(modules): + return (None, offset) + + debug_name, breakpad_id = modules[module_index] + return ((debug_name, breakpad_id), offset) + + # Legacy telemetry format: + # [module_index, offset] if isinstance(frame, list): module_index, offset = frame if module_index is None or module_index < 0 or module_index >= len(modules): return (None, offset) debug_name, breakpad_id = modules[module_index] return ((debug_name, breakpad_id), offset) - else: - return (("pseudo", None), frame) + + # Pseudo frame fallback. + return (("pseudo", None), frame) def filter_hang(hang, config): @@ -635,18 +655,25 @@ def filter_hang(hang, config): def process_hang(hang): - result = hang.asDict() - result["stack"] = json.loads(hang["stack"]) - return result + if hasattr(hang, "asDict"): + return hang.asDict(recursive=True) + return hang def process_hangs(ping, config): - build_date = ping["application/build_id"][:8] # "YYYYMMDD" : 8 characters + build_date = ping["client_info/app_build"][:8] # "YYYYMMDD" : 8 characters + + platform = "{}".format(ping["client_info/os"]) + + modules = ping["metrics/object/hangs_modules"] + if isinstance(modules, str): + modules = json.loads(modules) - platform = "{}".format(ping["environment/system/os/name"]) + raw_hangs = ping["metrics/object/hangs_reports"] + if isinstance(raw_hangs, str): + raw_hangs = json.loads(raw_hangs) - modules = ping["payload/modules"] - hangs = [process_hang(h) for h in ping["payload/hangs"]] + hangs = [process_hang(h) for h in raw_hangs] if hangs is None: return [] @@ -661,7 +688,7 @@ def process_hangs(ping, config): h["thread"], "", h["process"], - h["annotations"], + h.get("annotations") or [], build_date, platform, ) @@ -800,10 +827,9 @@ def process_hang_key(key, processed_modules): def process_hang_value(key, val, usage_hours_by_date): - stack, runnable_name, thread, build_date, annotations, platform = key return ( - val[0] / usage_hours_by_date[build_date], - val[1] / usage_hours_by_date[build_date], + val[0], + val[1], ) @@ -893,20 +919,6 @@ def get_grouped_sums_and_counts(hangs, usage_hours_by_date, config): return [k + v for k, v in items if k is not None] -def get_usage_hours(ping): - build_date = ping["application/build_id"][:8] # "YYYYMMDD" : 8 characters - usage_hours = float(ping["payload/time_since_last_ping"]) / 3600000.0 - return (build_date, usage_hours) - - -def merge_usage_hours(a, b): - return a + b - - -def get_usage_hours_by_date(pings): - return pings.map(get_usage_hours).reduceByKey(merge_usage_hours).collectAsMap() - - def make_sym_map(data, url=None): public_symbols = {} func_symbols = {} @@ -1057,9 +1069,7 @@ def transform_pings(_, pings, config): hangs = symbolicate_hang_keys(hangs, processed_modules) - usage_hours_by_date = time_code( - "Getting usage hours", lambda: get_usage_hours_by_date(filtered) - ) + usage_hours_by_date = {} result = time_code( "Grouping stacks", From 174d2d1e6fd18a69f8adfb1e74adab67972ab7cd Mon Sep 17 00:00:00 2001 From: Sky Ning Date: Wed, 6 May 2026 15:22:38 -0400 Subject: [PATCH 4/4] Follow up on Issue 409 - Make BHR billing project configurable Add a --billing-project CLI option and pass it through the job config to the BigQuery connector. Default to mozdata so local runs keep the current behavior, while scheduled jobs can override the billing project. --- mozetl/bhr_collection/bhr_collection.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/mozetl/bhr_collection/bhr_collection.py b/mozetl/bhr_collection/bhr_collection.py index a7bfb926..e8d66b6f 100644 --- a/mozetl/bhr_collection/bhr_collection.py +++ b/mozetl/bhr_collection/bhr_collection.py @@ -551,7 +551,7 @@ def get_data(sc, sql_context, config, date, end_date=None): # sql results need to be saved to a table .option("materializationProject", "mozdata") .option("materializationDataset", "tmp") - .option("billingProject", "mozdata") + .option("billingProject", config["billing_project"]) .load() ) @@ -1195,6 +1195,7 @@ def write_file(name, stuff, config): "exclude_modules": False, "uuid": uuid.uuid4().hex, "thread_filter": "Gecko", + "billing_project": "mozdata", } @@ -1256,6 +1257,11 @@ def etl_job_daily(sc, sql_context, config=None): @click.command() +@click.option( + "--billing-project", + default="mozdata", + help="GCP project to use for BigQuery billing.", +) @click.option("--date", type=datetime.fromisoformat, required=True) @click.option( "--sample-size", @@ -1280,7 +1286,15 @@ def etl_job_daily(sc, sql_context, config=None): help="If present, add to spark.jars config. Used for local testing, " "e.g. --bq-connector-jar=spark-bigquery-latest.jar", ) -def start_job(date, sample_size, use_gcs, thread_filter, output_tag, bq_connector_jar): +def start_job( + billing_project, + date, + sample_size, + use_gcs, + thread_filter, + output_tag, + bq_connector_jar, +): print(f"Running for {date}") print(f"Using sample size {sample_size}") @@ -1305,6 +1319,7 @@ def start_job(date, sample_size, use_gcs, thread_filter, output_tag, bq_connecto "hang_upper_bound": 65536, "sample_size": sample_size, "use_gcs": use_gcs, + "billing_project": billing_project, }, )