-
Notifications
You must be signed in to change notification settings - Fork 260
feat(execution/grpc): adding support for grpc otlp #3300
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7f44037
8a30fb8
2a6efb6
dbe4842
8e86807
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| package grpc | ||
|
|
||
| import ( | ||
| "context" | ||
|
|
||
| "connectrpc.com/connect" | ||
| "go.opentelemetry.io/otel" | ||
| "go.opentelemetry.io/otel/propagation" | ||
| ) | ||
|
|
||
| func inboundPropagationInterceptor() connect.UnaryInterceptorFunc { | ||
| return connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc { | ||
| return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { | ||
| prop := otel.GetTextMapPropagator() | ||
| ctx = prop.Extract(ctx, propagation.HeaderCarrier(req.Header())) | ||
| return next(ctx, req) | ||
| } | ||
| }) | ||
| } | ||
|
|
||
| func outboundPropagationInterceptor() connect.UnaryInterceptorFunc { | ||
| return connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc { | ||
| return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { | ||
| prop := otel.GetTextMapPropagator() | ||
| prop.Inject(ctx, propagation.HeaderCarrier(req.Header())) | ||
| return next(ctx, req) | ||
| } | ||
| }) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,226 @@ | ||
| package grpc | ||
|
|
||
| import ( | ||
| "context" | ||
| "net/http/httptest" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "connectrpc.com/connect" | ||
| "go.opentelemetry.io/otel" | ||
| "go.opentelemetry.io/otel/baggage" | ||
| "go.opentelemetry.io/otel/propagation" | ||
| sdktrace "go.opentelemetry.io/otel/sdk/trace" | ||
| "go.opentelemetry.io/otel/sdk/trace/tracetest" | ||
| "go.opentelemetry.io/otel/trace" | ||
|
|
||
| "github.com/evstack/ev-node/core/execution" | ||
| ) | ||
|
|
||
| func setupTracer(t *testing.T) (*tracetest.SpanRecorder, func()) { | ||
| t.Helper() | ||
| rec := tracetest.NewSpanRecorder() | ||
| tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(rec)) | ||
| oldTP := otel.GetTracerProvider() | ||
| oldProp := otel.GetTextMapPropagator() | ||
| otel.SetTracerProvider(tp) | ||
| otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})) | ||
| return rec, func() { | ||
| _ = tp.Shutdown(context.Background()) | ||
| otel.SetTracerProvider(oldTP) | ||
| otel.SetTextMapPropagator(oldProp) | ||
| } | ||
| } | ||
|
|
||
| func TestInboundMetadataCreatesChildSpanWithSameTraceID(t *testing.T) { | ||
| rec, cleanup := setupTracer(t) | ||
| defer cleanup() | ||
|
|
||
| tracer := otel.Tracer("test") | ||
| parentCtx, parent := tracer.Start(context.Background(), "parent") | ||
| defer parent.End() | ||
| parentTraceID := parent.SpanContext().TraceID() | ||
|
|
||
| mockExec := &mockExecutor{getTxsFunc: func(ctx context.Context) ([][]byte, error) { | ||
| _, span := tracer.Start(ctx, "server-child") | ||
| span.End() | ||
| return [][]byte{}, nil | ||
| }} | ||
|
|
||
| handler := NewExecutorServiceHandler(mockExec) | ||
| ts := httptest.NewServer(handler) | ||
| defer ts.Close() | ||
|
|
||
| client, err := NewClient(ts.URL) | ||
| if err != nil { | ||
| t.Fatalf("NewClient failed: %v", err) | ||
| } | ||
|
|
||
| _, err = client.GetTxs(parentCtx) | ||
| if err != nil { | ||
| t.Fatalf("GetTxs failed: %v", err) | ||
| } | ||
|
|
||
| var found bool | ||
| for _, s := range rec.Ended() { | ||
| if s.Name() == "server-child" { | ||
| found = true | ||
| if s.SpanContext().TraceID() != parentTraceID { | ||
| t.Fatalf("trace id mismatch: got %s want %s", s.SpanContext().TraceID(), parentTraceID) | ||
| } | ||
| } | ||
| } | ||
| if !found { | ||
| t.Fatalf("server-child span not found") | ||
| } | ||
| } | ||
|
|
||
| func TestOutboundGRPCCallCarriesTraceparentMetadata(t *testing.T) { | ||
| rec, cleanup := setupTracer(t) | ||
| _ = rec | ||
| defer cleanup() | ||
|
|
||
| tracer := otel.Tracer("test") | ||
| ctx, parent := tracer.Start(context.Background(), "parent") | ||
| defer parent.End() | ||
|
|
||
| gotTraceparent := "" | ||
| captureHeader := connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc { | ||
| return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { | ||
| gotTraceparent = req.Header().Get("traceparent") | ||
| return next(ctx, req) | ||
| } | ||
| }) | ||
|
|
||
| mockExec := &mockExecutor{} | ||
| handler := NewExecutorServiceHandler(mockExec, connect.WithInterceptors(captureHeader)) | ||
| ts := httptest.NewServer(handler) | ||
| defer ts.Close() | ||
|
|
||
| client, err := NewClient(ts.URL) | ||
| if err != nil { | ||
| t.Fatalf("NewClient failed: %v", err) | ||
| } | ||
|
|
||
| if _, err = client.GetTxs(ctx); err != nil { | ||
| t.Fatalf("GetTxs failed: %v", err) | ||
| } | ||
| if gotTraceparent == "" { | ||
| t.Fatalf("expected traceparent metadata to be propagated") | ||
| } | ||
| } | ||
|
|
||
| func TestOutboundGRPCCallCarriesPropagationHeaders(t *testing.T) { | ||
| rec, cleanup := setupTracer(t) | ||
| _ = rec | ||
| defer cleanup() | ||
|
|
||
| tracer := otel.Tracer("test") | ||
| ctx, parent := tracer.Start(context.Background(), "parent") | ||
| defer parent.End() | ||
| member, err := baggage.NewMember("tenant", "alpha") | ||
| if err != nil { | ||
| t.Fatalf("failed to create baggage member: %v", err) | ||
| } | ||
| bg, err := baggage.New(member) | ||
| if err != nil { | ||
| t.Fatalf("failed to create baggage: %v", err) | ||
| } | ||
| ctx = baggage.ContextWithBaggage(ctx, bg) | ||
|
|
||
| var gotTraceparent string | ||
| var gotBaggage string | ||
| captureHeader := connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc { | ||
| return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { | ||
| gotTraceparent = req.Header().Get("traceparent") | ||
| gotBaggage = req.Header().Get("baggage") | ||
| return next(ctx, req) | ||
| } | ||
| }) | ||
|
|
||
| mockExec := &mockExecutor{} | ||
| handler := NewExecutorServiceHandler(mockExec, connect.WithInterceptors(captureHeader)) | ||
| ts := httptest.NewServer(handler) | ||
| defer ts.Close() | ||
|
|
||
| client, err := NewClient(ts.URL) | ||
| if err != nil { | ||
| t.Fatalf("NewClient failed: %v", err) | ||
| } | ||
|
|
||
| if _, err = client.GetTxs(ctx); err != nil { | ||
| t.Fatalf("GetTxs failed: %v", err) | ||
| } | ||
|
|
||
| if gotTraceparent == "" { | ||
| t.Fatalf("expected traceparent metadata to be propagated") | ||
| } | ||
| if gotBaggage == "" { | ||
| t.Fatalf("expected baggage metadata to be propagated") | ||
| } | ||
| } | ||
|
|
||
| func TestEndToEndParentChildAcrossServerClientHop(t *testing.T) { | ||
| rec, cleanup := setupTracer(t) | ||
| defer cleanup() | ||
|
|
||
| tracer := otel.Tracer("test") | ||
| var midSpan trace.Span | ||
|
|
||
|
Comment on lines
+168
to
+169
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: cat -n execution/grpc/otel_propagation_test.go | head -200Repository: evstack/ev-node Length of output: 7290 🏁 Script executed: tail -n +195 execution/grpc/otel_propagation_test.go | head -50Repository: evstack/ev-node Length of output: 332 Fix concurrent access to
Suggested fix (channel-based synchronization)- var midSpan trace.Span
+ midTraceIDCh := make(chan trace.TraceID, 1)
@@
upstreamExec := &mockExecutor{getTxsFunc: func(ctx context.Context) ([][]byte, error) {
ctx, span := tracer.Start(ctx, "upstream-mid")
- midSpan = span
+ midTraceIDCh <- span.SpanContext().TraceID()
defer span.End()
@@
- time.Sleep(10 * time.Millisecond)
-
rootTraceID := root.SpanContext().TraceID()
- if midSpan.SpanContext().TraceID() != rootTraceID {
+ var midTraceID trace.TraceID
+ select {
+ case midTraceID = <-midTraceIDCh:
+ case <-time.After(time.Second):
+ t.Fatalf("timeout waiting for upstream-mid span")
+ }
+ if midTraceID != rootTraceID {
t.Fatalf("mid span trace id mismatch")
}🤖 Prompt for AI Agents |
||
| downstreamExec := &mockExecutor{getExecutionInfoFunc: func(ctx context.Context) (executionInfo execution.ExecutionInfo, err error) { | ||
| _, span := tracer.Start(ctx, "downstream-child") | ||
| span.End() | ||
| return execution.ExecutionInfo{MaxGas: 1}, nil | ||
| }} | ||
| downstreamHandler := NewExecutorServiceHandler(downstreamExec) | ||
| downstreamSrv := httptest.NewServer(downstreamHandler) | ||
| defer downstreamSrv.Close() | ||
| downstreamClient, err := NewClient(downstreamSrv.URL) | ||
| if err != nil { | ||
| t.Fatalf("NewClient failed: %v", err) | ||
| } | ||
|
|
||
| upstreamExec := &mockExecutor{getTxsFunc: func(ctx context.Context) ([][]byte, error) { | ||
| ctx, span := tracer.Start(ctx, "upstream-mid") | ||
| midSpan = span | ||
| defer span.End() | ||
| _, err := downstreamClient.GetExecutionInfo(ctx) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| return [][]byte{}, nil | ||
| }} | ||
| upstreamHandler := NewExecutorServiceHandler(upstreamExec) | ||
| upstreamSrv := httptest.NewServer(upstreamHandler) | ||
| defer upstreamSrv.Close() | ||
|
|
||
| client, err := NewClient(upstreamSrv.URL) | ||
| if err != nil { | ||
| t.Fatalf("NewClient failed: %v", err) | ||
| } | ||
|
|
||
| rootCtx, root := tracer.Start(context.Background(), "root") | ||
| defer root.End() | ||
| if _, err := client.GetTxs(rootCtx); err != nil { | ||
| t.Fatalf("GetTxs failed: %v", err) | ||
| } | ||
|
|
||
| time.Sleep(10 * time.Millisecond) | ||
|
|
||
| rootTraceID := root.SpanContext().TraceID() | ||
| if midSpan.SpanContext().TraceID() != rootTraceID { | ||
| t.Fatalf("mid span trace id mismatch") | ||
| } | ||
| var found bool | ||
| for _, s := range rec.Ended() { | ||
| if s.Name() == "downstream-child" { | ||
| found = true | ||
| if s.SpanContext().TraceID() != rootTraceID { | ||
| t.Fatalf("downstream trace id mismatch") | ||
| } | ||
| } | ||
| } | ||
| if !found { | ||
| t.Fatalf("downstream-child span not found") | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: evstack/ev-node
Length of output: 99
🏁 Script executed:
cat -n execution/grpc/otel_propagation_test.go | head -200Repository: evstack/ev-node
Length of output: 7290
🏁 Script executed:
Repository: evstack/ev-node
Length of output: 1021
🏁 Script executed:
Repository: evstack/ev-node
Length of output: 41
🏁 Script executed:
Repository: evstack/ev-node
Length of output: 41
🏁 Script executed:
Repository: evstack/ev-node
Length of output: 865
🏁 Script executed:
Repository: evstack/ev-node
Length of output: 84
🏁 Script executed:
Repository: evstack/ev-node
Length of output: 730
🏁 Script executed:
Repository: evstack/ev-node
Length of output: 1082
🏁 Script executed:
Repository: evstack/ev-node
Length of output: 240
Synchronize header capture and span access to avoid test data races.
gotTraceparent,gotBaggage, andmidSpanare written in handler/interceptor goroutines and read in the test goroutine without synchronization. Although the blocking RPC calls provide implicit ordering, the Go race detector requires explicit synchronization primitives (mutex or channels) to recognize the happens-before relationship. Additionally, the fixed 10ms sleep at line 189 violates the determinism guideline; use explicit synchronization instead of timing-based waits.Suggested fix (mutex-guarded capture)
Apply the same pattern to
gotBaggage(lines 123-131) andmidSpan(lines 156-193). For the end-to-end test, use a channel or sync primitive to wait for the handler to populatemidSpaninstead of the fixed sleep.Also applies to: 97-103, 123-131, 139-148, 156-193
🤖 Prompt for AI Agents