Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changes/unreleased/Bugfix-20260422-093744.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
kind: Bugfix
body: Re-write log streamer with io.Pipe
time: 2026-04-22T09:37:44.618884-04:00
40 changes: 0 additions & 40 deletions src/pkg/buffer.go

This file was deleted.

10 changes: 5 additions & 5 deletions src/pkg/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ type JobConfig struct {
PodName string
ContainerName string
Stdin io.Reader
Stdout *SafeBuffer
Stderr *SafeBuffer
Stdout io.Writer
Stderr io.Writer
}

type JobRunner struct {
Expand Down Expand Up @@ -278,7 +278,7 @@ func (s *JobRunner) getPodObject(identifier string, labels map[string]string, jo
}

// TODO: Remove all usages of "Viper" they should be passed in at JobRunner configuration time
func (s *JobRunner) Run(ctx context.Context, job opslevel.RunnerJob, stdout, stderr *SafeBuffer) JobOutcome {
func (s *JobRunner) Run(ctx context.Context, job opslevel.RunnerJob, stdout, stderr io.Writer) JobOutcome {
id := string(job.Id)
// Once we get off "the old API" method of runner we can circle back around to this
// and fix it to generate safe pod names since k8s has limitations.
Expand Down Expand Up @@ -344,7 +344,7 @@ func (s *JobRunner) Run(ctx context.Context, job opslevel.RunnerJob, stdout, std
runErr := s.Exec(ctx, stdout, stderr, pod, pod.Spec.Containers[0].Name, s.podConfig.Shell, "-e", "-c", strings.Join(commands, ";\n"))
if runErr != nil {
return JobOutcome{
Message: fmt.Sprintf("pod execution failed REASON: %s %s", strings.TrimSuffix(stderr.String(), "\n"), runErr),
Message: fmt.Sprintf("pod execution failed REASON: %s", runErr),
Outcome: opslevel.RunnerJobOutcomeEnumFailed,
}
}
Expand Down Expand Up @@ -406,7 +406,7 @@ func (s *JobRunner) ExecWithConfig(ctx context.Context, config JobConfig) error
})
}

func (s *JobRunner) Exec(ctx context.Context, stdout, stderr *SafeBuffer, pod *corev1.Pod, containerName string, cmd ...string) error {
func (s *JobRunner) Exec(ctx context.Context, stdout, stderr io.Writer, pod *corev1.Pod, containerName string, cmd ...string) error {
return s.ExecWithConfig(ctx, JobConfig{
Command: cmd,
Namespace: pod.Namespace,
Expand Down
234 changes: 167 additions & 67 deletions src/pkg/logs.go
Original file line number Diff line number Diff line change
@@ -1,46 +1,193 @@
package pkg

import (
"bytes"
"container/ring"
"context"
"strings"
"time"
"io"
"sync"
"unicode/utf8"

"github.com/rs/zerolog"
)

const (
// maxLineBytes caps a single emitted log line. Longer runs without a
// newline are force-flushed so a misbehaving process cannot grow the
// buffer without bound.
maxLineBytes = 64 * 1024

// boundaryOverlap is the tail of the buffer retained across a
// force-flush cut so a pattern (redacted secret, ::set-outcome-var
// directive, etc.) straddling the cut still has a chance to match on
// the next emission. Must be at least as large as the longest pattern
// any processor cares about.
boundaryOverlap = 1024

// readChunk is the per-read size pulled from the pipe reader.
readChunk = 4 * 1024
)

type LogProcessor interface {
ProcessStdout(line string) string
ProcessStderr(line string) string
Flush(outcome JobOutcome)
}

// BoundarySafeProcessor is implemented by processors whose redaction must
// apply across force-flush chunk boundaries for oversized no-newline lines.
// When present, LogStreamer runs SanitizeBoundary over the full accumulated
// buffer before cutting, so a secret that would otherwise straddle the cut
// is masked in both the emitted prefix and the retained tail.
type BoundarySafeProcessor interface {
SanitizeBoundary(line string) string
}

type LogStreamer struct {
Stdout *SafeBuffer
Stderr *SafeBuffer
Stdout *io.PipeWriter
Stderr *io.PipeWriter

stdoutR *io.PipeReader
stderrR *io.PipeReader

processors []LogProcessor
logger zerolog.Logger
quit chan bool
logBuffer *ring.Ring

logBuffer *ring.Ring
logBufferMu sync.Mutex

wg sync.WaitGroup
closeOnce sync.Once
}

func NewLogStreamer(logger zerolog.Logger, processors ...LogProcessor) LogStreamer {
quit := make(chan bool)
return LogStreamer{
Stdout: &SafeBuffer{},
Stderr: &SafeBuffer{},
// NewLogStreamer constructs a LogStreamer and starts the two reader
// goroutines that drain Stdout/Stderr. Callers MUST call Flush exactly once
// per streamer or the reader goroutines leak. Callers SHOULD also
// `go Run(ctx)` to propagate ctx cancellation — without it, a stuck exec
// write cannot be unblocked by cancellation.
func NewLogStreamer(logger zerolog.Logger, processors ...LogProcessor) *LogStreamer {
sor, sow := io.Pipe()
ser, sew := io.Pipe()
s := &LogStreamer{
Stdout: sow,
Stderr: sew,
stdoutR: sor,
stderrR: ser,
processors: processors,
logger: logger,
quit: quit,
logBuffer: ring.New(20),
}
s.wg.Add(2)
go s.readLoop(s.stdoutR, LogProcessor.ProcessStdout)
go s.readLoop(s.stderrR, LogProcessor.ProcessStderr)
return s
}

// AddProcessor appends a processor to the chain. Must be called before the
// first Write to Stdout/Stderr: the reader goroutines observe s.processors
// without a lock, relying on the io.Pipe Write→Read happens-before to
// publish the slice. Mutating after writes begin is a data race.
func (s *LogStreamer) AddProcessor(processor LogProcessor) {
s.processors = append(s.processors, processor)
}

// Run watches ctx and, on cancellation, closes the pipe readers so any
// in-flight write from the k8s exec stream fails fast rather than blocking.
// Returns only after the internal reader goroutines have exited, so callers
// can rely on the log buffer being fully populated on return. Safe to call
// without a corresponding Flush; safe to omit entirely (Flush alone suffices
// for a successful run — Run only matters for ctx-driven cancellation).
func (s *LogStreamer) Run(ctx context.Context) {
s.logger.Trace().Msg("Starting log streamer ...")
defer s.logger.Trace().Msg("Log streamer stopped.")
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
select {
case <-ctx.Done():
_ = s.stdoutR.CloseWithError(ctx.Err())
_ = s.stderrR.CloseWithError(ctx.Err())
<-done
case <-done:
}
}

// readLoop reads from r, extracts newline-terminated lines, and emits each
// through processFn. Exits on io.EOF (writer closed) or any read error,
// emitting any trailing no-newline partial line first.
func (s *LogStreamer) readLoop(r io.Reader, processFn func(LogProcessor, string) string) {
defer s.wg.Done()
defer func() {
if rec := recover(); rec != nil {
s.logger.Error().Interface("panic", rec).Msg("log processor panicked; draining reader")
_, _ = io.Copy(io.Discard, r)
}
}()
buf := make([]byte, 0, maxLineBytes+readChunk)
readBuf := make([]byte, readChunk)
for {
n, err := r.Read(readBuf)
if n > 0 {
buf = append(buf, readBuf[:n]...)
buf = s.drain(buf, processFn)
}
if err != nil {
if len(buf) > 0 {
s.emit(string(buf), processFn)
}
return
}
}
}

// drain emits every complete line in buf and, if the remaining tail has
// reached maxLineBytes without a newline, force-flushes a prefix while
// retaining a boundaryOverlap tail. Before cutting, any BoundarySafeProcessor
// masks the entire buffer so a secret straddling the cut is redacted in both
// halves. Returns the residual buffer to carry into the next read.
func (s *LogStreamer) drain(buf []byte, processFn func(LogProcessor, string) string) []byte {
for {
if i := bytes.IndexByte(buf, '\n'); i >= 0 {
s.emit(string(buf[:i]), processFn)
buf = buf[i+1:]
continue
}
if len(buf) < maxLineBytes {
return buf
}
full := string(buf)
for _, p := range s.processors {
if bp, ok := p.(BoundarySafeProcessor); ok {
full = bp.SanitizeBoundary(full)
}
}
if len(full) <= boundaryOverlap {
return append(buf[:0], full...)
}
cut := len(full) - boundaryOverlap
for cut > 0 && !utf8.RuneStart(full[cut]) {
cut--
}
s.emit(full[:cut], processFn)
return append(buf[:0], full[cut:]...)
}
}

func (s *LogStreamer) emit(line string, processFn func(LogProcessor, string) string) {
for _, processor := range s.processors {
line = processFn(processor, line)
}
s.logBufferMu.Lock()
s.logBuffer.Value = line
s.logBuffer = s.logBuffer.Next()
s.logBufferMu.Unlock()
}

func (s *LogStreamer) GetLogBuffer() []string {
s.logBufferMu.Lock()
defer s.logBufferMu.Unlock()
output := make([]string, 0)
s.logBuffer.Do(func(line any) {
if line != nil {
Expand All @@ -50,63 +197,16 @@ func (s *LogStreamer) GetLogBuffer() []string {
return output
}

func (s *LogStreamer) Run(ctx context.Context) {
s.logger.Trace().Msg("Starting log streamer ...")
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
s.logger.Trace().Msg("Shutting down log streamer ...")
return
case <-s.quit:
s.logger.Trace().Msg("Shutting down log streamer ...")
return
case <-ticker.C:
for len(s.Stderr.String()) > 0 {
line, err := s.Stderr.ReadString('\n')
if err == nil {
line = strings.TrimSuffix(line, "\n")
for _, processor := range s.processors {
line = processor.ProcessStderr(line)
}
s.logBuffer.Value = line
s.logBuffer = s.logBuffer.Next()
}
}
for len(s.Stdout.String()) > 0 {
line, err := s.Stdout.ReadString('\n')
if err == nil {
line = strings.TrimSuffix(line, "\n")
for _, processor := range s.processors {
line = processor.ProcessStdout(line)
}
s.logBuffer.Value = line
s.logBuffer = s.logBuffer.Next()
}
}
}
}
}

// Flush closes the pipe writers (signaling EOF to the reader goroutines),
// waits for the readers to drain, then flushes the processor chain in
// reverse order. Safe to call multiple times.
func (s *LogStreamer) Flush(outcome JobOutcome) {
s.logger.Trace().Msg("Starting log streamer flush ...")
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
timeout := time.After(30 * time.Second)
for len(s.Stderr.String()) > 0 || len(s.Stdout.String()) > 0 {
select {
case <-ticker.C:
// Continue waiting
case <-timeout:
s.logger.Warn().Msg("Flush timeout reached, proceeding with remaining data")
goto done
}
}
done:
s.logger.Trace().Msg("Finished log streamer flush ...")
s.quit <- true
time.Sleep(200 * time.Millisecond) // Allow 'Run' goroutine to quit
s.closeOnce.Do(func() {
_ = s.Stdout.Close()
_ = s.Stderr.Close()
})
s.wg.Wait()
s.logger.Trace().Msg("Flushing log processors ...")
for i := len(s.processors) - 1; i >= 0; i-- {
s.processors[i].Flush(outcome)
Expand Down
Loading
Loading