From ac2c4e5c3bc9df0ce0012faa196aa15a3abcab24 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Fri, 1 May 2026 17:21:37 +0200 Subject: [PATCH 1/3] feat(fibre): log per-Submit upload duration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Fibre Submit path was opaque: failures showed up as DeadlineExceeded with no signal of how long the upload actually took, and successes only logged at debug level inside the upstream library. During load-test debugging this turned into a guessing game — was the cluster slow, the deadline too tight, or something stuck mid-RPC? Add a single info-level (warn-on-failure) log line in fiberDAClient.Submit covering the Upload call: duration, flat blob bytes, blob count. Cheap (one time.Since) and gives the operator concrete numbers — e.g. "17 blobs / 115 MiB / 1.5 s" — to reason about whether RPCTimeout, pending cap, or batch sizing is the right knob to turn next. --- block/internal/da/fiber_client.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/block/internal/da/fiber_client.go b/block/internal/da/fiber_client.go index 3a4f9eb754..31df5603a2 100644 --- a/block/internal/da/fiber_client.go +++ b/block/internal/da/fiber_client.go @@ -89,7 +89,23 @@ func (c *fiberDAClient) Submit(ctx context.Context, data [][]byte, _ float64, na flat := flattenBlobs(data) nsID := namespace[len(namespace)-10:] + uploadStart := time.Now() result, err := c.fiber.Upload(context.Background(), nsID, flat) + uploadDuration := time.Since(uploadStart) + if err != nil { + c.logger.Warn(). + Dur("duration", uploadDuration). + Int("flat_size", len(flat)). + Int("blob_count", len(data)). + Err(err). + Msg("fiber upload duration (failed)") + } else { + c.logger.Info(). + Dur("duration", uploadDuration). + Int("flat_size", len(flat)). + Int("blob_count", len(data)). + Msg("fiber upload duration (ok)") + } if err != nil { code := datypes.StatusError switch { From cb167b3ec803a45786f910467c8039324f73743d Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Fri, 1 May 2026 17:43:46 +0200 Subject: [PATCH 2/3] fix(fibre): split DA Submit batches at Fibre's 128 MiB upload cap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Under sustained txsim load (~50 MiB/s) the DA submitter batched 10 block_data items into one Upload(), producing a flat payload of 144 MiB. Fibre's per-upload cap is hard at ~128 MiB ("blob size exceeds maximum allowed size: data size 144366912 exceeds maximum 134217723") and rejected every batched upload. With MaxPendingHeadersAndData=10 that took down 170 consecutive submissions before the node halted itself with "Data exceeds DA blob size limit". Wrap the Upload call in a chunker that groups input blobs into ≤120 MiB chunks (8 MiB headroom under Fibre's cap for the per-blob length-prefix overhead added by flattenBlobs) and uploads each chunk separately. Aggregates submitted counts and BlobIDs across chunks; on first chunk failure, returns the error with the partially-submitted count so the submitter's retry/backoff logic sees a coherent state instead of all-or-nothing. Single oversized blobs (already validated against DefaultMaxBlobSize earlier in Submit) still land alone and fail server-side, but at least don't drag healthy peers into the same rejected batch. --- block/internal/da/fiber_client.go | 117 +++++++++++++++++++++--------- 1 file changed, 82 insertions(+), 35 deletions(-) diff --git a/block/internal/da/fiber_client.go b/block/internal/da/fiber_client.go index 31df5603a2..70902673e9 100644 --- a/block/internal/da/fiber_client.go +++ b/block/internal/da/fiber_client.go @@ -87,52 +87,65 @@ func (c *fiberDAClient) Submit(ctx context.Context, data [][]byte, _ float64, na } } - flat := flattenBlobs(data) + // Fibre's per-upload cap is ~128 MiB (hard server-side reject: + // "data size %d exceeds maximum 134217723"). flattenBlobs adds + // 4 bytes per blob + 4 prefix, so we target 120 MiB per chunk + // to leave overhead room and avoid borderline rejects. + chunks := chunkBlobsForFibre(data, fibreUploadChunkBudget) nsID := namespace[len(namespace)-10:] - uploadStart := time.Now() - result, err := c.fiber.Upload(context.Background(), nsID, flat) - uploadDuration := time.Since(uploadStart) - if err != nil { - c.logger.Warn(). - Dur("duration", uploadDuration). - Int("flat_size", len(flat)). - Int("blob_count", len(data)). - Err(err). - Msg("fiber upload duration (failed)") - } else { + + ids := make([][]byte, 0, len(chunks)) + var submitted int + for chunkIdx, chunk := range chunks { + flat := flattenBlobs(chunk) + uploadStart := time.Now() + result, err := c.fiber.Upload(context.Background(), nsID, flat) + uploadDuration := time.Since(uploadStart) + if err != nil { + c.logger.Warn(). + Dur("duration", uploadDuration). + Int("flat_size", len(flat)). + Int("blob_count", len(chunk)). + Int("chunk_idx", chunkIdx). + Int("chunk_total", len(chunks)). + Err(err). + Msg("fiber upload duration (failed)") + code := datypes.StatusError + switch { + case errors.Is(err, context.Canceled): + code = datypes.StatusContextCanceled + case errors.Is(err, context.DeadlineExceeded): + code = datypes.StatusContextDeadline + } + c.logger.Error().Err(err).Msg("fiber upload failed") + return datypes.ResultSubmit{ + BaseResult: datypes.BaseResult{ + Code: code, + Message: fmt.Sprintf("fiber upload failed for blob (chunk %d/%d): %v", chunkIdx+1, len(chunks), err), + SubmittedCount: uint64(submitted), + BlobSize: blobSize, + Timestamp: time.Now(), + }, + } + } c.logger.Info(). Dur("duration", uploadDuration). Int("flat_size", len(flat)). - Int("blob_count", len(data)). + Int("blob_count", len(chunk)). + Int("chunk_idx", chunkIdx). + Int("chunk_total", len(chunks)). Msg("fiber upload duration (ok)") - } - if err != nil { - code := datypes.StatusError - switch { - case errors.Is(err, context.Canceled): - code = datypes.StatusContextCanceled - case errors.Is(err, context.DeadlineExceeded): - code = datypes.StatusContextDeadline - } - c.logger.Error().Err(err).Msg("fiber upload failed") - return datypes.ResultSubmit{ - BaseResult: datypes.BaseResult{ - Code: code, - Message: fmt.Sprintf("fiber upload failed for blob: %v", err), - SubmittedCount: uint64(len(data) - 1), - BlobSize: blobSize, - Timestamp: time.Now(), - }, - } + ids = append(ids, result.BlobID) + submitted += len(chunk) } - c.logger.Debug().Int("num_ids", len(data)).Uint64("height", 0 /* TODO */).Msg("fiber DA submission successful") + c.logger.Debug().Int("num_ids", len(data)).Int("chunks", len(chunks)).Uint64("height", 0 /* TODO */).Msg("fiber DA submission successful") return datypes.ResultSubmit{ BaseResult: datypes.BaseResult{ Code: datypes.StatusSuccess, - IDs: [][]byte{result.BlobID}, - SubmittedCount: uint64(len(data)), + IDs: ids, + SubmittedCount: uint64(submitted), Height: 0, /* TODO */ BlobSize: blobSize, Timestamp: time.Now(), @@ -140,6 +153,40 @@ func (c *fiberDAClient) Submit(ctx context.Context, data [][]byte, _ float64, na } } +// fibreUploadChunkBudget is the target maximum flattened size of a single +// Fibre Upload call. Fibre rejects payloads above ~128 MiB +// ("data size N exceeds maximum 134217723"); 120 MiB leaves slack for +// flattenBlobs's per-blob length prefixes and for any future overhead. +const fibreUploadChunkBudget = 120 * 1024 * 1024 + +// chunkBlobsForFibre groups data into chunks whose flattened size stays +// below budget. Per-blob length-prefix overhead matches flattenBlobs. +// A single oversized blob (already validated against DefaultMaxBlobSize +// above) lands in its own chunk; the upload still fails server-side but +// at least we don't drag healthy peers down with it. +func chunkBlobsForFibre(data [][]byte, budget int) [][][]byte { + if len(data) == 0 { + return nil + } + chunks := make([][][]byte, 0, 1) + cur := make([][]byte, 0, len(data)) + curSize := 4 // flattenBlobs's count prefix + for _, b := range data { + entry := 4 + len(b) + if len(cur) > 0 && curSize+entry > budget { + chunks = append(chunks, cur) + cur = make([][]byte, 0, len(data)) + curSize = 4 + } + cur = append(cur, b) + curSize += entry + } + if len(cur) > 0 { + chunks = append(chunks, cur) + } + return chunks +} + func (c *fiberDAClient) Retrieve(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve { return c.retrieve(ctx, height, namespace, true) } From 1bc9e3e3cd4954b5e49426706f85e85310d3e39f Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Fri, 1 May 2026 17:43:58 +0200 Subject: [PATCH 3/3] fix(evnode-fibre): cap per-block data at 100 MiB to fit a Fibre upload MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Companion to the submitter chunking fix. The submitter can split a multi-blob batch into ≤120 MiB Fibre uploads, but a *single* block_data item that exceeds 128 MiB still ends up alone in its own chunk and fails server-side ("blob size exceeds maximum allowed size"). Lower the per-block cap to 100 MiB so under high-throughput txsim a single block can't grow past Fibre's hard limit, and update the comment to explain the relationship between this cap and Fibre's ~128 MiB upload reject threshold. --- tools/celestia-node-fiber/cmd/evnode-fibre/main.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tools/celestia-node-fiber/cmd/evnode-fibre/main.go b/tools/celestia-node-fiber/cmd/evnode-fibre/main.go index 7b8b73080f..77322116bd 100644 --- a/tools/celestia-node-fiber/cmd/evnode-fibre/main.go +++ b/tools/celestia-node-fiber/cmd/evnode-fibre/main.go @@ -302,7 +302,12 @@ func run(cli cliFlags) error { // Fiber-tuned profile: BatchingStrategy=adaptive, BatchMaxDelay=1.5s, // DA.BlockTime=1s, MaxPendingHeadersAndData=0, plus 120 MiB blob cap. cfg.ApplyFiberDefaults() - block.SetMaxBlobSize(120 * 1024 * 1024) + // 100 MiB — bounded by Fibre's hard ~128 MiB per-upload cap (we + // hit `data size exceeds maximum 134217723` at 128 MiB - 5 B). + // Set the per-block data cap below that so each block_data item + // fits in a single Fibre upload after the submitter splits a + // multi-blob batch into ≤120 MiB chunks. + block.SetMaxBlobSize(100 * 1024 * 1024) cfg.P2P.ListenAddress = cli.p2pListen cfg.P2P.DisableConnectionGater = true cfg.RPC.Address = cli.rpcListen