Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 54 additions & 2 deletions service/executor/handler/sqlx.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"database/sql"
"fmt"
"os"

"github.com/viant/datly/service/executor"
expand "github.com/viant/datly/service/executor/expand"
"github.com/viant/datly/service/executor/sequencer"
Expand Down Expand Up @@ -73,12 +75,18 @@ func (s *Service) Flush(ctx context.Context, tableName string) error {

options = append(options, executor.WithTx(tx))

filtered := s.dataUnit.Statements.FilterByTableName(tableName)
exec := executor.New()
if err := exec.ExecuteStmts(ctx, s, &sqlxIterator{
toExecute: s.dataUnit.Statements.FilterByTableName(tableName),
toExecute: filtered,
}, options...); err != nil {
return err
}
for _, item := range filtered {
if e, ok := item.(*expand.Executable); ok {
e.MarkAsExecuted()
}
}
return nil
}

Expand Down Expand Up @@ -225,15 +233,59 @@ func (s *Service) CanBatch(table string) bool {
return false
}

// Allocate reserves auto-increment IDs by performing a transient INSERT into tableName.
// Pending batched DML (e.g. DELETEs) must be committed first so the sequencer's
// transient INSERT (which runs on a raw DB connection) doesn't hit unique-key conflicts
// against rows that are queued for deletion but not yet visible as deleted.
//
// We use a dedicated transaction (db.BeginTx) instead of s.Flush/s.Tx to avoid
// triggering txNotifier, which would mark the main executor's TX as "transient"
// and prevent it from committing the remaining INSERTs/UPDATEs.
func (s *Service) Allocate(ctx context.Context, tableName string, dest interface{}, selector string) error {
pending := s.dataUnit.Statements.FilterByTableName(tableName)
if len(pending) > 0 {
if os.Getenv("DATLY_DEBUG_MUTABLE") == "1" {
fmt.Printf("[MUTABLE DEBUG] Allocate: committing %d pending statement(s) for %s before sequence allocation\n", len(pending), tableName)
}
if err := s.commitPendingForAllocate(ctx, pending); err != nil {
return fmt.Errorf("failed to commit pending statements for %s before Allocate: %w", tableName, err)
}
}
db, err := s.Db(ctx)
if err != nil {
return err
}
service := sequencer.New(context.Background(), db)
service := sequencer.New(ctx, db)
return service.Next(tableName, dest, selector)
}

// commitPendingForAllocate executes and commits pending DML in a dedicated transaction
// so the sequencer (which operates on the raw DB) can see the changes.
func (s *Service) commitPendingForAllocate(ctx context.Context, pending []interface{}) error {
db, err := s.Db(ctx)
if err != nil {
return err
}
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
exec := executor.New()
if err := exec.ExecuteStmts(ctx, s, &sqlxIterator{toExecute: pending}, executor.WithTx(tx)); err != nil {
_ = tx.Rollback()
return err
}
if err := tx.Commit(); err != nil {
return err
}
for _, item := range pending {
if e, ok := item.(*expand.Executable); ok {
e.MarkAsExecuted()
}
}
return nil
}

func (s *Service) CanBatchGlobally() bool {
return false
}
Loading