Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion tools/talis/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,16 +350,28 @@ func CreateAWSInstances(ctx context.Context, insts []Instance, sshKey, keyName s
close(results)
}()

var created []Instance
var (
created []Instance
failures []string
)
for res := range results {
if res.err != nil {
fmt.Printf("❌ %s failed after %v %v\n", res.inst.Name, res.timeRequired, res.err)
failures = append(failures, fmt.Sprintf("%s: %v", res.inst.Name, res.err))
} else {
created = append(created, res.inst)
fmt.Printf("✅ %s is up (public=%s) in %v\n", res.inst.Name, res.inst.PublicIP, res.timeRequired)
}
fmt.Printf("---- Progress: %d/%d\n", len(created), total)
}
if len(failures) > 0 {
// Surface partial-failure as an error so `talis up` exits
// non-zero; without this, downstream genesis runs against a
// half-provisioned config and fails much later with confusing
// "X has no public IP yet" messages.
return created, fmt.Errorf("%d/%d instance(s) failed to launch: %s",
len(failures), total, strings.Join(failures, "; "))
}
return created, nil
}

Expand Down
23 changes: 22 additions & 1 deletion tools/talis/cmd/evnode-txsim/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"fmt"
"io"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"sort"
Expand Down Expand Up @@ -113,7 +114,27 @@ func run(cli cliFlags) error {
return fmt.Errorf("seed random pool: %w", err)
}

httpClient := &http.Client{Timeout: cli.timeout}
// Bump per-host idle connections so concurrent goroutines reuse
// keep-alive sockets instead of churning TCP+TLS handshakes —
// stdlib default MaxIdleConnsPerHost=2 caps in-flight requests
// to 2 keep-alive sockets per target, which serializes any
// concurrency>2 onto fresh connections each request.
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.MaxIdleConns = 2 * cli.concurrency
transport.MaxIdleConnsPerHost = 2 * cli.concurrency
transport.MaxConnsPerHost = 2 * cli.concurrency
httpClient := &http.Client{Timeout: cli.timeout, Transport: transport}

// pprof on a dedicated listener — `_ "net/http/pprof"` registers
// handlers on http.DefaultServeMux. Always-on at 127.0.0.1:6060
// since this is a load-tester binary, not a production daemon;
// SSH port-forward to grab profiles under load:
//
// ssh -L 6060:127.0.0.1:6060 root@loadgen \
// go tool pprof http://localhost:6060/debug/pprof/profile?seconds=10
go func() {
_ = http.ListenAndServe("127.0.0.1:6060", nil)
}()

ctx, cancel := context.WithCancel(context.Background())
if cli.duration > 0 {
Expand Down
14 changes: 12 additions & 2 deletions tools/talis/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,16 +193,26 @@ func compressAndDownload(table, localPath, user, host, sshKeyPath string) error
return nil
}

// sshExec runs a command on a remote host via SSH and returns the combined output.
// sshExec runs a command on a remote host via SSH and returns stdout only.
//
// We intentionally do NOT use CombinedOutput here. ssh prints connection
// chatter ("Warning: Permanently added '...' to the list of known hosts.")
// on stderr, and a previous `CombinedOutput` revision caused
// `fmt.Sscanf(out, "%d")` parses to silently return 0 because the leading
// stderr line had no digits. Capturing only stdout keeps numeric output
// parseable; -q + LogLevel=ERROR further suppresses the chatter for any
// caller that does combine streams.
func sshExec(user, host, sshKeyPath, command string) ([]byte, error) {
cmd := exec.Command("ssh",
"-q",
"-o", "LogLevel=ERROR",
"-o", "StrictHostKeyChecking=no",
"-o", "UserKnownHostsFile=/dev/null",
"-i", sshKeyPath,
fmt.Sprintf("%s@%s", user, host),
command,
)
return cmd.CombinedOutput()
return cmd.Output()
}

