diff --git a/workers/arax/worker.py b/workers/arax/worker.py index 44647bb..39d110c 100644 --- a/workers/arax/worker.py +++ b/workers/arax/worker.py @@ -31,8 +31,10 @@ async def arax(task, logger: logging.Logger): logger.info(f"Get the message from db {message}") headers = {"Content-Type": "application/json"} - with httpx.Client(timeout=270) as client: - response = client.post(settings.arax_url, json=message, headers=headers) + async with httpx.AsyncClient(timeout=270) as client: + response = await client.post( + settings.arax_url, json=message, headers=headers + ) logger.info(f"Status Code from ARAX response: {response.status_code}") result = response.json() diff --git a/workers/monitor/alerts.py b/workers/monitor/alerts.py index 08b5c05..6ad874b 100644 --- a/workers/monitor/alerts.py +++ b/workers/monitor/alerts.py @@ -306,8 +306,14 @@ async def _dispatch_slack(event: Dict[str, Any]) -> None: "warning": ":warning:", "critical": ":rotating_light:", }.get(event.get("severity", "warning"), ":warning:") + # Environment context lets one Slack channel receive alerts from multiple + # deployments (dev / staging / production) without ambiguity about which + # one fired. + server_url = settings.server_url or "unknown" + maturity = settings.server_maturity or "unknown" text = ( f"{emoji} *Shepherd alert* `{event['rule']}` ({event['severity']})\n" + f"*Environment:* {maturity} | *URL:* <{server_url}|{server_url}>\n" f"{event['message']}" ) try: diff --git a/workers/monitor/static/app.js b/workers/monitor/static/app.js index c01a44c..0ae660a 100644 --- a/workers/monitor/static/app.js +++ b/workers/monitor/static/app.js @@ -22,6 +22,10 @@ let xlenChart, araChart; let xlenHistory = {}; // stream -> [{x, y}, ...] const HISTORY_POINTS = 60; // rolling window in the chart + // Tracks chart legend items the user clicked to hide. Datasets are rebuilt + // on every snapshot tick, which wipes Chart.js's internal visibility state, + // so we mirror it here (keyed by series label) and re-apply on each rebuild. + const xlenHiddenSeries = new Set(); function fmt(n) { if (n === null || n === undefined) return "-"; @@ -56,7 +60,18 @@ animation: false, parsing: false, plugins: { - legend: { labels: { color: "#8b949e", boxWidth: 10 } }, + legend: { + labels: { color: "#8b949e", boxWidth: 10 }, + onClick: (e, legendItem, legend) => { + Chart.defaults.plugins.legend.onClick(e, legendItem, legend); + const label = legendItem.text; + if (legend.chart.isDatasetVisible(legendItem.datasetIndex)) { + xlenHiddenSeries.delete(label); + } else { + xlenHiddenSeries.add(label); + } + }, + }, title: { display: true, text: "Queue depth (XLEN)", color: "#e6edf3" }, }, scales: { @@ -231,6 +246,7 @@ borderWidth: 1.5, pointRadius: 0, tension: 0.2, + hidden: xlenHiddenSeries.has(n), })); xlenChart.update("none"); } diff --git a/workers/monitor/storage.py b/workers/monitor/storage.py index a0b56b4..6077db5 100644 --- a/workers/monitor/storage.py +++ b/workers/monitor/storage.py @@ -395,22 +395,28 @@ async def query_summary(since: float, until: float) -> Dict[str, Any]: row = await cur.fetchone() summary["queries_started"] = int(row[0] or 0) - # Event counts. + # Event counts. Crashes are not their own row type -- the poller + # records them as scale_down events with ``payload.kind = "crashed"`` + # (a clean scale-down uses kind="scaled_down"), so we have to dig + # into the JSON payload to separate the two. cur = await conn.execute( - "SELECT type, COUNT(*) FROM monitor_events " + "SELECT type, COUNT(*) FILTER (WHERE payload->>'kind' = 'crashed'), " + "COUNT(*) FROM monitor_events " "WHERE ts >= to_timestamp(%s) AND ts < to_timestamp(%s) " "GROUP BY type", (since, until), ) - for etype, count in await cur.fetchall(): - if etype == "crash": - summary["crashes"] = int(count) - elif etype in ("scale_up", "scale_down"): + for etype, crashed_count, total_count in await cur.fetchall(): + if etype in ("scale_up", "scale_down"): summary["scale_events"] = summary.get("scale_events", 0) + int( - count + total_count ) + if etype == "scale_down": + summary["crashes"] = summary.get("crashes", 0) + int( + crashed_count or 0 + ) elif etype == "alert": - summary["alert_count"] = int(count) + summary["alert_count"] = int(total_count) # Peak backlog per stream. cur = await conn.execute(