diff --git a/block/internal/da/fiber_client.go b/block/internal/da/fiber_client.go index 3a4f9eb754..70902673e9 100644 --- a/block/internal/da/fiber_client.go +++ b/block/internal/da/fiber_client.go @@ -87,36 +87,65 @@ func (c *fiberDAClient) Submit(ctx context.Context, data [][]byte, _ float64, na } } - flat := flattenBlobs(data) + // Fibre's per-upload cap is ~128 MiB (hard server-side reject: + // "data size %d exceeds maximum 134217723"). flattenBlobs adds + // 4 bytes per blob + 4 prefix, so we target 120 MiB per chunk + // to leave overhead room and avoid borderline rejects. + chunks := chunkBlobsForFibre(data, fibreUploadChunkBudget) nsID := namespace[len(namespace)-10:] - result, err := c.fiber.Upload(context.Background(), nsID, flat) - if err != nil { - code := datypes.StatusError - switch { - case errors.Is(err, context.Canceled): - code = datypes.StatusContextCanceled - case errors.Is(err, context.DeadlineExceeded): - code = datypes.StatusContextDeadline - } - c.logger.Error().Err(err).Msg("fiber upload failed") - return datypes.ResultSubmit{ - BaseResult: datypes.BaseResult{ - Code: code, - Message: fmt.Sprintf("fiber upload failed for blob: %v", err), - SubmittedCount: uint64(len(data) - 1), - BlobSize: blobSize, - Timestamp: time.Now(), - }, + + ids := make([][]byte, 0, len(chunks)) + var submitted int + for chunkIdx, chunk := range chunks { + flat := flattenBlobs(chunk) + uploadStart := time.Now() + result, err := c.fiber.Upload(context.Background(), nsID, flat) + uploadDuration := time.Since(uploadStart) + if err != nil { + c.logger.Warn(). + Dur("duration", uploadDuration). + Int("flat_size", len(flat)). + Int("blob_count", len(chunk)). + Int("chunk_idx", chunkIdx). + Int("chunk_total", len(chunks)). + Err(err). + Msg("fiber upload duration (failed)") + code := datypes.StatusError + switch { + case errors.Is(err, context.Canceled): + code = datypes.StatusContextCanceled + case errors.Is(err, context.DeadlineExceeded): + code = datypes.StatusContextDeadline + } + c.logger.Error().Err(err).Msg("fiber upload failed") + return datypes.ResultSubmit{ + BaseResult: datypes.BaseResult{ + Code: code, + Message: fmt.Sprintf("fiber upload failed for blob (chunk %d/%d): %v", chunkIdx+1, len(chunks), err), + SubmittedCount: uint64(submitted), + BlobSize: blobSize, + Timestamp: time.Now(), + }, + } } + c.logger.Info(). + Dur("duration", uploadDuration). + Int("flat_size", len(flat)). + Int("blob_count", len(chunk)). + Int("chunk_idx", chunkIdx). + Int("chunk_total", len(chunks)). + Msg("fiber upload duration (ok)") + ids = append(ids, result.BlobID) + submitted += len(chunk) } - c.logger.Debug().Int("num_ids", len(data)).Uint64("height", 0 /* TODO */).Msg("fiber DA submission successful") + c.logger.Debug().Int("num_ids", len(data)).Int("chunks", len(chunks)).Uint64("height", 0 /* TODO */).Msg("fiber DA submission successful") return datypes.ResultSubmit{ BaseResult: datypes.BaseResult{ Code: datypes.StatusSuccess, - IDs: [][]byte{result.BlobID}, - SubmittedCount: uint64(len(data)), + IDs: ids, + SubmittedCount: uint64(submitted), Height: 0, /* TODO */ BlobSize: blobSize, Timestamp: time.Now(), @@ -124,6 +153,40 @@ func (c *fiberDAClient) Submit(ctx context.Context, data [][]byte, _ float64, na } } +// fibreUploadChunkBudget is the target maximum flattened size of a single +// Fibre Upload call. Fibre rejects payloads above ~128 MiB +// ("data size N exceeds maximum 134217723"); 120 MiB leaves slack for +// flattenBlobs's per-blob length prefixes and for any future overhead. +const fibreUploadChunkBudget = 120 * 1024 * 1024 + +// chunkBlobsForFibre groups data into chunks whose flattened size stays +// below budget. Per-blob length-prefix overhead matches flattenBlobs. +// A single oversized blob (already validated against DefaultMaxBlobSize +// above) lands in its own chunk; the upload still fails server-side but +// at least we don't drag healthy peers down with it. +func chunkBlobsForFibre(data [][]byte, budget int) [][][]byte { + if len(data) == 0 { + return nil + } + chunks := make([][][]byte, 0, 1) + cur := make([][]byte, 0, len(data)) + curSize := 4 // flattenBlobs's count prefix + for _, b := range data { + entry := 4 + len(b) + if len(cur) > 0 && curSize+entry > budget { + chunks = append(chunks, cur) + cur = make([][]byte, 0, len(data)) + curSize = 4 + } + cur = append(cur, b) + curSize += entry + } + if len(cur) > 0 { + chunks = append(chunks, cur) + } + return chunks +} + func (c *fiberDAClient) Retrieve(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve { return c.retrieve(ctx, height, namespace, true) } diff --git a/tools/celestia-node-fiber/cmd/evnode-fibre/main.go b/tools/celestia-node-fiber/cmd/evnode-fibre/main.go index 7b8b73080f..77322116bd 100644 --- a/tools/celestia-node-fiber/cmd/evnode-fibre/main.go +++ b/tools/celestia-node-fiber/cmd/evnode-fibre/main.go @@ -302,7 +302,12 @@ func run(cli cliFlags) error { // Fiber-tuned profile: BatchingStrategy=adaptive, BatchMaxDelay=1.5s, // DA.BlockTime=1s, MaxPendingHeadersAndData=0, plus 120 MiB blob cap. cfg.ApplyFiberDefaults() - block.SetMaxBlobSize(120 * 1024 * 1024) + // 100 MiB — bounded by Fibre's hard ~128 MiB per-upload cap (we + // hit `data size exceeds maximum 134217723` at 128 MiB - 5 B). + // Set the per-block data cap below that so each block_data item + // fits in a single Fibre upload after the submitter splits a + // multi-blob batch into ≤120 MiB chunks. + block.SetMaxBlobSize(100 * 1024 * 1024) cfg.P2P.ListenAddress = cli.p2pListen cfg.P2P.DisableConnectionGater = true cfg.RPC.Address = cli.rpcListen