func sftpDownload(remotePath, localPath, user, host, sshKeyPath string) error {
Expand Down
34 changes: 29 additions & 5 deletions tools/talis/fibre_bootstrap_evnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,21 +125,45 @@ to fetch, then SCPs to each evnode-*.`,
defer wg.Done()
log.Printf("[%s] pushing JWT + keyring", ev.Name)

// JWT is small + atomic on the receive side because
// it's a single file, so we push it directly.
if err := scpToRemote(sshUser, ev.PublicIP, sshKeyPath, localJWT, "/root/bridge-jwt.txt", false); err != nil {
errCh <- fmt.Errorf("[%s] push JWT: %w", ev.Name, err)
return
}

// mkdir the parent so scp lands at the exact path
// evnode_init.sh waits for.
if _, err := sshExec(sshUser, ev.PublicIP, sshKeyPath, "mkdir -p /root/keyring-fibre && rm -rf /root/keyring-fibre/keyring-test"); err != nil {
errCh <- fmt.Errorf("[%s] mkdir keyring-fibre: %w", ev.Name, err)
// Keyring push is staged through a tmp dir and
// promoted via mv. Without staging, evnode_init.sh's
// poll loop (which tests `[ -d keyring-test ]`)
// passes the moment scp -r mkdir's the directory,
// long before fibre-0.info is on disk. evnode then
// launches mid-scp and dies with `keyring entry
// "fibre-0" not found`. mv is atomic on the same
// filesystem so the init script either sees nothing
// (keep waiting) or the fully-populated dir (start
// the daemon cleanly).
stageDir := "/root/.keyring-fibre.staging"
prep := fmt.Sprintf(
"rm -rf %s && mkdir -p %s && mkdir -p /root/keyring-fibre && rm -rf /root/keyring-fibre/keyring-test",
stageDir, stageDir,
)
if _, err := sshExec(sshUser, ev.PublicIP, sshKeyPath, prep); err != nil {
errCh <- fmt.Errorf("[%s] stage keyring: %w", ev.Name, err)
return
}
if err := scpToRemote(sshUser, ev.PublicIP, sshKeyPath, filepath.Join(localKeyringRoot, "keyring-test"), "/root/keyring-fibre/keyring-test", true); err != nil {
stageDest := stageDir + "/keyring-test"
if err := scpToRemote(sshUser, ev.PublicIP, sshKeyPath, filepath.Join(localKeyringRoot, "keyring-test"), stageDest, true); err != nil {
errCh <- fmt.Errorf("[%s] push keyring: %w", ev.Name, err)
return
}
promote := fmt.Sprintf(
"mv %s /root/keyring-fibre/keyring-test && rmdir %s",
stageDest, stageDir,
)
if _, err := sshExec(sshUser, ev.PublicIP, sshKeyPath, promote); err != nil {
errCh <- fmt.Errorf("[%s] promote keyring: %w", ev.Name, err)
return
}

log.Printf("[%s] ✓ pushed; daemon should start within ~10s", ev.Name)
}(ev)
Expand Down
89 changes: 89 additions & 0 deletions tools/talis/fibre_experiment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package main

import (
"fmt"
"os"
"os/exec"

"github.com/spf13/cobra"
)

// fibreExperimentCmd is the one-command driver for the Fibre throughput
// experiment. It assumes the operator has already populated the
// experiment directory with a config.json + scripts/ + base config.toml +
// app.toml (i.e. ran `talis init` + `talis add` for validators / bridge
// / evnode / loadgen) and that build artefacts are at $rootDir/build.
//
// It then invokes — in order — the same subcommands the operator would
// run by hand:
//
// 1. up — provision instances
// 2. genesis -b <build> — stage validator/bridge/evnode/loadgen payloads
// 3. deploy — ship payloads + start init scripts
// 4. setup-fibre — register host + deposit escrow on each validator
// 5. start-fibre — launch the fibre server on each validator
// 6. fibre-bootstrap-evnode — scp bridge JWT + fibre keyring onto evnode-*
//
// Each step is invoked via os/exec on the running binary. Any failure
// surfaces immediately; nothing is retried at this layer (the
// individual subcommands handle their own waits + retries).
//
// After step 6 returns, evnode-* daemons start within ~10 s and the
// load-gen's init script auto-launches evnode-txsim. The operator
// reads the final TXSIM: line from the load-gen.
func fibreExperimentCmd() *cobra.Command {
var (
rootDir string
buildDir string
)

cmd := &cobra.Command{
Use: "fibre-experiment",
Short: "End-to-end driver: up → genesis → deploy → setup-fibre → start-fibre → fibre-bootstrap-evnode",
Long: `Run every step needed to bring up a Fibre throughput experiment from a
prepared root directory. Equivalent to invoking each subcommand in
sequence; included so the operator doesn't have to remember the order
or watch for inter-step races.`,
RunE: func(cmd *cobra.Command, args []string) error {
self, err := os.Executable()
if err != nil {
return fmt.Errorf("locate own binary: %w", err)
}

steps := []struct {
name string
args []string
}{
{"up", []string{"up", "-d", rootDir}},
{"genesis", []string{"genesis", "-d", rootDir, "-b", buildDir}},
{"deploy", []string{"deploy", "-d", rootDir}},
{"setup-fibre", []string{"setup-fibre", "-d", rootDir}},
{"start-fibre", []string{"start-fibre", "-d", rootDir}},
{"fibre-bootstrap-evnode", []string{"fibre-bootstrap-evnode", "-d", rootDir}},
}

for _, s := range steps {
fmt.Printf("\n=== talis %s ===\n", s.name)
c := exec.Command(self, s.args...)
c.Stdout = os.Stdout
c.Stderr = os.Stderr
c.Env = os.Environ()
if err := c.Run(); err != nil {
return fmt.Errorf("step %q failed: %w", s.name, err)
}
}

fmt.Println()
fmt.Println("=== fibre-experiment complete ===")
fmt.Println("evnode aggregator(s) start within ~10 s and load-gen init")
fmt.Println("scripts auto-launch evnode-txsim once evnode's /stats responds.")
fmt.Println("Final TXSIM: line lands at /root/txsim.log on each load-gen host.")
return nil
},
}

cmd.Flags().StringVarP(&rootDir, "directory", "d", ".", "experiment root directory")
cmd.Flags().StringVarP(&buildDir, "build-dir", "b", "./build", "directory containing the cross-compiled linux/amd64 binaries")

return cmd
}
126 changes: 121 additions & 5 deletions tools/talis/fibre_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,104 @@ func setupFibreCmd() *cobra.Command {
// Build script: register host + deposit escrow for validator + all fibre accounts
var sb strings.Builder

// 0. Block until the chain has produced at least one block.
// Without this, the very next tx returns
// `celestia-app is not ready; please wait for first block`
// from the local node — the call appears to succeed at
// the CLI level (`--yes` returns the txhash before block
// inclusion), but the tx never lands. Polling explicitly
// avoids the `sleep 10` heuristic that used to be here.
sb.WriteString(
"echo 'waiting for chain to produce first block...'\n" +
"DEADLINE=$(( $(date +%s) + 300 ))\n" +
"while true; do\n" +
" H=$(celestia-appd status 2>/dev/null | " +
" grep -oE '\"latest_block_height\":\"[0-9]+\"' | " +
" grep -oE '[0-9]+' | head -1)\n" +
" if [ -n \"$H\" ] && [ \"$H\" -gt 0 ]; then\n" +
" echo \"chain is at height $H\"\n" +
" break\n" +
" fi\n" +
" if [ $(date +%s) -gt $DEADLINE ]; then\n" +
" echo 'FATAL: chain never produced a block within 5m' >&2\n" +
" exit 1\n" +
" fi\n" +
" sleep 3\n" +
"done\n",
)

// 1. Register fibre host address. Plain `host:port` form —
// x/valaddr requires it; the gRPC client dials it via the
// passthrough resolver. Don't prefix `dns:///` here.
//
// Retry until `query valaddr providers` shows OUR host
// — `--yes` returns the txhash before inclusion, so a
// single one-shot call can succeed at the RPC layer
// while the chain rejects the tx (mempool full, signer
// not yet in validator set, …) and we'd never know.
// 5-minute deadline so a stuck chain doesn't loop
// forever.
sb.WriteString(fmt.Sprintf(
"celestia-appd tx valaddr set-host %s:%d "+
"HOST=%s:%d\n"+
"DEADLINE=$(( $(date +%%s) + 300 ))\n"+
"while true; do\n"+
" celestia-appd tx valaddr set-host \"$HOST\" "+
"--from validator --keyring-backend=test --home .celestia-app "+
"--chain-id %s --fees %s --yes\n",
"--chain-id %s --fees %s --yes >/dev/null 2>&1 || true\n"+
" sleep 6\n"+
" if celestia-appd query valaddr providers --chain-id %s -o json 2>/dev/null \\\n"+
" | grep -q \"\\\"host\\\": *\\\"$HOST\\\"\"; then\n"+
" echo \"set-host confirmed: $HOST\"\n"+
" break\n"+
" fi\n"+
" if [ $(date +%%s) -gt $DEADLINE ]; then\n"+
" echo \"FATAL: set-host did not register $HOST after 5m\" >&2\n"+
" exit 1\n"+
" fi\n"+
" echo 'set-host pending, retrying...'\n"+
"done\n",
val.PublicIP, fibrePort,
cfg.ChainID, fees,
cfg.ChainID,
))

// 2. Deposit escrow for fibre-0 inside a retry loop.
// Same silent-failure mode as set-host: `--yes` returns
// the txhash before inclusion, so a single bounced tx
// (mempool full, signer not yet propagated, …) leaves
// the runner failing every upload with
// `escrow account not found for signer …`. fibre-0 is
// the one the runner actually signs with by default,
// so it's the only one we hard-block on.
sb.WriteString(fmt.Sprintf(
"FIBRE0_ADDR=$(celestia-appd keys show fibre-0 --keyring-backend test --home .celestia-app -a)\n"+
"DEADLINE=$(( $(date +%%s) + 300 ))\n"+
"while true; do\n"+
" celestia-appd tx fibre deposit-to-escrow %s "+
"--from fibre-0 --keyring-backend=test --home .celestia-app "+
"--chain-id %s --fees %s --yes >/dev/null 2>&1 || true\n"+
" sleep 6\n"+
" if celestia-appd query fibre escrow-account \"$FIBRE0_ADDR\" --chain-id %s -o json 2>/dev/null \\\n"+
" | grep -q '\"found\":true'; then\n"+
" echo \"escrow confirmed for fibre-0 ($FIBRE0_ADDR)\"\n"+
" break\n"+
" fi\n"+
" if [ $(date +%%s) -gt $DEADLINE ]; then\n"+
" echo \"FATAL: fibre-0 escrow did not land after 5m\" >&2\n"+
" exit 1\n"+
" fi\n"+
" echo 'fibre-0 escrow pending, retrying...'\n"+
"done\n",
escrowAmount,
cfg.ChainID, fees,
cfg.ChainID,
))
sb.WriteString("sleep 10\n")

// 2. Deposit escrow for each fibre worker account
for i := range fibreAccounts {
// 3. Best-effort fund fibre-1..N. The runner only signs
// with fibre-0 by default, so a missing one of these
// doesn't block uploads — they exist as headroom for
// future signer rotation.
for i := 1; i < fibreAccounts; i++ {
keyName := fmt.Sprintf("fibre-%d", i)
sb.WriteString(fmt.Sprintf(
"celestia-appd tx fibre deposit-to-escrow %s "+
Expand Down Expand Up @@ -103,6 +187,38 @@ func setupFibreCmd() *cobra.Command {
if err := waitForTmuxSessions(cfg.Validators, resolvedSSHKeyPath, SetupFibreSessionName, 10*time.Minute); err != nil {
return fmt.Errorf("waiting for setup-fibre sessions: %w", err)
}

// CLI-side verification that every validator's host is on
// the chain's provider list before we hand off to start-
// fibre / fibre-bootstrap-evnode. The per-validator script
// above already self-verifies its own host, but we
// re-check here from a single vantage point so a
// concurrent set-host race across validators surfaces
// before downstream steps cache an empty registry.
if len(cfg.Validators) > 0 {
expected := len(cfg.Validators)
queryHost := cfg.Validators[0].PublicIP
queryCmd := fmt.Sprintf(
"celestia-appd query valaddr providers --chain-id %s -o json 2>/dev/null | grep -o '\"host\"' | wc -l",
cfg.ChainID,
)
deadline := time.Now().Add(5 * time.Minute)
for {
out, err := sshExec("root", queryHost, resolvedSSHKeyPath, queryCmd)
if err == nil {
count := 0
_, _ = fmt.Sscanf(strings.TrimSpace(string(out)), "%d", &count)
if count >= expected {
break
}
fmt.Printf(" valaddr providers: %d/%d registered, retrying...\n", count, expected)
}
if time.Now().After(deadline) {
return fmt.Errorf("only some validators registered as fibre providers within 5m — re-run setup-fibre")
}
time.Sleep(5 * time.Second)
}
}
fmt.Println("Validator setup done!")

// Deposit escrow for encoder accounts.
Expand Down
Loading
Loading