diff --git a/tools/talis/aws.go b/tools/talis/aws.go index 249dc89bbd..afcef6ebaa 100644 --- a/tools/talis/aws.go +++ b/tools/talis/aws.go @@ -350,16 +350,28 @@ func CreateAWSInstances(ctx context.Context, insts []Instance, sshKey, keyName s close(results) }() - var created []Instance + var ( + created []Instance + failures []string + ) for res := range results { if res.err != nil { fmt.Printf("❌ %s failed after %v %v\n", res.inst.Name, res.timeRequired, res.err) + failures = append(failures, fmt.Sprintf("%s: %v", res.inst.Name, res.err)) } else { created = append(created, res.inst) fmt.Printf("✅ %s is up (public=%s) in %v\n", res.inst.Name, res.inst.PublicIP, res.timeRequired) } fmt.Printf("---- Progress: %d/%d\n", len(created), total) } + if len(failures) > 0 { + // Surface partial-failure as an error so `talis up` exits + // non-zero; without this, downstream genesis runs against a + // half-provisioned config and fails much later with confusing + // "X has no public IP yet" messages. + return created, fmt.Errorf("%d/%d instance(s) failed to launch: %s", + len(failures), total, strings.Join(failures, "; ")) + } return created, nil } diff --git a/tools/talis/cmd/evnode-txsim/main.go b/tools/talis/cmd/evnode-txsim/main.go index d8fd1a9435..39edd66fd1 100644 --- a/tools/talis/cmd/evnode-txsim/main.go +++ b/tools/talis/cmd/evnode-txsim/main.go @@ -26,6 +26,7 @@ import ( "fmt" "io" "net/http" + _ "net/http/pprof" "os" "os/signal" "sort" @@ -113,7 +114,27 @@ func run(cli cliFlags) error { return fmt.Errorf("seed random pool: %w", err) } - httpClient := &http.Client{Timeout: cli.timeout} + // Bump per-host idle connections so concurrent goroutines reuse + // keep-alive sockets instead of churning TCP+TLS handshakes — + // stdlib default MaxIdleConnsPerHost=2 caps in-flight requests + // to 2 keep-alive sockets per target, which serializes any + // concurrency>2 onto fresh connections each request. + transport := http.DefaultTransport.(*http.Transport).Clone() + transport.MaxIdleConns = 2 * cli.concurrency + transport.MaxIdleConnsPerHost = 2 * cli.concurrency + transport.MaxConnsPerHost = 2 * cli.concurrency + httpClient := &http.Client{Timeout: cli.timeout, Transport: transport} + + // pprof on a dedicated listener — `_ "net/http/pprof"` registers + // handlers on http.DefaultServeMux. Always-on at 127.0.0.1:6060 + // since this is a load-tester binary, not a production daemon; + // SSH port-forward to grab profiles under load: + // + // ssh -L 6060:127.0.0.1:6060 root@loadgen \ + // go tool pprof http://localhost:6060/debug/pprof/profile?seconds=10 + go func() { + _ = http.ListenAndServe("127.0.0.1:6060", nil) + }() ctx, cancel := context.WithCancel(context.Background()) if cli.duration > 0 { diff --git a/tools/talis/download.go b/tools/talis/download.go index 99284d0326..ddff69e102 100644 --- a/tools/talis/download.go +++ b/tools/talis/download.go @@ -193,16 +193,26 @@ func compressAndDownload(table, localPath, user, host, sshKeyPath string) error return nil } -// sshExec runs a command on a remote host via SSH and returns the combined output. +// sshExec runs a command on a remote host via SSH and returns stdout only. +// +// We intentionally do NOT use CombinedOutput here. ssh prints connection +// chatter ("Warning: Permanently added '...' to the list of known hosts.") +// on stderr, and a previous `CombinedOutput` revision caused +// `fmt.Sscanf(out, "%d")` parses to silently return 0 because the leading +// stderr line had no digits. Capturing only stdout keeps numeric output +// parseable; -q + LogLevel=ERROR further suppresses the chatter for any +// caller that does combine streams. func sshExec(user, host, sshKeyPath, command string) ([]byte, error) { cmd := exec.Command("ssh", + "-q", + "-o", "LogLevel=ERROR", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", "-i", sshKeyPath, fmt.Sprintf("%s@%s", user, host), command, ) - return cmd.CombinedOutput() + return cmd.Output() } func sftpDownload(remotePath, localPath, user, host, sshKeyPath string) error { diff --git a/tools/talis/fibre_bootstrap_evnode.go b/tools/talis/fibre_bootstrap_evnode.go index 5df12d41ae..c8d76822b3 100644 --- a/tools/talis/fibre_bootstrap_evnode.go +++ b/tools/talis/fibre_bootstrap_evnode.go @@ -125,21 +125,45 @@ to fetch, then SCPs to each evnode-*.`, defer wg.Done() log.Printf("[%s] pushing JWT + keyring", ev.Name) + // JWT is small + atomic on the receive side because + // it's a single file, so we push it directly. if err := scpToRemote(sshUser, ev.PublicIP, sshKeyPath, localJWT, "/root/bridge-jwt.txt", false); err != nil { errCh <- fmt.Errorf("[%s] push JWT: %w", ev.Name, err) return } - // mkdir the parent so scp lands at the exact path - // evnode_init.sh waits for. - if _, err := sshExec(sshUser, ev.PublicIP, sshKeyPath, "mkdir -p /root/keyring-fibre && rm -rf /root/keyring-fibre/keyring-test"); err != nil { - errCh <- fmt.Errorf("[%s] mkdir keyring-fibre: %w", ev.Name, err) + // Keyring push is staged through a tmp dir and + // promoted via mv. Without staging, evnode_init.sh's + // poll loop (which tests `[ -d keyring-test ]`) + // passes the moment scp -r mkdir's the directory, + // long before fibre-0.info is on disk. evnode then + // launches mid-scp and dies with `keyring entry + // "fibre-0" not found`. mv is atomic on the same + // filesystem so the init script either sees nothing + // (keep waiting) or the fully-populated dir (start + // the daemon cleanly). + stageDir := "/root/.keyring-fibre.staging" + prep := fmt.Sprintf( + "rm -rf %s && mkdir -p %s && mkdir -p /root/keyring-fibre && rm -rf /root/keyring-fibre/keyring-test", + stageDir, stageDir, + ) + if _, err := sshExec(sshUser, ev.PublicIP, sshKeyPath, prep); err != nil { + errCh <- fmt.Errorf("[%s] stage keyring: %w", ev.Name, err) return } - if err := scpToRemote(sshUser, ev.PublicIP, sshKeyPath, filepath.Join(localKeyringRoot, "keyring-test"), "/root/keyring-fibre/keyring-test", true); err != nil { + stageDest := stageDir + "/keyring-test" + if err := scpToRemote(sshUser, ev.PublicIP, sshKeyPath, filepath.Join(localKeyringRoot, "keyring-test"), stageDest, true); err != nil { errCh <- fmt.Errorf("[%s] push keyring: %w", ev.Name, err) return } + promote := fmt.Sprintf( + "mv %s /root/keyring-fibre/keyring-test && rmdir %s", + stageDest, stageDir, + ) + if _, err := sshExec(sshUser, ev.PublicIP, sshKeyPath, promote); err != nil { + errCh <- fmt.Errorf("[%s] promote keyring: %w", ev.Name, err) + return + } log.Printf("[%s] ✓ pushed; daemon should start within ~10s", ev.Name) }(ev) diff --git a/tools/talis/fibre_experiment.go b/tools/talis/fibre_experiment.go new file mode 100644 index 0000000000..585992d5fe --- /dev/null +++ b/tools/talis/fibre_experiment.go @@ -0,0 +1,89 @@ +package main + +import ( + "fmt" + "os" + "os/exec" + + "github.com/spf13/cobra" +) + +// fibreExperimentCmd is the one-command driver for the Fibre throughput +// experiment. It assumes the operator has already populated the +// experiment directory with a config.json + scripts/ + base config.toml + +// app.toml (i.e. ran `talis init` + `talis add` for validators / bridge +// / evnode / loadgen) and that build artefacts are at $rootDir/build. +// +// It then invokes — in order — the same subcommands the operator would +// run by hand: +// +// 1. up — provision instances +// 2. genesis -b — stage validator/bridge/evnode/loadgen payloads +// 3. deploy — ship payloads + start init scripts +// 4. setup-fibre — register host + deposit escrow on each validator +// 5. start-fibre — launch the fibre server on each validator +// 6. fibre-bootstrap-evnode — scp bridge JWT + fibre keyring onto evnode-* +// +// Each step is invoked via os/exec on the running binary. Any failure +// surfaces immediately; nothing is retried at this layer (the +// individual subcommands handle their own waits + retries). +// +// After step 6 returns, evnode-* daemons start within ~10 s and the +// load-gen's init script auto-launches evnode-txsim. The operator +// reads the final TXSIM: line from the load-gen. +func fibreExperimentCmd() *cobra.Command { + var ( + rootDir string + buildDir string + ) + + cmd := &cobra.Command{ + Use: "fibre-experiment", + Short: "End-to-end driver: up → genesis → deploy → setup-fibre → start-fibre → fibre-bootstrap-evnode", + Long: `Run every step needed to bring up a Fibre throughput experiment from a +prepared root directory. Equivalent to invoking each subcommand in +sequence; included so the operator doesn't have to remember the order +or watch for inter-step races.`, + RunE: func(cmd *cobra.Command, args []string) error { + self, err := os.Executable() + if err != nil { + return fmt.Errorf("locate own binary: %w", err) + } + + steps := []struct { + name string + args []string + }{ + {"up", []string{"up", "-d", rootDir}}, + {"genesis", []string{"genesis", "-d", rootDir, "-b", buildDir}}, + {"deploy", []string{"deploy", "-d", rootDir}}, + {"setup-fibre", []string{"setup-fibre", "-d", rootDir}}, + {"start-fibre", []string{"start-fibre", "-d", rootDir}}, + {"fibre-bootstrap-evnode", []string{"fibre-bootstrap-evnode", "-d", rootDir}}, + } + + for _, s := range steps { + fmt.Printf("\n=== talis %s ===\n", s.name) + c := exec.Command(self, s.args...) + c.Stdout = os.Stdout + c.Stderr = os.Stderr + c.Env = os.Environ() + if err := c.Run(); err != nil { + return fmt.Errorf("step %q failed: %w", s.name, err) + } + } + + fmt.Println() + fmt.Println("=== fibre-experiment complete ===") + fmt.Println("evnode aggregator(s) start within ~10 s and load-gen init") + fmt.Println("scripts auto-launch evnode-txsim once evnode's /stats responds.") + fmt.Println("Final TXSIM: line lands at /root/txsim.log on each load-gen host.") + return nil + }, + } + + cmd.Flags().StringVarP(&rootDir, "directory", "d", ".", "experiment root directory") + cmd.Flags().StringVarP(&buildDir, "build-dir", "b", "./build", "directory containing the cross-compiled linux/amd64 binaries") + + return cmd +} diff --git a/tools/talis/fibre_setup.go b/tools/talis/fibre_setup.go index 28f7115b80..382e780c50 100644 --- a/tools/talis/fibre_setup.go +++ b/tools/talis/fibre_setup.go @@ -51,20 +51,104 @@ func setupFibreCmd() *cobra.Command { // Build script: register host + deposit escrow for validator + all fibre accounts var sb strings.Builder + // 0. Block until the chain has produced at least one block. + // Without this, the very next tx returns + // `celestia-app is not ready; please wait for first block` + // from the local node — the call appears to succeed at + // the CLI level (`--yes` returns the txhash before block + // inclusion), but the tx never lands. Polling explicitly + // avoids the `sleep 10` heuristic that used to be here. + sb.WriteString( + "echo 'waiting for chain to produce first block...'\n" + + "DEADLINE=$(( $(date +%s) + 300 ))\n" + + "while true; do\n" + + " H=$(celestia-appd status 2>/dev/null | " + + " grep -oE '\"latest_block_height\":\"[0-9]+\"' | " + + " grep -oE '[0-9]+' | head -1)\n" + + " if [ -n \"$H\" ] && [ \"$H\" -gt 0 ]; then\n" + + " echo \"chain is at height $H\"\n" + + " break\n" + + " fi\n" + + " if [ $(date +%s) -gt $DEADLINE ]; then\n" + + " echo 'FATAL: chain never produced a block within 5m' >&2\n" + + " exit 1\n" + + " fi\n" + + " sleep 3\n" + + "done\n", + ) + // 1. Register fibre host address. Plain `host:port` form — // x/valaddr requires it; the gRPC client dials it via the // passthrough resolver. Don't prefix `dns:///` here. + // + // Retry until `query valaddr providers` shows OUR host + // — `--yes` returns the txhash before inclusion, so a + // single one-shot call can succeed at the RPC layer + // while the chain rejects the tx (mempool full, signer + // not yet in validator set, …) and we'd never know. + // 5-minute deadline so a stuck chain doesn't loop + // forever. sb.WriteString(fmt.Sprintf( - "celestia-appd tx valaddr set-host %s:%d "+ + "HOST=%s:%d\n"+ + "DEADLINE=$(( $(date +%%s) + 300 ))\n"+ + "while true; do\n"+ + " celestia-appd tx valaddr set-host \"$HOST\" "+ "--from validator --keyring-backend=test --home .celestia-app "+ - "--chain-id %s --fees %s --yes\n", + "--chain-id %s --fees %s --yes >/dev/null 2>&1 || true\n"+ + " sleep 6\n"+ + " if celestia-appd query valaddr providers --chain-id %s -o json 2>/dev/null \\\n"+ + " | grep -q \"\\\"host\\\": *\\\"$HOST\\\"\"; then\n"+ + " echo \"set-host confirmed: $HOST\"\n"+ + " break\n"+ + " fi\n"+ + " if [ $(date +%%s) -gt $DEADLINE ]; then\n"+ + " echo \"FATAL: set-host did not register $HOST after 5m\" >&2\n"+ + " exit 1\n"+ + " fi\n"+ + " echo 'set-host pending, retrying...'\n"+ + "done\n", val.PublicIP, fibrePort, cfg.ChainID, fees, + cfg.ChainID, + )) + + // 2. Deposit escrow for fibre-0 inside a retry loop. + // Same silent-failure mode as set-host: `--yes` returns + // the txhash before inclusion, so a single bounced tx + // (mempool full, signer not yet propagated, …) leaves + // the runner failing every upload with + // `escrow account not found for signer …`. fibre-0 is + // the one the runner actually signs with by default, + // so it's the only one we hard-block on. + sb.WriteString(fmt.Sprintf( + "FIBRE0_ADDR=$(celestia-appd keys show fibre-0 --keyring-backend test --home .celestia-app -a)\n"+ + "DEADLINE=$(( $(date +%%s) + 300 ))\n"+ + "while true; do\n"+ + " celestia-appd tx fibre deposit-to-escrow %s "+ + "--from fibre-0 --keyring-backend=test --home .celestia-app "+ + "--chain-id %s --fees %s --yes >/dev/null 2>&1 || true\n"+ + " sleep 6\n"+ + " if celestia-appd query fibre escrow-account \"$FIBRE0_ADDR\" --chain-id %s -o json 2>/dev/null \\\n"+ + " | grep -q '\"found\":true'; then\n"+ + " echo \"escrow confirmed for fibre-0 ($FIBRE0_ADDR)\"\n"+ + " break\n"+ + " fi\n"+ + " if [ $(date +%%s) -gt $DEADLINE ]; then\n"+ + " echo \"FATAL: fibre-0 escrow did not land after 5m\" >&2\n"+ + " exit 1\n"+ + " fi\n"+ + " echo 'fibre-0 escrow pending, retrying...'\n"+ + "done\n", + escrowAmount, + cfg.ChainID, fees, + cfg.ChainID, )) - sb.WriteString("sleep 10\n") - // 2. Deposit escrow for each fibre worker account - for i := range fibreAccounts { + // 3. Best-effort fund fibre-1..N. The runner only signs + // with fibre-0 by default, so a missing one of these + // doesn't block uploads — they exist as headroom for + // future signer rotation. + for i := 1; i < fibreAccounts; i++ { keyName := fmt.Sprintf("fibre-%d", i) sb.WriteString(fmt.Sprintf( "celestia-appd tx fibre deposit-to-escrow %s "+ @@ -103,6 +187,38 @@ func setupFibreCmd() *cobra.Command { if err := waitForTmuxSessions(cfg.Validators, resolvedSSHKeyPath, SetupFibreSessionName, 10*time.Minute); err != nil { return fmt.Errorf("waiting for setup-fibre sessions: %w", err) } + + // CLI-side verification that every validator's host is on + // the chain's provider list before we hand off to start- + // fibre / fibre-bootstrap-evnode. The per-validator script + // above already self-verifies its own host, but we + // re-check here from a single vantage point so a + // concurrent set-host race across validators surfaces + // before downstream steps cache an empty registry. + if len(cfg.Validators) > 0 { + expected := len(cfg.Validators) + queryHost := cfg.Validators[0].PublicIP + queryCmd := fmt.Sprintf( + "celestia-appd query valaddr providers --chain-id %s -o json 2>/dev/null | grep -o '\"host\"' | wc -l", + cfg.ChainID, + ) + deadline := time.Now().Add(5 * time.Minute) + for { + out, err := sshExec("root", queryHost, resolvedSSHKeyPath, queryCmd) + if err == nil { + count := 0 + _, _ = fmt.Sscanf(strings.TrimSpace(string(out)), "%d", &count) + if count >= expected { + break + } + fmt.Printf(" valaddr providers: %d/%d registered, retrying...\n", count, expected) + } + if time.Now().After(deadline) { + return fmt.Errorf("only some validators registered as fibre providers within 5m — re-run setup-fibre") + } + time.Sleep(5 * time.Second) + } + } fmt.Println("Validator setup done!") // Deposit escrow for encoder accounts. diff --git a/tools/talis/genesis.go b/tools/talis/genesis.go index 9a4f481a81..e1b229efaa 100644 --- a/tools/talis/genesis.go +++ b/tools/talis/genesis.go @@ -558,9 +558,17 @@ mkdir -p "$EVNODE_HOME" # 2. /root/keyring-fibre/keyring-test cosmos-sdk file keyring with # a Fibre payment account # Without them the daemon would crash immediately on startup. -echo "Waiting for $BRIDGE_JWT_FILE and $FIBRE_KEYRING_DIR..." +# +# We check for the *specific* fibre-0.info file (not just the +# keyring-test directory) because a non-atomic scp -r would create +# the directory before transferring its contents — so testing -d +# alone passes mid-scp and the daemon would launch with an empty +# keyring. fibre-bootstrap-evnode now also stages-and-mvs so this +# check should never see a partial state, but keep the file-level +# guard as a defence-in-depth. +echo "Waiting for $BRIDGE_JWT_FILE and $FIBRE_KEYRING_DIR/keyring-test/fibre-0.info..." WAITED=0 -until [ -s "$BRIDGE_JWT_FILE" ] && [ -d "$FIBRE_KEYRING_DIR/keyring-test" ]; do +until [ -s "$BRIDGE_JWT_FILE" ] && [ -f "$FIBRE_KEYRING_DIR/keyring-test/fibre-0.info" ]; do sleep 5 WAITED=$((WAITED + 5)) if [ $((WAITED % 60)) -eq 0 ]; then diff --git a/tools/talis/main.go b/tools/talis/main.go index 016b1ee340..899fe8cf54 100644 --- a/tools/talis/main.go +++ b/tools/talis/main.go @@ -37,6 +37,7 @@ func main() { fibreTxsimCmd(), fibreThroughputCmd(), fibreBootstrapEvnodeCmd(), + fibreExperimentCmd(), resourceMonitorCmd(), downloadResourcesCmd(), syncNodeCmd(),