Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 77 additions & 55 deletions mozetl/bhr_collection/bhr_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,11 +533,11 @@ def get_data(sc, sql_context, config, date, end_date=None):

sql = f"""
SELECT
environment,
application,
payload,
client_info,
ping_info,
metrics,
FROM
`moz-fx-data-shared-prod.telemetry_stable.bhr_v4`
`moz-fx-data-shared-prod.firefox_desktop_stable.hang_report_v1`
WHERE
-- Use document_id to sample
ABS(MOD(FARM_FINGERPRINT(document_id), {max_sample_slices})) < {sample_slices}
Expand All @@ -549,29 +549,28 @@ def get_data(sc, sql_context, config, date, end_date=None):
.option("query", sql)
.option("viewsEnabled", True)
# sql results need to be saved to a table
.option("materializationProject", "moz-fx-data-shared-prod")
.option("materializationProject", "mozdata")
.option("materializationDataset", "tmp")
.option("billingProject", config["billing_project"])
.load()
)

if config["exclude_modules"]:
properties = [
"environment/system/os/name",
"environment/system/os/version",
"application/architecture",
"application/build_id",
"payload/hangs",
"payload/time_since_last_ping",
"client_info/os",
"client_info/os_version",
"client_info/architecture",
"client_info/app_build",
"metrics/object/hangs_reports",
]
else:
properties = [
"environment/system/os/name",
"environment/system/os/version",
"application/architecture",
"application/build_id",
"payload/modules",
"payload/hangs",
"payload/time_since_last_ping",
"client_info/os",
"client_info/os_version",
"client_info/architecture",
"client_info/app_build",
"metrics/object/hangs_modules",
"metrics/object/hangs_reports",
]

print("%d results total" % pings_df.rdd.count())
Expand All @@ -581,8 +580,8 @@ def get_data(sc, sql_context, config, date, end_date=None):

try:
result = mapped.filter(
lambda p: p["application/build_id"][:8] >= date_str
and p["application/build_id"][:8] <= end_date_str
lambda p: p["client_info/app_build"][:8] >= date_str
and p["client_info/app_build"][:8] <= end_date_str
)
print("%d results after first filter" % result.count())
return result
Expand All @@ -591,13 +590,13 @@ def get_data(sc, sql_context, config, date, end_date=None):


def ping_is_valid(ping):
if not isinstance(ping["environment/system/os/version"], str):
if not isinstance(ping["client_info/os_version"], str):
return False
if not isinstance(ping["environment/system/os/name"], str):
if not isinstance(ping["client_info/os"], str):
return False
if not isinstance(ping["application/build_id"], str):
if not isinstance(ping["client_info/app_build"], str):
return False
if not isinstance(ping["payload/time_since_last_ping"], int):
if ping["metrics/object/hangs_reports"] is None:
return False

return True
Expand All @@ -619,14 +618,32 @@ def string_to_module(string_module):


def process_frame(frame, modules):
# Glean format:
# {"frame": "...", "module": 0}
if isinstance(frame, dict):
module_index = frame.get("module")
offset = frame.get("frame")

if module_index is None:
return (("pseudo", None), offset)

if module_index < 0 or module_index >= len(modules):
return (None, offset)

debug_name, breakpad_id = modules[module_index]
return ((debug_name, breakpad_id), offset)

# Legacy telemetry format:
# [module_index, offset]
if isinstance(frame, list):
module_index, offset = frame
if module_index is None or module_index < 0 or module_index >= len(modules):
return (None, offset)
debug_name, breakpad_id = modules[module_index]
return ((debug_name, breakpad_id), offset)
else:
return (("pseudo", None), frame)

# Pseudo frame fallback.
return (("pseudo", None), frame)


def filter_hang(hang, config):
Expand All @@ -638,18 +655,25 @@ def filter_hang(hang, config):


def process_hang(hang):
result = hang.asDict()
result["stack"] = json.loads(hang["stack"])
return result
if hasattr(hang, "asDict"):
return hang.asDict(recursive=True)
return hang


def process_hangs(ping, config):
build_date = ping["application/build_id"][:8] # "YYYYMMDD" : 8 characters
build_date = ping["client_info/app_build"][:8] # "YYYYMMDD" : 8 characters

platform = "{}".format(ping["client_info/os"])

platform = "{}".format(ping["environment/system/os/name"])
modules = ping["metrics/object/hangs_modules"]
if isinstance(modules, str):
modules = json.loads(modules)

modules = ping["payload/modules"]
hangs = [process_hang(h) for h in ping["payload/hangs"]]
raw_hangs = ping["metrics/object/hangs_reports"]
if isinstance(raw_hangs, str):
raw_hangs = json.loads(raw_hangs)

hangs = [process_hang(h) for h in raw_hangs]
if hangs is None:
return []

Expand All @@ -664,7 +688,7 @@ def process_hangs(ping, config):
h["thread"],
"",
h["process"],
h["annotations"],
h.get("annotations") or [],
build_date,
platform,
)
Expand Down Expand Up @@ -803,10 +827,9 @@ def process_hang_key(key, processed_modules):


def process_hang_value(key, val, usage_hours_by_date):
stack, runnable_name, thread, build_date, annotations, platform = key
return (
val[0] / usage_hours_by_date[build_date],
val[1] / usage_hours_by_date[build_date],
val[0],
val[1],
)


Expand Down Expand Up @@ -896,20 +919,6 @@ def get_grouped_sums_and_counts(hangs, usage_hours_by_date, config):
return [k + v for k, v in items if k is not None]


def get_usage_hours(ping):
build_date = ping["application/build_id"][:8] # "YYYYMMDD" : 8 characters
usage_hours = float(ping["payload/time_since_last_ping"]) / 3600000.0
return (build_date, usage_hours)


def merge_usage_hours(a, b):
return a + b


def get_usage_hours_by_date(pings):
return pings.map(get_usage_hours).reduceByKey(merge_usage_hours).collectAsMap()


def make_sym_map(data, url=None):
public_symbols = {}
func_symbols = {}
Expand Down Expand Up @@ -1060,9 +1069,7 @@ def transform_pings(_, pings, config):

hangs = symbolicate_hang_keys(hangs, processed_modules)

usage_hours_by_date = time_code(
"Getting usage hours", lambda: get_usage_hours_by_date(filtered)
)
usage_hours_by_date = {}

result = time_code(
"Grouping stacks",
Expand Down Expand Up @@ -1188,6 +1195,7 @@ def write_file(name, stuff, config):
"exclude_modules": False,
"uuid": uuid.uuid4().hex,
"thread_filter": "Gecko",
"billing_project": "mozdata",
}


Expand Down Expand Up @@ -1249,6 +1257,11 @@ def etl_job_daily(sc, sql_context, config=None):


@click.command()
@click.option(
"--billing-project",
default="mozdata",
help="GCP project to use for BigQuery billing.",
)
@click.option("--date", type=datetime.fromisoformat, required=True)
@click.option(
"--sample-size",
Expand All @@ -1273,7 +1286,15 @@ def etl_job_daily(sc, sql_context, config=None):
help="If present, add to spark.jars config. Used for local testing, "
"e.g. --bq-connector-jar=spark-bigquery-latest.jar",
)
def start_job(date, sample_size, use_gcs, thread_filter, output_tag, bq_connector_jar):
def start_job(
billing_project,
date,
sample_size,
use_gcs,
thread_filter,
output_tag,
bq_connector_jar,
):
print(f"Running for {date}")
print(f"Using sample size {sample_size}")

Expand All @@ -1298,6 +1319,7 @@ def start_job(date, sample_size, use_gcs, thread_filter, output_tag, bq_connecto
"hang_upper_bound": 65536,
"sample_size": sample_size,
"use_gcs": use_gcs,
"billing_project": billing_project,
},
)

Expand Down