From afc6ca40af0193971e7df04c24e7cdf05e035d5c Mon Sep 17 00:00:00 2001 From: Jason Morrow Date: Tue, 21 Apr 2026 11:17:21 -0400 Subject: [PATCH 1/2] Rewrite log streaming with io.Pipe MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the SafeBuffer + ticker-polling LogStreamer with reader goroutines that drain io.Pipe endpoints via a manual []byte buffer and bytes.IndexByte line extraction. Exec writes directly to the PipeWriter; readers emit lines as they arrive and force-flush once the buffer reaches 64 KiB with no newline, retaining a 1 KiB tail so patterns straddling the cut still match on the next emission. BoundarySafeProcessor lets the sanitizer redact across that cut. Flush closes the pipes and waits on a WaitGroup, which naturally delivers any trailing partial line via EOF. This removes the polling loop, the 50 ms latency floor, the 200 ms Flush sleep, the Run↔Flush processor race, and the SafeBuffer type entirely. --- src/pkg/buffer.go | 40 ----- src/pkg/k8s.go | 10 +- src/pkg/logs.go | 224 +++++++++++++++++++-------- src/pkg/logs_test.go | 264 ++++++++++++++++++++++++++++++++ src/pkg/sanitizeLogProcessor.go | 7 + 5 files changed, 433 insertions(+), 112 deletions(-) delete mode 100644 src/pkg/buffer.go create mode 100644 src/pkg/logs_test.go diff --git a/src/pkg/buffer.go b/src/pkg/buffer.go deleted file mode 100644 index 11b05ee..0000000 --- a/src/pkg/buffer.go +++ /dev/null @@ -1,40 +0,0 @@ -package pkg - -import ( - "bytes" - "sync" -) - -// SafeBuffer is a goroutine safe bytes.Buffer -type SafeBuffer struct { - buffer bytes.Buffer - mutex sync.Mutex -} - -// Write appends the contents of p to the buffer, growing the buffer as needed. It returns -// the number of bytes written. -func (s *SafeBuffer) Write(p []byte) (n int, err error) { - s.mutex.Lock() - defer s.mutex.Unlock() - return s.buffer.Write(p) -} - -// String returns the contents of the unread portion of the buffer -// as a string. If the Buffer is a nil pointer, it returns "". -func (s *SafeBuffer) String() string { - s.mutex.Lock() - defer s.mutex.Unlock() - return s.buffer.String() -} - -// ReadString reads until the first occurrence of delim in the input, -// returning a string containing the data up to and including the delimiter. -// If ReadString encounters an error before finding a delimiter, -// it returns the data read before the error and the error itself (often io.EOF). -// ReadString returns err != nil if and only if the returned data does not end -// in delim. -func (s *SafeBuffer) ReadString(delim byte) (line string, err error) { - s.mutex.Lock() - defer s.mutex.Unlock() - return s.buffer.ReadString(delim) -} diff --git a/src/pkg/k8s.go b/src/pkg/k8s.go index e7a9e25..d10d2a8 100644 --- a/src/pkg/k8s.go +++ b/src/pkg/k8s.go @@ -50,8 +50,8 @@ type JobConfig struct { PodName string ContainerName string Stdin io.Reader - Stdout *SafeBuffer - Stderr *SafeBuffer + Stdout io.Writer + Stderr io.Writer } type JobRunner struct { @@ -278,7 +278,7 @@ func (s *JobRunner) getPodObject(identifier string, labels map[string]string, jo } // TODO: Remove all usages of "Viper" they should be passed in at JobRunner configuration time -func (s *JobRunner) Run(ctx context.Context, job opslevel.RunnerJob, stdout, stderr *SafeBuffer) JobOutcome { +func (s *JobRunner) Run(ctx context.Context, job opslevel.RunnerJob, stdout, stderr io.Writer) JobOutcome { id := string(job.Id) // Once we get off "the old API" method of runner we can circle back around to this // and fix it to generate safe pod names since k8s has limitations. @@ -344,7 +344,7 @@ func (s *JobRunner) Run(ctx context.Context, job opslevel.RunnerJob, stdout, std runErr := s.Exec(ctx, stdout, stderr, pod, pod.Spec.Containers[0].Name, s.podConfig.Shell, "-e", "-c", strings.Join(commands, ";\n")) if runErr != nil { return JobOutcome{ - Message: fmt.Sprintf("pod execution failed REASON: %s %s", strings.TrimSuffix(stderr.String(), "\n"), runErr), + Message: fmt.Sprintf("pod execution failed REASON: %s", runErr), Outcome: opslevel.RunnerJobOutcomeEnumFailed, } } @@ -406,7 +406,7 @@ func (s *JobRunner) ExecWithConfig(ctx context.Context, config JobConfig) error }) } -func (s *JobRunner) Exec(ctx context.Context, stdout, stderr *SafeBuffer, pod *corev1.Pod, containerName string, cmd ...string) error { +func (s *JobRunner) Exec(ctx context.Context, stdout, stderr io.Writer, pod *corev1.Pod, containerName string, cmd ...string) error { return s.ExecWithConfig(ctx, JobConfig{ Command: cmd, Namespace: pod.Namespace, diff --git a/src/pkg/logs.go b/src/pkg/logs.go index 52c16ff..3c67915 100644 --- a/src/pkg/logs.go +++ b/src/pkg/logs.go @@ -1,46 +1,183 @@ package pkg import ( + "bytes" "container/ring" "context" - "strings" - "time" + "io" + "sync" "github.com/rs/zerolog" ) +const ( + // maxLineBytes caps a single emitted log line. Longer runs without a + // newline are force-flushed so a misbehaving process cannot grow the + // buffer without bound. + maxLineBytes = 64 * 1024 + + // boundaryOverlap is the tail of the buffer retained across a + // force-flush cut so a pattern (redacted secret, ::set-outcome-var + // directive, etc.) straddling the cut still has a chance to match on + // the next emission. Must be at least as large as the longest pattern + // any processor cares about. + boundaryOverlap = 1024 + + // readChunk is the per-read size pulled from the pipe reader. + readChunk = 4 * 1024 +) + type LogProcessor interface { ProcessStdout(line string) string ProcessStderr(line string) string Flush(outcome JobOutcome) } +// BoundarySafeProcessor is implemented by processors whose redaction must +// apply across force-flush chunk boundaries for oversized no-newline lines. +// When present, LogStreamer runs SanitizeBoundary over the full accumulated +// buffer before cutting, so a secret that would otherwise straddle the cut +// is masked in both the emitted prefix and the retained tail. +type BoundarySafeProcessor interface { + SanitizeBoundary(line string) string +} + type LogStreamer struct { - Stdout *SafeBuffer - Stderr *SafeBuffer + Stdout *io.PipeWriter + Stderr *io.PipeWriter + + stdoutR *io.PipeReader + stderrR *io.PipeReader + processors []LogProcessor logger zerolog.Logger - quit chan bool - logBuffer *ring.Ring + + logBuffer *ring.Ring + logBufferMu sync.Mutex + + wg sync.WaitGroup + closeOnce sync.Once } -func NewLogStreamer(logger zerolog.Logger, processors ...LogProcessor) LogStreamer { - quit := make(chan bool) - return LogStreamer{ - Stdout: &SafeBuffer{}, - Stderr: &SafeBuffer{}, +// NewLogStreamer constructs a LogStreamer and starts the two reader +// goroutines that drain Stdout/Stderr. Callers MUST call Flush exactly once +// per streamer or the reader goroutines leak. Callers SHOULD also +// `go Run(ctx)` to propagate ctx cancellation — without it, a stuck exec +// write cannot be unblocked by cancellation. +func NewLogStreamer(logger zerolog.Logger, processors ...LogProcessor) *LogStreamer { + sor, sow := io.Pipe() + ser, sew := io.Pipe() + s := &LogStreamer{ + Stdout: sow, + Stderr: sew, + stdoutR: sor, + stderrR: ser, processors: processors, logger: logger, - quit: quit, logBuffer: ring.New(20), } + s.wg.Add(2) + go s.readLoop(s.stdoutR, LogProcessor.ProcessStdout) + go s.readLoop(s.stderrR, LogProcessor.ProcessStderr) + return s } +// AddProcessor appends a processor to the chain. Must be called before the +// first Write to Stdout/Stderr: the reader goroutines observe s.processors +// without a lock, relying on the io.Pipe Write→Read happens-before to +// publish the slice. Mutating after writes begin is a data race. func (s *LogStreamer) AddProcessor(processor LogProcessor) { s.processors = append(s.processors, processor) } +// Run watches ctx and, on cancellation, closes the pipe readers so any +// in-flight write from the k8s exec stream fails fast rather than blocking. +// Returns only after the internal reader goroutines have exited, so callers +// can rely on the log buffer being fully populated on return. Safe to call +// without a corresponding Flush; safe to omit entirely (Flush alone suffices +// for a successful run — Run only matters for ctx-driven cancellation). +func (s *LogStreamer) Run(ctx context.Context) { + s.logger.Trace().Msg("Starting log streamer ...") + defer s.logger.Trace().Msg("Log streamer stopped.") + done := make(chan struct{}) + go func() { + s.wg.Wait() + close(done) + }() + select { + case <-ctx.Done(): + _ = s.stdoutR.CloseWithError(ctx.Err()) + _ = s.stderrR.CloseWithError(ctx.Err()) + <-done + case <-done: + } +} + +// readLoop reads from r, extracts newline-terminated lines, and emits each +// through processFn. Exits on io.EOF (writer closed) or any read error, +// emitting any trailing no-newline partial line first. +func (s *LogStreamer) readLoop(r io.Reader, processFn func(LogProcessor, string) string) { + defer s.wg.Done() + buf := make([]byte, 0, maxLineBytes+readChunk) + readBuf := make([]byte, readChunk) + for { + n, err := r.Read(readBuf) + if n > 0 { + buf = append(buf, readBuf[:n]...) + buf = s.drain(buf, processFn) + } + if err != nil { + if len(buf) > 0 { + s.emit(string(buf), processFn) + } + return + } + } +} + +// drain emits every complete line in buf and, if the remaining tail has +// reached maxLineBytes without a newline, force-flushes a prefix while +// retaining a boundaryOverlap tail. Before cutting, any BoundarySafeProcessor +// masks the entire buffer so a secret straddling the cut is redacted in both +// halves. Returns the residual buffer to carry into the next read. +func (s *LogStreamer) drain(buf []byte, processFn func(LogProcessor, string) string) []byte { + for { + if i := bytes.IndexByte(buf, '\n'); i >= 0 { + s.emit(string(buf[:i]), processFn) + buf = buf[i+1:] + continue + } + if len(buf) < maxLineBytes { + return buf + } + full := string(buf) + for _, p := range s.processors { + if bp, ok := p.(BoundarySafeProcessor); ok { + full = bp.SanitizeBoundary(full) + } + } + if len(full) <= boundaryOverlap { + return append(buf[:0], full...) + } + cut := len(full) - boundaryOverlap + s.emit(full[:cut], processFn) + return append(buf[:0], full[cut:]...) + } +} + +func (s *LogStreamer) emit(line string, processFn func(LogProcessor, string) string) { + for _, processor := range s.processors { + line = processFn(processor, line) + } + s.logBufferMu.Lock() + s.logBuffer.Value = line + s.logBuffer = s.logBuffer.Next() + s.logBufferMu.Unlock() +} + func (s *LogStreamer) GetLogBuffer() []string { + s.logBufferMu.Lock() + defer s.logBufferMu.Unlock() output := make([]string, 0) s.logBuffer.Do(func(line any) { if line != nil { @@ -50,63 +187,16 @@ func (s *LogStreamer) GetLogBuffer() []string { return output } -func (s *LogStreamer) Run(ctx context.Context) { - s.logger.Trace().Msg("Starting log streamer ...") - ticker := time.NewTicker(50 * time.Millisecond) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - s.logger.Trace().Msg("Shutting down log streamer ...") - return - case <-s.quit: - s.logger.Trace().Msg("Shutting down log streamer ...") - return - case <-ticker.C: - for len(s.Stderr.String()) > 0 { - line, err := s.Stderr.ReadString('\n') - if err == nil { - line = strings.TrimSuffix(line, "\n") - for _, processor := range s.processors { - line = processor.ProcessStderr(line) - } - s.logBuffer.Value = line - s.logBuffer = s.logBuffer.Next() - } - } - for len(s.Stdout.String()) > 0 { - line, err := s.Stdout.ReadString('\n') - if err == nil { - line = strings.TrimSuffix(line, "\n") - for _, processor := range s.processors { - line = processor.ProcessStdout(line) - } - s.logBuffer.Value = line - s.logBuffer = s.logBuffer.Next() - } - } - } - } -} - +// Flush closes the pipe writers (signaling EOF to the reader goroutines), +// waits for the readers to drain, then flushes the processor chain in +// reverse order. Safe to call multiple times. func (s *LogStreamer) Flush(outcome JobOutcome) { s.logger.Trace().Msg("Starting log streamer flush ...") - ticker := time.NewTicker(200 * time.Millisecond) - defer ticker.Stop() - timeout := time.After(30 * time.Second) - for len(s.Stderr.String()) > 0 || len(s.Stdout.String()) > 0 { - select { - case <-ticker.C: - // Continue waiting - case <-timeout: - s.logger.Warn().Msg("Flush timeout reached, proceeding with remaining data") - goto done - } - } -done: - s.logger.Trace().Msg("Finished log streamer flush ...") - s.quit <- true - time.Sleep(200 * time.Millisecond) // Allow 'Run' goroutine to quit + s.closeOnce.Do(func() { + _ = s.Stdout.Close() + _ = s.Stderr.Close() + }) + s.wg.Wait() s.logger.Trace().Msg("Flushing log processors ...") for i := len(s.processors) - 1; i >= 0; i-- { s.processors[i].Flush(outcome) diff --git a/src/pkg/logs_test.go b/src/pkg/logs_test.go new file mode 100644 index 0000000..0cbd05d --- /dev/null +++ b/src/pkg/logs_test.go @@ -0,0 +1,264 @@ +package pkg + +import ( + "context" + "strings" + "sync" + "testing" + "time" + + "github.com/opslevel/opslevel-go/v2026" + "github.com/rocktavious/autopilot/v2023" + "github.com/rs/zerolog" +) + +type captureProcessor struct { + mu sync.Mutex + lines []string +} + +func (c *captureProcessor) ProcessStdout(line string) string { + c.mu.Lock() + defer c.mu.Unlock() + c.lines = append(c.lines, line) + return line +} + +func (c *captureProcessor) ProcessStderr(line string) string { + c.mu.Lock() + defer c.mu.Unlock() + c.lines = append(c.lines, line) + return line +} + +func (c *captureProcessor) Flush(_ JobOutcome) {} + +func (c *captureProcessor) snapshot() []string { + c.mu.Lock() + defer c.mu.Unlock() + out := make([]string, len(c.lines)) + copy(out, c.lines) + return out +} + +// A partial write without a newline is buffered until either more data arrives +// with a newline or the pipe is closed (via Flush) — at which point the +// trailing partial line is delivered as a final line. +func TestLogStreamerPartialLineStdout(t *testing.T) { + proc := &captureProcessor{} + s := NewLogStreamer(zerolog.Nop(), proc) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go s.Run(ctx) + + _, _ = s.Stdout.Write([]byte("par")) + _, _ = s.Stdout.Write([]byte("tial\ntrailing-no-newline")) + + s.Flush(JobOutcome{}) + + autopilot.Equals(t, []string{"partial", "trailing-no-newline"}, proc.snapshot()) +} + +func TestLogStreamerPartialLineStderr(t *testing.T) { + proc := &captureProcessor{} + s := NewLogStreamer(zerolog.Nop(), proc) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go s.Run(ctx) + + _, _ = s.Stderr.Write([]byte("par")) + _, _ = s.Stderr.Write([]byte("tial\ntrailing-no-newline")) + + s.Flush(JobOutcome{}) + + autopilot.Equals(t, []string{"partial", "trailing-no-newline"}, proc.snapshot()) +} + +// Flush must not deadlock when Run has already exited via ctx cancellation +// before Flush is called. +func TestLogStreamerFlushAfterContextCancel(t *testing.T) { + proc := &captureProcessor{} + s := NewLogStreamer(zerolog.Nop(), proc) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + s.Run(ctx) + close(done) + }() + + _, _ = s.Stdout.Write([]byte("before-cancel\n")) + cancel() + <-done + + finished := make(chan struct{}) + go func() { + s.Flush(JobOutcome{}) + close(finished) + }() + select { + case <-finished: + case <-time.After(5 * time.Second): + t.Fatal("Flush deadlocked after ctx cancellation") + } +} + +// A line longer than maxLineBytes must be force-emitted in bounded chunks +// rather than growing the buffer without bound. Assert total bytes preserved, +// at least one force-flush emission plus trailing, and that each segment +// stays within the memory bound. +func TestLogStreamerOversizedPartial(t *testing.T) { + proc := &captureProcessor{} + s := NewLogStreamer(zerolog.Nop(), proc) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go s.Run(ctx) + + total := maxLineBytes + 100 + big := strings.Repeat("x", total) + _, _ = s.Stdout.Write([]byte(big)) + + s.Flush(JobOutcome{}) + + lines := proc.snapshot() + if len(lines) < 2 { + t.Fatalf("expected force-flush + trailing (>=2 segments), got %d", len(lines)) + } + sum := 0 + for _, l := range lines { + if len(l) > maxLineBytes+readChunk { + t.Fatalf("emitted segment of %d bytes exceeds bound %d", len(l), maxLineBytes+readChunk) + } + sum += len(l) + } + autopilot.Equals(t, total, sum) +} + +// A secret that straddles the force-flush cut must still be redacted in both +// the emitted prefix and the retained tail — a BoundarySafeProcessor is +// applied to the full buffer before cutting. +func TestLogStreamerOversizedSanitizerBoundary(t *testing.T) { + const secret = "s3cretpassword-XYZ" // 18 chars + sanitizer := NewSanitizeLogProcessor([]opslevel.RunnerJobVariable{ + {Sensitive: true, Value: secret}, + }) + proc := &captureProcessor{} + s := NewLogStreamer(zerolog.Nop(), sanitizer, proc) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go s.Run(ctx) + + // Centre the secret on the cut point: force-flush cuts at + // len(buf) - boundaryOverlap, so placing the secret's midpoint at + // maxLineBytes - boundaryOverlap makes it straddle both halves. + prefixLen := maxLineBytes - boundaryOverlap - len(secret)/2 + suffixLen := maxLineBytes - prefixLen - len(secret) + payload := strings.Repeat("x", prefixLen) + secret + strings.Repeat("y", suffixLen) + _, _ = s.Stdout.Write([]byte(payload)) + + s.Flush(JobOutcome{}) + + for i, line := range proc.snapshot() { + if strings.Contains(line, secret) { + t.Fatalf("secret leaked unredacted in emitted segment %d", i) + } + // Also reject any partial-secret fragment of length >= 6 as a + // conservative leak indicator. + for n := 6; n < len(secret); n++ { + if strings.Contains(line, secret[:n]) { + t.Fatalf("secret prefix of len %d leaked in segment %d", n, i) + } + if strings.Contains(line, secret[len(secret)-n:]) { + t.Fatalf("secret suffix of len %d leaked in segment %d", n, i) + } + } + } +} + +// GetLogBuffer must be safe to call concurrently with ongoing line emission. +// Meaningful under `go test -race`. +func TestLogStreamerGetLogBufferConcurrentAccess(t *testing.T) { + proc := &captureProcessor{} + s := NewLogStreamer(zerolog.Nop(), proc) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go s.Run(ctx) + + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + for i := 0; i < 200; i++ { + _, _ = s.Stdout.Write([]byte("line\n")) + } + }() + go func() { + defer wg.Done() + for i := 0; i < 200; i++ { + _ = s.GetLogBuffer() + time.Sleep(time.Millisecond) + } + }() + wg.Wait() + s.Flush(JobOutcome{}) +} + +// Flush is idempotent — calling it multiple times must not panic on +// double-close of the underlying pipes. +func TestLogStreamerFlushIdempotent(t *testing.T) { + proc := &captureProcessor{} + s := NewLogStreamer(zerolog.Nop(), proc) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go s.Run(ctx) + + _, _ = s.Stdout.Write([]byte("hello\n")) + s.Flush(JobOutcome{}) + s.Flush(JobOutcome{}) + + autopilot.Equals(t, []string{"hello"}, proc.snapshot()) +} + +// A writer blocked on io.Pipe must be unblocked when Run observes ctx +// cancellation — Run closes the reader side with the ctx error, which +// propagates to pending writes. Without this, a hung exec would leak the +// streamer. +func TestLogStreamerWriteUnblocksOnCancel(t *testing.T) { + proc := &captureProcessor{} + s := NewLogStreamer(zerolog.Nop(), proc) + + ctx, cancel := context.WithCancel(context.Background()) + runDone := make(chan struct{}) + go func() { + s.Run(ctx) + close(runDone) + }() + + writeDone := make(chan error, 1) + go func() { + _, err := s.Stdout.Write([]byte(strings.Repeat("x", 4*maxLineBytes))) + writeDone <- err + }() + + time.Sleep(10 * time.Millisecond) + cancel() + + select { + case <-writeDone: + case <-time.After(5 * time.Second): + t.Fatal("Write did not unblock after ctx cancellation") + } + select { + case <-runDone: + case <-time.After(5 * time.Second): + t.Fatal("Run did not return after ctx cancellation") + } + + s.Flush(JobOutcome{}) +} diff --git a/src/pkg/sanitizeLogProcessor.go b/src/pkg/sanitizeLogProcessor.go index 46477a1..a48b8d3 100644 --- a/src/pkg/sanitizeLogProcessor.go +++ b/src/pkg/sanitizeLogProcessor.go @@ -38,4 +38,11 @@ func (s *SanitizeLogProcessor) ProcessStderr(line string) string { return s.Process(line) } +// SanitizeBoundary lets LogStreamer redact secrets across a force-flush +// chunk boundary before the prefix is emitted, so a secret straddling the +// cut is masked in both halves. +func (s *SanitizeLogProcessor) SanitizeBoundary(line string) string { + return s.Process(line) +} + func (s *SanitizeLogProcessor) Flush(outcome JobOutcome) {} From c7beda7425534665570aef44277fcd02a792596c Mon Sep 17 00:00:00 2001 From: Jason Morrow Date: Wed, 22 Apr 2026 09:34:20 -0400 Subject: [PATCH 2/2] fix for processor panic, and utf-8 handling --- .changes/unreleased/Bugfix-20260422-093744.yaml | 3 +++ src/pkg/logs.go | 10 ++++++++++ 2 files changed, 13 insertions(+) create mode 100644 .changes/unreleased/Bugfix-20260422-093744.yaml diff --git a/.changes/unreleased/Bugfix-20260422-093744.yaml b/.changes/unreleased/Bugfix-20260422-093744.yaml new file mode 100644 index 0000000..ecac4ca --- /dev/null +++ b/.changes/unreleased/Bugfix-20260422-093744.yaml @@ -0,0 +1,3 @@ +kind: Bugfix +body: Re-write log streamer with io.Pipe +time: 2026-04-22T09:37:44.618884-04:00 diff --git a/src/pkg/logs.go b/src/pkg/logs.go index 3c67915..5d02c93 100644 --- a/src/pkg/logs.go +++ b/src/pkg/logs.go @@ -6,6 +6,7 @@ import ( "context" "io" "sync" + "unicode/utf8" "github.com/rs/zerolog" ) @@ -118,6 +119,12 @@ func (s *LogStreamer) Run(ctx context.Context) { // emitting any trailing no-newline partial line first. func (s *LogStreamer) readLoop(r io.Reader, processFn func(LogProcessor, string) string) { defer s.wg.Done() + defer func() { + if rec := recover(); rec != nil { + s.logger.Error().Interface("panic", rec).Msg("log processor panicked; draining reader") + _, _ = io.Copy(io.Discard, r) + } + }() buf := make([]byte, 0, maxLineBytes+readChunk) readBuf := make([]byte, readChunk) for { @@ -160,6 +167,9 @@ func (s *LogStreamer) drain(buf []byte, processFn func(LogProcessor, string) str return append(buf[:0], full...) } cut := len(full) - boundaryOverlap + for cut > 0 && !utf8.RuneStart(full[cut]) { + cut-- + } s.emit(full[:cut], processFn) return append(buf[:0], full[cut:]...) }