Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 85 additions & 22 deletions block/internal/da/fiber_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,43 +87,106 @@
}
}

flat := flattenBlobs(data)
// Fibre's per-upload cap is ~128 MiB (hard server-side reject:
// "data size %d exceeds maximum 134217723"). flattenBlobs adds
// 4 bytes per blob + 4 prefix, so we target 120 MiB per chunk
// to leave overhead room and avoid borderline rejects.
chunks := chunkBlobsForFibre(data, fibreUploadChunkBudget)
nsID := namespace[len(namespace)-10:]
result, err := c.fiber.Upload(context.Background(), nsID, flat)
if err != nil {
code := datypes.StatusError
switch {
case errors.Is(err, context.Canceled):
code = datypes.StatusContextCanceled
case errors.Is(err, context.DeadlineExceeded):
code = datypes.StatusContextDeadline
}
c.logger.Error().Err(err).Msg("fiber upload failed")
return datypes.ResultSubmit{
BaseResult: datypes.BaseResult{
Code: code,
Message: fmt.Sprintf("fiber upload failed for blob: %v", err),
SubmittedCount: uint64(len(data) - 1),
BlobSize: blobSize,
Timestamp: time.Now(),
},

ids := make([][]byte, 0, len(chunks))
var submitted int
for chunkIdx, chunk := range chunks {
flat := flattenBlobs(chunk)
uploadStart := time.Now()
result, err := c.fiber.Upload(context.Background(), nsID, flat)

Check failure on line 102 in block/internal/da/fiber_client.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

Non-inherited new context, use function like `context.WithXXX` instead (contextcheck)
uploadDuration := time.Since(uploadStart)
if err != nil {
c.logger.Warn().
Dur("duration", uploadDuration).
Int("flat_size", len(flat)).
Int("blob_count", len(chunk)).
Int("chunk_idx", chunkIdx).
Int("chunk_total", len(chunks)).
Err(err).
Msg("fiber upload duration (failed)")
code := datypes.StatusError
switch {
case errors.Is(err, context.Canceled):
code = datypes.StatusContextCanceled
case errors.Is(err, context.DeadlineExceeded):
code = datypes.StatusContextDeadline
}
c.logger.Error().Err(err).Msg("fiber upload failed")
return datypes.ResultSubmit{
BaseResult: datypes.BaseResult{
Code: code,
Message: fmt.Sprintf("fiber upload failed for blob (chunk %d/%d): %v", chunkIdx+1, len(chunks), err),
SubmittedCount: uint64(submitted),
BlobSize: blobSize,
Timestamp: time.Now(),
},
}
}
c.logger.Info().
Dur("duration", uploadDuration).
Int("flat_size", len(flat)).
Int("blob_count", len(chunk)).
Int("chunk_idx", chunkIdx).
Int("chunk_total", len(chunks)).
Msg("fiber upload duration (ok)")
ids = append(ids, result.BlobID)
submitted += len(chunk)
}

c.logger.Debug().Int("num_ids", len(data)).Uint64("height", 0 /* TODO */).Msg("fiber DA submission successful")
c.logger.Debug().Int("num_ids", len(data)).Int("chunks", len(chunks)).Uint64("height", 0 /* TODO */).Msg("fiber DA submission successful")

return datypes.ResultSubmit{
BaseResult: datypes.BaseResult{
Code: datypes.StatusSuccess,
IDs: [][]byte{result.BlobID},
SubmittedCount: uint64(len(data)),
IDs: ids,
SubmittedCount: uint64(submitted),
Height: 0, /* TODO */
BlobSize: blobSize,
Timestamp: time.Now(),
},
}
}

// fibreUploadChunkBudget is the target maximum flattened size of a single
// Fibre Upload call. Fibre rejects payloads above ~128 MiB
// ("data size N exceeds maximum 134217723"); 120 MiB leaves slack for
// flattenBlobs's per-blob length prefixes and for any future overhead.
const fibreUploadChunkBudget = 120 * 1024 * 1024

// chunkBlobsForFibre groups data into chunks whose flattened size stays
// below budget. Per-blob length-prefix overhead matches flattenBlobs.
// A single oversized blob (already validated against DefaultMaxBlobSize
// above) lands in its own chunk; the upload still fails server-side but
// at least we don't drag healthy peers down with it.
func chunkBlobsForFibre(data [][]byte, budget int) [][][]byte {
if len(data) == 0 {
return nil
}
chunks := make([][][]byte, 0, 1)
cur := make([][]byte, 0, len(data))
curSize := 4 // flattenBlobs's count prefix
for _, b := range data {
entry := 4 + len(b)
if len(cur) > 0 && curSize+entry > budget {
chunks = append(chunks, cur)
cur = make([][]byte, 0, len(data))
curSize = 4
}
cur = append(cur, b)
curSize += entry
}
if len(cur) > 0 {
chunks = append(chunks, cur)
}
return chunks
}

func (c *fiberDAClient) Retrieve(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve {
return c.retrieve(ctx, height, namespace, true)
}
Expand Down
7 changes: 6 additions & 1 deletion tools/celestia-node-fiber/cmd/evnode-fibre/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,12 @@ func run(cli cliFlags) error {
// Fiber-tuned profile: BatchingStrategy=adaptive, BatchMaxDelay=1.5s,
// DA.BlockTime=1s, MaxPendingHeadersAndData=0, plus 120 MiB blob cap.
cfg.ApplyFiberDefaults()
block.SetMaxBlobSize(120 * 1024 * 1024)
// 100 MiB — bounded by Fibre's hard ~128 MiB per-upload cap (we
// hit `data size exceeds maximum 134217723` at 128 MiB - 5 B).
// Set the per-block data cap below that so each block_data item
// fits in a single Fibre upload after the submitter splits a
// multi-blob batch into ≤120 MiB chunks.
block.SetMaxBlobSize(100 * 1024 * 1024)
cfg.P2P.ListenAddress = cli.p2pListen
cfg.P2P.DisableConnectionGater = true
cfg.RPC.Address = cli.rpcListen
Expand Down
Loading