Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changes

- Optimization of mutex usage in cache for reaper [#3286](https://github.com/evstack/ev-node/pull/3286)
- Add Unix domain socket support for gRPC execution endpoints via `unix:///path/to/socket`.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing PR name.

- Add additive `tx_batch` fields for gRPC execution transaction payloads so Go clients and servers can use contiguous transaction buffers while retaining legacy `txs` compatibility.

## v1.1.1

Expand Down
15 changes: 12 additions & 3 deletions apps/grpc/README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# gRPC Single Sequencer App

This application runs a Evolve node with a single sequencer that connects to a remote execution client via gRPC. It allows you to use any execution layer that implements the Evolve execution gRPC interface.
This application runs a Evolve node with a single sequencer that connects to an execution client via gRPC. It allows you to use any execution layer that implements the Evolve execution gRPC interface.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix article in intro sentence.

Use “an Evolve node” instead of “a Evolve node” for grammatical correctness.

✏️ Proposed doc fix
-This application runs a Evolve node with a single sequencer that connects to an execution client via gRPC. It allows you to use any execution layer that implements the Evolve execution gRPC interface.
+This application runs an Evolve node with a single sequencer that connects to an execution client via gRPC. It allows you to use any execution layer that implements the Evolve execution gRPC interface.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
This application runs a Evolve node with a single sequencer that connects to an execution client via gRPC. It allows you to use any execution layer that implements the Evolve execution gRPC interface.
This application runs an Evolve node with a single sequencer that connects to an execution client via gRPC. It allows you to use any execution layer that implements the Evolve execution gRPC interface.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/grpc/README.md` at line 3, Update the README intro sentence to use the
correct indefinite article: replace the phrase "a Evolve node" with "an Evolve
node" in the README content so the sentence reads "This application runs an
Evolve node with a single sequencer..." (look for the README sentence containing
"a Evolve node" to make the change).


## Overview

The gRPC single sequencer app provides:

- A Evolve consensus node with single sequencer
- Connection to remote execution clients via gRPC
- Connection to execution clients via TCP or Unix domain socket gRPC
- Full data availability layer integration
- P2P networking capabilities

Expand Down Expand Up @@ -58,11 +58,20 @@ Start the Evolve node with:
--da.auth-token your-da-token
```

For a same-machine executor, use a Unix domain socket endpoint:

```bash
./evgrpc start \
--root-dir ~/.evgrpc \
--grpc-executor-url unix:///tmp/evolve-executor.sock \
--da.address http://localhost:7980
```

## Command-Line Flags

### gRPC-specific Flags

- `--grpc-executor-url`: URL of the gRPC execution service (default: `http://localhost:50051`)
- `--grpc-executor-url`: URL of the gRPC execution service, either `http://host:port` or `unix:///path/to/socket` (default: `http://localhost:50051`)

### Common Evolve Flags

Expand Down
4 changes: 2 additions & 2 deletions apps/grpc/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

const (
grpcDbName = "grpc-single"
// FlagGrpcExecutorURL is the flag for the gRPC executor endpoint
// FlagGrpcExecutorURL is the flag for the gRPC executor endpoint.
FlagGrpcExecutorURL = "grpc-executor-url"
)

Expand Down Expand Up @@ -169,5 +169,5 @@ func createGRPCExecutionClient(cmd *cobra.Command) (execution.Executor, error) {

// addGRPCFlags adds flags specific to the gRPC execution client
func addGRPCFlags(cmd *cobra.Command) {
cmd.Flags().String(FlagGrpcExecutorURL, "http://localhost:50051", "URL of the gRPC execution service")
cmd.Flags().String(FlagGrpcExecutorURL, "http://localhost:50051", "URL of the gRPC execution service, or unix:///path/to/executor.sock")
}
23 changes: 19 additions & 4 deletions execution/grpc/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# gRPC Execution Client

This package provides a gRPC-based implementation of the Evolve execution interface. It allows Evolve to communicate with remote execution clients via gRPC using the Connect-RPC framework.
This package provides a gRPC-based implementation of the Evolve execution interface. It allows Evolve to communicate with execution clients via gRPC using the Connect-RPC framework.

## Overview

The gRPC execution client enables separation between the consensus layer (Evolve) and the execution layer by providing a network interface for communication. This allows execution clients to run in separate processes or even on different machines.
The gRPC execution client enables separation between the consensus layer (Evolve) and the execution layer by providing a process boundary for communication. Execution clients can run on different machines over TCP, or on the same machine over a Unix domain socket to avoid TCP/IP overhead.

## Usage

Expand All @@ -17,12 +17,15 @@ import (
"github.com/evstack/ev-node/execution/grpc"
)

// Create a new gRPC client
// Create a new gRPC client over TCP
client := grpc.NewClient("http://localhost:50051")

// Or connect to an executor on the same machine over a Unix domain socket
client := grpc.NewClient("unix:///tmp/evolve-executor.sock")

// Use the client as an execution.Executor
ctx := context.Background()
stateRoot, maxBytes, err := client.InitChain(ctx, time.Now(), 1, "my-chain")
stateRoot, err := client.InitChain(ctx, time.Now(), 1, "my-chain")
```

### Server
Expand All @@ -42,6 +45,14 @@ handler := grpc.NewExecutorServiceHandler(myExecutor)
http.ListenAndServe(":50051", handler)
```

To serve on a Unix domain socket:

```go
import "github.com/evstack/ev-node/execution/grpc"

err := grpc.ListenAndServeUnix("/tmp/evolve-executor.sock", myExecutor)
```

## Protocol

The gRPC service is defined in `proto/evnode/v1/execution.proto` and provides the following methods:
Expand All @@ -50,13 +61,17 @@ The gRPC service is defined in `proto/evnode/v1/execution.proto` and provides th
- `GetTxs`: Fetch transactions from the mempool
- `ExecuteTxs`: Execute transactions and update state
- `SetFinal`: Mark a block as finalized
- `GetExecutionInfo`: Return current execution limits
- `FilterTxs`: Validate and filter force-included transactions

## Features

- Full implementation of the `execution.Executor` interface
- Support for HTTP/1.1 and HTTP/2 (via h2c)
- Support for Unix domain socket connections with `unix:///path/to/socket`
- gRPC reflection for debugging and service discovery
- Compression for efficient data transfer
- Additive contiguous transaction batch encoding to reduce per-transaction protobuf overhead while keeping legacy `txs` fields in the protobuf API
- Comprehensive error handling and validation

## Testing
Expand Down
66 changes: 60 additions & 6 deletions execution/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package grpc
import (
"context"
"crypto/tls"
"errors"
"fmt"
"net"
"net/http"
"strings"
"time"

"connectrpc.com/connect"
Expand All @@ -26,6 +28,11 @@ type Client struct {
client v1connect.ExecutorServiceClient
}

const (
unixURLPrefix = "unix://"
unixHTTPBaseURL = "http://unix"
)

// newHTTP2Client creates an HTTP/2 client that supports cleartext (h2c) connections.
// This is required to connect to native gRPC servers without TLS.
func newHTTP2Client() *http.Client {
Expand All @@ -40,21 +47,53 @@ func newHTTP2Client() *http.Client {
}
}

// newUnixHTTP2Client creates an HTTP/2 client that speaks h2c over a Unix domain socket.
func newUnixHTTP2Client(socketPath string) *http.Client {
return &http.Client{
Transport: &http2.Transport{
AllowHTTP: true,
DialTLSContext: func(ctx context.Context, _, _ string, _ *tls.Config) (net.Conn, error) {
if socketPath == "" {
return nil, errors.New("unix socket path is required")
}
var d net.Dialer
return d.DialContext(ctx, "unix", socketPath)
},
},
}
}

func clientTransportForTarget(target string) (*http.Client, string) {
socketPath, ok := unixSocketPath(target)
if ok {
return newUnixHTTP2Client(socketPath), unixHTTPBaseURL
}
return newHTTP2Client(), target
}

func unixSocketPath(target string) (string, bool) {
if !strings.HasPrefix(target, unixURLPrefix) {
return "", false
}
return strings.TrimPrefix(target, unixURLPrefix), true
}

// NewClient creates a new gRPC execution client.
//
// Parameters:
// - url: The URL of the gRPC server (e.g., "http://localhost:50051")
// - url: The URL of the gRPC server (e.g., "http://localhost:50051" or "unix:///tmp/executor.sock")
// - opts: Optional Connect client options for configuring the connection
//
// Returns:
// - *Client: The initialized gRPC client
func NewClient(url string, opts ...connect.ClientOption) *Client {
// Prepend WithGRPC to use the native gRPC protocol (required for tonic/gRPC servers)
opts = append([]connect.ClientOption{connect.WithGRPC()}, opts...)
httpClient, targetURL := clientTransportForTarget(url)
return &Client{
client: v1connect.NewExecutorServiceClient(
newHTTP2Client(),
url,
httpClient,
targetURL,
opts...,
),
}
Expand Down Expand Up @@ -91,7 +130,12 @@ func (c *Client) GetTxs(ctx context.Context) ([][]byte, error) {
return nil, fmt.Errorf("grpc client: failed to get txs: %w", err)
}

return resp.Msg.Txs, nil
txs, err := decodeTxBatchOrTxs(resp.Msg.TxBatch, resp.Msg.Txs)
if err != nil {
return nil, fmt.Errorf("grpc client: invalid get txs response: %w", err)
}

return txs, nil
}

// ExecuteTxs processes transactions to produce a new block state.
Expand All @@ -100,8 +144,13 @@ func (c *Client) GetTxs(ctx context.Context) ([][]byte, error) {
// returns the updated state root after execution. The execution service ensures
// deterministic execution and validates the state transition.
func (c *Client) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint64, timestamp time.Time, prevStateRoot []byte) (updatedStateRoot []byte, err error) {
txBatch, err := encodeTxBatch(txs)
if err != nil {
return nil, fmt.Errorf("grpc client: failed to encode tx batch: %w", err)
}

req := connect.NewRequest(&pb.ExecuteTxsRequest{
Txs: txs,
TxBatch: txBatch,
BlockHeight: blockHeight,
Timestamp: timestamppb.New(timestamp),
PrevStateRoot: prevStateRoot,
Expand Down Expand Up @@ -154,8 +203,13 @@ func (c *Client) GetExecutionInfo(ctx context.Context) (execution.ExecutionInfo,
// This method sends transactions to the remote execution service for validation.
// Returns a slice of FilterStatus for each transaction.
func (c *Client) FilterTxs(ctx context.Context, txs [][]byte, maxBytes, maxGas uint64, hasForceIncludedTransaction bool) ([]execution.FilterStatus, error) {
txBatch, err := encodeTxBatch(txs)
if err != nil {
return nil, fmt.Errorf("grpc client: failed to encode tx batch: %w", err)
}

req := connect.NewRequest(&pb.FilterTxsRequest{
Txs: txs,
TxBatch: txBatch,
MaxBytes: maxBytes,
MaxGas: maxGas,
HasForceIncludedTransaction: hasForceIncludedTransaction,
Expand Down
117 changes: 117 additions & 0 deletions execution/grpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package grpc

import (
"context"
"errors"
"net"
"net/http"
"net/http/httptest"
"testing"
"time"
Expand Down Expand Up @@ -214,3 +217,117 @@ func TestClient_SetFinal(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
}

func TestClient_FilterTxs(t *testing.T) {
ctx := context.Background()
txs := [][]byte{[]byte("tx1"), []byte{}, []byte("tx3")}
maxBytes := uint64(100)
maxGas := uint64(200)
hasForced := true
expectedStatuses := []execution.FilterStatus{
execution.FilterOK,
execution.FilterRemove,
execution.FilterPostpone,
}

mockExec := &mockExecutor{
filterTxsFunc: func(ctx context.Context, txsIn [][]byte, mb, mg uint64, forced bool) ([]execution.FilterStatus, error) {
if len(txsIn) != len(txs) {
t.Fatalf("expected %d txs, got %d", len(txs), len(txsIn))
}
for i, tx := range txsIn {
if string(tx) != string(txs[i]) {
t.Fatalf("tx %d: expected %q, got %q", i, txs[i], tx)
}
}
if mb != maxBytes {
t.Fatalf("expected max bytes %d, got %d", maxBytes, mb)
}
if mg != maxGas {
t.Fatalf("expected max gas %d, got %d", maxGas, mg)
}
if forced != hasForced {
t.Fatalf("expected forced=%t, got %t", hasForced, forced)
}
return expectedStatuses, nil
},
}

handler := NewExecutorServiceHandler(mockExec)
server := httptest.NewServer(handler)
defer server.Close()

client := NewClient(server.URL)

statuses, err := client.FilterTxs(ctx, txs, maxBytes, maxGas, hasForced)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(statuses) != len(expectedStatuses) {
t.Fatalf("expected %d statuses, got %d", len(expectedStatuses), len(statuses))
}
for i, status := range statuses {
if status != expectedStatuses[i] {
t.Fatalf("status %d: expected %v, got %v", i, expectedStatuses[i], status)
}
}
}

func TestClient_UnixSocket(t *testing.T) {
ctx := context.Background()
socketPath := testUnixSocketPath(t)
expectedTxs := [][]byte{[]byte("tx1"), []byte("tx2")}

mockExec := &mockExecutor{
getTxsFunc: func(ctx context.Context) ([][]byte, error) {
return expectedTxs, nil
},
}

startUnixTestServer(t, mockExec, socketPath)

client := NewClient("unix://" + socketPath)
txs, err := client.GetTxs(ctx)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(txs) != len(expectedTxs) {
t.Fatalf("expected %d txs, got %d", len(expectedTxs), len(txs))
}
for i, tx := range txs {
if string(tx) != string(expectedTxs[i]) {
t.Fatalf("tx %d: expected %q, got %q", i, expectedTxs[i], tx)
}
}
}

func startUnixTestServer(t *testing.T, executor execution.Executor, socketPath string) {
t.Helper()

listener, err := ListenUnix(socketPath)
if err != nil {
t.Fatalf("listen unix socket: %v", err)
}

server := &http.Server{Handler: NewExecutorServiceHandler(executor)}
done := make(chan error, 1)
go func() {
err := server.Serve(listener)
if errors.Is(err, http.ErrServerClosed) || errors.Is(err, net.ErrClosed) {
err = nil
}
done <- err
}()

t.Cleanup(func() {
_ = server.Close()
select {
case err := <-done:
if err != nil {
t.Errorf("unix socket server error: %v", err)
}
case <-time.After(time.Second):
t.Error("unix socket server did not stop")
}
})
}
2 changes: 2 additions & 0 deletions execution/grpc/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module github.com/evstack/ev-node/execution/grpc

go 1.25.7

replace github.com/evstack/ev-node => ../..

require (
connectrpc.com/connect v1.19.2
connectrpc.com/grpcreflect v1.3.0
Expand Down
Loading
Loading