Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 6 additions & 40 deletions service/reader/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"os"
"reflect"
"runtime/debug"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -37,14 +35,6 @@ type Service struct {

// ReadInto reads Data into provided destination, * dDest` is required. It has to be a pointer to `interface{}` or pointer to slice of `T` or `*T`
func (s *Service) ReadInto(ctx context.Context, dest interface{}, aView *view.View, opts ...Option) error {
if os.Getenv("DATLY_DEBUG_READER") == "1" {
defer func() {
if r := recover(); r != nil {
fmt.Printf("[READER DEBUG] panic view=%s dest=%T err=%v\n%s\n", aView.Name, dest, r, debug.Stack())
panic(r)
}
}()
}
session, err := NewSession(dest, aView, opts...)
if err != nil {
return err
Expand Down Expand Up @@ -287,10 +277,9 @@ func (s *Service) readObjects(ctx context.Context, session *Session, batchData *
}

func (s *Service) querySummary(ctx context.Context, session *Session, aView *view.View, statelet *view.Statelet, batchDataCopy *view.BatchData, collector *view.Collector, parentViewMetaParam *expand.ViewContext) (*response.SQLExecution, error) {
selectorDeref := *statelet
selectorDeref.Fields = []string{}
selectorDeref.Columns = []string{}
selector := &selectorDeref
selector := statelet.CloneForSummary()
selector.Fields = []string{}
selector.Columns = []string{}

var indexed *cache.ParmetrizedQuery
var cacheStats *cache.Stats
Expand Down Expand Up @@ -430,9 +419,6 @@ func (s *Service) BuildCriteria(ctx context.Context, value interface{}, options
}

func (s *Service) queryInBatches(ctx context.Context, session *Session, aView *view.View, collector *view.Collector, visitor view.VisitorFn, info *response.SQLExecutions, batchData *view.BatchData, selector *view.Statelet) error {
if os.Getenv("DATLY_DEBUG_QUERY_HANDLER") == "1" {
fmt.Printf("[QUERY DEBUG] queryInBatches view=%s selectorTemplateNil=%v batchValues=%d\n", aView.Name, selector == nil || selector.Template == nil, len(batchData.ValuesBatch))
}
wg := &sync.WaitGroup{}
db, err := aView.Db()
if err != nil {
Expand Down Expand Up @@ -471,19 +457,10 @@ func (s *Service) queryObjects(ctx context.Context, session *Session, aView *vie
return s.queryWithPartitions(ctx, session, aView, selector, batchData, db, collector, visitor, partitioned)
}
readData := 0
if os.Getenv("DATLY_DEBUG_QUERY_HANDLER") == "1" {
fmt.Printf("[QUERY DEBUG] queryObjects view=%s schema=%v slice=%v collectorView=%s\n", aView.Name, aView.Schema.Type(), aView.Schema.SliceType(), collector.View().Name)
}
parametrizedSQL, columnInMatcher, err := s.buildParametrizedSQL(ctx, aView, selector, batchData, collector, session, nil)
if err != nil {
if os.Getenv("DATLY_DEBUG_QUERY_HANDLER") == "1" {
fmt.Printf("[QUERY DEBUG] buildParametrizedSQL error view=%s err=%v\n", aView.Name, err)
}
return nil, err
}
if os.Getenv("DATLY_DEBUG_QUERY_HANDLER") == "1" {
fmt.Printf("[QUERY DEBUG] builtSQL view=%s sql=%s args=%#v\n", aView.Name, parametrizedSQL.SQL, parametrizedSQL.Args)
}

var parentProvider func(value interface{}) (interface{}, error)
handler := func(row interface{}) error {
Expand All @@ -505,7 +482,8 @@ func (s *Service) queryObjects(ctx context.Context, session *Session, aView *vie
}
return visitor(row)
}
return s.queryWithHandler(ctx, session, aView, collector, columnInMatcher, parametrizedSQL, db, handler, &readData)
execs, err := s.queryWithHandler(ctx, session, aView, collector, columnInMatcher, parametrizedSQL, db, handler, &readData)
return execs, err
}

func (s *Service) getParentContext(ctx context.Context, row interface{}, collector *view.Collector, parentProvider func(value interface{}) (interface{}, error)) (context.Context, error) {
Expand Down Expand Up @@ -548,9 +526,6 @@ func (s *Service) queryWithHandler(ctx context.Context, session *Session, aView

stats, onDone := NewExecutionInfo(parametrizedSQL, cacheStats, collector)
defer onDone()
if os.Getenv("DATLY_DEBUG_QUERY_HANDLER") == "1" {
fmt.Printf("[QUERY HANDLER] view=%s sql=%s args=%#v\n", aView.Name, parametrizedSQL.SQL, parametrizedSQL.Args)
}
if session.DryRun {
return []*response.SQLExecution{stats}, nil
}
Expand Down Expand Up @@ -580,16 +555,7 @@ BEGIN:
}
_ = stmt.Close()
}()
debugHandler := handler
if os.Getenv("DATLY_DEBUG_QUERY_HANDLER") == "1" {
debugHandler = func(row interface{}) error {
fmt.Printf("[QUERY HANDLER] view=%s before unwrap row=%T readData=%d\n", aView.Name, row, *readData)
err := handler(row)
fmt.Printf("[QUERY HANDLER] view=%s after handler row=%T readData=%d err=%v\n", aView.Name, row, *readData, err)
return err
}
}
err = reader.QueryAll(ctx, debugHandler, parametrizedSQL.Args...)
err = reader.QueryAll(ctx, handler, parametrizedSQL.Args...)

isInvalidConnection = err != nil && strings.Contains(err.Error(), "invalid connection")
if isInvalidConnection && atomic.AddUint32(&retires, 1) < 3 {
Expand Down
41 changes: 41 additions & 0 deletions view/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,44 @@ func (s *State) Init(aView *View) {
func (s *Statelet) IgnoreRead() {
s.Ignore = true
}

// CloneForSummary creates a lock-safe copy of Statelet state for summary/meta work.
// It intentionally does not copy mutex or lock-owner bookkeeping.
func (s *Statelet) CloneForSummary() *Statelet {
if s == nil {
return NewStatelet()
}

ret := &Statelet{
DatabaseFormat: s.DatabaseFormat,
OutputFormat: s.OutputFormat,
Template: s.Template,
QuerySelector: s.QuerySelector,
QuerySettings: s.QuerySettings,
initialized: s.initialized,
result: s.result,
Ignore: s.Ignore,
}

if s._columnNames != nil {
ret._columnNames = make(map[string]bool, len(s._columnNames))
for k, v := range s._columnNames {
ret._columnNames[k] = v
}
} else {
ret._columnNames = map[string]bool{}
}

if len(s.Filters) > 0 {
ret.Filters = append(predicate.Filters(nil), s.Filters...)
}

if len(s.Fields) > 0 {
ret.Fields = append([]string(nil), s.Fields...)
}
if len(s.Columns) > 0 {
ret.Columns = append([]string(nil), s.Columns...)
}

return ret
}
29 changes: 16 additions & 13 deletions view/state/kind/locator/form.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (r *Form) Value(ctx context.Context, rType reflect.Type, name string) (inte
if err := r.request.ParseForm(); err != nil {
return nil, false, err
}
values, ok := r.request.Form[name]
values, ok = r.request.Form[name]
if !ok {
return nil, false, nil
}
Expand All @@ -102,18 +102,26 @@ func NewForm(opts ...Option) (kind.Locator, error) {
return ret, nil
}

// seedFormFromMultipart parses multipart/form-data and copies values into shared maps.
// Mutex is required because multiple Form locators (one per parameter) can call this
// concurrently on the same request. Uses form.Values directly instead of form.Set to
// avoid deadlock (form.Set locks the same mutex).
// seedFormFromMultipart parses multipart/form-data (if needed) and copies textual values to the shared form
func (r *Form) seedFormFromMultipart() {
if r.request == nil || r.form == nil {
return
}

// Session parameter binding runs in parallel. Multipart seeding mutates the
// shared form and request maps, so serialize that work under the form mutex.
mu := r.form.Mutex()
mu.Lock()
defer mu.Unlock()

if r.request.MultipartForm == nil && len(r.form.Values) == 0 {
// Only ParseMultipartForm for form-data; other multipart types aren't
// supported by ParseMultipartForm. If the shared form already has
// values, treat it as authoritative and avoid parsing.
ct := r.request.Header.Get("Content-Type")
if ct != "" {
if mediaType, _, err := mime.ParseMediaType(ct); err == nil && shared.IsFormData(mediaType) {
// Use the same default memory threshold as Body locator
const maxMultipartMemory = 32 << 20 // 32 MiB
_ = r.request.ParseMultipartForm(maxMultipartMemory)
}
Expand All @@ -122,18 +130,13 @@ func (r *Form) seedFormFromMultipart() {
if r.request.MultipartForm == nil {
return
}
// BUG FIX (concurrent map writes):
mu := r.form.Mutex()
mu.Lock()
defer mu.Unlock()
if r.request.Form == nil {
r.request.Form = url.Values{}
if r.form.Values == nil {
r.form.Values = url.Values{}
}
for k, vs := range r.request.MultipartForm.Value {
if len(vs) == 0 {
continue
}
r.form.Values[k] = vs
r.request.Form[k] = vs
r.form.Values[k] = append([]string(nil), vs...)
}
}
Loading