From 6a1892ce5656461c6b0ce064d2583a7c012b0cc3 Mon Sep 17 00:00:00 2001 From: vc42 Date: Wed, 22 Apr 2026 12:28:21 -0400 Subject: [PATCH 1/3] concurrent map fix in seedFormFromMultipart --- view/state/kind/locator/form.go | 53 +++++++++++++++++---------------- 1 file changed, 28 insertions(+), 25 deletions(-) diff --git a/view/state/kind/locator/form.go b/view/state/kind/locator/form.go index 9b37fdb6..4c4ee643 100644 --- a/view/state/kind/locator/form.go +++ b/view/state/kind/locator/form.go @@ -71,23 +71,18 @@ func (r *Form) Value(ctx context.Context, rType reflect.Type, name string) (inte } return nil, false, nil } - // Non-multipart: parse form/query values and preserve repeated values. + // Non-multipart: use standard FormValue fallback r.form.Mutex().Lock() defer r.form.Mutex().Unlock() - if err := r.request.ParseForm(); err != nil { - return nil, false, err - } - values, ok := r.request.Form[name] - if !ok { - return nil, false, nil - } - if len(values) > 1 { - return values, true, nil - } - if len(values) == 1 { - return values[0], true, nil + value := r.request.FormValue(name) + if value == "" { + if r.request.Form == nil { + return nil, false, nil + } + _, ok := r.request.Form[name] + return "", ok, nil } - return "", true, nil + return value, true, nil } if len(values) > 1 { return values, true, nil @@ -102,18 +97,26 @@ func NewForm(opts ...Option) (kind.Locator, error) { return ret, nil } -// seedFormFromMultipart parses multipart/form-data and copies values into shared maps. -// Mutex is required because multiple Form locators (one per parameter) can call this -// concurrently on the same request. Uses form.Values directly instead of form.Set to -// avoid deadlock (form.Set locks the same mutex). +// seedFormFromMultipart parses multipart/form-data (if needed) and copies textual values to the shared form func (r *Form) seedFormFromMultipart() { if r.request == nil || r.form == nil { return } + + // Session parameter binding runs in parallel. Multipart seeding mutates the + // shared form and request maps, so serialize that work under the form mutex. + mu := r.form.Mutex() + mu.Lock() + defer mu.Unlock() + if r.request.MultipartForm == nil && len(r.form.Values) == 0 { + // Only ParseMultipartForm for form-data; other multipart types aren't + // supported by ParseMultipartForm. If the shared form already has + // values, treat it as authoritative and avoid parsing. ct := r.request.Header.Get("Content-Type") if ct != "" { if mediaType, _, err := mime.ParseMediaType(ct); err == nil && shared.IsFormData(mediaType) { + // Use the same default memory threshold as Body locator const maxMultipartMemory = 32 << 20 // 32 MiB _ = r.request.ParseMultipartForm(maxMultipartMemory) } @@ -122,18 +125,18 @@ func (r *Form) seedFormFromMultipart() { if r.request.MultipartForm == nil { return } - // BUG FIX (concurrent map writes): - mu := r.form.Mutex() - mu.Lock() - defer mu.Unlock() - if r.request.Form == nil { + if r.form.Values == nil { + r.form.Values = url.Values{} + } + if len(r.request.Form) == 0 { r.request.Form = url.Values{} } for k, vs := range r.request.MultipartForm.Value { if len(vs) == 0 { continue } - r.form.Values[k] = vs - r.request.Form[k] = vs + seeded := append([]string(nil), vs...) + r.form.Values[k] = seeded + r.request.Form[k] = append([]string(nil), seeded...) } } From 704013dc252be9b164a055404d4ffe61ad1395f7 Mon Sep 17 00:00:00 2001 From: vc42 Date: Wed, 22 Apr 2026 12:48:42 -0400 Subject: [PATCH 2/3] concurrent map fix in seedFormFromMultipart 2 --- view/state/kind/locator/form.go | 34 +++++++++++++++------------------ 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/view/state/kind/locator/form.go b/view/state/kind/locator/form.go index 4c4ee643..35c7695f 100644 --- a/view/state/kind/locator/form.go +++ b/view/state/kind/locator/form.go @@ -5,7 +5,6 @@ import ( "mime" "mime/multipart" "net/http" - "net/url" "reflect" "sync" @@ -71,18 +70,23 @@ func (r *Form) Value(ctx context.Context, rType reflect.Type, name string) (inte } return nil, false, nil } - // Non-multipart: use standard FormValue fallback + // Non-multipart: parse form/query values and preserve repeated values. r.form.Mutex().Lock() defer r.form.Mutex().Unlock() - value := r.request.FormValue(name) - if value == "" { - if r.request.Form == nil { - return nil, false, nil - } - _, ok := r.request.Form[name] - return "", ok, nil + if err := r.request.ParseForm(); err != nil { + return nil, false, err + } + values, ok = r.request.Form[name] + if !ok { + return nil, false, nil + } + if len(values) > 1 { + return values, true, nil } - return value, true, nil + if len(values) == 1 { + return values[0], true, nil + } + return "", true, nil } if len(values) > 1 { return values, true, nil @@ -125,18 +129,10 @@ func (r *Form) seedFormFromMultipart() { if r.request.MultipartForm == nil { return } - if r.form.Values == nil { - r.form.Values = url.Values{} - } - if len(r.request.Form) == 0 { - r.request.Form = url.Values{} - } for k, vs := range r.request.MultipartForm.Value { if len(vs) == 0 { continue } - seeded := append([]string(nil), vs...) - r.form.Values[k] = seeded - r.request.Form[k] = append([]string(nil), seeded...) + r.form.Set(k, append([]string(nil), vs...)...) } } From 72d783ff5202e8b61e53f3ea5ca351bed01e1c67 Mon Sep 17 00:00:00 2001 From: vc42 Date: Thu, 23 Apr 2026 14:21:34 -0400 Subject: [PATCH 3/3] mutex corruption fix --- service/reader/service.go | 46 +++++---------------------------- view/state.go | 41 +++++++++++++++++++++++++++++ view/state/kind/locator/form.go | 6 ++++- 3 files changed, 52 insertions(+), 41 deletions(-) diff --git a/service/reader/service.go b/service/reader/service.go index 56074977..7cfaa790 100644 --- a/service/reader/service.go +++ b/service/reader/service.go @@ -4,9 +4,7 @@ import ( "context" "database/sql" "fmt" - "os" "reflect" - "runtime/debug" "strings" "sync" "sync/atomic" @@ -37,14 +35,6 @@ type Service struct { // ReadInto reads Data into provided destination, * dDest` is required. It has to be a pointer to `interface{}` or pointer to slice of `T` or `*T` func (s *Service) ReadInto(ctx context.Context, dest interface{}, aView *view.View, opts ...Option) error { - if os.Getenv("DATLY_DEBUG_READER") == "1" { - defer func() { - if r := recover(); r != nil { - fmt.Printf("[READER DEBUG] panic view=%s dest=%T err=%v\n%s\n", aView.Name, dest, r, debug.Stack()) - panic(r) - } - }() - } session, err := NewSession(dest, aView, opts...) if err != nil { return err @@ -287,10 +277,9 @@ func (s *Service) readObjects(ctx context.Context, session *Session, batchData * } func (s *Service) querySummary(ctx context.Context, session *Session, aView *view.View, statelet *view.Statelet, batchDataCopy *view.BatchData, collector *view.Collector, parentViewMetaParam *expand.ViewContext) (*response.SQLExecution, error) { - selectorDeref := *statelet - selectorDeref.Fields = []string{} - selectorDeref.Columns = []string{} - selector := &selectorDeref + selector := statelet.CloneForSummary() + selector.Fields = []string{} + selector.Columns = []string{} var indexed *cache.ParmetrizedQuery var cacheStats *cache.Stats @@ -430,9 +419,6 @@ func (s *Service) BuildCriteria(ctx context.Context, value interface{}, options } func (s *Service) queryInBatches(ctx context.Context, session *Session, aView *view.View, collector *view.Collector, visitor view.VisitorFn, info *response.SQLExecutions, batchData *view.BatchData, selector *view.Statelet) error { - if os.Getenv("DATLY_DEBUG_QUERY_HANDLER") == "1" { - fmt.Printf("[QUERY DEBUG] queryInBatches view=%s selectorTemplateNil=%v batchValues=%d\n", aView.Name, selector == nil || selector.Template == nil, len(batchData.ValuesBatch)) - } wg := &sync.WaitGroup{} db, err := aView.Db() if err != nil { @@ -471,19 +457,10 @@ func (s *Service) queryObjects(ctx context.Context, session *Session, aView *vie return s.queryWithPartitions(ctx, session, aView, selector, batchData, db, collector, visitor, partitioned) } readData := 0 - if os.Getenv("DATLY_DEBUG_QUERY_HANDLER") == "1" { - fmt.Printf("[QUERY DEBUG] queryObjects view=%s schema=%v slice=%v collectorView=%s\n", aView.Name, aView.Schema.Type(), aView.Schema.SliceType(), collector.View().Name) - } parametrizedSQL, columnInMatcher, err := s.buildParametrizedSQL(ctx, aView, selector, batchData, collector, session, nil) if err != nil { - if os.Getenv("DATLY_DEBUG_QUERY_HANDLER") == "1" { - fmt.Printf("[QUERY DEBUG] buildParametrizedSQL error view=%s err=%v\n", aView.Name, err) - } return nil, err } - if os.Getenv("DATLY_DEBUG_QUERY_HANDLER") == "1" { - fmt.Printf("[QUERY DEBUG] builtSQL view=%s sql=%s args=%#v\n", aView.Name, parametrizedSQL.SQL, parametrizedSQL.Args) - } var parentProvider func(value interface{}) (interface{}, error) handler := func(row interface{}) error { @@ -505,7 +482,8 @@ func (s *Service) queryObjects(ctx context.Context, session *Session, aView *vie } return visitor(row) } - return s.queryWithHandler(ctx, session, aView, collector, columnInMatcher, parametrizedSQL, db, handler, &readData) + execs, err := s.queryWithHandler(ctx, session, aView, collector, columnInMatcher, parametrizedSQL, db, handler, &readData) + return execs, err } func (s *Service) getParentContext(ctx context.Context, row interface{}, collector *view.Collector, parentProvider func(value interface{}) (interface{}, error)) (context.Context, error) { @@ -548,9 +526,6 @@ func (s *Service) queryWithHandler(ctx context.Context, session *Session, aView stats, onDone := NewExecutionInfo(parametrizedSQL, cacheStats, collector) defer onDone() - if os.Getenv("DATLY_DEBUG_QUERY_HANDLER") == "1" { - fmt.Printf("[QUERY HANDLER] view=%s sql=%s args=%#v\n", aView.Name, parametrizedSQL.SQL, parametrizedSQL.Args) - } if session.DryRun { return []*response.SQLExecution{stats}, nil } @@ -580,16 +555,7 @@ BEGIN: } _ = stmt.Close() }() - debugHandler := handler - if os.Getenv("DATLY_DEBUG_QUERY_HANDLER") == "1" { - debugHandler = func(row interface{}) error { - fmt.Printf("[QUERY HANDLER] view=%s before unwrap row=%T readData=%d\n", aView.Name, row, *readData) - err := handler(row) - fmt.Printf("[QUERY HANDLER] view=%s after handler row=%T readData=%d err=%v\n", aView.Name, row, *readData, err) - return err - } - } - err = reader.QueryAll(ctx, debugHandler, parametrizedSQL.Args...) + err = reader.QueryAll(ctx, handler, parametrizedSQL.Args...) isInvalidConnection = err != nil && strings.Contains(err.Error(), "invalid connection") if isInvalidConnection && atomic.AddUint32(&retires, 1) < 3 { diff --git a/view/state.go b/view/state.go index a187cd4b..370b9d38 100644 --- a/view/state.go +++ b/view/state.go @@ -151,3 +151,44 @@ func (s *State) Init(aView *View) { func (s *Statelet) IgnoreRead() { s.Ignore = true } + +// CloneForSummary creates a lock-safe copy of Statelet state for summary/meta work. +// It intentionally does not copy mutex or lock-owner bookkeeping. +func (s *Statelet) CloneForSummary() *Statelet { + if s == nil { + return NewStatelet() + } + + ret := &Statelet{ + DatabaseFormat: s.DatabaseFormat, + OutputFormat: s.OutputFormat, + Template: s.Template, + QuerySelector: s.QuerySelector, + QuerySettings: s.QuerySettings, + initialized: s.initialized, + result: s.result, + Ignore: s.Ignore, + } + + if s._columnNames != nil { + ret._columnNames = make(map[string]bool, len(s._columnNames)) + for k, v := range s._columnNames { + ret._columnNames[k] = v + } + } else { + ret._columnNames = map[string]bool{} + } + + if len(s.Filters) > 0 { + ret.Filters = append(predicate.Filters(nil), s.Filters...) + } + + if len(s.Fields) > 0 { + ret.Fields = append([]string(nil), s.Fields...) + } + if len(s.Columns) > 0 { + ret.Columns = append([]string(nil), s.Columns...) + } + + return ret +} diff --git a/view/state/kind/locator/form.go b/view/state/kind/locator/form.go index 35c7695f..b4fee35b 100644 --- a/view/state/kind/locator/form.go +++ b/view/state/kind/locator/form.go @@ -5,6 +5,7 @@ import ( "mime" "mime/multipart" "net/http" + "net/url" "reflect" "sync" @@ -129,10 +130,13 @@ func (r *Form) seedFormFromMultipart() { if r.request.MultipartForm == nil { return } + if r.form.Values == nil { + r.form.Values = url.Values{} + } for k, vs := range r.request.MultipartForm.Value { if len(vs) == 0 { continue } - r.form.Set(k, append([]string(nil), vs...)...) + r.form.Values[k] = append([]string(nil), vs...) } }