diff --git a/mozetl/bhr_collection/bhr_collection.py b/mozetl/bhr_collection/bhr_collection.py index 5549b4d1..e8d66b6f 100644 --- a/mozetl/bhr_collection/bhr_collection.py +++ b/mozetl/bhr_collection/bhr_collection.py @@ -533,11 +533,11 @@ def get_data(sc, sql_context, config, date, end_date=None): sql = f""" SELECT - environment, - application, - payload, + client_info, + ping_info, + metrics, FROM - `moz-fx-data-shared-prod.telemetry_stable.bhr_v4` + `moz-fx-data-shared-prod.firefox_desktop_stable.hang_report_v1` WHERE -- Use document_id to sample ABS(MOD(FARM_FINGERPRINT(document_id), {max_sample_slices})) < {sample_slices} @@ -549,29 +549,28 @@ def get_data(sc, sql_context, config, date, end_date=None): .option("query", sql) .option("viewsEnabled", True) # sql results need to be saved to a table - .option("materializationProject", "moz-fx-data-shared-prod") + .option("materializationProject", "mozdata") .option("materializationDataset", "tmp") + .option("billingProject", config["billing_project"]) .load() ) if config["exclude_modules"]: properties = [ - "environment/system/os/name", - "environment/system/os/version", - "application/architecture", - "application/build_id", - "payload/hangs", - "payload/time_since_last_ping", + "client_info/os", + "client_info/os_version", + "client_info/architecture", + "client_info/app_build", + "metrics/object/hangs_reports", ] else: properties = [ - "environment/system/os/name", - "environment/system/os/version", - "application/architecture", - "application/build_id", - "payload/modules", - "payload/hangs", - "payload/time_since_last_ping", + "client_info/os", + "client_info/os_version", + "client_info/architecture", + "client_info/app_build", + "metrics/object/hangs_modules", + "metrics/object/hangs_reports", ] print("%d results total" % pings_df.rdd.count()) @@ -581,8 +580,8 @@ def get_data(sc, sql_context, config, date, end_date=None): try: result = mapped.filter( - lambda p: p["application/build_id"][:8] >= date_str - and p["application/build_id"][:8] <= end_date_str + lambda p: p["client_info/app_build"][:8] >= date_str + and p["client_info/app_build"][:8] <= end_date_str ) print("%d results after first filter" % result.count()) return result @@ -591,13 +590,13 @@ def get_data(sc, sql_context, config, date, end_date=None): def ping_is_valid(ping): - if not isinstance(ping["environment/system/os/version"], str): + if not isinstance(ping["client_info/os_version"], str): return False - if not isinstance(ping["environment/system/os/name"], str): + if not isinstance(ping["client_info/os"], str): return False - if not isinstance(ping["application/build_id"], str): + if not isinstance(ping["client_info/app_build"], str): return False - if not isinstance(ping["payload/time_since_last_ping"], int): + if ping["metrics/object/hangs_reports"] is None: return False return True @@ -619,14 +618,32 @@ def string_to_module(string_module): def process_frame(frame, modules): + # Glean format: + # {"frame": "...", "module": 0} + if isinstance(frame, dict): + module_index = frame.get("module") + offset = frame.get("frame") + + if module_index is None: + return (("pseudo", None), offset) + + if module_index < 0 or module_index >= len(modules): + return (None, offset) + + debug_name, breakpad_id = modules[module_index] + return ((debug_name, breakpad_id), offset) + + # Legacy telemetry format: + # [module_index, offset] if isinstance(frame, list): module_index, offset = frame if module_index is None or module_index < 0 or module_index >= len(modules): return (None, offset) debug_name, breakpad_id = modules[module_index] return ((debug_name, breakpad_id), offset) - else: - return (("pseudo", None), frame) + + # Pseudo frame fallback. + return (("pseudo", None), frame) def filter_hang(hang, config): @@ -638,18 +655,25 @@ def filter_hang(hang, config): def process_hang(hang): - result = hang.asDict() - result["stack"] = json.loads(hang["stack"]) - return result + if hasattr(hang, "asDict"): + return hang.asDict(recursive=True) + return hang def process_hangs(ping, config): - build_date = ping["application/build_id"][:8] # "YYYYMMDD" : 8 characters + build_date = ping["client_info/app_build"][:8] # "YYYYMMDD" : 8 characters + + platform = "{}".format(ping["client_info/os"]) - platform = "{}".format(ping["environment/system/os/name"]) + modules = ping["metrics/object/hangs_modules"] + if isinstance(modules, str): + modules = json.loads(modules) - modules = ping["payload/modules"] - hangs = [process_hang(h) for h in ping["payload/hangs"]] + raw_hangs = ping["metrics/object/hangs_reports"] + if isinstance(raw_hangs, str): + raw_hangs = json.loads(raw_hangs) + + hangs = [process_hang(h) for h in raw_hangs] if hangs is None: return [] @@ -664,7 +688,7 @@ def process_hangs(ping, config): h["thread"], "", h["process"], - h["annotations"], + h.get("annotations") or [], build_date, platform, ) @@ -803,10 +827,9 @@ def process_hang_key(key, processed_modules): def process_hang_value(key, val, usage_hours_by_date): - stack, runnable_name, thread, build_date, annotations, platform = key return ( - val[0] / usage_hours_by_date[build_date], - val[1] / usage_hours_by_date[build_date], + val[0], + val[1], ) @@ -896,20 +919,6 @@ def get_grouped_sums_and_counts(hangs, usage_hours_by_date, config): return [k + v for k, v in items if k is not None] -def get_usage_hours(ping): - build_date = ping["application/build_id"][:8] # "YYYYMMDD" : 8 characters - usage_hours = float(ping["payload/time_since_last_ping"]) / 3600000.0 - return (build_date, usage_hours) - - -def merge_usage_hours(a, b): - return a + b - - -def get_usage_hours_by_date(pings): - return pings.map(get_usage_hours).reduceByKey(merge_usage_hours).collectAsMap() - - def make_sym_map(data, url=None): public_symbols = {} func_symbols = {} @@ -1060,9 +1069,7 @@ def transform_pings(_, pings, config): hangs = symbolicate_hang_keys(hangs, processed_modules) - usage_hours_by_date = time_code( - "Getting usage hours", lambda: get_usage_hours_by_date(filtered) - ) + usage_hours_by_date = {} result = time_code( "Grouping stacks", @@ -1188,6 +1195,7 @@ def write_file(name, stuff, config): "exclude_modules": False, "uuid": uuid.uuid4().hex, "thread_filter": "Gecko", + "billing_project": "mozdata", } @@ -1249,6 +1257,11 @@ def etl_job_daily(sc, sql_context, config=None): @click.command() +@click.option( + "--billing-project", + default="mozdata", + help="GCP project to use for BigQuery billing.", +) @click.option("--date", type=datetime.fromisoformat, required=True) @click.option( "--sample-size", @@ -1273,7 +1286,15 @@ def etl_job_daily(sc, sql_context, config=None): help="If present, add to spark.jars config. Used for local testing, " "e.g. --bq-connector-jar=spark-bigquery-latest.jar", ) -def start_job(date, sample_size, use_gcs, thread_filter, output_tag, bq_connector_jar): +def start_job( + billing_project, + date, + sample_size, + use_gcs, + thread_filter, + output_tag, + bq_connector_jar, +): print(f"Running for {date}") print(f"Using sample size {sample_size}") @@ -1298,6 +1319,7 @@ def start_job(date, sample_size, use_gcs, thread_filter, output_tag, bq_connecto "hang_upper_bound": 65536, "sample_size": sample_size, "use_gcs": use_gcs, + "billing_project": billing_project, }, )