From 4f727d71b6a830986dd9267b8ffad58ffd1e9118 Mon Sep 17 00:00:00 2001 From: Gaurav Vaidya Date: Fri, 1 May 2026 16:51:57 -0400 Subject: [PATCH 1/2] Performance and observability improvements based on /status diagnostics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The v1.5.2 /status payload (PR #253) showed strongly bimodal latency (p50=78ms, p99=42s, 12% very-slow), undersized queryResultCache (size=512, 56% hit rate, 317K evictions), and JVM heap consuming 94% of host RAM. Source-side changes target everything fixable in this repo; deployment items (heap sizing, queryResultCache, page-cache visibility) are filed as GitHub issues #265, #266, #267 with the performance label. Schema and data loading (data-loading/setup-and-load-solr.sh): - Fix malformed JSON on the `types` field add (missing comma between `stored:true` and `multiValued:true`) — Solr was silently 400'ing this. - Add docValues=true to curie, types, taxa, clique_identifier_count. - Force-merge to <=8 segments after the load loop. The index is read-only after loading, so a one-time merge cuts per-query work substantially (production was running with 57 segments). API (api/server.py, api/solr.py): - Replace regex prefix filter `curie:/MONDO:.*/` with prefix-wildcard query `curie:MONDO\:*` plus alphanumeric input sanitization. The regex form had to walk the entire term dictionary; the wildcard form uses it. - Bound curie_lookup's Solr `rows` from a hard-coded 1,000,000 to the number of input CURIEs. - Module-level shared httpx.AsyncClient via FastAPI lifespan instead of per-request creation. Also handles loop changes lazily so TestClient works without each test entering the lifespan as a context manager. - Parallelize bulk_lookup with an asyncio.Semaphore (default 8, configurable via BULK_LOOKUP_CONCURRENCY); preserves input order. - Tag query_log entries with endpoint name; expose recent_queries.per_endpoint stats keyed by `lookup` / `synonyms` / `bulk-lookup`. Record from all three entry points (previously only /lookup recorded). - Add slow_query_samples deque (size SLOW_QUERY_SAMPLES_SIZE, default 50) with structured records (no raw query string by default; opt in via LOG_SLOW_QUERY_STRINGS=true). Surfaced under recent_queries.slow_samples. - Use time.time() (wall clock) for query_log timestamps instead of perf_counter_ns. The latter is monotonic-since-boot and produced the nonsense `time_since_last_query_seconds: 1.7e9` value seen in production /status output. Tests (tests/test_service.py, tests/test_status.py): - Regression tests for only_prefixes / exclude_prefixes (A2 rewrite). - Test that bulk_lookup preserves input order under concurrency. - Tests asserting recent_queries.per_endpoint and slow_samples shape. Co-Authored-By: Claude Opus 4.7 --- api/server.py | 358 ++++++++++++++++++++++------ api/solr.py | 35 ++- data-loading/setup-and-load-solr.sh | 21 +- tests/test_service.py | 51 ++++ tests/test_status.py | 42 +++- 5 files changed, 418 insertions(+), 89 deletions(-) diff --git a/api/server.py b/api/server.py index 8fc60a97..eaa52f73 100755 --- a/api/server.py +++ b/api/server.py @@ -7,6 +7,7 @@ * The curie with the shortest match is first, etc. * Matching names are returned first, followed by non-matching names """ +import asyncio import json import logging import statistics @@ -15,7 +16,8 @@ import os import re from collections import deque -from typing import Dict, List, Union, Annotated, Optional +from contextlib import asynccontextmanager +from typing import Dict, List, NamedTuple, Union, Annotated, Optional from fastapi import Body, FastAPI, Query from fastapi.responses import RedirectResponse @@ -29,9 +31,64 @@ SOLR_HOST = os.getenv("SOLR_HOST", "localhost") SOLR_PORT = os.getenv("SOLR_PORT", "8983") +# Shared async HTTP client for talking to Solr. Created once during app startup so we get +# connection pooling and HTTP keep-alive across all requests instead of paying TCP setup +# costs on every endpoint call. Assigned by the lifespan context manager below. +_http_client: Optional[httpx.AsyncClient] = None +# Identity of the event loop the current _http_client was created on. httpx.AsyncClient is +# bound to the loop it is first used on, so if a different loop comes up later we have to +# recreate. In production this never changes (uvicorn runs one loop); in tests TestClient +# uses a fresh loop per call. +_http_client_loop = None + solr_client = SolrClient(SOLR_HOST, int(SOLR_PORT)) -app = FastAPI(**get_app_info()) + +def _make_http_client() -> httpx.AsyncClient: + return httpx.AsyncClient( + timeout=httpx.Timeout(60.0, connect=5.0), + limits=httpx.Limits( + max_connections=64, + max_keepalive_connections=32, + keepalive_expiry=60.0, + ), + ) + + +def get_http_client() -> httpx.AsyncClient: + """Return the shared httpx.AsyncClient. + + Normally the lifespan context manager creates the client at app startup. We also + lazy-initialize on demand so unit tests using ``TestClient(app)`` still work without + each test entering the lifespan as a context manager. If the running event loop has + changed since the client was last created (TestClient runs each call on a fresh loop), + recreate the client to avoid "event loop is closed" errors. + """ + global _http_client, _http_client_loop + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = None + if _http_client is None or _http_client_loop is not loop: + _http_client = _make_http_client() + _http_client_loop = loop + return _http_client + + +@asynccontextmanager +async def lifespan(_app: FastAPI): + global _http_client, _http_client_loop + _http_client = _make_http_client() + _http_client_loop = asyncio.get_running_loop() + try: + yield + finally: + await _http_client.aclose() + _http_client = None + _http_client_loop = None + + +app = FastAPI(lifespan=lifespan, **get_app_info()) logger = logging.getLogger(__name__) logging.basicConfig(level=os.getenv("LOGLEVEL", logging.INFO)) @@ -43,9 +100,21 @@ allow_headers=["*"], ) -# We track (timestamp_s, duration_ms) for recent queries to compute both latency and rate stats. -# A large default covers ~100s at 500 qps, giving meaningful rate windows even under heavy load. +# We track recent queries to compute both latency and rate stats. A large default covers +# ~100s at 500 qps, giving meaningful rate windows even under heavy load. QUERY_LOG_SIZE = int(os.getenv("QUERY_LOG_SIZE", 50000)) + + +class QueryLogEntry(NamedTuple): + """One record in the rolling query log. Endpoint/flag fields enable per-endpoint slicing + in /status without storing the raw input string.""" + timestamp_s: float + duration_ms: float + endpoint: str # one of: "lookup", "synonyms", "bulk-lookup" + autocomplete: bool + has_filters: bool # any of biolink_types / only_prefixes / exclude_prefixes / only_taxa + + query_log: deque = deque(maxlen=QUERY_LOG_SIZE) # Queries slower than this threshold will be logged at WARNING level (see documentation/Performance.md). @@ -53,6 +122,118 @@ IDEAL_QUERY_THRESHOLD_MS = 100 # below this: "ideal" VERY_SLOW_QUERY_THRESHOLD_MS = 1000 # at or above this: "very slow" +# Rolling buffer of slow-query samples. We keep a small structured record (no raw query string +# by default) so /status can show what the slow path looks like without writing PII to logs. +SLOW_QUERY_SAMPLES_SIZE = max(0, int(os.getenv("SLOW_QUERY_SAMPLES_SIZE", "50"))) +slow_query_samples: deque = deque(maxlen=SLOW_QUERY_SAMPLES_SIZE) + +# When true, slow-query samples include the raw query string. Off by default — operators +# should turn this on temporarily when diagnosing tail latency. +LOG_SLOW_QUERY_STRINGS = os.getenv("LOG_SLOW_QUERY_STRINGS", "false").lower() == "true" + +# Maximum number of concurrent /lookup calls dispatched per /bulk-lookup request. Bounded so that +# a single bulk request cannot saturate the backend Solr instance to the detriment of other clients. +BULK_LOOKUP_CONCURRENCY = max(1, int(os.getenv("BULK_LOOKUP_CONCURRENCY", "8"))) + +# CURIE prefixes are short identifiers like "MONDO" or "NCBITaxon". Restrict to a safe character +# set so that user input cannot inject Solr query syntax via the only_prefixes/exclude_prefixes +# parameters (which build a `curie:\:*` clause). +_CURIE_PREFIX_RE = re.compile(r"[^A-Za-z0-9._-]") + +def _sanitize_curie_prefix(prefix: str) -> str: + """Strip whitespace and remove any character that is not safe in a Solr term.""" + return _CURIE_PREFIX_RE.sub("", prefix.strip()) + + +def _compute_latency_stats(durations: List[float]) -> Dict: + """Summarize a list of durations into the same shape we expose for the global stats: + count, mean, p50/p95/p99, and latency_buckets. Reused per-endpoint.""" + total = len(durations) + if total >= 2: + qs = statistics.quantiles(durations, n=100) + p50, p95, p99 = qs[49], qs[94], qs[98] + else: + p50 = p95 = p99 = None + + if total: + n_ideal = n_fine = n_slow = n_very_slow = 0 + for d in durations: + if d < IDEAL_QUERY_THRESHOLD_MS: + n_ideal += 1 + elif d < SLOW_QUERY_THRESHOLD_MS: + n_fine += 1 + elif d < VERY_SLOW_QUERY_THRESHOLD_MS: + n_slow += 1 + else: + n_very_slow += 1 + latency_buckets = { + 'slow_threshold_ms': SLOW_QUERY_THRESHOLD_MS, + 'ideal_pct': round(n_ideal / total, 4), + 'fine_pct': round(n_fine / total, 4), + 'slow_pct': round(n_slow / total, 4), + 'very_slow_pct': round(n_very_slow / total, 4), + } + else: + latency_buckets = None + + return { + 'count': total, + 'mean_time_ms': round(sum(durations) / total, 2) if total else -1, + 'p50_ms': p50, + 'p95_ms': p95, + 'p99_ms': p99, + 'latency_buckets': latency_buckets, + } + + +def _record_query(endpoint: str, duration_ms: float, *, autocomplete: bool, has_filters: bool) -> None: + """Append a tagged record to the rolling query log. Uses wall-clock time so /status can + compute correct rate windows and inter-arrival gaps.""" + query_log.append(QueryLogEntry( + timestamp_s=time.time(), + duration_ms=duration_ms, + endpoint=endpoint, + autocomplete=autocomplete, + has_filters=has_filters, + )) + + +def _build_slow_sample(*, endpoint: str, duration_ms: float, solr_ms: Optional[float] = None, + string: Optional[str] = None, autocomplete: bool = False, + highlighting: bool = False, limit: Optional[int] = None, + biolink_types: Optional[List[str]] = None, + only_prefixes: Optional[str] = None, + exclude_prefixes: Optional[str] = None, + only_taxa: Optional[str] = None, + extra: Optional[Dict] = None) -> Dict: + """Build a structured record for the slow-query sample buffer. + + The raw query string is omitted by default to avoid logging potentially sensitive input; + set LOG_SLOW_QUERY_STRINGS=true in the environment to include it for diagnosis. + """ + sample = { + "ts": time.time(), + "endpoint": endpoint, + "duration_ms": round(duration_ms, 2), + } + if solr_ms is not None: + sample["solr_ms"] = round(solr_ms, 2) + if string is not None: + sample["string_len"] = len(string) + if LOG_SLOW_QUERY_STRINGS: + sample["string"] = string + sample["autocomplete"] = bool(autocomplete) + sample["highlighting"] = bool(highlighting) + if limit is not None: + sample["limit"] = limit + sample["biolink_types_count"] = len(biolink_types) if biolink_types else 0 + sample["only_prefixes"] = bool(only_prefixes) + sample["exclude_prefixes"] = bool(exclude_prefixes) + sample["only_taxa"] = bool(only_taxa) + if extra: + sample.update(extra) + return sample + # ENDPOINT / # If someone tries accessing /, we should redirect them to the Swagger interface. @app.get("/", include_in_schema=False) @@ -85,7 +266,7 @@ async def status_get( async def status(full: bool = False) -> Dict: """ Return a dictionary containing status and count information for the underlying Solr instance. """ - solr = await solr_client.fetch_status(full=full) + solr = await solr_client.fetch_status(full=full, client=get_http_client()) # Do we know the Babel version and version URL? It will be stored in an environmental variable if we do. babel_version = os.environ.get("BABEL_VERSION", "unknown") @@ -102,20 +283,19 @@ async def status(full: bool = False) -> Dict: app_info = get_app_info() if 'version' in app_info and app_info['version']: nameres_version = 'v' + app_info['version'] - # Unpack query_log into parallel lists for latency and rate computations. - log_snapshot = list(query_log) # snapshot to avoid mutation during computation - # Sort by timestamp: concurrent requests complete in a different order than they started, - # so insertion order does not reflect arrival order. - log_snapshot.sort(key=lambda x: x[0]) - timestamps = [ts for ts, _ in log_snapshot] - durations = [dur for _, dur in log_snapshot] - - # Latency percentiles. - if len(durations) >= 2: - qs = statistics.quantiles(durations, n=100) - p50, p95, p99 = qs[49], qs[94], qs[98] - else: - p50 = p95 = p99 = None + # Snapshot the query log and sort by timestamp. Concurrent requests can complete in a + # different order than they started, so insertion order does not reflect arrival order. + log_snapshot = sorted(query_log, key=lambda e: e[0]) + durations = [e[1] for e in log_snapshot] + timestamps = [e[0] for e in log_snapshot] + + # Per-endpoint slices for the same stats. Keying off the NamedTuple's `endpoint` slot + # lets us see whether the tail latency comes from /lookup, /synonyms, or /bulk-lookup. + by_endpoint: Dict[str, List[float]] = {} + for entry in log_snapshot: + # Older raw-tuple entries (no endpoint tag) are bucketed under "lookup" for back-compat. + ep = entry[2] if len(entry) >= 3 else "lookup" + by_endpoint.setdefault(ep, []).append(entry[1]) # Inter-arrival times (gaps between consecutive query start timestamps, in ms). # Requires >= 3 timestamps (>= 2 gaps) because statistics.quantiles needs at least 2 data points. @@ -131,29 +311,6 @@ async def status(full: bool = False) -> Dict: 'p95': round(statistics.quantiles(gaps, n=100)[94], 2), } - # Latency buckets: fraction of queries in each performance tier. - total = len(durations) - if total: - n_ideal = n_fine = n_slow = n_very_slow = 0 - for d in durations: - if d < IDEAL_QUERY_THRESHOLD_MS: - n_ideal += 1 - elif d < SLOW_QUERY_THRESHOLD_MS: - n_fine += 1 - elif d < VERY_SLOW_QUERY_THRESHOLD_MS: - n_slow += 1 - else: - n_very_slow += 1 - latency_buckets = { - 'slow_threshold_ms': SLOW_QUERY_THRESHOLD_MS, - 'ideal_pct': round(n_ideal / total, 4), - 'fine_pct': round(n_fine / total, 4), - 'slow_pct': round(n_slow / total, 4), - 'very_slow_pct': round(n_very_slow / total, 4), - } - else: - latency_buckets = None - # Windowed query rates. Scan from newest to oldest, stopping at the largest window. now = time.time() count_10s = count_60s = count_300s = 0 @@ -171,13 +328,9 @@ async def status(full: bool = False) -> Dict: history_span = (timestamps[-1] - timestamps[0]) if len(timestamps) >= 2 else 0 time_since_last = (now - timestamps[-1]) if timestamps else None + global_stats = _compute_latency_stats(durations) recent_queries = { - 'count': len(durations), - 'mean_time_ms': round(sum(durations) / len(durations), 2) if durations else -1, - 'p50_ms': p50, - 'p95_ms': p95, - 'p99_ms': p99, - 'latency_buckets': latency_buckets, + **global_stats, 'rate': { 'history_span_seconds': round(history_span, 1), 'time_since_last_query_seconds': round(time_since_last, 2) if time_since_last is not None else None, @@ -189,6 +342,8 @@ async def status(full: bool = False) -> Dict: 'queries_per_second_last_300s': round(count_300s / 300, 2), 'inter_arrival_ms': inter_arrival_ms, }, + 'per_endpoint': {ep: _compute_latency_stats(d) for ep, d in by_endpoint.items()}, + 'slow_samples': list(slow_query_samples), } biolink_model = { @@ -310,12 +465,15 @@ async def curie_lookup(curies) -> Dict[str, Dict]: f"curie:\"{curie}\"" for curie in curies ) + # Each input CURIE matches at most one document, so bounding the Solr `rows` parameter to + # the number of inputs avoids forcing Solr to allocate buffers for the previous 1,000,000 + # ceiling. A small additive floor keeps the request well-formed for empty/short inputs. params = { "query": curie_filter, - "limit": 1000000, + "limit": max(len(curies), 1), } - async with httpx.AsyncClient(timeout=None) as client: - response = await client.post(query, json=params) + client = get_http_client() + response = await client.post(query, json=params) response.raise_for_status() response_json = response.json() output = { @@ -325,8 +483,16 @@ async def curie_lookup(curies) -> Dict[str, Dict]: for doc in response_json["response"]["docs"]: output[doc["curie"]] = doc time_end = time.perf_counter_ns() + time_taken_ms = (time_end - time_start)/1_000_000 + _record_query("synonyms", time_taken_ms, autocomplete=False, has_filters=False) + if time_taken_ms > SLOW_QUERY_THRESHOLD_MS: + slow_query_samples.append(_build_slow_sample( + endpoint="synonyms", + duration_ms=time_taken_ms, + extra={"curies_count": len(curies)}, + )) - logger.info(f"CURIE Lookup on {len(curies)} CURIEs {json.dumps(curies)} took {(time_end - time_start)/1_000_000:.2f}ms") + logger.info(f"CURIE Lookup on {len(curies)} CURIEs {json.dumps(curies)} took {time_taken_ms:.2f}ms") return output @@ -532,15 +698,23 @@ async def lookup(string: str, if only_prefixes: prefix_filters = [] for prefix in re.split('\\s*\\|\\s*', only_prefixes): - prefix_filters.append(f"curie:/{prefix}:.*/") - filters.append(" OR ".join(prefix_filters)) + sanitized = _sanitize_curie_prefix(prefix) + if sanitized: + # Prefix-wildcard query (uses Lucene's term dictionary) is dramatically cheaper + # than the previous regex form `curie:/{prefix}:.*/`, which had to walk all terms. + prefix_filters.append(f"curie:{sanitized}\\:*") + if prefix_filters: + filters.append(" OR ".join(prefix_filters)) # Prefix: exclude filter if exclude_prefixes: prefix_exclude_filters = [] for prefix in re.split('\\s*\\|\\s*', exclude_prefixes): - prefix_exclude_filters.append(f"NOT curie:/{prefix}:.*/") - filters.append(" AND ".join(prefix_exclude_filters)) + sanitized = _sanitize_curie_prefix(prefix) + if sanitized: + prefix_exclude_filters.append(f"NOT curie:{sanitized}\\:*") + if prefix_exclude_filters: + filters.append(" AND ".join(prefix_exclude_filters)) # Taxa filter. # only_taxa is like: 'NCBITaxon:9606|NCBITaxon:10090|NCBITaxon:10116|NCBITaxon:7955' @@ -594,8 +768,8 @@ async def lookup(string: str, time_solr_start = time.perf_counter_ns() query_url = f"http://{SOLR_HOST}:{SOLR_PORT}/solr/name_lookup/select" - async with httpx.AsyncClient(timeout=None) as client: - response = await client.post(query_url, json=params) + client = get_http_client() + response = await client.post(query_url, json=params) if response.status_code >= 300: logger.error("Solr REST error: %s", response.text) response.raise_for_status() @@ -647,7 +821,22 @@ async def lookup(string: str, time_end = time.perf_counter_ns() time_taken_ms = (time_end - time_start)/1_000_000 solr_ms = (time_solr_end - time_solr_start)/1_000_000 - query_log.append((time_start / 1_000_000_000, time_taken_ms)) + has_filters = bool(biolink_types or only_prefixes or exclude_prefixes or only_taxa) + _record_query("lookup", time_taken_ms, autocomplete=autocomplete, has_filters=has_filters) + if time_taken_ms > SLOW_QUERY_THRESHOLD_MS: + slow_query_samples.append(_build_slow_sample( + endpoint="lookup", + duration_ms=time_taken_ms, + solr_ms=solr_ms, + string=string, + autocomplete=autocomplete, + highlighting=highlighting, + limit=limit, + biolink_types=biolink_types, + only_prefixes=only_prefixes, + exclude_prefixes=exclude_prefixes, + only_taxa=only_taxa, + )) log_msg = ( f"Lookup query to Solr for {json.dumps(string)} " f"(autocomplete={autocomplete}, highlighting={highlighting}, offset={offset}, limit={limit}, " @@ -729,20 +918,45 @@ class NameResQuery(BaseModel): ) async def bulk_lookup(query: NameResQuery) -> Dict[str, List[LookupResult]]: time_start = time.perf_counter_ns() - result = {} - for string in query.strings: - result[string] = await lookup( - string, - query.autocomplete, - query.highlighting, - query.offset, - query.limit, - query.biolink_types, - query.only_prefixes, - query.exclude_prefixes, - query.only_taxa) + sem = asyncio.Semaphore(BULK_LOOKUP_CONCURRENCY) + + async def one(s: str) -> List[LookupResult]: + async with sem: + return await lookup( + s, + query.autocomplete, + query.highlighting, + query.offset, + query.limit, + query.biolink_types, + query.only_prefixes, + query.exclude_prefixes, + query.only_taxa) + + # Preserve input order: gather returns results in the same order the coroutines were + # passed, so we can zip them back to the original strings to keep the response stable. + results = await asyncio.gather(*(one(s) for s in query.strings)) + result = dict(zip(query.strings, results)) + time_end = time.perf_counter_ns() - logger.info(f"Bulk lookup query for {len(query.strings)} strings ({query}) took {(time_end - time_start)/1_000_000:.2f}ms") + time_taken_ms = (time_end - time_start)/1_000_000 + has_filters = bool(query.biolink_types or query.only_prefixes or query.exclude_prefixes or query.only_taxa) + _record_query("bulk-lookup", time_taken_ms, + autocomplete=bool(query.autocomplete), has_filters=has_filters) + if time_taken_ms > SLOW_QUERY_THRESHOLD_MS: + slow_query_samples.append(_build_slow_sample( + endpoint="bulk-lookup", + duration_ms=time_taken_ms, + autocomplete=bool(query.autocomplete), + highlighting=bool(query.highlighting), + limit=query.limit, + biolink_types=query.biolink_types, + only_prefixes=query.only_prefixes, + exclude_prefixes=query.exclude_prefixes, + only_taxa=query.only_taxa, + extra={"strings_count": len(query.strings)}, + )) + logger.info(f"Bulk lookup query for {len(query.strings)} strings ({query}) took {time_taken_ms:.2f}ms") return result diff --git a/api/solr.py b/api/solr.py index 9cfd4ecc..e0d0032b 100644 --- a/api/solr.py +++ b/api/solr.py @@ -129,11 +129,24 @@ async def fetch_cores(self, client: httpx.AsyncClient) -> dict: r.raise_for_status() return r.json() + async def _fetch_all(self, client: httpx.AsyncClient, full: bool): + """Run cores (and optionally sysinfo + mbeans) concurrently against the given client.""" + if full: + cores_data, sysinfo_data, mbeans_data = await asyncio.gather( + self.fetch_cores(client), + self.fetch_sysinfo(client), + self.fetch_mbeans(client), + ) + else: + cores_data = await self.fetch_cores(client) + sysinfo_data = mbeans_data = None + return cores_data, sysinfo_data, mbeans_data + # ------------------------------------------------------------------ # # High-level: fetch everything and return a parsed snapshot # # ------------------------------------------------------------------ # - async def fetch_status(self, full: bool = False) -> dict: + async def fetch_status(self, full: bool = False, client: httpx.AsyncClient | None = None) -> dict: """Fetch and parse Solr monitoring data. When ``full=False`` (default), only the cores endpoint is called, @@ -141,21 +154,21 @@ async def fetch_status(self, full: bool = False) -> dict: ``None``. Pass ``full=True`` to also fetch JVM, OS, and cache metrics (three concurrent requests instead of one). + Pass ``client`` to reuse a caller-managed ``httpx.AsyncClient`` (e.g. an + application-wide pool); otherwise a short-lived client is created and + torn down for this call. The caller-managed path keeps connections + alive across requests, which matters for high-throughput status polling. + Returns a dict with a ``found`` flag plus parsed fields. Callers should check ``result["found"]`` before accessing index-level keys. Raises ``httpx.HTTPStatusError`` if the cores endpoint is unavailable. """ - async with httpx.AsyncClient(timeout=None) as client: - if full: - cores_data, sysinfo_data, mbeans_data = await asyncio.gather( - self.fetch_cores(client), - self.fetch_sysinfo(client), - self.fetch_mbeans(client), - ) - else: - cores_data = await self.fetch_cores(client) - sysinfo_data = mbeans_data = None + if client is None: + async with httpx.AsyncClient(timeout=None) as owned: + cores_data, sysinfo_data, mbeans_data = await self._fetch_all(owned, full) + else: + cores_data, sysinfo_data, mbeans_data = await self._fetch_all(client, full) jvm_info = self.parse_jvm(sysinfo_data) if sysinfo_data else None os_info = self.parse_os(sysinfo_data) if sysinfo_data else None diff --git a/data-loading/setup-and-load-solr.sh b/data-loading/setup-and-load-solr.sh index 5d60a75d..693b62a1 100755 --- a/data-loading/setup-and-load-solr.sh +++ b/data-loading/setup-and-load-solr.sh @@ -79,7 +79,8 @@ curl -X POST -H 'Content-type:application/json' --data-binary '{ { "name":"curie", "type":"string", - "stored":true + "stored":true, + "docValues":true }, { "name":"preferred_name", @@ -96,8 +97,9 @@ curl -X POST -H 'Content-type:application/json' --data-binary '{ { "name":"types", "type":"string", - "stored":true - "multiValued":true + "stored":true, + "multiValued":true, + "docValues":true }, { "name":"shortest_name_length", @@ -116,12 +118,14 @@ curl -X POST -H 'Content-type:application/json' --data-binary '{ "name":"taxa", "type":"string", "stored":true, - "multiValued":true + "multiValued":true, + "docValues":true }, { "name":"clique_identifier_count", "type":"pint", - "stored":true + "stored":true, + "docValues":true } ] }' 'http://localhost:8983/solr/name_lookup/schema' @@ -150,6 +154,13 @@ for f in $1; do 'http://localhost:8983/solr/name_lookup/update/json/docs?processor=uuid&uuid.fieldName=id&commit=true' sleep 30 done + +# Force-merge segments down to a small fixed count. The index is read-only after the load +# loop above, so a one-time merge meaningfully reduces per-query work for every subsequent +# search. waitSearcher=true blocks until the merge is committed and a new searcher is open. +echo "Force-merging segments..." +curl -s 'http://localhost:8983/solr/name_lookup/update?optimize=true&maxSegments=8&waitSearcher=true' + echo "Check solr" curl -s --negotiate -u: 'localhost:8983/solr/name_lookup/query?q=*:*&rows=0' diff --git a/tests/test_service.py b/tests/test_service.py index d5522d7f..d105ad11 100644 --- a/tests/test_service.py +++ b/tests/test_service.py @@ -191,6 +191,57 @@ def test_bulk_lookup(): assert results['Parkinson'][0]['label'] == "Parkinson disease" +def test_bulk_lookup_preserves_input_order(): + """ Bulk lookup should return a dict whose keys match the input strings in input order, + even now that lookups run concurrently under a Semaphore. """ + client = TestClient(app) + inputs = ['Parkinson', 'beta-secretase', 'alzheimer'] + response = client.post("/bulk-lookup", json={'strings': inputs, 'limit': 5}) + assert response.status_code == 200 + results = response.json() + assert list(results.keys()) == inputs + for s in inputs: + assert isinstance(results[s], list) + + +def test_only_prefixes_filter(): + """ The only_prefixes filter should restrict results to CURIEs in the listed namespaces. + Verifies the prefix-wildcard query rewrite works for both single and pipe-separated inputs. """ + client = TestClient(app) + + response = client.get("/lookup", params={'string': 'Parkinson', 'limit': 100, 'only_prefixes': 'MONDO'}) + syns = response.json() + assert len(syns) > 0 + assert all(s['curie'].startswith('MONDO:') for s in syns) + + response = client.get("/lookup", params={'string': 'Parkinson', 'limit': 100, 'only_prefixes': 'MONDO|HP'}) + syns = response.json() + assert len(syns) > 0 + assert all(s['curie'].split(':', 1)[0] in {'MONDO', 'HP'} for s in syns) + + # Sanitization: garbage characters in the prefix should not blow up the query — they get + # stripped and the remaining alphanumerics are used. An entirely-invalid prefix should be + # dropped, not produce a 500. + response = client.get("/lookup", params={'string': 'Parkinson', 'limit': 100, + 'only_prefixes': 'MONDO|/*evil*/'}) + assert response.status_code == 200 + + +def test_exclude_prefixes_filter(): + """ The exclude_prefixes filter should drop results whose CURIE is in the listed namespaces. """ + client = TestClient(app) + + # Baseline: no filter. + response = client.get("/lookup", params={'string': 'Parkinson', 'limit': 100}) + baseline = response.json() + assert any(s['curie'].startswith('MONDO:') for s in baseline) + + # With MONDO excluded. + response = client.get("/lookup", params={'string': 'Parkinson', 'limit': 100, 'exclude_prefixes': 'MONDO'}) + syns = response.json() + assert all(not s['curie'].startswith('MONDO:') for s in syns) + + def test_synonyms(): """ Test the /synonyms endpoints -- these are used to look up all the information we know about a preferred CURIE. diff --git a/tests/test_status.py b/tests/test_status.py index 2f8393d6..fdbcbb01 100644 --- a/tests/test_status.py +++ b/tests/test_status.py @@ -1,6 +1,6 @@ import logging -from api.server import app +from api.server import app, query_log, slow_query_samples from fastapi.testclient import TestClient # Turn on debugging for tests. @@ -48,3 +48,43 @@ def test_status_full_includes_metrics(): assert solr['jvm']['heap_used_bytes'] is not None assert solr['os'] is not None # assert solr['cache'] is not None -- TODO: figure out why this doesn't work on our little Docker image. + + +def test_recent_queries_per_endpoint(): + """ After exercising /lookup, /synonyms, and /bulk-lookup the /status payload should + expose per-endpoint latency stats keyed by the same endpoint names. """ + query_log.clear() + client = TestClient(app) + client.post("/lookup", params={'string': 'alzheimer', 'limit': 1}) + client.get("/synonyms", params={'preferred_curies': ['CHEBI:74925']}) + client.post("/bulk-lookup", json={'strings': ['Parkinson'], 'limit': 1}) + + rq = client.get("/status").json()['recent_queries'] + assert 'per_endpoint' in rq + per = rq['per_endpoint'] + assert set(per.keys()) >= {'lookup', 'synonyms', 'bulk-lookup'} + for ep_stats in per.values(): + assert ep_stats['count'] >= 1 + assert 'mean_time_ms' in ep_stats + assert 'latency_buckets' in ep_stats + + +def test_recent_queries_slow_samples_shape(): + """ Slow samples should be exposed as a list. We can't reliably trigger a slow sample + against the test fixture (test queries are fast), so we just assert shape and append a + synthetic record to verify the field is wired up to the deque. """ + slow_query_samples.clear() + client = TestClient(app) + rq = client.get("/status").json()['recent_queries'] + assert 'slow_samples' in rq + assert rq['slow_samples'] == [] + + slow_query_samples.append({ + 'ts': 0.0, 'endpoint': 'lookup', 'duration_ms': 9999.99, + 'autocomplete': False, 'highlighting': False, + 'biolink_types_count': 0, 'only_prefixes': False, + 'exclude_prefixes': False, 'only_taxa': False, + }) + rq = client.get("/status").json()['recent_queries'] + assert len(rq['slow_samples']) == 1 + assert rq['slow_samples'][0]['endpoint'] == 'lookup' From 4844c74fc1043c494fa6f01eed529ca029ae611e Mon Sep 17 00:00:00 2001 From: Gaurav Vaidya Date: Fri, 1 May 2026 16:52:13 -0400 Subject: [PATCH 2/2] Reduced Java memory. This is apparently both (1) good for Solr, which needs non-heap memory as well, and (2) allows me to run it on my 32G work laptop. --- docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-compose.yml b/docker-compose.yml index 78a38e74..9710e27d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,7 +5,7 @@ services: environment: # Change this setting to control how much memory you would like your Solr setup to have. # Note that your Docker will need to be configured to allow this amount of memory. - SOLR_JAVA_MEM: '-Xms25G -Xmx25G' + SOLR_JAVA_MEM: '-Xms8G -Xmx16G' ports: - '8983:8983' command: ['-DzkRun']