diff --git a/CHANGELOG.md b/CHANGELOG.md index c8d7e9f6ba..3cf651e997 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,10 +11,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changes +- Add support for otlp in execution/grpc. [#3300](https://github.com/evstack/ev-node/pull/3300) - Optimization of mutex usage in cache for reaper [#3286](https://github.com/evstack/ev-node/pull/3286) - Add Unix domain socket support for gRPC execution endpoints via `unix:///path/to/socket` [#3297](https://github.com/evstack/ev-node/pull/3297) - **BREAKING:** (execution/grpc) - - Move execution service where it belongs in execution/grpc. []() + - Move execution service where it belongs in execution/grpc. [#3302](https://github.com/evstack/ev-node/pull/3302) - Replace legacy gRPC execution `txs` payload fields with `tx_batch` so clients and servers use contiguous transaction buffers [#3297](https://github.com/evstack/ev-node/pull/3297) - Optimize metadata writes by making it async in cache store [#3298](https://github.com/evstack/ev-node/pull/3298) - Reduce tx cache retention to avoid OOM under (really) heavy tx load [#3299](https://github.com/evstack/ev-node/pull/3299) diff --git a/execution/grpc/client.go b/execution/grpc/client.go index 253dbcb464..f5640c4950 100644 --- a/execution/grpc/client.go +++ b/execution/grpc/client.go @@ -98,6 +98,7 @@ func NewClient(url string, opts ...connect.ClientOption) (*Client, error) { if err != nil { return nil, err } + opts = append([]connect.ClientOption{connect.WithInterceptors(outboundPropagationInterceptor())}, opts...) return &Client{ client: v1connect.NewExecutorServiceClient( httpClient, diff --git a/execution/grpc/go.mod b/execution/grpc/go.mod index b702023db3..b2021c0bd4 100644 --- a/execution/grpc/go.mod +++ b/execution/grpc/go.mod @@ -6,8 +6,20 @@ require ( connectrpc.com/connect v1.19.2 connectrpc.com/grpcreflect v1.3.0 github.com/evstack/ev-node/core v1.0.0 + go.opentelemetry.io/otel v1.43.0 + go.opentelemetry.io/otel/sdk v1.43.0 + go.opentelemetry.io/otel/trace v1.43.0 golang.org/x/net v0.53.0 google.golang.org/protobuf v1.36.11 ) -require golang.org/x/text v0.36.0 // indirect +require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/google/uuid v1.6.0 // indirect + go.opentelemetry.io/auto/sdk v1.2.1 // indirect + go.opentelemetry.io/otel/metric v1.43.0 // indirect + golang.org/x/sys v0.43.0 // indirect + golang.org/x/text v0.36.0 // indirect +) diff --git a/execution/grpc/go.sum b/execution/grpc/go.sum index e0bdd7d74e..7feb97ed8e 100644 --- a/execution/grpc/go.sum +++ b/execution/grpc/go.sum @@ -2,13 +2,46 @@ connectrpc.com/connect v1.19.2 h1:McQ83FGdzL+t60peksi0gXC7MQ/iLKgLduAnThbM0mo= connectrpc.com/connect v1.19.2/go.mod h1:tN20fjdGlewnSFeZxLKb0xwIZ6ozc3OQs2hTXy4du9w= connectrpc.com/grpcreflect v1.3.0 h1:Y4V+ACf8/vOb1XOc251Qun7jMB75gCUNw6llvB9csXc= connectrpc.com/grpcreflect v1.3.0/go.mod h1:nfloOtCS8VUQOQ1+GTdFzVg2CJo4ZGaat8JIovCtDYs= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/evstack/ev-node/core v1.0.0 h1:s0Tx0uWHme7SJn/ZNEtee4qNM8UO6PIxXnHhPbbKTz8= github.com/evstack/ev-node/core v1.0.0/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/otel v1.43.0 h1:mYIM03dnh5zfN7HautFE4ieIig9amkNANT+xcVxAj9I= +go.opentelemetry.io/otel v1.43.0/go.mod h1:JuG+u74mvjvcm8vj8pI5XiHy1zDeoCS2LB1spIq7Ay0= +go.opentelemetry.io/otel/metric v1.43.0 h1:d7638QeInOnuwOONPp4JAOGfbCEpYb+K6DVWvdxGzgM= +go.opentelemetry.io/otel/metric v1.43.0/go.mod h1:RDnPtIxvqlgO8GRW18W6Z/4P462ldprJtfxHxyKd2PY= +go.opentelemetry.io/otel/sdk v1.43.0 h1:pi5mE86i5rTeLXqoF/hhiBtUNcrAGHLKQdhg4h4V9Dg= +go.opentelemetry.io/otel/sdk v1.43.0/go.mod h1:P+IkVU3iWukmiit/Yf9AWvpyRDlUeBaRg6Y+C58QHzg= +go.opentelemetry.io/otel/sdk/metric v1.43.0 h1:S88dyqXjJkuBNLeMcVPRFXpRw2fuwdvfCGLEo89fDkw= +go.opentelemetry.io/otel/sdk/metric v1.43.0/go.mod h1:C/RJtwSEJ5hzTiUz5pXF1kILHStzb9zFlIEe85bhj6A= +go.opentelemetry.io/otel/trace v1.43.0 h1:BkNrHpup+4k4w+ZZ86CZoHHEkohws8AY+WTX09nk+3A= +go.opentelemetry.io/otel/trace v1.43.0/go.mod h1:/QJhyVBUUswCphDVxq+8mld+AvhXZLhe+8WVFxiFff0= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA= golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs= +golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= +golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg= golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/execution/grpc/handler.go b/execution/grpc/handler.go index c3b3bc2ab6..76db8e87f2 100644 --- a/execution/grpc/handler.go +++ b/execution/grpc/handler.go @@ -25,6 +25,7 @@ import ( // - http.Handler: The configured HTTP handler func NewExecutorServiceHandler(executor execution.Executor, opts ...connect.HandlerOption) http.Handler { server := NewServer(executor) + opts = append([]connect.HandlerOption{connect.WithInterceptors(inboundPropagationInterceptor())}, opts...) mux := http.NewServeMux() diff --git a/execution/grpc/otel_propagation.go b/execution/grpc/otel_propagation.go new file mode 100644 index 0000000000..da4d1595ed --- /dev/null +++ b/execution/grpc/otel_propagation.go @@ -0,0 +1,29 @@ +package grpc + +import ( + "context" + + "connectrpc.com/connect" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" +) + +func inboundPropagationInterceptor() connect.UnaryInterceptorFunc { + return connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc { + return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { + prop := otel.GetTextMapPropagator() + ctx = prop.Extract(ctx, propagation.HeaderCarrier(req.Header())) + return next(ctx, req) + } + }) +} + +func outboundPropagationInterceptor() connect.UnaryInterceptorFunc { + return connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc { + return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { + prop := otel.GetTextMapPropagator() + prop.Inject(ctx, propagation.HeaderCarrier(req.Header())) + return next(ctx, req) + } + }) +} diff --git a/execution/grpc/otel_propagation_test.go b/execution/grpc/otel_propagation_test.go new file mode 100644 index 0000000000..5815732f2a --- /dev/null +++ b/execution/grpc/otel_propagation_test.go @@ -0,0 +1,226 @@ +package grpc + +import ( + "context" + "net/http/httptest" + "testing" + "time" + + "connectrpc.com/connect" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/baggage" + "go.opentelemetry.io/otel/propagation" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/trace" + + "github.com/evstack/ev-node/core/execution" +) + +func setupTracer(t *testing.T) (*tracetest.SpanRecorder, func()) { + t.Helper() + rec := tracetest.NewSpanRecorder() + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(rec)) + oldTP := otel.GetTracerProvider() + oldProp := otel.GetTextMapPropagator() + otel.SetTracerProvider(tp) + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})) + return rec, func() { + _ = tp.Shutdown(context.Background()) + otel.SetTracerProvider(oldTP) + otel.SetTextMapPropagator(oldProp) + } +} + +func TestInboundMetadataCreatesChildSpanWithSameTraceID(t *testing.T) { + rec, cleanup := setupTracer(t) + defer cleanup() + + tracer := otel.Tracer("test") + parentCtx, parent := tracer.Start(context.Background(), "parent") + defer parent.End() + parentTraceID := parent.SpanContext().TraceID() + + mockExec := &mockExecutor{getTxsFunc: func(ctx context.Context) ([][]byte, error) { + _, span := tracer.Start(ctx, "server-child") + span.End() + return [][]byte{}, nil + }} + + handler := NewExecutorServiceHandler(mockExec) + ts := httptest.NewServer(handler) + defer ts.Close() + + client, err := NewClient(ts.URL) + if err != nil { + t.Fatalf("NewClient failed: %v", err) + } + + _, err = client.GetTxs(parentCtx) + if err != nil { + t.Fatalf("GetTxs failed: %v", err) + } + + var found bool + for _, s := range rec.Ended() { + if s.Name() == "server-child" { + found = true + if s.SpanContext().TraceID() != parentTraceID { + t.Fatalf("trace id mismatch: got %s want %s", s.SpanContext().TraceID(), parentTraceID) + } + } + } + if !found { + t.Fatalf("server-child span not found") + } +} + +func TestOutboundGRPCCallCarriesTraceparentMetadata(t *testing.T) { + rec, cleanup := setupTracer(t) + _ = rec + defer cleanup() + + tracer := otel.Tracer("test") + ctx, parent := tracer.Start(context.Background(), "parent") + defer parent.End() + + gotTraceparent := "" + captureHeader := connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc { + return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { + gotTraceparent = req.Header().Get("traceparent") + return next(ctx, req) + } + }) + + mockExec := &mockExecutor{} + handler := NewExecutorServiceHandler(mockExec, connect.WithInterceptors(captureHeader)) + ts := httptest.NewServer(handler) + defer ts.Close() + + client, err := NewClient(ts.URL) + if err != nil { + t.Fatalf("NewClient failed: %v", err) + } + + if _, err = client.GetTxs(ctx); err != nil { + t.Fatalf("GetTxs failed: %v", err) + } + if gotTraceparent == "" { + t.Fatalf("expected traceparent metadata to be propagated") + } +} + +func TestOutboundGRPCCallCarriesPropagationHeaders(t *testing.T) { + rec, cleanup := setupTracer(t) + _ = rec + defer cleanup() + + tracer := otel.Tracer("test") + ctx, parent := tracer.Start(context.Background(), "parent") + defer parent.End() + member, err := baggage.NewMember("tenant", "alpha") + if err != nil { + t.Fatalf("failed to create baggage member: %v", err) + } + bg, err := baggage.New(member) + if err != nil { + t.Fatalf("failed to create baggage: %v", err) + } + ctx = baggage.ContextWithBaggage(ctx, bg) + + var gotTraceparent string + var gotBaggage string + captureHeader := connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc { + return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { + gotTraceparent = req.Header().Get("traceparent") + gotBaggage = req.Header().Get("baggage") + return next(ctx, req) + } + }) + + mockExec := &mockExecutor{} + handler := NewExecutorServiceHandler(mockExec, connect.WithInterceptors(captureHeader)) + ts := httptest.NewServer(handler) + defer ts.Close() + + client, err := NewClient(ts.URL) + if err != nil { + t.Fatalf("NewClient failed: %v", err) + } + + if _, err = client.GetTxs(ctx); err != nil { + t.Fatalf("GetTxs failed: %v", err) + } + + if gotTraceparent == "" { + t.Fatalf("expected traceparent metadata to be propagated") + } + if gotBaggage == "" { + t.Fatalf("expected baggage metadata to be propagated") + } +} + +func TestEndToEndParentChildAcrossServerClientHop(t *testing.T) { + rec, cleanup := setupTracer(t) + defer cleanup() + + tracer := otel.Tracer("test") + var midSpan trace.Span + + downstreamExec := &mockExecutor{getExecutionInfoFunc: func(ctx context.Context) (executionInfo execution.ExecutionInfo, err error) { + _, span := tracer.Start(ctx, "downstream-child") + span.End() + return execution.ExecutionInfo{MaxGas: 1}, nil + }} + downstreamHandler := NewExecutorServiceHandler(downstreamExec) + downstreamSrv := httptest.NewServer(downstreamHandler) + defer downstreamSrv.Close() + downstreamClient, err := NewClient(downstreamSrv.URL) + if err != nil { + t.Fatalf("NewClient failed: %v", err) + } + + upstreamExec := &mockExecutor{getTxsFunc: func(ctx context.Context) ([][]byte, error) { + ctx, span := tracer.Start(ctx, "upstream-mid") + midSpan = span + defer span.End() + _, err := downstreamClient.GetExecutionInfo(ctx) + if err != nil { + return nil, err + } + return [][]byte{}, nil + }} + upstreamHandler := NewExecutorServiceHandler(upstreamExec) + upstreamSrv := httptest.NewServer(upstreamHandler) + defer upstreamSrv.Close() + + client, err := NewClient(upstreamSrv.URL) + if err != nil { + t.Fatalf("NewClient failed: %v", err) + } + + rootCtx, root := tracer.Start(context.Background(), "root") + defer root.End() + if _, err := client.GetTxs(rootCtx); err != nil { + t.Fatalf("GetTxs failed: %v", err) + } + + time.Sleep(10 * time.Millisecond) + + rootTraceID := root.SpanContext().TraceID() + if midSpan.SpanContext().TraceID() != rootTraceID { + t.Fatalf("mid span trace id mismatch") + } + var found bool + for _, s := range rec.Ended() { + if s.Name() == "downstream-child" { + found = true + if s.SpanContext().TraceID() != rootTraceID { + t.Fatalf("downstream trace id mismatch") + } + } + } + if !found { + t.Fatalf("downstream-child span not found") + } +}