From ee0dede6800674b73c1a45b66094d440203b22bf Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Tue, 28 Apr 2026 17:10:40 +0200 Subject: [PATCH 1/3] add grpc socket and flattn tx batches to allow for lower allocations --- apps/grpc/README.md | 15 +- apps/grpc/cmd/run.go | 4 +- execution/grpc/README.md | 23 ++- execution/grpc/client.go | 66 ++++++++- execution/grpc/client_test.go | 117 +++++++++++++++ execution/grpc/go.mod | 2 + execution/grpc/server.go | 21 ++- execution/grpc/server_test.go | 123 +++++++++++++++- execution/grpc/tx_batch.go | 74 ++++++++++ execution/grpc/tx_batch_test.go | 69 +++++++++ execution/grpc/unix.go | 64 +++++++++ execution/grpc/unix_test.go | 59 ++++++++ proto/evnode/v1/execution.proto | 21 ++- types/pb/evnode/v1/execution.pb.go | 220 ++++++++++++++++++----------- 14 files changed, 768 insertions(+), 110 deletions(-) create mode 100644 execution/grpc/tx_batch.go create mode 100644 execution/grpc/tx_batch_test.go create mode 100644 execution/grpc/unix.go create mode 100644 execution/grpc/unix_test.go diff --git a/apps/grpc/README.md b/apps/grpc/README.md index a2d1f2ae11..5f248319f5 100644 --- a/apps/grpc/README.md +++ b/apps/grpc/README.md @@ -1,13 +1,13 @@ # gRPC Single Sequencer App -This application runs a Evolve node with a single sequencer that connects to a remote execution client via gRPC. It allows you to use any execution layer that implements the Evolve execution gRPC interface. +This application runs a Evolve node with a single sequencer that connects to an execution client via gRPC. It allows you to use any execution layer that implements the Evolve execution gRPC interface. ## Overview The gRPC single sequencer app provides: - A Evolve consensus node with single sequencer -- Connection to remote execution clients via gRPC +- Connection to execution clients via TCP or Unix domain socket gRPC - Full data availability layer integration - P2P networking capabilities @@ -58,11 +58,20 @@ Start the Evolve node with: --da.auth-token your-da-token ``` +For a same-machine executor, use a Unix domain socket endpoint: + +```bash +./evgrpc start \ + --root-dir ~/.evgrpc \ + --grpc-executor-url unix:///tmp/evolve-executor.sock \ + --da.address http://localhost:7980 +``` + ## Command-Line Flags ### gRPC-specific Flags -- `--grpc-executor-url`: URL of the gRPC execution service (default: `http://localhost:50051`) +- `--grpc-executor-url`: URL of the gRPC execution service, either `http://host:port` or `unix:///path/to/socket` (default: `http://localhost:50051`) ### Common Evolve Flags diff --git a/apps/grpc/cmd/run.go b/apps/grpc/cmd/run.go index 22ca71f587..449714b941 100644 --- a/apps/grpc/cmd/run.go +++ b/apps/grpc/cmd/run.go @@ -28,7 +28,7 @@ import ( const ( grpcDbName = "grpc-single" - // FlagGrpcExecutorURL is the flag for the gRPC executor endpoint + // FlagGrpcExecutorURL is the flag for the gRPC executor endpoint. FlagGrpcExecutorURL = "grpc-executor-url" ) @@ -169,5 +169,5 @@ func createGRPCExecutionClient(cmd *cobra.Command) (execution.Executor, error) { // addGRPCFlags adds flags specific to the gRPC execution client func addGRPCFlags(cmd *cobra.Command) { - cmd.Flags().String(FlagGrpcExecutorURL, "http://localhost:50051", "URL of the gRPC execution service") + cmd.Flags().String(FlagGrpcExecutorURL, "http://localhost:50051", "URL of the gRPC execution service, or unix:///path/to/executor.sock") } diff --git a/execution/grpc/README.md b/execution/grpc/README.md index 8647b2ad90..3216926f97 100644 --- a/execution/grpc/README.md +++ b/execution/grpc/README.md @@ -1,10 +1,10 @@ # gRPC Execution Client -This package provides a gRPC-based implementation of the Evolve execution interface. It allows Evolve to communicate with remote execution clients via gRPC using the Connect-RPC framework. +This package provides a gRPC-based implementation of the Evolve execution interface. It allows Evolve to communicate with execution clients via gRPC using the Connect-RPC framework. ## Overview -The gRPC execution client enables separation between the consensus layer (Evolve) and the execution layer by providing a network interface for communication. This allows execution clients to run in separate processes or even on different machines. +The gRPC execution client enables separation between the consensus layer (Evolve) and the execution layer by providing a process boundary for communication. Execution clients can run on different machines over TCP, or on the same machine over a Unix domain socket to avoid TCP/IP overhead. ## Usage @@ -17,12 +17,15 @@ import ( "github.com/evstack/ev-node/execution/grpc" ) -// Create a new gRPC client +// Create a new gRPC client over TCP client := grpc.NewClient("http://localhost:50051") +// Or connect to an executor on the same machine over a Unix domain socket +client := grpc.NewClient("unix:///tmp/evolve-executor.sock") + // Use the client as an execution.Executor ctx := context.Background() -stateRoot, maxBytes, err := client.InitChain(ctx, time.Now(), 1, "my-chain") +stateRoot, err := client.InitChain(ctx, time.Now(), 1, "my-chain") ``` ### Server @@ -42,6 +45,14 @@ handler := grpc.NewExecutorServiceHandler(myExecutor) http.ListenAndServe(":50051", handler) ``` +To serve on a Unix domain socket: + +```go +import "github.com/evstack/ev-node/execution/grpc" + +err := grpc.ListenAndServeUnix("/tmp/evolve-executor.sock", myExecutor) +``` + ## Protocol The gRPC service is defined in `proto/evnode/v1/execution.proto` and provides the following methods: @@ -50,13 +61,17 @@ The gRPC service is defined in `proto/evnode/v1/execution.proto` and provides th - `GetTxs`: Fetch transactions from the mempool - `ExecuteTxs`: Execute transactions and update state - `SetFinal`: Mark a block as finalized +- `GetExecutionInfo`: Return current execution limits +- `FilterTxs`: Validate and filter force-included transactions ## Features - Full implementation of the `execution.Executor` interface - Support for HTTP/1.1 and HTTP/2 (via h2c) +- Support for Unix domain socket connections with `unix:///path/to/socket` - gRPC reflection for debugging and service discovery - Compression for efficient data transfer +- Contiguous transaction batch encoding to reduce per-transaction protobuf overhead - Comprehensive error handling and validation ## Testing diff --git a/execution/grpc/client.go b/execution/grpc/client.go index efb9d2f840..2769d681f3 100644 --- a/execution/grpc/client.go +++ b/execution/grpc/client.go @@ -3,9 +3,11 @@ package grpc import ( "context" "crypto/tls" + "errors" "fmt" "net" "net/http" + "strings" "time" "connectrpc.com/connect" @@ -26,6 +28,11 @@ type Client struct { client v1connect.ExecutorServiceClient } +const ( + unixURLPrefix = "unix://" + unixHTTPBaseURL = "http://unix" +) + // newHTTP2Client creates an HTTP/2 client that supports cleartext (h2c) connections. // This is required to connect to native gRPC servers without TLS. func newHTTP2Client() *http.Client { @@ -40,10 +47,41 @@ func newHTTP2Client() *http.Client { } } +// newUnixHTTP2Client creates an HTTP/2 client that speaks h2c over a Unix domain socket. +func newUnixHTTP2Client(socketPath string) *http.Client { + return &http.Client{ + Transport: &http2.Transport{ + AllowHTTP: true, + DialTLSContext: func(ctx context.Context, _, _ string, _ *tls.Config) (net.Conn, error) { + if socketPath == "" { + return nil, errors.New("unix socket path is required") + } + var d net.Dialer + return d.DialContext(ctx, "unix", socketPath) + }, + }, + } +} + +func clientTransportForTarget(target string) (*http.Client, string) { + socketPath, ok := unixSocketPath(target) + if ok { + return newUnixHTTP2Client(socketPath), unixHTTPBaseURL + } + return newHTTP2Client(), target +} + +func unixSocketPath(target string) (string, bool) { + if !strings.HasPrefix(target, unixURLPrefix) { + return "", false + } + return strings.TrimPrefix(target, unixURLPrefix), true +} + // NewClient creates a new gRPC execution client. // // Parameters: -// - url: The URL of the gRPC server (e.g., "http://localhost:50051") +// - url: The URL of the gRPC server (e.g., "http://localhost:50051" or "unix:///tmp/executor.sock") // - opts: Optional Connect client options for configuring the connection // // Returns: @@ -51,10 +89,11 @@ func newHTTP2Client() *http.Client { func NewClient(url string, opts ...connect.ClientOption) *Client { // Prepend WithGRPC to use the native gRPC protocol (required for tonic/gRPC servers) opts = append([]connect.ClientOption{connect.WithGRPC()}, opts...) + httpClient, targetURL := clientTransportForTarget(url) return &Client{ client: v1connect.NewExecutorServiceClient( - newHTTP2Client(), - url, + httpClient, + targetURL, opts..., ), } @@ -91,7 +130,12 @@ func (c *Client) GetTxs(ctx context.Context) ([][]byte, error) { return nil, fmt.Errorf("grpc client: failed to get txs: %w", err) } - return resp.Msg.Txs, nil + txs, err := decodeTxBatch(resp.Msg.TxBatch) + if err != nil { + return nil, fmt.Errorf("grpc client: invalid get txs response: %w", err) + } + + return txs, nil } // ExecuteTxs processes transactions to produce a new block state. @@ -100,8 +144,13 @@ func (c *Client) GetTxs(ctx context.Context) ([][]byte, error) { // returns the updated state root after execution. The execution service ensures // deterministic execution and validates the state transition. func (c *Client) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint64, timestamp time.Time, prevStateRoot []byte) (updatedStateRoot []byte, err error) { + txBatch, err := encodeTxBatch(txs) + if err != nil { + return nil, fmt.Errorf("grpc client: failed to encode tx batch: %w", err) + } + req := connect.NewRequest(&pb.ExecuteTxsRequest{ - Txs: txs, + TxBatch: txBatch, BlockHeight: blockHeight, Timestamp: timestamppb.New(timestamp), PrevStateRoot: prevStateRoot, @@ -154,8 +203,13 @@ func (c *Client) GetExecutionInfo(ctx context.Context) (execution.ExecutionInfo, // This method sends transactions to the remote execution service for validation. // Returns a slice of FilterStatus for each transaction. func (c *Client) FilterTxs(ctx context.Context, txs [][]byte, maxBytes, maxGas uint64, hasForceIncludedTransaction bool) ([]execution.FilterStatus, error) { + txBatch, err := encodeTxBatch(txs) + if err != nil { + return nil, fmt.Errorf("grpc client: failed to encode tx batch: %w", err) + } + req := connect.NewRequest(&pb.FilterTxsRequest{ - Txs: txs, + TxBatch: txBatch, MaxBytes: maxBytes, MaxGas: maxGas, HasForceIncludedTransaction: hasForceIncludedTransaction, diff --git a/execution/grpc/client_test.go b/execution/grpc/client_test.go index 59ec6416ff..b715f2909a 100644 --- a/execution/grpc/client_test.go +++ b/execution/grpc/client_test.go @@ -2,6 +2,9 @@ package grpc import ( "context" + "errors" + "net" + "net/http" "net/http/httptest" "testing" "time" @@ -214,3 +217,117 @@ func TestClient_SetFinal(t *testing.T) { t.Fatalf("unexpected error: %v", err) } } + +func TestClient_FilterTxs(t *testing.T) { + ctx := context.Background() + txs := [][]byte{[]byte("tx1"), []byte{}, []byte("tx3")} + maxBytes := uint64(100) + maxGas := uint64(200) + hasForced := true + expectedStatuses := []execution.FilterStatus{ + execution.FilterOK, + execution.FilterRemove, + execution.FilterPostpone, + } + + mockExec := &mockExecutor{ + filterTxsFunc: func(ctx context.Context, txsIn [][]byte, mb, mg uint64, forced bool) ([]execution.FilterStatus, error) { + if len(txsIn) != len(txs) { + t.Fatalf("expected %d txs, got %d", len(txs), len(txsIn)) + } + for i, tx := range txsIn { + if string(tx) != string(txs[i]) { + t.Fatalf("tx %d: expected %q, got %q", i, txs[i], tx) + } + } + if mb != maxBytes { + t.Fatalf("expected max bytes %d, got %d", maxBytes, mb) + } + if mg != maxGas { + t.Fatalf("expected max gas %d, got %d", maxGas, mg) + } + if forced != hasForced { + t.Fatalf("expected forced=%t, got %t", hasForced, forced) + } + return expectedStatuses, nil + }, + } + + handler := NewExecutorServiceHandler(mockExec) + server := httptest.NewServer(handler) + defer server.Close() + + client := NewClient(server.URL) + + statuses, err := client.FilterTxs(ctx, txs, maxBytes, maxGas, hasForced) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(statuses) != len(expectedStatuses) { + t.Fatalf("expected %d statuses, got %d", len(expectedStatuses), len(statuses)) + } + for i, status := range statuses { + if status != expectedStatuses[i] { + t.Fatalf("status %d: expected %v, got %v", i, expectedStatuses[i], status) + } + } +} + +func TestClient_UnixSocket(t *testing.T) { + ctx := context.Background() + socketPath := testUnixSocketPath(t) + expectedTxs := [][]byte{[]byte("tx1"), []byte("tx2")} + + mockExec := &mockExecutor{ + getTxsFunc: func(ctx context.Context) ([][]byte, error) { + return expectedTxs, nil + }, + } + + startUnixTestServer(t, mockExec, socketPath) + + client := NewClient("unix://" + socketPath) + txs, err := client.GetTxs(ctx) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(txs) != len(expectedTxs) { + t.Fatalf("expected %d txs, got %d", len(expectedTxs), len(txs)) + } + for i, tx := range txs { + if string(tx) != string(expectedTxs[i]) { + t.Fatalf("tx %d: expected %q, got %q", i, expectedTxs[i], tx) + } + } +} + +func startUnixTestServer(t *testing.T, executor execution.Executor, socketPath string) { + t.Helper() + + listener, err := ListenUnix(socketPath) + if err != nil { + t.Fatalf("listen unix socket: %v", err) + } + + server := &http.Server{Handler: NewExecutorServiceHandler(executor)} + done := make(chan error, 1) + go func() { + err := server.Serve(listener) + if errors.Is(err, http.ErrServerClosed) || errors.Is(err, net.ErrClosed) { + err = nil + } + done <- err + }() + + t.Cleanup(func() { + _ = server.Close() + select { + case err := <-done: + if err != nil { + t.Errorf("unix socket server error: %v", err) + } + case <-time.After(time.Second): + t.Error("unix socket server did not stop") + } + }) +} diff --git a/execution/grpc/go.mod b/execution/grpc/go.mod index 25031ee32f..bbedeaf599 100644 --- a/execution/grpc/go.mod +++ b/execution/grpc/go.mod @@ -2,6 +2,8 @@ module github.com/evstack/ev-node/execution/grpc go 1.25.7 +replace github.com/evstack/ev-node => ../.. + require ( connectrpc.com/connect v1.19.2 connectrpc.com/grpcreflect v1.3.0 diff --git a/execution/grpc/server.go b/execution/grpc/server.go index e0488b7655..8f1cbb48ce 100644 --- a/execution/grpc/server.go +++ b/execution/grpc/server.go @@ -77,8 +77,13 @@ func (s *Server) GetTxs( return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to get txs: %w", err)) } + txBatch, err := encodeTxBatch(txs) + if err != nil { + return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to encode tx batch: %w", err)) + } + return connect.NewResponse(&pb.GetTxsResponse{ - Txs: txs, + TxBatch: txBatch, }), nil } @@ -102,9 +107,14 @@ func (s *Server) ExecuteTxs( return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("prev_state_root is required")) } + txs, err := decodeTxBatch(req.Msg.TxBatch) + if err != nil { + return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid tx_batch: %w", err)) + } + updatedStateRoot, err := s.executor.ExecuteTxs( ctx, - req.Msg.Txs, + txs, req.Msg.BlockHeight, req.Msg.Timestamp.AsTime(), req.Msg.PrevStateRoot, @@ -162,7 +172,12 @@ func (s *Server) FilterTxs( ctx context.Context, req *connect.Request[pb.FilterTxsRequest], ) (*connect.Response[pb.FilterTxsResponse], error) { - result, err := s.executor.FilterTxs(ctx, req.Msg.Txs, req.Msg.MaxBytes, req.Msg.MaxGas, req.Msg.HasForceIncludedTransaction) + txs, err := decodeTxBatch(req.Msg.TxBatch) + if err != nil { + return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid tx_batch: %w", err)) + } + + result, err := s.executor.FilterTxs(ctx, txs, req.Msg.MaxBytes, req.Msg.MaxGas, req.Msg.HasForceIncludedTransaction) if err != nil { return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to filter transactions: %w", err)) } diff --git a/execution/grpc/server_test.go b/execution/grpc/server_test.go index e2a01b4bc4..2219d47f47 100644 --- a/execution/grpc/server_test.go +++ b/execution/grpc/server_test.go @@ -9,6 +9,7 @@ import ( "connectrpc.com/connect" "google.golang.org/protobuf/types/known/timestamppb" + "github.com/evstack/ev-node/core/execution" pb "github.com/evstack/ev-node/types/pb/evnode/v1" ) @@ -172,8 +173,12 @@ func TestServer_GetTxs(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - if len(resp.Msg.Txs) != len(expectedTxs) { - t.Fatalf("expected %d txs, got %d", len(expectedTxs), len(resp.Msg.Txs)) + txs, err := decodeTxBatch(resp.Msg.TxBatch) + if err != nil { + t.Fatalf("unexpected tx batch decode error: %v", err) + } + if len(txs) != len(expectedTxs) { + t.Fatalf("expected %d txs, got %d", len(expectedTxs), len(txs)) } }) } @@ -197,7 +202,7 @@ func TestServer_ExecuteTxs(t *testing.T) { { name: "success", req: &pb.ExecuteTxsRequest{ - Txs: txs, + TxBatch: mustEncodeTxBatch(t, txs), BlockHeight: blockHeight, Timestamp: timestamppb.New(timestamp), PrevStateRoot: prevStateRoot, @@ -210,7 +215,7 @@ func TestServer_ExecuteTxs(t *testing.T) { { name: "missing block height", req: &pb.ExecuteTxsRequest{ - Txs: txs, + TxBatch: mustEncodeTxBatch(t, txs), Timestamp: timestamppb.New(timestamp), PrevStateRoot: prevStateRoot, }, @@ -220,7 +225,7 @@ func TestServer_ExecuteTxs(t *testing.T) { { name: "missing timestamp", req: &pb.ExecuteTxsRequest{ - Txs: txs, + TxBatch: mustEncodeTxBatch(t, txs), BlockHeight: blockHeight, PrevStateRoot: prevStateRoot, }, @@ -230,17 +235,28 @@ func TestServer_ExecuteTxs(t *testing.T) { { name: "missing prev state root", req: &pb.ExecuteTxsRequest{ - Txs: txs, + TxBatch: mustEncodeTxBatch(t, txs), BlockHeight: blockHeight, Timestamp: timestamppb.New(timestamp), }, wantErr: true, wantCode: connect.CodeInvalidArgument, }, + { + name: "invalid tx batch", + req: &pb.ExecuteTxsRequest{ + TxBatch: &pb.TxBatch{Data: []byte("tx"), TxSizes: []uint32{3}}, + BlockHeight: blockHeight, + Timestamp: timestamppb.New(timestamp), + PrevStateRoot: prevStateRoot, + }, + wantErr: true, + wantCode: connect.CodeInvalidArgument, + }, { name: "executor error", req: &pb.ExecuteTxsRequest{ - Txs: txs, + TxBatch: mustEncodeTxBatch(t, txs), BlockHeight: blockHeight, Timestamp: timestamppb.New(timestamp), PrevStateRoot: prevStateRoot, @@ -362,3 +378,96 @@ func TestServer_SetFinal(t *testing.T) { }) } } + +func TestServer_FilterTxs(t *testing.T) { + ctx := context.Background() + txs := [][]byte{[]byte("tx1"), []byte("tx2")} + expectedStatuses := []execution.FilterStatus{execution.FilterOK, execution.FilterPostpone} + + tests := []struct { + name string + req *pb.FilterTxsRequest + mockFunc func(ctx context.Context, txs [][]byte, maxBytes, maxGas uint64, hasForceIncludedTransaction bool) ([]execution.FilterStatus, error) + wantErr bool + wantCode connect.Code + }{ + { + name: "success", + req: &pb.FilterTxsRequest{ + TxBatch: mustEncodeTxBatch(t, txs), + MaxBytes: 100, + MaxGas: 200, + HasForceIncludedTransaction: true, + }, + mockFunc: func(ctx context.Context, txsIn [][]byte, maxBytes, maxGas uint64, forced bool) ([]execution.FilterStatus, error) { + if len(txsIn) != len(txs) { + t.Fatalf("expected %d txs, got %d", len(txs), len(txsIn)) + } + if maxBytes != 100 { + t.Fatalf("expected max bytes 100, got %d", maxBytes) + } + if maxGas != 200 { + t.Fatalf("expected max gas 200, got %d", maxGas) + } + if !forced { + t.Fatalf("expected forced transaction flag") + } + return expectedStatuses, nil + }, + wantErr: false, + }, + { + name: "invalid tx batch", + req: &pb.FilterTxsRequest{ + TxBatch: &pb.TxBatch{Data: []byte("tx"), TxSizes: []uint32{3}}, + }, + wantErr: true, + wantCode: connect.CodeInvalidArgument, + }, + { + name: "executor error", + req: &pb.FilterTxsRequest{ + TxBatch: mustEncodeTxBatch(t, txs), + }, + mockFunc: func(ctx context.Context, txs [][]byte, maxBytes, maxGas uint64, hasForceIncludedTransaction bool) ([]execution.FilterStatus, error) { + return nil, errors.New("filter failed") + }, + wantErr: true, + wantCode: connect.CodeInternal, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockExec := &mockExecutor{ + filterTxsFunc: tt.mockFunc, + } + server := NewServer(mockExec) + + req := connect.NewRequest(tt.req) + resp, err := server.FilterTxs(ctx, req) + + if tt.wantErr { + if err == nil { + t.Fatalf("expected error but got none") + } + var connectErr *connect.Error + if errors.As(err, &connectErr) { + if connectErr.Code() != tt.wantCode { + t.Errorf("expected error code %v, got %v", tt.wantCode, connectErr.Code()) + } + } else { + t.Errorf("expected connect error, got %v", err) + } + return + } + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(resp.Msg.Statuses) != len(expectedStatuses) { + t.Fatalf("expected %d statuses, got %d", len(expectedStatuses), len(resp.Msg.Statuses)) + } + }) + } +} diff --git a/execution/grpc/tx_batch.go b/execution/grpc/tx_batch.go new file mode 100644 index 0000000000..2e3e600596 --- /dev/null +++ b/execution/grpc/tx_batch.go @@ -0,0 +1,74 @@ +package grpc + +import ( + "fmt" + + pb "github.com/evstack/ev-node/types/pb/evnode/v1" +) + +// maxTxBatchTxSize is the largest transaction length representable in TxBatch.TxSizes: +// 4 GiB - 1 byte, or 4,294,967,295 bytes. +const maxTxBatchTxSize = uint64(1<<32 - 1) + +func encodeTxBatch(txs [][]byte) (*pb.TxBatch, error) { + if len(txs) == 0 { + return &pb.TxBatch{}, nil + } + + maxInt := uint64(int(^uint(0) >> 1)) + var total uint64 + txSizes := make([]uint32, len(txs)) + for i, tx := range txs { + txLen := uint64(len(tx)) + if txLen > maxTxBatchTxSize { + return nil, fmt.Errorf("tx %d size %d exceeds uint32", i, txLen) + } + total += txLen + if total > maxInt { + return nil, fmt.Errorf("tx batch size %d exceeds int", total) + } + txSizes[i] = uint32(txLen) + } + + data := make([]byte, 0, int(total)) + for _, tx := range txs { + data = append(data, tx...) + } + + return &pb.TxBatch{ + Data: data, + TxSizes: txSizes, + }, nil +} + +func decodeTxBatch(batch *pb.TxBatch) ([][]byte, error) { + if batch == nil { + return nil, nil + } + if len(batch.TxSizes) == 0 { + if len(batch.Data) != 0 { + return nil, fmt.Errorf("tx batch has %d data bytes but no tx sizes", len(batch.Data)) + } + return nil, nil + } + + var total uint64 + for i, txSize := range batch.TxSizes { + total += uint64(txSize) + if total > uint64(len(batch.Data)) { + return nil, fmt.Errorf("tx sizes exceed data length at index %d", i) + } + } + if total != uint64(len(batch.Data)) { + return nil, fmt.Errorf("tx sizes total %d does not match data length %d", total, len(batch.Data)) + } + + txs := make([][]byte, len(batch.TxSizes)) + offset := 0 + for i, txSize := range batch.TxSizes { + end := offset + int(txSize) + txs[i] = batch.Data[offset:end:end] + offset = end + } + return txs, nil +} diff --git a/execution/grpc/tx_batch_test.go b/execution/grpc/tx_batch_test.go new file mode 100644 index 0000000000..6637507e52 --- /dev/null +++ b/execution/grpc/tx_batch_test.go @@ -0,0 +1,69 @@ +package grpc + +import ( + "bytes" + "testing" + + pb "github.com/evstack/ev-node/types/pb/evnode/v1" +) + +func mustEncodeTxBatch(t *testing.T, txs [][]byte) *pb.TxBatch { + t.Helper() + + batch, err := encodeTxBatch(txs) + if err != nil { + t.Fatalf("encode tx batch: %v", err) + } + return batch +} + +func TestEncodeDecodeTxBatch(t *testing.T) { + txs := [][]byte{[]byte("tx1"), nil, []byte("tx3"), []byte{}} + + batch := mustEncodeTxBatch(t, txs) + decoded, err := decodeTxBatch(batch) + if err != nil { + t.Fatalf("decode tx batch: %v", err) + } + if len(decoded) != len(txs) { + t.Fatalf("expected %d txs, got %d", len(txs), len(decoded)) + } + for i := range txs { + if !bytes.Equal(decoded[i], txs[i]) { + t.Fatalf("tx %d: expected %q, got %q", i, txs[i], decoded[i]) + } + } + + decoded[0] = append(decoded[0], 'x') + if !bytes.Equal(decoded[2], txs[2]) { + t.Fatalf("decoded tx slices should not have capacity overlap") + } +} + +func TestDecodeTxBatchRejectsMalformedInput(t *testing.T) { + tests := []struct { + name string + batch *pb.TxBatch + }{ + { + name: "data without sizes", + batch: &pb.TxBatch{Data: []byte("tx")}, + }, + { + name: "sizes exceed data", + batch: &pb.TxBatch{Data: []byte("tx"), TxSizes: []uint32{3}}, + }, + { + name: "sizes do not consume data", + batch: &pb.TxBatch{Data: []byte("tx"), TxSizes: []uint32{1}}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if _, err := decodeTxBatch(tt.batch); err == nil { + t.Fatalf("expected decode error") + } + }) + } +} diff --git a/execution/grpc/unix.go b/execution/grpc/unix.go new file mode 100644 index 0000000000..5eee823c59 --- /dev/null +++ b/execution/grpc/unix.go @@ -0,0 +1,64 @@ +package grpc + +import ( + "errors" + "fmt" + "net" + "net/http" + "os" + + "connectrpc.com/connect" + + "github.com/evstack/ev-node/core/execution" +) + +// ListenUnix creates a Unix domain socket listener for the gRPC execution service. +// +// If socketPath already exists, ListenUnix removes it only when it is a stale +// socket. Regular files, directories, and other path types are left untouched. +func ListenUnix(socketPath string) (net.Listener, error) { + if socketPath == "" { + return nil, errors.New("unix socket path is required") + } + if err := removeStaleUnixSocket(socketPath); err != nil { + return nil, err + } + listener, err := net.Listen("unix", socketPath) + if err != nil { + return nil, fmt.Errorf("listen unix socket %q: %w", socketPath, err) + } + return listener, nil +} + +// ListenAndServeUnix serves the gRPC execution service over a Unix domain socket. +func ListenAndServeUnix(socketPath string, executor execution.Executor, opts ...connect.HandlerOption) error { + listener, err := ListenUnix(socketPath) + if err != nil { + return err + } + defer func() { + _ = listener.Close() + }() + defer func() { + _ = removeStaleUnixSocket(socketPath) + }() + + return http.Serve(listener, NewExecutorServiceHandler(executor, opts...)) +} + +func removeStaleUnixSocket(socketPath string) error { + info, err := os.Lstat(socketPath) + if errors.Is(err, os.ErrNotExist) { + return nil + } + if err != nil { + return fmt.Errorf("stat unix socket %q: %w", socketPath, err) + } + if info.Mode()&os.ModeSocket == 0 { + return fmt.Errorf("refusing to remove non-socket path %q", socketPath) + } + if err := os.Remove(socketPath); err != nil { + return fmt.Errorf("remove stale unix socket %q: %w", socketPath, err) + } + return nil +} diff --git a/execution/grpc/unix_test.go b/execution/grpc/unix_test.go new file mode 100644 index 0000000000..d90133a8e9 --- /dev/null +++ b/execution/grpc/unix_test.go @@ -0,0 +1,59 @@ +package grpc + +import ( + "fmt" + "net" + "os" + "path/filepath" + "strings" + "testing" + "time" +) + +func TestListenUnixRejectsNonSocketPath(t *testing.T) { + socketPath := filepath.Join(t.TempDir(), "executor.sock") + if err := os.WriteFile(socketPath, []byte("not a socket"), 0o600); err != nil { + t.Fatalf("write test file: %v", err) + } + + listener, err := ListenUnix(socketPath) + if err == nil { + _ = listener.Close() + t.Fatal("expected error for non-socket path") + } + if !strings.Contains(err.Error(), "refusing to remove non-socket path") { + t.Fatalf("expected non-socket refusal, got %v", err) + } +} + +func TestListenUnixRemovesStaleSocket(t *testing.T) { + socketPath := testUnixSocketPath(t) + staleListener, err := net.Listen("unix", socketPath) + if err != nil { + t.Fatalf("create stale unix socket: %v", err) + } + if err := staleListener.Close(); err != nil { + t.Fatalf("close stale unix socket: %v", err) + } + + listener, err := ListenUnix(socketPath) + if err != nil { + t.Fatalf("listen unix socket: %v", err) + } + if err := listener.Close(); err != nil { + t.Fatalf("close unix socket: %v", err) + } +} + +func testUnixSocketPath(t *testing.T) string { + t.Helper() + + socketPath := filepath.Join( + os.TempDir(), + fmt.Sprintf("ev-node-grpc-%d-%d.sock", os.Getpid(), time.Now().UnixNano()), + ) + t.Cleanup(func() { + _ = os.Remove(socketPath) + }) + return socketPath +} diff --git a/proto/evnode/v1/execution.proto b/proto/evnode/v1/execution.proto index a3abbea36a..fe2ca90c5f 100644 --- a/proto/evnode/v1/execution.proto +++ b/proto/evnode/v1/execution.proto @@ -49,16 +49,25 @@ message GetTxsRequest { // Empty for now, may include filtering criteria in the future } +// TxBatch stores ordered transactions in one contiguous bytes buffer. +message TxBatch { + // Concatenated transaction bytes. + bytes data = 1; + + // Byte length for each transaction in order. + repeated uint32 tx_sizes = 2; +} + // GetTxsResponse contains the available transactions message GetTxsResponse { - // Slice of valid transactions from mempool - repeated bytes txs = 1; + // Valid transactions from mempool. + TxBatch tx_batch = 1; } // ExecuteTxsRequest contains transactions and block context for execution message ExecuteTxsRequest { - // Ordered list of transactions to execute - repeated bytes txs = 1; + // Ordered transactions to execute. + TxBatch tx_batch = 1; // Height of block being created (must be > 0) uint64 block_height = 2; @@ -112,8 +121,8 @@ enum FilterStatus { // FilterTxsRequest contains transactions to validate and filter message FilterTxsRequest { - // All transactions (force-included + mempool) - repeated bytes txs = 1; + // All transactions (force-included + mempool). + TxBatch tx_batch = 1; // Maximum cumulative size allowed (0 means no size limit) uint64 max_bytes = 2; diff --git a/types/pb/evnode/v1/execution.pb.go b/types/pb/evnode/v1/execution.pb.go index 2b33c910d2..fc6732ad87 100644 --- a/types/pb/evnode/v1/execution.pb.go +++ b/types/pb/evnode/v1/execution.pb.go @@ -222,18 +222,73 @@ func (*GetTxsRequest) Descriptor() ([]byte, []int) { return file_evnode_v1_execution_proto_rawDescGZIP(), []int{2} } +// TxBatch stores ordered transactions in one contiguous bytes buffer. +type TxBatch struct { + state protoimpl.MessageState `protogen:"open.v1"` + // Concatenated transaction bytes. + Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` + // Byte length for each transaction in order. + TxSizes []uint32 `protobuf:"varint,2,rep,packed,name=tx_sizes,json=txSizes,proto3" json:"tx_sizes,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *TxBatch) Reset() { + *x = TxBatch{} + mi := &file_evnode_v1_execution_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *TxBatch) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TxBatch) ProtoMessage() {} + +func (x *TxBatch) ProtoReflect() protoreflect.Message { + mi := &file_evnode_v1_execution_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TxBatch.ProtoReflect.Descriptor instead. +func (*TxBatch) Descriptor() ([]byte, []int) { + return file_evnode_v1_execution_proto_rawDescGZIP(), []int{3} +} + +func (x *TxBatch) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +func (x *TxBatch) GetTxSizes() []uint32 { + if x != nil { + return x.TxSizes + } + return nil +} + // GetTxsResponse contains the available transactions type GetTxsResponse struct { state protoimpl.MessageState `protogen:"open.v1"` - // Slice of valid transactions from mempool - Txs [][]byte `protobuf:"bytes,1,rep,name=txs,proto3" json:"txs,omitempty"` + // Valid transactions from mempool. + TxBatch *TxBatch `protobuf:"bytes,1,opt,name=tx_batch,json=txBatch,proto3" json:"tx_batch,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } func (x *GetTxsResponse) Reset() { *x = GetTxsResponse{} - mi := &file_evnode_v1_execution_proto_msgTypes[3] + mi := &file_evnode_v1_execution_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -245,7 +300,7 @@ func (x *GetTxsResponse) String() string { func (*GetTxsResponse) ProtoMessage() {} func (x *GetTxsResponse) ProtoReflect() protoreflect.Message { - mi := &file_evnode_v1_execution_proto_msgTypes[3] + mi := &file_evnode_v1_execution_proto_msgTypes[4] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -258,12 +313,12 @@ func (x *GetTxsResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use GetTxsResponse.ProtoReflect.Descriptor instead. func (*GetTxsResponse) Descriptor() ([]byte, []int) { - return file_evnode_v1_execution_proto_rawDescGZIP(), []int{3} + return file_evnode_v1_execution_proto_rawDescGZIP(), []int{4} } -func (x *GetTxsResponse) GetTxs() [][]byte { +func (x *GetTxsResponse) GetTxBatch() *TxBatch { if x != nil { - return x.Txs + return x.TxBatch } return nil } @@ -271,8 +326,8 @@ func (x *GetTxsResponse) GetTxs() [][]byte { // ExecuteTxsRequest contains transactions and block context for execution type ExecuteTxsRequest struct { state protoimpl.MessageState `protogen:"open.v1"` - // Ordered list of transactions to execute - Txs [][]byte `protobuf:"bytes,1,rep,name=txs,proto3" json:"txs,omitempty"` + // Ordered transactions to execute. + TxBatch *TxBatch `protobuf:"bytes,1,opt,name=tx_batch,json=txBatch,proto3" json:"tx_batch,omitempty"` // Height of block being created (must be > 0) BlockHeight uint64 `protobuf:"varint,2,opt,name=block_height,json=blockHeight,proto3" json:"block_height,omitempty"` // Block creation time in UTC @@ -285,7 +340,7 @@ type ExecuteTxsRequest struct { func (x *ExecuteTxsRequest) Reset() { *x = ExecuteTxsRequest{} - mi := &file_evnode_v1_execution_proto_msgTypes[4] + mi := &file_evnode_v1_execution_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -297,7 +352,7 @@ func (x *ExecuteTxsRequest) String() string { func (*ExecuteTxsRequest) ProtoMessage() {} func (x *ExecuteTxsRequest) ProtoReflect() protoreflect.Message { - mi := &file_evnode_v1_execution_proto_msgTypes[4] + mi := &file_evnode_v1_execution_proto_msgTypes[5] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -310,12 +365,12 @@ func (x *ExecuteTxsRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use ExecuteTxsRequest.ProtoReflect.Descriptor instead. func (*ExecuteTxsRequest) Descriptor() ([]byte, []int) { - return file_evnode_v1_execution_proto_rawDescGZIP(), []int{4} + return file_evnode_v1_execution_proto_rawDescGZIP(), []int{5} } -func (x *ExecuteTxsRequest) GetTxs() [][]byte { +func (x *ExecuteTxsRequest) GetTxBatch() *TxBatch { if x != nil { - return x.Txs + return x.TxBatch } return nil } @@ -354,7 +409,7 @@ type ExecuteTxsResponse struct { func (x *ExecuteTxsResponse) Reset() { *x = ExecuteTxsResponse{} - mi := &file_evnode_v1_execution_proto_msgTypes[5] + mi := &file_evnode_v1_execution_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -366,7 +421,7 @@ func (x *ExecuteTxsResponse) String() string { func (*ExecuteTxsResponse) ProtoMessage() {} func (x *ExecuteTxsResponse) ProtoReflect() protoreflect.Message { - mi := &file_evnode_v1_execution_proto_msgTypes[5] + mi := &file_evnode_v1_execution_proto_msgTypes[6] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -379,7 +434,7 @@ func (x *ExecuteTxsResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use ExecuteTxsResponse.ProtoReflect.Descriptor instead. func (*ExecuteTxsResponse) Descriptor() ([]byte, []int) { - return file_evnode_v1_execution_proto_rawDescGZIP(), []int{5} + return file_evnode_v1_execution_proto_rawDescGZIP(), []int{6} } func (x *ExecuteTxsResponse) GetUpdatedStateRoot() []byte { @@ -407,7 +462,7 @@ type SetFinalRequest struct { func (x *SetFinalRequest) Reset() { *x = SetFinalRequest{} - mi := &file_evnode_v1_execution_proto_msgTypes[6] + mi := &file_evnode_v1_execution_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -419,7 +474,7 @@ func (x *SetFinalRequest) String() string { func (*SetFinalRequest) ProtoMessage() {} func (x *SetFinalRequest) ProtoReflect() protoreflect.Message { - mi := &file_evnode_v1_execution_proto_msgTypes[6] + mi := &file_evnode_v1_execution_proto_msgTypes[7] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -432,7 +487,7 @@ func (x *SetFinalRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use SetFinalRequest.ProtoReflect.Descriptor instead. func (*SetFinalRequest) Descriptor() ([]byte, []int) { - return file_evnode_v1_execution_proto_rawDescGZIP(), []int{6} + return file_evnode_v1_execution_proto_rawDescGZIP(), []int{7} } func (x *SetFinalRequest) GetBlockHeight() uint64 { @@ -451,7 +506,7 @@ type SetFinalResponse struct { func (x *SetFinalResponse) Reset() { *x = SetFinalResponse{} - mi := &file_evnode_v1_execution_proto_msgTypes[7] + mi := &file_evnode_v1_execution_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -463,7 +518,7 @@ func (x *SetFinalResponse) String() string { func (*SetFinalResponse) ProtoMessage() {} func (x *SetFinalResponse) ProtoReflect() protoreflect.Message { - mi := &file_evnode_v1_execution_proto_msgTypes[7] + mi := &file_evnode_v1_execution_proto_msgTypes[8] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -476,7 +531,7 @@ func (x *SetFinalResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use SetFinalResponse.ProtoReflect.Descriptor instead. func (*SetFinalResponse) Descriptor() ([]byte, []int) { - return file_evnode_v1_execution_proto_rawDescGZIP(), []int{7} + return file_evnode_v1_execution_proto_rawDescGZIP(), []int{8} } // GetExecutionInfoRequest requests execution layer parameters @@ -488,7 +543,7 @@ type GetExecutionInfoRequest struct { func (x *GetExecutionInfoRequest) Reset() { *x = GetExecutionInfoRequest{} - mi := &file_evnode_v1_execution_proto_msgTypes[8] + mi := &file_evnode_v1_execution_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -500,7 +555,7 @@ func (x *GetExecutionInfoRequest) String() string { func (*GetExecutionInfoRequest) ProtoMessage() {} func (x *GetExecutionInfoRequest) ProtoReflect() protoreflect.Message { - mi := &file_evnode_v1_execution_proto_msgTypes[8] + mi := &file_evnode_v1_execution_proto_msgTypes[9] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -513,7 +568,7 @@ func (x *GetExecutionInfoRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use GetExecutionInfoRequest.ProtoReflect.Descriptor instead. func (*GetExecutionInfoRequest) Descriptor() ([]byte, []int) { - return file_evnode_v1_execution_proto_rawDescGZIP(), []int{8} + return file_evnode_v1_execution_proto_rawDescGZIP(), []int{9} } // GetExecutionInfoResponse contains execution layer parameters @@ -528,7 +583,7 @@ type GetExecutionInfoResponse struct { func (x *GetExecutionInfoResponse) Reset() { *x = GetExecutionInfoResponse{} - mi := &file_evnode_v1_execution_proto_msgTypes[9] + mi := &file_evnode_v1_execution_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -540,7 +595,7 @@ func (x *GetExecutionInfoResponse) String() string { func (*GetExecutionInfoResponse) ProtoMessage() {} func (x *GetExecutionInfoResponse) ProtoReflect() protoreflect.Message { - mi := &file_evnode_v1_execution_proto_msgTypes[9] + mi := &file_evnode_v1_execution_proto_msgTypes[10] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -553,7 +608,7 @@ func (x *GetExecutionInfoResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use GetExecutionInfoResponse.ProtoReflect.Descriptor instead. func (*GetExecutionInfoResponse) Descriptor() ([]byte, []int) { - return file_evnode_v1_execution_proto_rawDescGZIP(), []int{9} + return file_evnode_v1_execution_proto_rawDescGZIP(), []int{10} } func (x *GetExecutionInfoResponse) GetMaxGas() uint64 { @@ -566,8 +621,8 @@ func (x *GetExecutionInfoResponse) GetMaxGas() uint64 { // FilterTxsRequest contains transactions to validate and filter type FilterTxsRequest struct { state protoimpl.MessageState `protogen:"open.v1"` - // All transactions (force-included + mempool) - Txs [][]byte `protobuf:"bytes,1,rep,name=txs,proto3" json:"txs,omitempty"` + // All transactions (force-included + mempool). + TxBatch *TxBatch `protobuf:"bytes,1,opt,name=tx_batch,json=txBatch,proto3" json:"tx_batch,omitempty"` // Maximum cumulative size allowed (0 means no size limit) MaxBytes uint64 `protobuf:"varint,2,opt,name=max_bytes,json=maxBytes,proto3" json:"max_bytes,omitempty"` // Maximum cumulative gas allowed (0 means no gas limit) @@ -580,7 +635,7 @@ type FilterTxsRequest struct { func (x *FilterTxsRequest) Reset() { *x = FilterTxsRequest{} - mi := &file_evnode_v1_execution_proto_msgTypes[10] + mi := &file_evnode_v1_execution_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -592,7 +647,7 @@ func (x *FilterTxsRequest) String() string { func (*FilterTxsRequest) ProtoMessage() {} func (x *FilterTxsRequest) ProtoReflect() protoreflect.Message { - mi := &file_evnode_v1_execution_proto_msgTypes[10] + mi := &file_evnode_v1_execution_proto_msgTypes[11] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -605,12 +660,12 @@ func (x *FilterTxsRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use FilterTxsRequest.ProtoReflect.Descriptor instead. func (*FilterTxsRequest) Descriptor() ([]byte, []int) { - return file_evnode_v1_execution_proto_rawDescGZIP(), []int{10} + return file_evnode_v1_execution_proto_rawDescGZIP(), []int{11} } -func (x *FilterTxsRequest) GetTxs() [][]byte { +func (x *FilterTxsRequest) GetTxBatch() *TxBatch { if x != nil { - return x.Txs + return x.TxBatch } return nil } @@ -647,7 +702,7 @@ type FilterTxsResponse struct { func (x *FilterTxsResponse) Reset() { *x = FilterTxsResponse{} - mi := &file_evnode_v1_execution_proto_msgTypes[11] + mi := &file_evnode_v1_execution_proto_msgTypes[12] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -659,7 +714,7 @@ func (x *FilterTxsResponse) String() string { func (*FilterTxsResponse) ProtoMessage() {} func (x *FilterTxsResponse) ProtoReflect() protoreflect.Message { - mi := &file_evnode_v1_execution_proto_msgTypes[11] + mi := &file_evnode_v1_execution_proto_msgTypes[12] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -672,7 +727,7 @@ func (x *FilterTxsResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use FilterTxsResponse.ProtoReflect.Descriptor instead. func (*FilterTxsResponse) Descriptor() ([]byte, []int) { - return file_evnode_v1_execution_proto_rawDescGZIP(), []int{11} + return file_evnode_v1_execution_proto_rawDescGZIP(), []int{12} } func (x *FilterTxsResponse) GetStatuses() []FilterStatus { @@ -694,11 +749,14 @@ const file_evnode_v1_execution_proto_rawDesc = "" + "\x11InitChainResponse\x12\x1d\n" + "\n" + "state_root\x18\x01 \x01(\fR\tstateRoot\"\x0f\n" + - "\rGetTxsRequest\"\"\n" + - "\x0eGetTxsResponse\x12\x10\n" + - "\x03txs\x18\x01 \x03(\fR\x03txs\"\xaa\x01\n" + - "\x11ExecuteTxsRequest\x12\x10\n" + - "\x03txs\x18\x01 \x03(\fR\x03txs\x12!\n" + + "\rGetTxsRequest\"8\n" + + "\aTxBatch\x12\x12\n" + + "\x04data\x18\x01 \x01(\fR\x04data\x12\x19\n" + + "\btx_sizes\x18\x02 \x03(\rR\atxSizes\"?\n" + + "\x0eGetTxsResponse\x12-\n" + + "\btx_batch\x18\x01 \x01(\v2\x12.evnode.v1.TxBatchR\atxBatch\"\xc7\x01\n" + + "\x11ExecuteTxsRequest\x12-\n" + + "\btx_batch\x18\x01 \x01(\v2\x12.evnode.v1.TxBatchR\atxBatch\x12!\n" + "\fblock_height\x18\x02 \x01(\x04R\vblockHeight\x128\n" + "\ttimestamp\x18\x03 \x01(\v2\x1a.google.protobuf.TimestampR\ttimestamp\x12&\n" + "\x0fprev_state_root\x18\x04 \x01(\fR\rprevStateRoot\"_\n" + @@ -710,9 +768,9 @@ const file_evnode_v1_execution_proto_rawDesc = "" + "\x10SetFinalResponse\"\x19\n" + "\x17GetExecutionInfoRequest\"3\n" + "\x18GetExecutionInfoResponse\x12\x17\n" + - "\amax_gas\x18\x01 \x01(\x04R\x06maxGas\"\x9f\x01\n" + - "\x10FilterTxsRequest\x12\x10\n" + - "\x03txs\x18\x01 \x03(\fR\x03txs\x12\x1b\n" + + "\amax_gas\x18\x01 \x01(\x04R\x06maxGas\"\xbc\x01\n" + + "\x10FilterTxsRequest\x12-\n" + + "\btx_batch\x18\x01 \x01(\v2\x12.evnode.v1.TxBatchR\atxBatch\x12\x1b\n" + "\tmax_bytes\x18\x02 \x01(\x04R\bmaxBytes\x12\x17\n" + "\amax_gas\x18\x03 \x01(\x04R\x06maxGas\x12C\n" + "\x1ehas_force_included_transaction\x18\x04 \x01(\bR\x1bhasForceIncludedTransaction\"H\n" + @@ -744,44 +802,48 @@ func file_evnode_v1_execution_proto_rawDescGZIP() []byte { } var file_evnode_v1_execution_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_evnode_v1_execution_proto_msgTypes = make([]protoimpl.MessageInfo, 12) +var file_evnode_v1_execution_proto_msgTypes = make([]protoimpl.MessageInfo, 13) var file_evnode_v1_execution_proto_goTypes = []any{ (FilterStatus)(0), // 0: evnode.v1.FilterStatus (*InitChainRequest)(nil), // 1: evnode.v1.InitChainRequest (*InitChainResponse)(nil), // 2: evnode.v1.InitChainResponse (*GetTxsRequest)(nil), // 3: evnode.v1.GetTxsRequest - (*GetTxsResponse)(nil), // 4: evnode.v1.GetTxsResponse - (*ExecuteTxsRequest)(nil), // 5: evnode.v1.ExecuteTxsRequest - (*ExecuteTxsResponse)(nil), // 6: evnode.v1.ExecuteTxsResponse - (*SetFinalRequest)(nil), // 7: evnode.v1.SetFinalRequest - (*SetFinalResponse)(nil), // 8: evnode.v1.SetFinalResponse - (*GetExecutionInfoRequest)(nil), // 9: evnode.v1.GetExecutionInfoRequest - (*GetExecutionInfoResponse)(nil), // 10: evnode.v1.GetExecutionInfoResponse - (*FilterTxsRequest)(nil), // 11: evnode.v1.FilterTxsRequest - (*FilterTxsResponse)(nil), // 12: evnode.v1.FilterTxsResponse - (*timestamppb.Timestamp)(nil), // 13: google.protobuf.Timestamp + (*TxBatch)(nil), // 4: evnode.v1.TxBatch + (*GetTxsResponse)(nil), // 5: evnode.v1.GetTxsResponse + (*ExecuteTxsRequest)(nil), // 6: evnode.v1.ExecuteTxsRequest + (*ExecuteTxsResponse)(nil), // 7: evnode.v1.ExecuteTxsResponse + (*SetFinalRequest)(nil), // 8: evnode.v1.SetFinalRequest + (*SetFinalResponse)(nil), // 9: evnode.v1.SetFinalResponse + (*GetExecutionInfoRequest)(nil), // 10: evnode.v1.GetExecutionInfoRequest + (*GetExecutionInfoResponse)(nil), // 11: evnode.v1.GetExecutionInfoResponse + (*FilterTxsRequest)(nil), // 12: evnode.v1.FilterTxsRequest + (*FilterTxsResponse)(nil), // 13: evnode.v1.FilterTxsResponse + (*timestamppb.Timestamp)(nil), // 14: google.protobuf.Timestamp } var file_evnode_v1_execution_proto_depIdxs = []int32{ - 13, // 0: evnode.v1.InitChainRequest.genesis_time:type_name -> google.protobuf.Timestamp - 13, // 1: evnode.v1.ExecuteTxsRequest.timestamp:type_name -> google.protobuf.Timestamp - 0, // 2: evnode.v1.FilterTxsResponse.statuses:type_name -> evnode.v1.FilterStatus - 1, // 3: evnode.v1.ExecutorService.InitChain:input_type -> evnode.v1.InitChainRequest - 3, // 4: evnode.v1.ExecutorService.GetTxs:input_type -> evnode.v1.GetTxsRequest - 5, // 5: evnode.v1.ExecutorService.ExecuteTxs:input_type -> evnode.v1.ExecuteTxsRequest - 7, // 6: evnode.v1.ExecutorService.SetFinal:input_type -> evnode.v1.SetFinalRequest - 9, // 7: evnode.v1.ExecutorService.GetExecutionInfo:input_type -> evnode.v1.GetExecutionInfoRequest - 11, // 8: evnode.v1.ExecutorService.FilterTxs:input_type -> evnode.v1.FilterTxsRequest - 2, // 9: evnode.v1.ExecutorService.InitChain:output_type -> evnode.v1.InitChainResponse - 4, // 10: evnode.v1.ExecutorService.GetTxs:output_type -> evnode.v1.GetTxsResponse - 6, // 11: evnode.v1.ExecutorService.ExecuteTxs:output_type -> evnode.v1.ExecuteTxsResponse - 8, // 12: evnode.v1.ExecutorService.SetFinal:output_type -> evnode.v1.SetFinalResponse - 10, // 13: evnode.v1.ExecutorService.GetExecutionInfo:output_type -> evnode.v1.GetExecutionInfoResponse - 12, // 14: evnode.v1.ExecutorService.FilterTxs:output_type -> evnode.v1.FilterTxsResponse - 9, // [9:15] is the sub-list for method output_type - 3, // [3:9] is the sub-list for method input_type - 3, // [3:3] is the sub-list for extension type_name - 3, // [3:3] is the sub-list for extension extendee - 0, // [0:3] is the sub-list for field type_name + 14, // 0: evnode.v1.InitChainRequest.genesis_time:type_name -> google.protobuf.Timestamp + 4, // 1: evnode.v1.GetTxsResponse.tx_batch:type_name -> evnode.v1.TxBatch + 4, // 2: evnode.v1.ExecuteTxsRequest.tx_batch:type_name -> evnode.v1.TxBatch + 14, // 3: evnode.v1.ExecuteTxsRequest.timestamp:type_name -> google.protobuf.Timestamp + 4, // 4: evnode.v1.FilterTxsRequest.tx_batch:type_name -> evnode.v1.TxBatch + 0, // 5: evnode.v1.FilterTxsResponse.statuses:type_name -> evnode.v1.FilterStatus + 1, // 6: evnode.v1.ExecutorService.InitChain:input_type -> evnode.v1.InitChainRequest + 3, // 7: evnode.v1.ExecutorService.GetTxs:input_type -> evnode.v1.GetTxsRequest + 6, // 8: evnode.v1.ExecutorService.ExecuteTxs:input_type -> evnode.v1.ExecuteTxsRequest + 8, // 9: evnode.v1.ExecutorService.SetFinal:input_type -> evnode.v1.SetFinalRequest + 10, // 10: evnode.v1.ExecutorService.GetExecutionInfo:input_type -> evnode.v1.GetExecutionInfoRequest + 12, // 11: evnode.v1.ExecutorService.FilterTxs:input_type -> evnode.v1.FilterTxsRequest + 2, // 12: evnode.v1.ExecutorService.InitChain:output_type -> evnode.v1.InitChainResponse + 5, // 13: evnode.v1.ExecutorService.GetTxs:output_type -> evnode.v1.GetTxsResponse + 7, // 14: evnode.v1.ExecutorService.ExecuteTxs:output_type -> evnode.v1.ExecuteTxsResponse + 9, // 15: evnode.v1.ExecutorService.SetFinal:output_type -> evnode.v1.SetFinalResponse + 11, // 16: evnode.v1.ExecutorService.GetExecutionInfo:output_type -> evnode.v1.GetExecutionInfoResponse + 13, // 17: evnode.v1.ExecutorService.FilterTxs:output_type -> evnode.v1.FilterTxsResponse + 12, // [12:18] is the sub-list for method output_type + 6, // [6:12] is the sub-list for method input_type + 6, // [6:6] is the sub-list for extension type_name + 6, // [6:6] is the sub-list for extension extendee + 0, // [0:6] is the sub-list for field type_name } func init() { file_evnode_v1_execution_proto_init() } @@ -795,7 +857,7 @@ func file_evnode_v1_execution_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_evnode_v1_execution_proto_rawDesc), len(file_evnode_v1_execution_proto_rawDesc)), NumEnums: 1, - NumMessages: 12, + NumMessages: 13, NumExtensions: 0, NumServices: 1, }, From 58a300240547db88e3d43c6697578ecda909ff6a Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Tue, 28 Apr 2026 17:28:53 +0200 Subject: [PATCH 2/3] redo proto --- execution/grpc/README.md | 2 +- execution/grpc/client.go | 2 +- execution/grpc/server.go | 4 +- execution/grpc/server_test.go | 31 +++++++++++++ execution/grpc/tx_batch.go | 11 +++++ execution/grpc/tx_batch_test.go | 33 ++++++++++++++ proto/evnode/v1/execution.proto | 15 +++++-- types/pb/evnode/v1/execution.pb.go | 72 +++++++++++++++++++++--------- 8 files changed, 142 insertions(+), 28 deletions(-) diff --git a/execution/grpc/README.md b/execution/grpc/README.md index 3216926f97..5d6fe9f08a 100644 --- a/execution/grpc/README.md +++ b/execution/grpc/README.md @@ -71,7 +71,7 @@ The gRPC service is defined in `proto/evnode/v1/execution.proto` and provides th - Support for Unix domain socket connections with `unix:///path/to/socket` - gRPC reflection for debugging and service discovery - Compression for efficient data transfer -- Contiguous transaction batch encoding to reduce per-transaction protobuf overhead +- Additive contiguous transaction batch encoding to reduce per-transaction protobuf overhead while keeping legacy `txs` fields in the protobuf API - Comprehensive error handling and validation ## Testing diff --git a/execution/grpc/client.go b/execution/grpc/client.go index 2769d681f3..6406f63d09 100644 --- a/execution/grpc/client.go +++ b/execution/grpc/client.go @@ -130,7 +130,7 @@ func (c *Client) GetTxs(ctx context.Context) ([][]byte, error) { return nil, fmt.Errorf("grpc client: failed to get txs: %w", err) } - txs, err := decodeTxBatch(resp.Msg.TxBatch) + txs, err := decodeTxBatchOrTxs(resp.Msg.TxBatch, resp.Msg.Txs) if err != nil { return nil, fmt.Errorf("grpc client: invalid get txs response: %w", err) } diff --git a/execution/grpc/server.go b/execution/grpc/server.go index 8f1cbb48ce..781258cd05 100644 --- a/execution/grpc/server.go +++ b/execution/grpc/server.go @@ -107,7 +107,7 @@ func (s *Server) ExecuteTxs( return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("prev_state_root is required")) } - txs, err := decodeTxBatch(req.Msg.TxBatch) + txs, err := decodeTxBatchOrTxs(req.Msg.TxBatch, req.Msg.Txs) if err != nil { return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid tx_batch: %w", err)) } @@ -172,7 +172,7 @@ func (s *Server) FilterTxs( ctx context.Context, req *connect.Request[pb.FilterTxsRequest], ) (*connect.Response[pb.FilterTxsResponse], error) { - txs, err := decodeTxBatch(req.Msg.TxBatch) + txs, err := decodeTxBatchOrTxs(req.Msg.TxBatch, req.Msg.Txs) if err != nil { return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid tx_batch: %w", err)) } diff --git a/execution/grpc/server_test.go b/execution/grpc/server_test.go index 2219d47f47..af84bf03b9 100644 --- a/execution/grpc/server_test.go +++ b/execution/grpc/server_test.go @@ -212,6 +212,22 @@ func TestServer_ExecuteTxs(t *testing.T) { }, wantErr: false, }, + { + name: "legacy txs fallback", + req: &pb.ExecuteTxsRequest{ + Txs: txs, + BlockHeight: blockHeight, + Timestamp: timestamppb.New(timestamp), + PrevStateRoot: prevStateRoot, + }, + mockFunc: func(ctx context.Context, txsIn [][]byte, bh uint64, ts time.Time, psr []byte) ([]byte, error) { + if len(txsIn) != len(txs) { + t.Fatalf("expected %d txs, got %d", len(txs), len(txsIn)) + } + return expectedStateRoot, nil + }, + wantErr: false, + }, { name: "missing block height", req: &pb.ExecuteTxsRequest{ @@ -416,6 +432,21 @@ func TestServer_FilterTxs(t *testing.T) { }, wantErr: false, }, + { + name: "legacy txs fallback", + req: &pb.FilterTxsRequest{ + Txs: txs, + MaxBytes: 100, + MaxGas: 200, + }, + mockFunc: func(ctx context.Context, txsIn [][]byte, maxBytes, maxGas uint64, forced bool) ([]execution.FilterStatus, error) { + if len(txsIn) != len(txs) { + t.Fatalf("expected %d txs, got %d", len(txs), len(txsIn)) + } + return expectedStatuses, nil + }, + wantErr: false, + }, { name: "invalid tx batch", req: &pb.FilterTxsRequest{ diff --git a/execution/grpc/tx_batch.go b/execution/grpc/tx_batch.go index 2e3e600596..feab69ba7f 100644 --- a/execution/grpc/tx_batch.go +++ b/execution/grpc/tx_batch.go @@ -72,3 +72,14 @@ func decodeTxBatch(batch *pb.TxBatch) ([][]byte, error) { } return txs, nil } + +func decodeTxBatchOrTxs(batch *pb.TxBatch, txs [][]byte) ([][]byte, error) { + if hasTxBatchData(batch) { + return decodeTxBatch(batch) + } + return txs, nil +} + +func hasTxBatchData(batch *pb.TxBatch) bool { + return batch != nil && (len(batch.Data) != 0 || len(batch.TxSizes) != 0) +} diff --git a/execution/grpc/tx_batch_test.go b/execution/grpc/tx_batch_test.go index 6637507e52..08ff6ddf54 100644 --- a/execution/grpc/tx_batch_test.go +++ b/execution/grpc/tx_batch_test.go @@ -67,3 +67,36 @@ func TestDecodeTxBatchRejectsMalformedInput(t *testing.T) { }) } } + +func TestDecodeTxBatchOrTxsFallsBackToLegacyTxs(t *testing.T) { + legacyTxs := [][]byte{[]byte("legacy1"), []byte("legacy2")} + + txs, err := decodeTxBatchOrTxs(nil, legacyTxs) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(txs) != len(legacyTxs) { + t.Fatalf("expected %d txs, got %d", len(legacyTxs), len(txs)) + } + for i := range txs { + if !bytes.Equal(txs[i], legacyTxs[i]) { + t.Fatalf("tx %d: expected %q, got %q", i, legacyTxs[i], txs[i]) + } + } +} + +func TestDecodeTxBatchOrTxsPrefersTxBatch(t *testing.T) { + batchTxs := [][]byte{[]byte("batch")} + legacyTxs := [][]byte{[]byte("legacy")} + + txs, err := decodeTxBatchOrTxs(mustEncodeTxBatch(t, batchTxs), legacyTxs) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(txs) != len(batchTxs) { + t.Fatalf("expected %d txs, got %d", len(batchTxs), len(txs)) + } + if !bytes.Equal(txs[0], batchTxs[0]) { + t.Fatalf("expected tx_batch %q, got %q", batchTxs[0], txs[0]) + } +} diff --git a/proto/evnode/v1/execution.proto b/proto/evnode/v1/execution.proto index fe2ca90c5f..a8cf48565c 100644 --- a/proto/evnode/v1/execution.proto +++ b/proto/evnode/v1/execution.proto @@ -61,13 +61,16 @@ message TxBatch { // GetTxsResponse contains the available transactions message GetTxsResponse { // Valid transactions from mempool. - TxBatch tx_batch = 1; + repeated bytes txs = 1; + + // Valid transactions from mempool in contiguous batch form. + TxBatch tx_batch = 2; } // ExecuteTxsRequest contains transactions and block context for execution message ExecuteTxsRequest { // Ordered transactions to execute. - TxBatch tx_batch = 1; + repeated bytes txs = 1; // Height of block being created (must be > 0) uint64 block_height = 2; @@ -77,6 +80,9 @@ message ExecuteTxsRequest { // Previous block's state root hash bytes prev_state_root = 4; + + // Ordered transactions to execute in contiguous batch form. + TxBatch tx_batch = 5; } // ExecuteTxsResponse contains the result of transaction execution @@ -122,7 +128,7 @@ enum FilterStatus { // FilterTxsRequest contains transactions to validate and filter message FilterTxsRequest { // All transactions (force-included + mempool). - TxBatch tx_batch = 1; + repeated bytes txs = 1; // Maximum cumulative size allowed (0 means no size limit) uint64 max_bytes = 2; @@ -132,6 +138,9 @@ message FilterTxsRequest { // Whether force-included transactions are present bool has_force_included_transaction = 4; + + // All transactions (force-included + mempool) in contiguous batch form. + TxBatch tx_batch = 5; } // FilterTxsResponse contains the filter status for each transaction diff --git a/types/pb/evnode/v1/execution.pb.go b/types/pb/evnode/v1/execution.pb.go index fc6732ad87..eca22291fa 100644 --- a/types/pb/evnode/v1/execution.pb.go +++ b/types/pb/evnode/v1/execution.pb.go @@ -281,7 +281,9 @@ func (x *TxBatch) GetTxSizes() []uint32 { type GetTxsResponse struct { state protoimpl.MessageState `protogen:"open.v1"` // Valid transactions from mempool. - TxBatch *TxBatch `protobuf:"bytes,1,opt,name=tx_batch,json=txBatch,proto3" json:"tx_batch,omitempty"` + Txs [][]byte `protobuf:"bytes,1,rep,name=txs,proto3" json:"txs,omitempty"` + // Valid transactions from mempool in contiguous batch form. + TxBatch *TxBatch `protobuf:"bytes,2,opt,name=tx_batch,json=txBatch,proto3" json:"tx_batch,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -316,6 +318,13 @@ func (*GetTxsResponse) Descriptor() ([]byte, []int) { return file_evnode_v1_execution_proto_rawDescGZIP(), []int{4} } +func (x *GetTxsResponse) GetTxs() [][]byte { + if x != nil { + return x.Txs + } + return nil +} + func (x *GetTxsResponse) GetTxBatch() *TxBatch { if x != nil { return x.TxBatch @@ -327,13 +336,15 @@ func (x *GetTxsResponse) GetTxBatch() *TxBatch { type ExecuteTxsRequest struct { state protoimpl.MessageState `protogen:"open.v1"` // Ordered transactions to execute. - TxBatch *TxBatch `protobuf:"bytes,1,opt,name=tx_batch,json=txBatch,proto3" json:"tx_batch,omitempty"` + Txs [][]byte `protobuf:"bytes,1,rep,name=txs,proto3" json:"txs,omitempty"` // Height of block being created (must be > 0) BlockHeight uint64 `protobuf:"varint,2,opt,name=block_height,json=blockHeight,proto3" json:"block_height,omitempty"` // Block creation time in UTC Timestamp *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"` // Previous block's state root hash PrevStateRoot []byte `protobuf:"bytes,4,opt,name=prev_state_root,json=prevStateRoot,proto3" json:"prev_state_root,omitempty"` + // Ordered transactions to execute in contiguous batch form. + TxBatch *TxBatch `protobuf:"bytes,5,opt,name=tx_batch,json=txBatch,proto3" json:"tx_batch,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -368,9 +379,9 @@ func (*ExecuteTxsRequest) Descriptor() ([]byte, []int) { return file_evnode_v1_execution_proto_rawDescGZIP(), []int{5} } -func (x *ExecuteTxsRequest) GetTxBatch() *TxBatch { +func (x *ExecuteTxsRequest) GetTxs() [][]byte { if x != nil { - return x.TxBatch + return x.Txs } return nil } @@ -396,6 +407,13 @@ func (x *ExecuteTxsRequest) GetPrevStateRoot() []byte { return nil } +func (x *ExecuteTxsRequest) GetTxBatch() *TxBatch { + if x != nil { + return x.TxBatch + } + return nil +} + // ExecuteTxsResponse contains the result of transaction execution type ExecuteTxsResponse struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -622,15 +640,17 @@ func (x *GetExecutionInfoResponse) GetMaxGas() uint64 { type FilterTxsRequest struct { state protoimpl.MessageState `protogen:"open.v1"` // All transactions (force-included + mempool). - TxBatch *TxBatch `protobuf:"bytes,1,opt,name=tx_batch,json=txBatch,proto3" json:"tx_batch,omitempty"` + Txs [][]byte `protobuf:"bytes,1,rep,name=txs,proto3" json:"txs,omitempty"` // Maximum cumulative size allowed (0 means no size limit) MaxBytes uint64 `protobuf:"varint,2,opt,name=max_bytes,json=maxBytes,proto3" json:"max_bytes,omitempty"` // Maximum cumulative gas allowed (0 means no gas limit) MaxGas uint64 `protobuf:"varint,3,opt,name=max_gas,json=maxGas,proto3" json:"max_gas,omitempty"` // Whether force-included transactions are present HasForceIncludedTransaction bool `protobuf:"varint,4,opt,name=has_force_included_transaction,json=hasForceIncludedTransaction,proto3" json:"has_force_included_transaction,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + // All transactions (force-included + mempool) in contiguous batch form. + TxBatch *TxBatch `protobuf:"bytes,5,opt,name=tx_batch,json=txBatch,proto3" json:"tx_batch,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *FilterTxsRequest) Reset() { @@ -663,9 +683,9 @@ func (*FilterTxsRequest) Descriptor() ([]byte, []int) { return file_evnode_v1_execution_proto_rawDescGZIP(), []int{11} } -func (x *FilterTxsRequest) GetTxBatch() *TxBatch { +func (x *FilterTxsRequest) GetTxs() [][]byte { if x != nil { - return x.TxBatch + return x.Txs } return nil } @@ -691,6 +711,13 @@ func (x *FilterTxsRequest) GetHasForceIncludedTransaction() bool { return false } +func (x *FilterTxsRequest) GetTxBatch() *TxBatch { + if x != nil { + return x.TxBatch + } + return nil +} + // FilterTxsResponse contains the filter status for each transaction type FilterTxsResponse struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -752,14 +779,16 @@ const file_evnode_v1_execution_proto_rawDesc = "" + "\rGetTxsRequest\"8\n" + "\aTxBatch\x12\x12\n" + "\x04data\x18\x01 \x01(\fR\x04data\x12\x19\n" + - "\btx_sizes\x18\x02 \x03(\rR\atxSizes\"?\n" + - "\x0eGetTxsResponse\x12-\n" + - "\btx_batch\x18\x01 \x01(\v2\x12.evnode.v1.TxBatchR\atxBatch\"\xc7\x01\n" + - "\x11ExecuteTxsRequest\x12-\n" + - "\btx_batch\x18\x01 \x01(\v2\x12.evnode.v1.TxBatchR\atxBatch\x12!\n" + + "\btx_sizes\x18\x02 \x03(\rR\atxSizes\"Q\n" + + "\x0eGetTxsResponse\x12\x10\n" + + "\x03txs\x18\x01 \x03(\fR\x03txs\x12-\n" + + "\btx_batch\x18\x02 \x01(\v2\x12.evnode.v1.TxBatchR\atxBatch\"\xd9\x01\n" + + "\x11ExecuteTxsRequest\x12\x10\n" + + "\x03txs\x18\x01 \x03(\fR\x03txs\x12!\n" + "\fblock_height\x18\x02 \x01(\x04R\vblockHeight\x128\n" + "\ttimestamp\x18\x03 \x01(\v2\x1a.google.protobuf.TimestampR\ttimestamp\x12&\n" + - "\x0fprev_state_root\x18\x04 \x01(\fR\rprevStateRoot\"_\n" + + "\x0fprev_state_root\x18\x04 \x01(\fR\rprevStateRoot\x12-\n" + + "\btx_batch\x18\x05 \x01(\v2\x12.evnode.v1.TxBatchR\atxBatch\"_\n" + "\x12ExecuteTxsResponse\x12,\n" + "\x12updated_state_root\x18\x01 \x01(\fR\x10updatedStateRoot\x12\x1b\n" + "\tmax_bytes\x18\x02 \x01(\x04R\bmaxBytes\"4\n" + @@ -768,12 +797,13 @@ const file_evnode_v1_execution_proto_rawDesc = "" + "\x10SetFinalResponse\"\x19\n" + "\x17GetExecutionInfoRequest\"3\n" + "\x18GetExecutionInfoResponse\x12\x17\n" + - "\amax_gas\x18\x01 \x01(\x04R\x06maxGas\"\xbc\x01\n" + - "\x10FilterTxsRequest\x12-\n" + - "\btx_batch\x18\x01 \x01(\v2\x12.evnode.v1.TxBatchR\atxBatch\x12\x1b\n" + + "\amax_gas\x18\x01 \x01(\x04R\x06maxGas\"\xce\x01\n" + + "\x10FilterTxsRequest\x12\x10\n" + + "\x03txs\x18\x01 \x03(\fR\x03txs\x12\x1b\n" + "\tmax_bytes\x18\x02 \x01(\x04R\bmaxBytes\x12\x17\n" + "\amax_gas\x18\x03 \x01(\x04R\x06maxGas\x12C\n" + - "\x1ehas_force_included_transaction\x18\x04 \x01(\bR\x1bhasForceIncludedTransaction\"H\n" + + "\x1ehas_force_included_transaction\x18\x04 \x01(\bR\x1bhasForceIncludedTransaction\x12-\n" + + "\btx_batch\x18\x05 \x01(\v2\x12.evnode.v1.TxBatchR\atxBatch\"H\n" + "\x11FilterTxsResponse\x123\n" + "\bstatuses\x18\x01 \x03(\x0e2\x17.evnode.v1.FilterStatusR\bstatuses*E\n" + "\fFilterStatus\x12\r\n" + @@ -823,8 +853,8 @@ var file_evnode_v1_execution_proto_goTypes = []any{ var file_evnode_v1_execution_proto_depIdxs = []int32{ 14, // 0: evnode.v1.InitChainRequest.genesis_time:type_name -> google.protobuf.Timestamp 4, // 1: evnode.v1.GetTxsResponse.tx_batch:type_name -> evnode.v1.TxBatch - 4, // 2: evnode.v1.ExecuteTxsRequest.tx_batch:type_name -> evnode.v1.TxBatch - 14, // 3: evnode.v1.ExecuteTxsRequest.timestamp:type_name -> google.protobuf.Timestamp + 14, // 2: evnode.v1.ExecuteTxsRequest.timestamp:type_name -> google.protobuf.Timestamp + 4, // 3: evnode.v1.ExecuteTxsRequest.tx_batch:type_name -> evnode.v1.TxBatch 4, // 4: evnode.v1.FilterTxsRequest.tx_batch:type_name -> evnode.v1.TxBatch 0, // 5: evnode.v1.FilterTxsResponse.statuses:type_name -> evnode.v1.FilterStatus 1, // 6: evnode.v1.ExecutorService.InitChain:input_type -> evnode.v1.InitChainRequest From 10f09feb5c601cc7cccfa77d5af15efd62d8926a Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Tue, 28 Apr 2026 17:34:19 +0200 Subject: [PATCH 3/3] docs: update changelog for grpc execution transport --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0097fb829b..fe6e10762d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changes - Optimization of mutex usage in cache for reaper [#3286](https://github.com/evstack/ev-node/pull/3286) +- Add Unix domain socket support for gRPC execution endpoints via `unix:///path/to/socket`. +- Add additive `tx_batch` fields for gRPC execution transaction payloads so Go clients and servers can use contiguous transaction buffers while retaining legacy `txs` compatibility. ## v1.1.1