From d7480ee5091b478482f9a60bfddfb1b74ab78573 Mon Sep 17 00:00:00 2001 From: Badr Ezzir Date: Tue, 14 Apr 2026 21:01:05 +0100 Subject: [PATCH] ENG-54618 ENG-Refactor statement execution with pending DML handling and sequence allocation improvements --- service/executor/handler/sqlx.go | 56 ++++++++++++++++++++++++++++++-- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/service/executor/handler/sqlx.go b/service/executor/handler/sqlx.go index 97048251..ef7fad1b 100644 --- a/service/executor/handler/sqlx.go +++ b/service/executor/handler/sqlx.go @@ -4,6 +4,8 @@ import ( "context" "database/sql" "fmt" + "os" + "github.com/viant/datly/service/executor" expand "github.com/viant/datly/service/executor/expand" "github.com/viant/datly/service/executor/sequencer" @@ -73,12 +75,18 @@ func (s *Service) Flush(ctx context.Context, tableName string) error { options = append(options, executor.WithTx(tx)) + filtered := s.dataUnit.Statements.FilterByTableName(tableName) exec := executor.New() if err := exec.ExecuteStmts(ctx, s, &sqlxIterator{ - toExecute: s.dataUnit.Statements.FilterByTableName(tableName), + toExecute: filtered, }, options...); err != nil { return err } + for _, item := range filtered { + if e, ok := item.(*expand.Executable); ok { + e.MarkAsExecuted() + } + } return nil } @@ -225,15 +233,59 @@ func (s *Service) CanBatch(table string) bool { return false } +// Allocate reserves auto-increment IDs by performing a transient INSERT into tableName. +// Pending batched DML (e.g. DELETEs) must be committed first so the sequencer's +// transient INSERT (which runs on a raw DB connection) doesn't hit unique-key conflicts +// against rows that are queued for deletion but not yet visible as deleted. +// +// We use a dedicated transaction (db.BeginTx) instead of s.Flush/s.Tx to avoid +// triggering txNotifier, which would mark the main executor's TX as "transient" +// and prevent it from committing the remaining INSERTs/UPDATEs. func (s *Service) Allocate(ctx context.Context, tableName string, dest interface{}, selector string) error { + pending := s.dataUnit.Statements.FilterByTableName(tableName) + if len(pending) > 0 { + if os.Getenv("DATLY_DEBUG_MUTABLE") == "1" { + fmt.Printf("[MUTABLE DEBUG] Allocate: committing %d pending statement(s) for %s before sequence allocation\n", len(pending), tableName) + } + if err := s.commitPendingForAllocate(ctx, pending); err != nil { + return fmt.Errorf("failed to commit pending statements for %s before Allocate: %w", tableName, err) + } + } db, err := s.Db(ctx) if err != nil { return err } - service := sequencer.New(context.Background(), db) + service := sequencer.New(ctx, db) return service.Next(tableName, dest, selector) } +// commitPendingForAllocate executes and commits pending DML in a dedicated transaction +// so the sequencer (which operates on the raw DB) can see the changes. +func (s *Service) commitPendingForAllocate(ctx context.Context, pending []interface{}) error { + db, err := s.Db(ctx) + if err != nil { + return err + } + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return err + } + exec := executor.New() + if err := exec.ExecuteStmts(ctx, s, &sqlxIterator{toExecute: pending}, executor.WithTx(tx)); err != nil { + _ = tx.Rollback() + return err + } + if err := tx.Commit(); err != nil { + return err + } + for _, item := range pending { + if e, ok := item.(*expand.Executable); ok { + e.MarkAsExecuted() + } + } + return nil +} + func (s *Service) CanBatchGlobally() bool { return false }