Skip to content

Rewrite log streaming with io.Pipe#291

Closed
jasonopslevel wants to merge 2 commits into
mainfrom
jason/logs-io-pipe-rewrite
Closed

Rewrite log streaming with io.Pipe#291
jasonopslevel wants to merge 2 commits into
mainfrom
jason/logs-io-pipe-rewrite

Conversation

@jasonopslevel
Copy link
Copy Markdown
Contributor

@jasonopslevel jasonopslevel commented Apr 21, 2026

Replace the SafeBuffer + ticker-polling LogStreamer with reader goroutines that drain io.Pipe endpoints via a manual []byte buffer and bytes.IndexByte line extraction. Exec writes directly to the PipeWriter; readers emit lines as they arrive and force-flush once the buffer reaches 64 KiB with no newline, retaining a 1 KiB tail so patterns straddling the cut still match on the next emission. BoundarySafeProcessor lets the sanitizer redact across that cut. Flush closes the pipes and waits on a WaitGroup, which naturally delivers any trailing partial line via EOF.

This removes the polling loop, the 50 ms latency floor, the 200 ms Flush sleep, the Run↔Flush processor race, and the SafeBuffer type entirely.

⏺                     Concurrency / Data Flow — LogStreamer (afc6ca4)                                                                                                                                                                                
                                                               
    ┌──────────────────────────┐                                                                                                                                                                                                                     
    │ k8s exec (main goroutine │                                     ctx cancellation                                                                                                                                                                
    │  or caller's goroutine)  │                               ┌────────────────────┐                                                                                                                                                                
    │                          │                               │  Run(ctx) goroutine│                                                                                                                                                                
    │  Stdout.Write(...)       │                               │  (optional)        │                                                                                                                                                                
    │  Stderr.Write(...)       │                               │                    │                                                                                                                                                                
    └────────────┬─────────────┘                               │  select {          │                                                                                                                                                                
                 │                                             │   <-ctx.Done():    │                                                                                                                                                                
                 │ bytes                                       │     stdoutR.Close  │                                                                                                                                                                
                 ▼                                             │      WithError()   │                                                                                                                                                                
        ┌────────────────┐          ┌────────────────┐         │     stderrR.Close  │                                                                                                                                                                
        │ io.Pipe (stdout│          │ io.Pipe (stderr│◄────────┤      WithError()   │                                                                                                                                                                
        │  PW ──► PR     │          │  PW ──► PR     │         │   <-done:          │                                                                                                                                                                
        └───────┬────────┘          └───────┬────────┘         │  }                 │                                                                                                                                                                
                │ Read(readBuf=4KiB)        │ Read(readBuf=4KiB│                    │                                                                                                                                                                
                ▼                           ▼                  │  wg.Wait() ──► done│                                                                                                                                                                
     ┌───────────────────────┐  ┌───────────────────────┐      └──────────▲─────────┘                                                                                                                                                                
     │ readLoop goroutine #1 │  │ readLoop goroutine #2 │                 │                                                                                                                                                                          
     │ (stdout)              │  │ (stderr)              │                 │ wg.Done x2                                                                                                                                                               
     │                       │  │                       │                 │                                                                                                                                                                          
     │  append → buf         │  │  append → buf         │                 │                                                                                                                                                                          
     │  drain(buf):          │  │  drain(buf):          │                 │                                                                                                                                                                          
     │   for '\n' in buf:    │  │   for '\n' in buf:    │                 │
     │     emit(line)        │  │     emit(line)        │                 │                                                                                                                                                                          
     │   if len(buf)≥64KiB:  │  │   if len(buf)≥64KiB:  │                 │
     │     SanitizeBoundary  │  │     SanitizeBoundary  │                 │                                                                                                                                                                          
     │      (full buf)       │  │      (full buf)       │                 │
     │     emit(prefix)      │  │     emit(prefix)      │                 │                                                                                                                                                                          
     │     keep 1KiB tail    │  │     keep 1KiB tail    │                 │                                                                                                                                                                          
     │  on err/EOF:          │  │  on err/EOF:          │                 │                                                                                                                                                                          
     │   emit(trailing)      │  │   emit(trailing)      │                 │                                                                                                                                                                          
     │   wg.Done() ──────────┼──┼───────────────────────┼─────────────────┘                                                                                                                                                                          
     └──────────┬────────────┘  └──────────┬────────────┘                                                                                                                                                                                            
                │                          │                                                                                                                                                                                                         
                │ emit(line)               │ emit(line)                                                                                                                                                                                              
                └────────────┬─────────────┘                                                                                                                                                                                                         
                             ▼
             ┌─────────────────────────────────┐                                                                                                                                                                                                     
             │  processor chain (in order)     │
             │   line = p[0].ProcessStd*(line) │ sanitize → prefix → set-outcome-var                                                                                                                                                                 
             │   line = p[1].ProcessStd*(line) │ → opslevelAppend / faktory...                                                                                                                                                                       
             │     ...                         │                                                                                                                                                                                                     
             └────────────────┬────────────────┘                                                                                                                                                                                                     
                              ▼                                                                                                                                                                                                                      
                  ┌──────────────────────┐                                                                                                                                                                                                           
                  │ ring buffer (20)     │  ◄── logBufferMu (also used by
                  │  logBuffer.Value=line│       GetLogBuffer readers)                                                                                                                                                                               
                  └──────────────────────┘                                                                                                                                                                                                           
                                                                                                                                                                                                                                                     
    ───────────────────────────────────────────────────────────────────────────────                                                                                                                                                                  
    Flush(outcome)  (caller, exactly once):
       closeOnce:  Stdout.Close() ; Stderr.Close()      ── signals EOF to pipes                                                                                                                                                                      
       wg.Wait()                                          ── readers drain + exit,                                                                                                                                                                   
                                                             trailing partial lines                                                                                                                                                                  
                                                             emitted on EOF                                                                                                                                                                          
       for i := N-1; i >= 0; i--: processors[i].Flush(outcome)   ── reverse order                                                                                                                                                                    
    ───────────────────────────────────────────────────────────────────────────────                                                                                                                                                                  
                                                                                                                                                                                                                                                     
    Synchronization summary                                                                                                                                                                                                                          
      • 2 reader goroutines, 1 optional Run(ctx) goroutine, 1+ writers (exec)
      • io.Pipe provides Write→Read happens-before (publishes s.processors slice)                                                                                                                                                                    
      • WaitGroup: readers wg.Done on EOF/err; Flush & Run both wg.Wait                                                                                                                                                                              
      • closeOnce: Flush idempotent; Run's CloseWithError races safely with Close                                                                                                                                                                    
      • logBufferMu: protects ring buffer between emit() and GetLogBuffer()                                                                                                                                                                          
      • Cancellation path: ctx → CloseWithError → Read returns err → readers drain                                                                                                                                                                   
        buf, emit trailing, wg.Done → Run returns → Flush's Close is a no-op close                                                                                                                                                                   
                                                                                                                                                                                                                                                     
  Key invariants from the commit: no polling loop, no 50ms latency floor, no Run↔Flush race (Flush owns the close→wait sequence; Run only force-closes on ctx and then waits on the same WG). The 1 KiB retained tail +                              
  BoundarySafeProcessor.SanitizeBoundary keep secret redaction correct across a forced 64 KiB cut.    

Issues

Changelog

  • List your changes here
  • Make a changie entry

Tophatting

Replace the SafeBuffer + ticker-polling LogStreamer with reader goroutines
that drain io.Pipe endpoints via a manual []byte buffer and bytes.IndexByte
line extraction. Exec writes directly to the PipeWriter; readers emit
lines as they arrive and force-flush once the buffer reaches 64 KiB with
no newline, retaining a 1 KiB tail so patterns straddling the cut still
match on the next emission. BoundarySafeProcessor lets the sanitizer
redact across that cut. Flush closes the pipes and waits on a WaitGroup,
which naturally delivers any trailing partial line via EOF.

This removes the polling loop, the 50 ms latency floor, the 200 ms Flush
sleep, the Run↔Flush processor race, and the SafeBuffer type entirely.
@jasonopslevel jasonopslevel force-pushed the jason/logs-io-pipe-rewrite branch from f04be9d to c7beda7 Compare April 22, 2026 13:38
@jasonopslevel
Copy link
Copy Markdown
Contributor Author

We can save this larger refactor for later

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant