Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions dgraph/cmd/alpha/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"github.com/dgraph-io/dgo/v250"
"github.com/dgraph-io/dgo/v250/protos/api"
Expand Down Expand Up @@ -248,6 +253,117 @@ func TestConflict(t *testing.T) {
require.True(t, bytes.Equal(resp.Json, []byte("{\"me\":[{\"name\":\"Manish\"}]}")))
}

// TestConflictAbortReason proves the server emits the categorized abort reason on the
// gRPC status of a write-write conflict: the code stays codes.Aborted (so existing
// clients keep retrying) and the message is prefixed with "conflict: ". This is exactly
// the unflattened status a gRPC client such as dgraph4j receives and parses into
// TxnConflictException.AbortReason.
//
// It must inspect the raw CommitOrAbort response rather than dgo's high-level
// Txn.Commit, because dgo intentionally replaces any codes.Aborted error with the
// static dgo.ErrAborted (txn.go), discarding the reason for the Go client.
func TestConflictAbortReason(t *testing.T) {
op := &api.Operation{}
op.DropAll = true
require.NoError(t, dg.Alter(context.Background(), op))

// First transaction creates a node with a name.
txn := dg.NewTxn()
mu := &api.Mutation{}
mu.SetJson = []byte(`{"name": "Manish"}`)
assigned, err := txn.Mutate(context.Background(), mu)
require.NoError(t, err)
require.Len(t, assigned.Uids, 1)
var uid string
for _, u := range assigned.Uids {
uid = u
}

// Second transaction writes the same predicate on the same uid -> conflicts.
txn2 := dg.NewTxn()
mu = &api.Mutation{}
mu.SetJson = []byte(fmt.Sprintf(`{"uid": %q, "name": "Manish"}`, uid))
resp2, err := txn2.Mutate(context.Background(), mu)
require.NoError(t, err)
require.NotEmpty(t, resp2.GetTxn().GetKeys(), "mutation response must carry conflict keys")

// First commit wins; its commitTs is now greater than txn2's startTs.
require.NoError(t, txn.Commit(context.Background()))

// Commit the loser via the raw gRPC stub so we observe the unflattened status the
// server sends (dgo's Txn.Commit would replace it with the reasonless ErrAborted).
conn, err := grpc.NewClient(alphaSockAdd, grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
defer func() { _ = conn.Close() }()
raw := api.NewDgraphClient(conn)

// TestMain enables ACL, so the raw stub (unlike the logged-in dg client) must
// carry the access JWT; CommitOrAbort goes through the same auth interceptor.
ctx := metadata.NewOutgoingContext(context.Background(),
metadata.Pairs("accessJwt", hc.AccessJwt))
_, err = raw.CommitOrAbort(ctx, resp2.GetTxn())
require.Error(t, err)

st := status.Convert(err)
require.Equal(t, codes.Aborted, st.Code(),
"abort must keep codes.Aborted so existing clients still retry")
require.True(t, strings.HasPrefix(st.Message(), "conflict: "),
"abort reason should be categorized as conflict; got %q", st.Message())
}

// TestPredicateMoveAbortReason proves the "predicate-move" category is surfaced on the
// gRPC status. Zero's checkPreds aborts a commit whose predicate keys don't match the
// tablet's serving group (the same code path that rejects commits during a predicate
// move). We reach it deterministically by committing a real txn context whose Preds have
// been rewritten to claim a group that doesn't serve the predicate, with conflict Keys
// cleared so hasConflict passes and checkPreds runs.
//
// As with TestConflictAbortReason, it uses the raw CommitOrAbort stub to observe the
// unflattened status the server sends.
func TestPredicateMoveAbortReason(t *testing.T) {
op := &api.Operation{}
op.DropAll = true
require.NoError(t, dg.Alter(context.Background(), op))

// A normal mutation gives us a valid, fresh TxnContext (real StartTs and Preds).
txn := dg.NewTxn()
mu := &api.Mutation{SetJson: []byte(`{"name": "Alice"}`)}
resp, err := txn.Mutate(context.Background(), mu)
require.NoError(t, err)

tc := resp.GetTxn()
require.NotEmpty(t, tc.GetPreds(), "mutation response must carry predicate keys")

// Rewrite each predicate key's group id to a group that does not serve it, and drop
// the conflict keys so hasConflict is false and the commit reaches checkPreds.
doctored := make([]string, 0, len(tc.GetPreds()))
for _, p := range tc.GetPreds() {
// Preds look like "<gid>-<predicate>"; claim a nonexistent group 100.
if idx := strings.IndexByte(p, '-'); idx >= 0 {
doctored = append(doctored, "100"+p[idx:])
}
}
require.NotEmpty(t, doctored, "expected parseable predicate keys")
tc.Preds = doctored
tc.Keys = nil

conn, err := grpc.NewClient(alphaSockAdd, grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
defer func() { _ = conn.Close() }()
raw := api.NewDgraphClient(conn)

ctx := metadata.NewOutgoingContext(context.Background(),
metadata.Pairs("accessJwt", hc.AccessJwt))
_, err = raw.CommitOrAbort(ctx, tc)
require.Error(t, err)

st := status.Convert(err)
require.Equal(t, codes.Aborted, st.Code(),
"abort must keep codes.Aborted so existing clients still retry")
require.True(t, strings.HasPrefix(st.Message(), "predicate-move: "),
"abort reason should be categorized as predicate-move; got %q", st.Message())
}

func TestConflictTimeout(t *testing.T) {
var uid string
txn := dg.NewTxn()
Expand Down
72 changes: 65 additions & 7 deletions dgraph/cmd/zero/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"go.opentelemetry.io/otel"
attribute "go.opentelemetry.io/otel/attribute"
trace "go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/dgraph-io/badger/v4/y"
"github.com/dgraph-io/dgo/v250/protos/api"
Expand Down Expand Up @@ -338,21 +340,67 @@ func (s *Server) proposeTxn(ctx context.Context, src *api.TxnContext) error {
return nil
}

// Abort-reason codes. When Zero decides to abort a transaction it surfaces the category to the
// client as the prefix of a codes.Aborted gRPC status message, formatted as "<code>: <detail>".
// api.TxnContext (external dgo module) has no field for the reason, so it rides on the error;
// the status code stays codes.Aborted, so existing abort handling is unaffected. The dgraph4j
// client parses these prefixes into TxnConflictException.AbortReason — keep the two in sync.
const (
abortReasonConflict = "conflict"
abortReasonStaleStartTs = "stale-startts"
abortReasonPredicateMove = "predicate-move"
)

// Human-readable details paired with the conflict abort codes.
const (
abortDetailConflict = "Transaction has been aborted. Please retry"
abortDetailStaleStartTs = "Transaction has been aborted due to a leader change. Please retry"
)

// abortReason builds the wire string the client parses: "<code>: <detail>".
func abortReason(code, detail string) string {
return code + ": " + detail
}

// conflictAbortReason returns the wire reason for a hasConflict abort, distinguishing a
// write-write conflict from a stale start timestamp (a txn that predates the current
// leader's lease, i.e. a leader change rather than a real conflict).
func conflictAbortReason(stale bool) string {
if stale {
return abortReason(abortReasonStaleStartTs, abortDetailStaleStartTs)
}
return abortReason(abortReasonConflict, abortDetailConflict)
}

func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
span := trace.SpanFromContext(ctx)
span.SetAttributes(attribute.Int64("startTs", int64(src.StartTs)))
if src.Aborted {
// Client-initiated discard (txn.Discard); not a server-decided abort, so no reason.
return s.proposeTxn(ctx, src)
}

// abortWithReason marks the txn aborted, proposes it (advancing the watermark exactly as the
// previous code did), and then surfaces the categorized reason to the caller as a gRPC
// Aborted status. proposeTxn still runs identically — only the post-propose return changes.
abortWithReason := func(reason string) error {
span.SetAttributes(attribute.Bool("abort", true))
src.Aborted = true
if err := s.proposeTxn(ctx, src); err != nil {
return err
}
return status.Error(codes.Aborted, reason)
}

// Use the start timestamp to check if we have a conflict, before we need to assign a commit ts.
s.orc.RLock()
conflict := s.orc.hasConflict(src)
// A txn whose startTs predates this leader's lease is aborted by hasConflict, but it's a
// leader change rather than a write-write conflict — report it distinctly.
stale := src.StartTs < s.orc.startTxnTs
s.orc.RUnlock()
if conflict {
span.SetAttributes(attribute.Bool("abort", true))
src.Aborted = true
return s.proposeTxn(ctx, src)
return abortWithReason(conflictAbortReason(stale))
}

checkPreds := func() error {
Expand Down Expand Up @@ -385,9 +433,9 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
return nil
}
if err := checkPreds(); err != nil {
span.SetAttributes(attribute.Bool("abort", true))
src.Aborted = true
return s.proposeTxn(ctx, src)
// checkPreds builds rich messages, e.g. "Commits on predicate %s are blocked due to
// predicate move" — forward them instead of swallowing the reason.
return abortWithReason(abortReason(abortReasonPredicateMove, err.Error()))
}

num := pb.Num{Val: 1, Type: pb.Num_TXN_TS}
Expand All @@ -402,16 +450,26 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
span.SetAttributes(attribute.Int64("nodeId", int64(s.Node.Id)))
span.AddEvent(fmt.Sprintf("TXN Context: %+v", src))

aborted := false
if err := s.orc.commit(src); err != nil {
span.SetAttributes(attribute.Bool("abort", true))
src.Aborted = true
aborted = true
}
if err := ctx.Err(); err != nil {
span.SetAttributes(attribute.Bool("abort", true))
src.Aborted = true
aborted = true
}
// Propose txn should be used to set watermark as done.
return s.proposeTxn(ctx, src)
if err := s.proposeTxn(ctx, src); err != nil {
return err
}
if aborted {
// A late write-write conflict detected at commit time (keyCommit), or a cancelled ctx.
return status.Error(codes.Aborted, conflictAbortReason(false))
}
return nil
}

// CommitOrAbort either commits a transaction or aborts it.
Expand Down
63 changes: 63 additions & 0 deletions dgraph/cmd/zero/oracle_reason_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc.
* SPDX-License-Identifier: Apache-2.0
*/

package zero

import (
"strings"
"testing"

"github.com/stretchr/testify/require"

"github.com/dgraph-io/dgo/v250/protos/api"
)

// The abort-reason wire format is a contract with gRPC clients (e.g. dgraph4j parses the
// "<code>: " prefix into TxnConflictException.AbortReason). These unit tests pin the
// category prefixes and the logic that selects between them, so the contract can't drift
// silently without an integration cluster.

func TestAbortReasonFormat(t *testing.T) {
require.Equal(t, "conflict: boom", abortReason(abortReasonConflict, "boom"))
require.Equal(t, "stale-startts: x", abortReason(abortReasonStaleStartTs, "x"))
require.Equal(t, "predicate-move: y", abortReason(abortReasonPredicateMove, "y"))
}

func TestConflictAbortReason(t *testing.T) {
// Write-write conflict.
r := conflictAbortReason(false)
require.True(t, strings.HasPrefix(r, abortReasonConflict+": "),
"want conflict prefix, got %q", r)
require.Equal(t, abortReason(abortReasonConflict, abortDetailConflict), r)

// Stale start timestamp (leader change).
r = conflictAbortReason(true)
require.True(t, strings.HasPrefix(r, abortReasonStaleStartTs+": "),
"want stale-startts prefix, got %q", r)
require.Equal(t, abortReason(abortReasonStaleStartTs, abortDetailStaleStartTs), r)
require.Contains(t, r, "leader change")
}

// TestHasConflictStaleStartTs pins the exact discriminator commit() uses to choose the
// stale-startts reason: a txn whose startTs predates the leader's startTxnTs lease is a
// conflict, and is flagged stale; a fresh startTs with no conflicting keys is neither.
func TestHasConflictStaleStartTs(t *testing.T) {
o := &Oracle{}
o.Init()
defer o.close()

o.updateStartTxnTs(100)

// startTs below the lease floor: hasConflict true, and the stale discriminator true.
stale := &api.TxnContext{StartTs: 42}
require.True(t, o.hasConflict(stale), "txn below startTxnTs must conflict")
require.True(t, stale.StartTs < o.startTxnTs, "must be flagged stale")
require.Equal(t, conflictAbortReason(true), conflictAbortReason(stale.StartTs < o.startTxnTs))

// startTs at/above the lease floor with no keys: not a conflict, not stale.
fresh := &api.TxnContext{StartTs: 100}
require.False(t, o.hasConflict(fresh), "fresh txn with no keys must not conflict")
require.False(t, fresh.StartTs < o.startTxnTs, "must not be flagged stale")
}
10 changes: 10 additions & 0 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,10 @@ func (s *Server) doMutate(ctx context.Context, qc *queryContext, resp *api.Respo
if err == dgo.ErrAborted {
err = status.Error(codes.Aborted, err.Error())
resp.Txn.Aborted = true
} else if status.Code(err) == codes.Aborted {
// Server-decided abort carrying a categorized reason; err is already a codes.Aborted
// status (e.g. "conflict: ...", "predicate-move: ...") — surface it unchanged.
resp.Txn.Aborted = true
}

return err
Expand Down Expand Up @@ -2085,6 +2089,12 @@ func (s *Server) CommitOrAbort(ctx context.Context, tc *api.TxnContext) (*api.Tx

return tctx, status.Error(codes.Aborted, err.Error())
}
if status.Code(err) == codes.Aborted {
// Server-decided abort carrying a categorized reason; err is already a codes.Aborted
// status (e.g. "conflict: ...", "predicate-move: ...") — surface it unchanged.
tctx.Aborted = true
return tctx, err
}
tctx.StartTs = tc.StartTs
tctx.CommitTs = commitTs
return tctx, err
Expand Down
Loading
Loading