Conversation
Amp-Thread-ID: https://ampcode.com/threads/T-019db3d7-0da5-76ce-a24b-dc7507211386 Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019db3d7-0da5-76ce-a24b-dc7507211386 Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019db3d7-0da5-76ce-a24b-dc7507211386 Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019db3d7-0da5-76ce-a24b-dc7507211386 Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019db3d7-0da5-76ce-a24b-dc7507211386 Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019db3df-499e-72bd-9be0-df089c30106d Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019db3e1-8171-7040-8bbc-a8f061eb1b72 Co-authored-by: Amp <amp@ampcode.com>
* master: fix: stabilize geyser plugin startup (#4)
Amp-Thread-ID: https://ampcode.com/threads/T-019dd297-daf4-7092-91a6-1681a0d669ee Co-authored-by: Amp <amp@ampcode.com>
Step 2: Drop the unused allowed field from the ClientCheckpoint struct and update all four scenario files to stop constructing it: - ix-tests/src/expectation.rs: Removed allowed field from struct - ix-tests/src/scenarios/single_basic.rs: Updated lamport_client_checkpoint helper and owner_data_checkpoint block - ix-tests/src/scenarios/dual_concurrent.rs: Updated single_update_checkpoint helper - ix-tests/src/scenarios/dual_restart.rs: Updated repeated_checkpoint and empty_checkpoints helpers - ix-tests/src/scenarios/single_load.rs: Removed allowed from construction All ix-tests unit tests pass. Clippy errors in expectation.rs were auto-fixed (collapsible_if). Amp-Thread-ID: https://ampcode.com/threads/T-019dd298-cc3a-716f-a374-2375f3a99bf0 Co-authored-by: Amp <amp@ampcode.com>
The checkpoint runner re-evaluated every required entry against the full observation snapshot on each loop iteration. Required entries that had already been satisfied were re-checked against the same observations every tick, and a single observation could be counted toward multiple distinct required entries. Track per-client matching state across loop iterations so that: - each required entry, once matched, stays matched and is not re-evaluated; - each loop iteration only scans observations that arrived since the last poll; - each observation can only consume a single required entry. Replace the previous unmatched_required helper with a stateful consume_observations helper and update unit tests to cover the new incremental, consuming behavior. Amp-Thread-ID: https://ampcode.com/threads/T-019dd29f-8b80-76bf-8a1f-623f120781d8 Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019de28c-6339-7789-b2b9-91574b2fb9dc Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019de28e-1889-70e1-8acd-8644a015c46e Co-authored-by: Amp <amp@ampcode.com>
|
Warning Rate limit exceeded
You’ve run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (1)
WalkthroughThis PR adds an end-to-end integration test suite ( Changes(See hidden review stack artifact for per-range grouping.)
🎯 4 (Complex) | ⏱️ ~45 minutes
✨ Finishing Touches🧪 Generate unit tests (beta)
|
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 14
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@grpc-service/src/app.rs`:
- Line 195: Remove the unused local binding "_shutdown" in app.rs: the line "let
_shutdown = &self.shutdown;" should be deleted because self.shutdown is already
captured by self.account_update_source (which received shutdown.clone() during
construction); simply remove that line to eliminate the redundant unused
binding.
In `@grpc-service/src/kafka.rs`:
- Around line 92-126: The probe() method performs a blocking fetch_metadata call
(consumer.fetch_metadata) which can stall the async runtime; modify probe() to
offload the blocking call to a dedicated thread by using
tokio::task::spawn_blocking (or equivalent) so the metadata fetch runs inside
spawn_blocking and awaits the Result, mapping errors to
GeykagError::PreflightKafkaMetadata as before; keep the ClientConfig/consumer
creation local to probe() and ensure probe() remains async (or returns a Future)
so callers like preflight::wait_for_dependencies or app.run() can .await it
without blocking the tokio executor.
- Around line 27-61: The post_rebalance handler in ReadinessConsumerContext
currently ignores empty Rebalance::Assign and relies on rdkafka's rebalance
ordering; modify the post_rebalance implementation (method post_rebalance of
ReadinessConsumerContext) so that Rebalance::Assign(assignment) explicitly
checks assignment.count() and if zero calls
self.readiness.mark_kafka_not_ready() (and avoid logging an "assigned" message),
otherwise call self.readiness.mark_kafka_ready() and log using
topic_partition_list(assignment) as before; keep existing handling for
Rebalance::Revoke and Rebalance::Error unchanged.
In `@grpc-service/src/preflight.rs`:
- Around line 51-59: KafkaAccountUpdateStream is being constructed with
throwaway ServiceReadiness and CancellationToken because probe() only reads
self.config; remove these unused dependencies by changing probe() into an
associated function or adding a lightweight probe-only constructor. Update
either KafkaAccountUpdateStream::probe to be an async fn probe(config:
KafkaConfig) -> Result<...> (or provide
KafkaAccountUpdateStream::new_for_probe(config: KafkaConfig) that only stores
config) and then call run_probe_with_retry("kafka-metadata", deadline, || async
{ KafkaAccountUpdateStream::probe(config.clone()).await }) so you no longer
instantiate ServiceReadiness or CancellationToken; keep run_probe_with_retry
usage intact.
In `@ix-tests/src/artifacts.rs`:
- Line 82: The loop currently uses a reference to an array of references
(`&[&paths.stdout, &paths.stderr]`) which makes `path` type `&&PathBuf`; change
the iterable to ` [&paths.stdout, &paths.stderr]` so `path` is `&PathBuf` (no
extra indirection). Update the `for path in ...` header accordingly where
`paths`, `stdout`, and `stderr` are referenced.
In `@ix-tests/src/client.rs`:
- Around line 61-70: The current code uses
Pubkey::try_from(...).inspect_err(...).unwrap() (and similarly for signature
parsing) which logs errors but then panics on malformed bytes; change these to
handle parse failures gracefully by returning or propagating an error instead of
unwrapping: replace the unwrap() chain in the stream update handling with a
match or ?-propagation (e.g., let pubkey =
Pubkey::try_from(bytes.to_vec()).inspect_err(|err| error!(bytes = ?bytes, err =
?err, "failed to parse pubkey")).map_err(|e| /* convert to your handler error
*/)?; and do the same for signature parsing), or skip invalid entries and
continue; update the surrounding function signature to return Result where
needed so you can propagate errors from Pubkey::try_from and signature parsing
rather than panicking (affects the blocks using Pubkey::try_from at lines 61–70
and the analogous code at 76–85).
In `@ix-tests/src/main.rs`:
- Around line 82-89: The current block can lose the original test error because
artifact I/O uses `?` and only dumps one service's logs; capture the original
error (e.g., let orig_err = failure.error) before performing artifact
operations, then perform `ctx.artifacts.write_client_updates`,
`ctx.artifacts.dump_service_logs(layout::ServiceInstance::One)`,
`ctx.artifacts.dump_service_logs(layout::ServiceInstance::Two)`, and
`ctx.artifacts.persist_failure()` but do not propagate their errors—handle or
log them locally (e.g., ignore or warn on Err) so they cannot replace
`orig_err`; finally return Err(orig_err). Ensure you reference `failure`,
`ctx.artifacts.write_client_updates`, `ctx.artifacts.dump_service_logs`,
`layout::ServiceInstance::One/Two`, `persist_failure`, and the original
`failure.error` when making the change.
In `@ix-tests/src/observation.rs`:
- Around line 50-58: consume_next_update has a TOCTOU race: it locks entries
twice (calling self.entries.lock().unwrap() for is_empty() and again to
remove(0)), which can panic if another thread modifies the vector between locks;
fix by acquiring the mutex once (e.g. let mut guard =
self.entries.lock().unwrap()), then check guard.is_empty() and if not call
guard.remove(0) and return Some(...) so the check-and-remove happen under a
single lock in the consume_next_update function.
In `@ix-tests/src/scenarios/single_load.rs`:
- Around line 21-31: The code currently returns early on error from run_inner or
from shutdown_clients, which can skip calling shutdown_service; modify the run
function so shutdown_service(&ctx.service_controller, &mut service) is invoked
on every exit path by first capturing the outcome of run_inner into a local
Result (e.g., let outcome = match run_inner(...).await { Err(error) =>
Err(ScenarioFailure{ error, clients }), Ok(v) => Ok(v) }), then always call
shutdown_clients(clients).await.map_err(...)? and
shutdown_service(&ctx.service_controller, &mut service).await.map_err(...)?, and
finally return the previously captured outcome; this ensures shutdown_service
runs regardless of whether run_inner failed. Make changes around run_inner,
shutdown_clients, shutdown_service, and ScenarioFailure in run.
In `@ix-tests/src/scenarios/single_triage.rs`:
- Around line 35-45: When run_inner or shutdown_clients returns an error, the
code currently returns early and can skip calling shutdown_service; change run
to always call shutdown_service(&ctx.service_controller, &mut service).await
regardless of earlier failures: after run_inner returns Err, call
shutdown_service and then return Err(ScenarioFailure { error:
original_error_or_preferred_error, clients }); likewise if shutdown_clients
fails, still call shutdown_service and then return a ScenarioFailure choosing
which error to surface (e.g., prefer the shutdown_clients error but ensure both
were attempted). Use the existing symbols run_inner, shutdown_clients,
shutdown_service, and ScenarioFailure to implement this guaranteed cleanup flow.
In `@ix-tests/src/service.rs`:
- Around line 207-279: The spawned child process is created with
.kill_on_drop(true) and then moved into ServiceHandle as
ServiceOwnership::Owned(child), which causes the process to be killed if a
ServiceHandle is dropped without calling shutdown(); add clear documentation
explaining this ownership/cleanup contract so callers know they must call
ServiceHandle::shutdown() (or otherwise retain the handle) for graceful
termination—update the ServiceHandle doc comment (and/or module-level docs) to
mention .kill_on_drop(true), ServiceOwnership::Owned, and the required shutdown
behavior and lifetime expectations.
In `@ix-tests/src/validator.rs`:
- Around line 39-44: The fund_payer function calls self.rpc.request_airdrop but
does not wait for the airdrop to be confirmed; capture the returned signature
from request_airdrop and then wait for confirmation before returning (e.g., call
self.rpc.confirm_transaction or poll get_signature_status_with_commitment /
get_signature_statuses until the signature is finalized/confirmed with a
timeout), so that subsequent transactions won't race with funding finalization;
update fund_payer to use the airdrop signature and a confirmation check on
self.rpc (or an equivalent polling helper) and handle/report timeouts/errors
accordingly.
In `@kafka-setup/sh/ksql/03_reset-accounts-state.sh`:
- Around line 31-37: The check using QUERIES_OUTPUT to detect 'CTAS_' and
'ACCOUNTS' is fragile because the script then hardcodes TERMINATE
CTAS_ACCOUNTS_1; instead parse QUERIES_OUTPUT to extract the actual query
name(s) (e.g., using grep/sed/awk to match
/^CTAS_[^[:space:]]*ACCOUNTS[^[:space:]]*/ or similar), iterate over each
matched query name, and invoke $DC run --rm ksqldb-cli ksql "${KSQL_SERVER_URL}"
-e "TERMINATE <query_name>;" using the extracted variable(s) (ensure proper
quoting and handle zero matches by keeping the existing "No persistent ACCOUNTS
query found" message).
- Around line 41-47: The existence check uses uppercase "ACCOUNTS" but the drop
uses ${TABLE} which is lowercase; normalize the identifier so both match: set or
transform TABLE to uppercase before checking and using it (e.g., derive
TABLE_UPPER from TABLE or assign TABLE="$(echo "$TABLE" | tr '[:lower:]'
'[:upper:]')" and then use TABLE_UPPER in the grep against TABLES_OUTPUT and in
the DROP TABLE command), referencing TABLE, TABLES_OUTPUT, the ksqldb-cli SHOW
TABLES check and the DROP TABLE ${TABLE} DELETE TOPIC invocation to keep
behavior consistent with ksqlDB's uppercase display.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 3e14faab-a74b-4dd2-9399-b871f787a9d9
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (48)
Cargo.tomlMakefileREADME.mdgeyser-plugin/Makefilegeyser-plugin/configs/plugin-config.example.jsongeyser-plugin/src/account_update_publisher.rsgrpc-service/Cargo.tomlgrpc-service/src/app.rsgrpc-service/src/config.rsgrpc-service/src/errors.rsgrpc-service/src/grpc_service/init_subs.rsgrpc-service/src/grpc_service/mod.rsgrpc-service/src/grpc_service/readiness.rsgrpc-service/src/grpc_service/runtime.rsgrpc-service/src/grpc_service/service.rsgrpc-service/src/kafka.rsgrpc-service/src/main.rsgrpc-service/src/preflight.rsix-tests/Cargo.tomlix-tests/configs/grpc-service/service-1.tomlix-tests/configs/grpc-service/service-2.tomlix-tests/configs/suite.tomlix-tests/src/accounts.rsix-tests/src/artifacts.rsix-tests/src/cli.rsix-tests/src/client.rsix-tests/src/config.rsix-tests/src/context.rsix-tests/src/expectation.rsix-tests/src/layout.rsix-tests/src/main.rsix-tests/src/observation.rsix-tests/src/runner.rsix-tests/src/scenario.rsix-tests/src/scenarios/dual_concurrent.rsix-tests/src/scenarios/dual_restart.rsix-tests/src/scenarios/mod.rsix-tests/src/scenarios/single_basic.rsix-tests/src/scenarios/single_load.rsix-tests/src/scenarios/single_triage.rsix-tests/src/service.rsix-tests/src/validator.rskafka-setup/Makefilekafka-setup/README.mdkafka-setup/docker-compose.ymlkafka-setup/sh/kafka/01_recreate-account-updates-topic.shkafka-setup/sh/ksql/03_reset-accounts-state.shkafka-setup/sh/reset-state.sh
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@ix-tests/src/observation.rs`:
- Around line 45-48: The snapshot_from function can panic when taking
guard[start_index..] if start_index > guard.len(); update snapshot_from to lock
entries (entries.lock().unwrap()), check if start_index >= guard.len() and
return an empty Vec<ObservedUpdate> in that case, otherwise return
guard[start_index..].to_vec(); reference the snapshot_from method and the
entries lock/guard to locate the change.
In `@ix-tests/src/scenarios/single_load.rs`:
- Around line 26-30: The error branch inside the `if let Err(error) = outcome`
block currently returns `ScenarioFailure { error, clients: Vec::new() }`, which
discards the runtime client state; change it to return the captured client
snapshot instead (e.g., `clients` or `clients.clone()` / `std::mem::take(&mut
clients)` as appropriate) so `ScenarioFailure` contains the actual client
observations for artifact generation; ensure the chosen approach respects
ownership/borrowing and serialization bounds for the `clients` field.
In `@ix-tests/src/scenarios/single_triage.rs`:
- Around line 40-44: The failure branch in run_inner constructs ScenarioFailure
with clients: Vec::new(), discarding collected observations; replace the empty
vector with the actual clients variable (e.g., clients) so observations are
preserved: return Err(ScenarioFailure { error, clients }) (or clients.clone() if
ownership/borrow requires it). Ensure you adjust ownership/moves where run_inner
defines/uses the clients Vec and update any signatures to move or clone as
needed so the original observations are returned instead of being dropped.
In `@ix-tests/src/service.rs`:
- Around line 433-445: The timeout branch currently calls
RunArtifacts::dump_service_logs_at(log_paths)? which can convert an artifact I/O
error into the returned error; instead call
RunArtifacts::dump_service_logs_at(log_paths) and handle the Result so the
readiness timeout remains the primary error: if the dump fails, log or warn
about the dump failure (including the error) as best-effort but do not propagate
it — then proceed to bail! with the original grpc-service timeout message. Keep
references to RunArtifacts::dump_service_logs_at, log_paths, endpoint, and
self.service_start_timeout when locating and adjusting this logic.
- Around line 394-423: The readiness loop in wait_until_ready currently calls
GeyserClient::connect and client.ping without a per-attempt timeout, allowing a
single stalled call to exceed service_start_timeout; wrap the entire
connect+ping attempt in a tokio::time::timeout (or equivalent) using a
per-attempt deadline derived from service_start_timeout/remaining time, treat a
timeout as a non-ready result (log similarly to the Unavailable branch and
continue the loop), and ensure any partially created client is dropped/canceled
on timeout; update logging around announced_waiting to reflect timeout-based
retries.
In `@kafka-setup/sh/ksql/03_reset-accounts-state.sh`:
- Around line 54-60: The stream drop logic hardcodes "ACCOUNT_UPDATES_STREAM"
and mixes case with the STREAM variable; define STREAM_UPPER (like TABLE_UPPER)
by uppercasing STREAM near where TABLE_UPPER is set, then update the check to
search for "$STREAM_UPPER" in STREAMS_OUTPUT and use the same
STREAM/STREAM_UPPER pairing consistently when deciding to drop (use STREAM for
the DROP command but use STREAM_UPPER for existence checks) so the presence
check and drop operate on the same stream name in the correct case (adjust
references to STREAMS_OUTPUT and the ksqldb-cli command accordingly).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: f7601337-a4c5-443a-afe1-d69515e2c68f
📒 Files selected for processing (13)
grpc-service/src/app.rsgrpc-service/src/errors.rsgrpc-service/src/kafka.rsgrpc-service/src/preflight.rsix-tests/src/artifacts.rsix-tests/src/client.rsix-tests/src/main.rsix-tests/src/observation.rsix-tests/src/scenarios/single_load.rsix-tests/src/scenarios/single_triage.rsix-tests/src/service.rsix-tests/src/validator.rskafka-setup/sh/ksql/03_reset-accounts-state.sh
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@kafka-setup/sh/ksql/03_reset-accounts-state.sh`:
- Around line 48-56: The grep checks for TABLE_UPPER and STREAM_UPPER are using
substring matching and can match similarly named objects; change the existence
checks to match the object name exactly in the first column of the SHOW output
(use the variables TABLES_OUTPUT/STREAMS_OUTPUT) — e.g. pipe
TABLES_OUTPUT/STREAMS_OUTPUT into a field-aware matcher such as awk with -v
NAME="$TABLE_UPPER" and test $1 == NAME (do the same for STREAM_UPPER) so DROP
only runs when the first column exactly equals the target name; ensure you pass
the shell variable into awk with -v to avoid regex/word-boundary pitfalls.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: f2c8d2ae-815e-4aab-bcf0-613e9d5c4100
📒 Files selected for processing (5)
ix-tests/src/observation.rsix-tests/src/scenarios/single_load.rsix-tests/src/scenarios/single_triage.rsix-tests/src/service.rskafka-setup/sh/ksql/03_reset-accounts-state.sh
* master: feat: add ix integration test suite (#5)
Summary
Adds an
ix-testsworkspace crate for running local end-to-end integration scenarios against the account update pipeline. The suite drives validator/plugin setup, gRPC service instances, Kafka state, and Solana account changes, then validates observed updates through checkpoint-based expectations.The branch also hardens
grpc-servicestartup/readiness and shutdown behavior so the new scenarios can reliably distinguish service readiness from transient bootstrapping, Kafka assignment, and preflight failures.Details
ix-tests
Adds a runnable integration test harness with configurable scenarios, named accounts, generated per-run keypairs, service process control, observation logging, and persisted failure artifacts under
target/ix-tests/failures/.The initial scenarios cover single-service basics, load, triage, dual-service concurrency, and dual-service restart flows. Checkpoints are matched against required observations so the tests can verify account update behavior without depending on every incidental event in the stream.
grpc-service
Adds explicit service readiness tracking so
Pingonly reports success once startup preflight and Kafka consumer assignment have completed. Startup now probes required dependencies, reports clearer preflight errors, and supports cooperative shutdown via cancellation tokens.Kafka consumption now exposes assignment/readiness state and has additional logging around startup and rebalance behavior, which makes integration failures easier to diagnose.
geyser-plugin and local workflows
Updates local plugin and Kafka setup workflows used by the integration suite, including reset helpers for Kafka/ksqlDB state and Makefile targets for building and running the full suite or an individual scenario.
The README now documents the new
ix-testscrate and the expected local workflow for running scenarios.Summary by CodeRabbit
New Features
Improvements
Documentation