From 11ff9ecde0be2a113a9078dbef87c7394a76d199 Mon Sep 17 00:00:00 2001 From: Ryan Hendrickson Date: Mon, 15 Jun 2026 21:02:45 -0400 Subject: [PATCH 1/2] first draft of surfacing the reason transaction abort/error occured --- dgraph/cmd/alpha/txn_test.go | 63 ++++++++++++++++++++++++++++++++++++ dgraph/cmd/zero/oracle.go | 62 +++++++++++++++++++++++++++++++---- edgraph/server.go | 10 ++++++ worker/mutation.go | 12 +++++++ 4 files changed, 140 insertions(+), 7 deletions(-) diff --git a/dgraph/cmd/alpha/txn_test.go b/dgraph/cmd/alpha/txn_test.go index 0e20c8d8271..54f78682a1d 100644 --- a/dgraph/cmd/alpha/txn_test.go +++ b/dgraph/cmd/alpha/txn_test.go @@ -21,6 +21,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" "github.com/dgraph-io/dgo/v250" "github.com/dgraph-io/dgo/v250/protos/api" @@ -248,6 +253,64 @@ func TestConflict(t *testing.T) { require.True(t, bytes.Equal(resp.Json, []byte("{\"me\":[{\"name\":\"Manish\"}]}"))) } +// TestConflictAbortReason proves the server emits the categorized abort reason on the +// gRPC status of a write-write conflict: the code stays codes.Aborted (so existing +// clients keep retrying) and the message is prefixed with "conflict: ". This is exactly +// the unflattened status a gRPC client such as dgraph4j receives and parses into +// TxnConflictException.AbortReason. +// +// It must inspect the raw CommitOrAbort response rather than dgo's high-level +// Txn.Commit, because dgo intentionally replaces any codes.Aborted error with the +// static dgo.ErrAborted (txn.go), discarding the reason for the Go client. +func TestConflictAbortReason(t *testing.T) { + op := &api.Operation{} + op.DropAll = true + require.NoError(t, dg.Alter(context.Background(), op)) + + // First transaction creates a node with a name. + txn := dg.NewTxn() + mu := &api.Mutation{} + mu.SetJson = []byte(`{"name": "Manish"}`) + assigned, err := txn.Mutate(context.Background(), mu) + require.NoError(t, err) + require.Len(t, assigned.Uids, 1) + var uid string + for _, u := range assigned.Uids { + uid = u + } + + // Second transaction writes the same predicate on the same uid -> conflicts. + txn2 := dg.NewTxn() + mu = &api.Mutation{} + mu.SetJson = []byte(fmt.Sprintf(`{"uid": %q, "name": "Manish"}`, uid)) + resp2, err := txn2.Mutate(context.Background(), mu) + require.NoError(t, err) + require.NotEmpty(t, resp2.GetTxn().GetKeys(), "mutation response must carry conflict keys") + + // First commit wins; its commitTs is now greater than txn2's startTs. + require.NoError(t, txn.Commit(context.Background())) + + // Commit the loser via the raw gRPC stub so we observe the unflattened status the + // server sends (dgo's Txn.Commit would replace it with the reasonless ErrAborted). + conn, err := grpc.NewClient(alphaSockAdd, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + defer func() { _ = conn.Close() }() + raw := api.NewDgraphClient(conn) + + // TestMain enables ACL, so the raw stub (unlike the logged-in dg client) must + // carry the access JWT; CommitOrAbort goes through the same auth interceptor. + ctx := metadata.NewOutgoingContext(context.Background(), + metadata.Pairs("accessJwt", hc.AccessJwt)) + _, err = raw.CommitOrAbort(ctx, resp2.GetTxn()) + require.Error(t, err) + + st := status.Convert(err) + require.Equal(t, codes.Aborted, st.Code(), + "abort must keep codes.Aborted so existing clients still retry") + require.True(t, strings.HasPrefix(st.Message(), "conflict: "), + "abort reason should be categorized as conflict; got %q", st.Message()) +} + func TestConflictTimeout(t *testing.T) { var uid string txn := dg.NewTxn() diff --git a/dgraph/cmd/zero/oracle.go b/dgraph/cmd/zero/oracle.go index cee15fae72f..900907d35ae 100644 --- a/dgraph/cmd/zero/oracle.go +++ b/dgraph/cmd/zero/oracle.go @@ -18,6 +18,8 @@ import ( "go.opentelemetry.io/otel" attribute "go.opentelemetry.io/otel/attribute" trace "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/dgraph-io/badger/v4/y" "github.com/dgraph-io/dgo/v250/protos/api" @@ -338,21 +340,56 @@ func (s *Server) proposeTxn(ctx context.Context, src *api.TxnContext) error { return nil } +// Abort-reason codes. When Zero decides to abort a transaction it surfaces the category to the +// client as the prefix of a codes.Aborted gRPC status message, formatted as ": ". +// api.TxnContext (external dgo module) has no field for the reason, so it rides on the error; +// the status code stays codes.Aborted, so existing abort handling is unaffected. The dgraph4j +// client parses these prefixes into TxnConflictException.AbortReason — keep the two in sync. +const ( + abortReasonConflict = "conflict" + abortReasonStaleStartTs = "stale-startts" + abortReasonPredicateMove = "predicate-move" +) + +// abortReason builds the wire string the client parses: ": ". +func abortReason(code, detail string) string { + return code + ": " + detail +} + func (s *Server) commit(ctx context.Context, src *api.TxnContext) error { span := trace.SpanFromContext(ctx) span.SetAttributes(attribute.Int64("startTs", int64(src.StartTs))) if src.Aborted { + // Client-initiated discard (txn.Discard); not a server-decided abort, so no reason. return s.proposeTxn(ctx, src) } + // abortWithReason marks the txn aborted, proposes it (advancing the watermark exactly as the + // previous code did), and then surfaces the categorized reason to the caller as a gRPC + // Aborted status. proposeTxn still runs identically — only the post-propose return changes. + abortWithReason := func(reason string) error { + span.SetAttributes(attribute.Bool("abort", true)) + src.Aborted = true + if err := s.proposeTxn(ctx, src); err != nil { + return err + } + return status.Error(codes.Aborted, reason) + } + // Use the start timestamp to check if we have a conflict, before we need to assign a commit ts. s.orc.RLock() conflict := s.orc.hasConflict(src) + // A txn whose startTs predates this leader's lease is aborted by hasConflict, but it's a + // leader change rather than a write-write conflict — report it distinctly. + stale := src.StartTs < s.orc.startTxnTs s.orc.RUnlock() if conflict { - span.SetAttributes(attribute.Bool("abort", true)) - src.Aborted = true - return s.proposeTxn(ctx, src) + if stale { + return abortWithReason(abortReason(abortReasonStaleStartTs, + "Transaction has been aborted due to a leader change. Please retry")) + } + return abortWithReason(abortReason(abortReasonConflict, + "Transaction has been aborted. Please retry")) } checkPreds := func() error { @@ -385,9 +422,9 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error { return nil } if err := checkPreds(); err != nil { - span.SetAttributes(attribute.Bool("abort", true)) - src.Aborted = true - return s.proposeTxn(ctx, src) + // checkPreds builds rich messages, e.g. "Commits on predicate %s are blocked due to + // predicate move" — forward them instead of swallowing the reason. + return abortWithReason(abortReason(abortReasonPredicateMove, err.Error())) } num := pb.Num{Val: 1, Type: pb.Num_TXN_TS} @@ -402,16 +439,27 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error { span.SetAttributes(attribute.Int64("nodeId", int64(s.Node.Id))) span.AddEvent(fmt.Sprintf("TXN Context: %+v", src)) + aborted := false if err := s.orc.commit(src); err != nil { span.SetAttributes(attribute.Bool("abort", true)) src.Aborted = true + aborted = true } if err := ctx.Err(); err != nil { span.SetAttributes(attribute.Bool("abort", true)) src.Aborted = true + aborted = true } // Propose txn should be used to set watermark as done. - return s.proposeTxn(ctx, src) + if err := s.proposeTxn(ctx, src); err != nil { + return err + } + if aborted { + // A late write-write conflict detected at commit time (keyCommit), or a cancelled ctx. + return status.Error(codes.Aborted, abortReason(abortReasonConflict, + "Transaction has been aborted. Please retry")) + } + return nil } // CommitOrAbort either commits a transaction or aborts it. diff --git a/edgraph/server.go b/edgraph/server.go index 0da2fd32c62..26744ee3818 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -646,6 +646,10 @@ func (s *Server) doMutate(ctx context.Context, qc *queryContext, resp *api.Respo if err == dgo.ErrAborted { err = status.Error(codes.Aborted, err.Error()) resp.Txn.Aborted = true + } else if status.Code(err) == codes.Aborted { + // Server-decided abort carrying a categorized reason; err is already a codes.Aborted + // status (e.g. "conflict: ...", "predicate-move: ...") — surface it unchanged. + resp.Txn.Aborted = true } return err @@ -2085,6 +2089,12 @@ func (s *Server) CommitOrAbort(ctx context.Context, tc *api.TxnContext) (*api.Tx return tctx, status.Error(codes.Aborted, err.Error()) } + if status.Code(err) == codes.Aborted { + // Server-decided abort carrying a categorized reason; err is already a codes.Aborted + // status (e.g. "conflict: ...", "predicate-move: ...") — surface it unchanged. + tctx.Aborted = true + return tctx, err + } tctx.StartTs = tc.StartTs tctx.CommitTs = commitTs return tctx, err diff --git a/worker/mutation.go b/worker/mutation.go index fdac2a41c1b..1f0298bed14 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -20,7 +20,9 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" "github.com/dgraph-io/badger/v4" @@ -908,6 +910,16 @@ func CommitOverNetwork(ctx context.Context, tc *api.TxnContext) (uint64, error) tctx, err := zc.CommitOrAbort(ctx, tc) if err != nil { + // Zero signals a server-decided abort as a codes.Aborted status carrying a categorized + // reason (e.g. "conflict: ...", "predicate-move: ..."). Record the abort metric and + // forward the reason rather than treating it as a generic transport error or flattening + // it to the reasonless dgo.ErrAborted. + if status.Code(err) == codes.Aborted { + if !clientDiscard { + ostats.Record(ctx, x.TxnAborts.M(1)) + } + return 0, err + } span.AddEvent("Error in CommitOrAbort", trace.WithAttributes( attribute.String("error", err.Error()))) return 0, err From 1042bb19b9dbb633254f997e40a4e3378ca9d3d1 Mon Sep 17 00:00:00 2001 From: Ryan Hendrickson Date: Wed, 17 Jun 2026 01:25:36 -0400 Subject: [PATCH 2/2] adding test --- dgraph/cmd/alpha/txn_test.go | 53 ++++++++++++ dgraph/cmd/zero/oracle.go | 26 ++++-- dgraph/cmd/zero/oracle_reason_test.go | 63 ++++++++++++++ systest/integration2/txn_abort_reason_test.go | 86 +++++++++++++++++++ 4 files changed, 220 insertions(+), 8 deletions(-) create mode 100644 dgraph/cmd/zero/oracle_reason_test.go create mode 100644 systest/integration2/txn_abort_reason_test.go diff --git a/dgraph/cmd/alpha/txn_test.go b/dgraph/cmd/alpha/txn_test.go index 54f78682a1d..aac226b6e9e 100644 --- a/dgraph/cmd/alpha/txn_test.go +++ b/dgraph/cmd/alpha/txn_test.go @@ -311,6 +311,59 @@ func TestConflictAbortReason(t *testing.T) { "abort reason should be categorized as conflict; got %q", st.Message()) } +// TestPredicateMoveAbortReason proves the "predicate-move" category is surfaced on the +// gRPC status. Zero's checkPreds aborts a commit whose predicate keys don't match the +// tablet's serving group (the same code path that rejects commits during a predicate +// move). We reach it deterministically by committing a real txn context whose Preds have +// been rewritten to claim a group that doesn't serve the predicate, with conflict Keys +// cleared so hasConflict passes and checkPreds runs. +// +// As with TestConflictAbortReason, it uses the raw CommitOrAbort stub to observe the +// unflattened status the server sends. +func TestPredicateMoveAbortReason(t *testing.T) { + op := &api.Operation{} + op.DropAll = true + require.NoError(t, dg.Alter(context.Background(), op)) + + // A normal mutation gives us a valid, fresh TxnContext (real StartTs and Preds). + txn := dg.NewTxn() + mu := &api.Mutation{SetJson: []byte(`{"name": "Alice"}`)} + resp, err := txn.Mutate(context.Background(), mu) + require.NoError(t, err) + + tc := resp.GetTxn() + require.NotEmpty(t, tc.GetPreds(), "mutation response must carry predicate keys") + + // Rewrite each predicate key's group id to a group that does not serve it, and drop + // the conflict keys so hasConflict is false and the commit reaches checkPreds. + doctored := make([]string, 0, len(tc.GetPreds())) + for _, p := range tc.GetPreds() { + // Preds look like "-"; claim a nonexistent group 100. + if idx := strings.IndexByte(p, '-'); idx >= 0 { + doctored = append(doctored, "100"+p[idx:]) + } + } + require.NotEmpty(t, doctored, "expected parseable predicate keys") + tc.Preds = doctored + tc.Keys = nil + + conn, err := grpc.NewClient(alphaSockAdd, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + defer func() { _ = conn.Close() }() + raw := api.NewDgraphClient(conn) + + ctx := metadata.NewOutgoingContext(context.Background(), + metadata.Pairs("accessJwt", hc.AccessJwt)) + _, err = raw.CommitOrAbort(ctx, tc) + require.Error(t, err) + + st := status.Convert(err) + require.Equal(t, codes.Aborted, st.Code(), + "abort must keep codes.Aborted so existing clients still retry") + require.True(t, strings.HasPrefix(st.Message(), "predicate-move: "), + "abort reason should be categorized as predicate-move; got %q", st.Message()) +} + func TestConflictTimeout(t *testing.T) { var uid string txn := dg.NewTxn() diff --git a/dgraph/cmd/zero/oracle.go b/dgraph/cmd/zero/oracle.go index 900907d35ae..d29207ca814 100644 --- a/dgraph/cmd/zero/oracle.go +++ b/dgraph/cmd/zero/oracle.go @@ -351,11 +351,27 @@ const ( abortReasonPredicateMove = "predicate-move" ) +// Human-readable details paired with the conflict abort codes. +const ( + abortDetailConflict = "Transaction has been aborted. Please retry" + abortDetailStaleStartTs = "Transaction has been aborted due to a leader change. Please retry" +) + // abortReason builds the wire string the client parses: ": ". func abortReason(code, detail string) string { return code + ": " + detail } +// conflictAbortReason returns the wire reason for a hasConflict abort, distinguishing a +// write-write conflict from a stale start timestamp (a txn that predates the current +// leader's lease, i.e. a leader change rather than a real conflict). +func conflictAbortReason(stale bool) string { + if stale { + return abortReason(abortReasonStaleStartTs, abortDetailStaleStartTs) + } + return abortReason(abortReasonConflict, abortDetailConflict) +} + func (s *Server) commit(ctx context.Context, src *api.TxnContext) error { span := trace.SpanFromContext(ctx) span.SetAttributes(attribute.Int64("startTs", int64(src.StartTs))) @@ -384,12 +400,7 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error { stale := src.StartTs < s.orc.startTxnTs s.orc.RUnlock() if conflict { - if stale { - return abortWithReason(abortReason(abortReasonStaleStartTs, - "Transaction has been aborted due to a leader change. Please retry")) - } - return abortWithReason(abortReason(abortReasonConflict, - "Transaction has been aborted. Please retry")) + return abortWithReason(conflictAbortReason(stale)) } checkPreds := func() error { @@ -456,8 +467,7 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error { } if aborted { // A late write-write conflict detected at commit time (keyCommit), or a cancelled ctx. - return status.Error(codes.Aborted, abortReason(abortReasonConflict, - "Transaction has been aborted. Please retry")) + return status.Error(codes.Aborted, conflictAbortReason(false)) } return nil } diff --git a/dgraph/cmd/zero/oracle_reason_test.go b/dgraph/cmd/zero/oracle_reason_test.go new file mode 100644 index 00000000000..2e2d982c151 --- /dev/null +++ b/dgraph/cmd/zero/oracle_reason_test.go @@ -0,0 +1,63 @@ +/* + * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +package zero + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/dgraph-io/dgo/v250/protos/api" +) + +// The abort-reason wire format is a contract with gRPC clients (e.g. dgraph4j parses the +// ": " prefix into TxnConflictException.AbortReason). These unit tests pin the +// category prefixes and the logic that selects between them, so the contract can't drift +// silently without an integration cluster. + +func TestAbortReasonFormat(t *testing.T) { + require.Equal(t, "conflict: boom", abortReason(abortReasonConflict, "boom")) + require.Equal(t, "stale-startts: x", abortReason(abortReasonStaleStartTs, "x")) + require.Equal(t, "predicate-move: y", abortReason(abortReasonPredicateMove, "y")) +} + +func TestConflictAbortReason(t *testing.T) { + // Write-write conflict. + r := conflictAbortReason(false) + require.True(t, strings.HasPrefix(r, abortReasonConflict+": "), + "want conflict prefix, got %q", r) + require.Equal(t, abortReason(abortReasonConflict, abortDetailConflict), r) + + // Stale start timestamp (leader change). + r = conflictAbortReason(true) + require.True(t, strings.HasPrefix(r, abortReasonStaleStartTs+": "), + "want stale-startts prefix, got %q", r) + require.Equal(t, abortReason(abortReasonStaleStartTs, abortDetailStaleStartTs), r) + require.Contains(t, r, "leader change") +} + +// TestHasConflictStaleStartTs pins the exact discriminator commit() uses to choose the +// stale-startts reason: a txn whose startTs predates the leader's startTxnTs lease is a +// conflict, and is flagged stale; a fresh startTs with no conflicting keys is neither. +func TestHasConflictStaleStartTs(t *testing.T) { + o := &Oracle{} + o.Init() + defer o.close() + + o.updateStartTxnTs(100) + + // startTs below the lease floor: hasConflict true, and the stale discriminator true. + stale := &api.TxnContext{StartTs: 42} + require.True(t, o.hasConflict(stale), "txn below startTxnTs must conflict") + require.True(t, stale.StartTs < o.startTxnTs, "must be flagged stale") + require.Equal(t, conflictAbortReason(true), conflictAbortReason(stale.StartTs < o.startTxnTs)) + + // startTs at/above the lease floor with no keys: not a conflict, not stale. + fresh := &api.TxnContext{StartTs: 100} + require.False(t, o.hasConflict(fresh), "fresh txn with no keys must not conflict") + require.False(t, fresh.StartTs < o.startTxnTs, "must not be flagged stale") +} diff --git a/systest/integration2/txn_abort_reason_test.go b/systest/integration2/txn_abort_reason_test.go new file mode 100644 index 00000000000..acf87f21167 --- /dev/null +++ b/systest/integration2/txn_abort_reason_test.go @@ -0,0 +1,86 @@ +//go:build integration2 + +/* + * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +package main + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + + "github.com/dgraph-io/dgo/v250/protos/api" + "github.com/dgraph-io/dgraph/v25/dgraphtest" +) + +// TestStaleStartTsAbortReason proves the "stale-startts" abort category end-to-end against a +// real cluster. A transaction's start timestamp becomes "stale" when it predates the current +// Zero leader's lease — i.e. after a leader change. We reproduce that deterministically by +// opening a transaction, then restarting the (single) Zero: on restart Zero renews its lease and +// advances startTxnTs past every previously-leased start ts, so committing the now-old txn aborts +// with the stale-startts reason rather than a plain write-write conflict. +// +// As in the alpha-level reason tests, the commit goes through the raw CommitOrAbort stub so we +// observe the unflattened codes.Aborted status (dgo's Txn.Commit would replace it with the +// reasonless ErrAborted). +func TestStaleStartTsAbortReason(t *testing.T) { + conf := dgraphtest.NewClusterConfig().WithNumAlphas(1).WithNumZeros(1).WithReplicas(1) + c, err := dgraphtest.NewLocalCluster(conf) + require.NoError(t, err) + t.Cleanup(func() { c.Cleanup(t.Failed()) }) + require.NoError(t, c.Start()) + + gc, cleanup, err := c.Client() + require.NoError(t, err) + defer cleanup() + + ctx := context.Background() + require.NoError(t, gc.Alter(ctx, &api.Operation{DropAll: true})) + + // Open a transaction and mutate so it gets a real (soon-to-be-stale) start ts and keys. + txn := gc.NewTxn() + resp, err := txn.Mutate(ctx, &api.Mutation{SetJson: []byte(`{"name": "Manish"}`)}) + require.NoError(t, err) + tc := resp.GetTxn() + require.NotZero(t, tc.GetStartTs(), "mutation must yield a start ts") + + // Restart Zero. On coming back up it renews its lease and sets startTxnTs to MaxTxnTs+1, + // which is strictly greater than the start ts leased above — making our open txn stale. + require.NoError(t, c.StopZero(0)) + require.NoError(t, c.StartZero(0)) + require.NoError(t, c.HealthCheck(false)) + + // Wait until a Zero leader is established again (lease renewal, hence startTxnTs bump, runs + // when a Zero becomes leader). Avoids racing the commit against the leaderless window. + require.Eventually(t, func() bool { + _, err := c.GetZeroLeader(0) + return err == nil + }, 60*time.Second, time.Second, "zero leader did not re-establish after restart") + + // Commit the stale txn via the raw stub to observe the categorized status. + port, err := c.GetAlphaGrpcPublicPort(0) + require.NoError(t, err) + conn, err := grpc.NewClient("0.0.0.0:"+port, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + defer func() { _ = conn.Close() }() + raw := api.NewDgraphClient(conn) + + _, err = raw.CommitOrAbort(ctx, tc) + require.Error(t, err, "committing a txn whose start ts predates the new leader must abort") + + st := status.Convert(err) + require.Equal(t, codes.Aborted, st.Code(), + "abort must keep codes.Aborted so existing clients still retry") + require.True(t, strings.HasPrefix(st.Message(), "stale-startts: "), + "abort reason should be categorized as stale-startts; got %q", st.Message()) +}