diff --git a/service/reader/service.go b/service/reader/service.go index 56074977..7cfaa790 100644 --- a/service/reader/service.go +++ b/service/reader/service.go @@ -4,9 +4,7 @@ import ( "context" "database/sql" "fmt" - "os" "reflect" - "runtime/debug" "strings" "sync" "sync/atomic" @@ -37,14 +35,6 @@ type Service struct { // ReadInto reads Data into provided destination, * dDest` is required. It has to be a pointer to `interface{}` or pointer to slice of `T` or `*T` func (s *Service) ReadInto(ctx context.Context, dest interface{}, aView *view.View, opts ...Option) error { - if os.Getenv("DATLY_DEBUG_READER") == "1" { - defer func() { - if r := recover(); r != nil { - fmt.Printf("[READER DEBUG] panic view=%s dest=%T err=%v\n%s\n", aView.Name, dest, r, debug.Stack()) - panic(r) - } - }() - } session, err := NewSession(dest, aView, opts...) if err != nil { return err @@ -287,10 +277,9 @@ func (s *Service) readObjects(ctx context.Context, session *Session, batchData * } func (s *Service) querySummary(ctx context.Context, session *Session, aView *view.View, statelet *view.Statelet, batchDataCopy *view.BatchData, collector *view.Collector, parentViewMetaParam *expand.ViewContext) (*response.SQLExecution, error) { - selectorDeref := *statelet - selectorDeref.Fields = []string{} - selectorDeref.Columns = []string{} - selector := &selectorDeref + selector := statelet.CloneForSummary() + selector.Fields = []string{} + selector.Columns = []string{} var indexed *cache.ParmetrizedQuery var cacheStats *cache.Stats @@ -430,9 +419,6 @@ func (s *Service) BuildCriteria(ctx context.Context, value interface{}, options } func (s *Service) queryInBatches(ctx context.Context, session *Session, aView *view.View, collector *view.Collector, visitor view.VisitorFn, info *response.SQLExecutions, batchData *view.BatchData, selector *view.Statelet) error { - if os.Getenv("DATLY_DEBUG_QUERY_HANDLER") == "1" { - fmt.Printf("[QUERY DEBUG] queryInBatches view=%s selectorTemplateNil=%v batchValues=%d\n", aView.Name, selector == nil || selector.Template == nil, len(batchData.ValuesBatch)) - } wg := &sync.WaitGroup{} db, err := aView.Db() if err != nil { @@ -471,19 +457,10 @@ func (s *Service) queryObjects(ctx context.Context, session *Session, aView *vie return s.queryWithPartitions(ctx, session, aView, selector, batchData, db, collector, visitor, partitioned) } readData := 0 - if os.Getenv("DATLY_DEBUG_QUERY_HANDLER") == "1" { - fmt.Printf("[QUERY DEBUG] queryObjects view=%s schema=%v slice=%v collectorView=%s\n", aView.Name, aView.Schema.Type(), aView.Schema.SliceType(), collector.View().Name) - } parametrizedSQL, columnInMatcher, err := s.buildParametrizedSQL(ctx, aView, selector, batchData, collector, session, nil) if err != nil { - if os.Getenv("DATLY_DEBUG_QUERY_HANDLER") == "1" { - fmt.Printf("[QUERY DEBUG] buildParametrizedSQL error view=%s err=%v\n", aView.Name, err) - } return nil, err } - if os.Getenv("DATLY_DEBUG_QUERY_HANDLER") == "1" { - fmt.Printf("[QUERY DEBUG] builtSQL view=%s sql=%s args=%#v\n", aView.Name, parametrizedSQL.SQL, parametrizedSQL.Args) - } var parentProvider func(value interface{}) (interface{}, error) handler := func(row interface{}) error { @@ -505,7 +482,8 @@ func (s *Service) queryObjects(ctx context.Context, session *Session, aView *vie } return visitor(row) } - return s.queryWithHandler(ctx, session, aView, collector, columnInMatcher, parametrizedSQL, db, handler, &readData) + execs, err := s.queryWithHandler(ctx, session, aView, collector, columnInMatcher, parametrizedSQL, db, handler, &readData) + return execs, err } func (s *Service) getParentContext(ctx context.Context, row interface{}, collector *view.Collector, parentProvider func(value interface{}) (interface{}, error)) (context.Context, error) { @@ -548,9 +526,6 @@ func (s *Service) queryWithHandler(ctx context.Context, session *Session, aView stats, onDone := NewExecutionInfo(parametrizedSQL, cacheStats, collector) defer onDone() - if os.Getenv("DATLY_DEBUG_QUERY_HANDLER") == "1" { - fmt.Printf("[QUERY HANDLER] view=%s sql=%s args=%#v\n", aView.Name, parametrizedSQL.SQL, parametrizedSQL.Args) - } if session.DryRun { return []*response.SQLExecution{stats}, nil } @@ -580,16 +555,7 @@ BEGIN: } _ = stmt.Close() }() - debugHandler := handler - if os.Getenv("DATLY_DEBUG_QUERY_HANDLER") == "1" { - debugHandler = func(row interface{}) error { - fmt.Printf("[QUERY HANDLER] view=%s before unwrap row=%T readData=%d\n", aView.Name, row, *readData) - err := handler(row) - fmt.Printf("[QUERY HANDLER] view=%s after handler row=%T readData=%d err=%v\n", aView.Name, row, *readData, err) - return err - } - } - err = reader.QueryAll(ctx, debugHandler, parametrizedSQL.Args...) + err = reader.QueryAll(ctx, handler, parametrizedSQL.Args...) isInvalidConnection = err != nil && strings.Contains(err.Error(), "invalid connection") if isInvalidConnection && atomic.AddUint32(&retires, 1) < 3 { diff --git a/view/state.go b/view/state.go index a187cd4b..370b9d38 100644 --- a/view/state.go +++ b/view/state.go @@ -151,3 +151,44 @@ func (s *State) Init(aView *View) { func (s *Statelet) IgnoreRead() { s.Ignore = true } + +// CloneForSummary creates a lock-safe copy of Statelet state for summary/meta work. +// It intentionally does not copy mutex or lock-owner bookkeeping. +func (s *Statelet) CloneForSummary() *Statelet { + if s == nil { + return NewStatelet() + } + + ret := &Statelet{ + DatabaseFormat: s.DatabaseFormat, + OutputFormat: s.OutputFormat, + Template: s.Template, + QuerySelector: s.QuerySelector, + QuerySettings: s.QuerySettings, + initialized: s.initialized, + result: s.result, + Ignore: s.Ignore, + } + + if s._columnNames != nil { + ret._columnNames = make(map[string]bool, len(s._columnNames)) + for k, v := range s._columnNames { + ret._columnNames[k] = v + } + } else { + ret._columnNames = map[string]bool{} + } + + if len(s.Filters) > 0 { + ret.Filters = append(predicate.Filters(nil), s.Filters...) + } + + if len(s.Fields) > 0 { + ret.Fields = append([]string(nil), s.Fields...) + } + if len(s.Columns) > 0 { + ret.Columns = append([]string(nil), s.Columns...) + } + + return ret +} diff --git a/view/state/kind/locator/form.go b/view/state/kind/locator/form.go index 9b37fdb6..b4fee35b 100644 --- a/view/state/kind/locator/form.go +++ b/view/state/kind/locator/form.go @@ -77,7 +77,7 @@ func (r *Form) Value(ctx context.Context, rType reflect.Type, name string) (inte if err := r.request.ParseForm(); err != nil { return nil, false, err } - values, ok := r.request.Form[name] + values, ok = r.request.Form[name] if !ok { return nil, false, nil } @@ -102,18 +102,26 @@ func NewForm(opts ...Option) (kind.Locator, error) { return ret, nil } -// seedFormFromMultipart parses multipart/form-data and copies values into shared maps. -// Mutex is required because multiple Form locators (one per parameter) can call this -// concurrently on the same request. Uses form.Values directly instead of form.Set to -// avoid deadlock (form.Set locks the same mutex). +// seedFormFromMultipart parses multipart/form-data (if needed) and copies textual values to the shared form func (r *Form) seedFormFromMultipart() { if r.request == nil || r.form == nil { return } + + // Session parameter binding runs in parallel. Multipart seeding mutates the + // shared form and request maps, so serialize that work under the form mutex. + mu := r.form.Mutex() + mu.Lock() + defer mu.Unlock() + if r.request.MultipartForm == nil && len(r.form.Values) == 0 { + // Only ParseMultipartForm for form-data; other multipart types aren't + // supported by ParseMultipartForm. If the shared form already has + // values, treat it as authoritative and avoid parsing. ct := r.request.Header.Get("Content-Type") if ct != "" { if mediaType, _, err := mime.ParseMediaType(ct); err == nil && shared.IsFormData(mediaType) { + // Use the same default memory threshold as Body locator const maxMultipartMemory = 32 << 20 // 32 MiB _ = r.request.ParseMultipartForm(maxMultipartMemory) } @@ -122,18 +130,13 @@ func (r *Form) seedFormFromMultipart() { if r.request.MultipartForm == nil { return } - // BUG FIX (concurrent map writes): - mu := r.form.Mutex() - mu.Lock() - defer mu.Unlock() - if r.request.Form == nil { - r.request.Form = url.Values{} + if r.form.Values == nil { + r.form.Values = url.Values{} } for k, vs := range r.request.MultipartForm.Value { if len(vs) == 0 { continue } - r.form.Values[k] = vs - r.request.Form[k] = vs + r.form.Values[k] = append([]string(nil), vs...) } }