Skip to content

Commit

Permalink
refactor Go code to remove race condition and ensure cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
UncleGedd committed Nov 6, 2024
1 parent da154c4 commit 8eabe03
Showing 1 changed file with 38 additions and 10 deletions.
48 changes: 38 additions & 10 deletions src/pkg/api/logs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,42 +58,70 @@ func streamExistingLogs(ctx context.Context, clientSet kubernetes.Interface, w h
func handleLogStream(ctx context.Context, logStream io.ReadCloser, w http.ResponseWriter) error {
defer logStream.Close()

buffer := &strings.Builder{}
shouldFlush := make(chan struct{}, 1)
lines := make(chan []byte, 100) // Buffer some lines to reduce blocking
reader := bufio.NewReader(logStream)
flusher := w.(http.Flusher)

// Start log reading goroutine
writerCtx, cancelWriter := context.WithCancel(ctx)
defer cancelWriter()
// Start log reading goroutine
go func() {
defer func() {
close(lines)
slog.Debug("Writer goroutine and channel closed")
}()

for {
select {
case <-writerCtx.Done():
return
default:
}

line, err := reader.ReadBytes('\n')
if err != nil {
if err != io.EOF {
fmt.Fprintf(buffer, "ERROR: %v\n", err)
if err == io.EOF || errors.Is(err, context.Canceled) {
// no error, just end of stream
return
}
close(shouldFlush)
slog.Debug("Writer returned error", "error", err)
return
}

buffer.Write(line)

// Send line to channel or exit if context is cancelled
select {
case shouldFlush <- struct{}{}:
default:
case lines <- line:
case <-writerCtx.Done():
slog.Debug("Writer context cancelled while sending line")
return
}
}
}()

// Main flush loop
buffer := &strings.Builder{}
ticker := time.NewTicker(500 * time.Millisecond) // Batch flushes
defer ticker.Stop()

for {
select {
case <-ctx.Done():
slog.Debug("Log stream closed")
cancelWriter()
return nil
case _, ok := <-shouldFlush:
case line, ok := <-lines:
if !ok {
// Flush any remaining content
if buffer.Len() > 0 {
fmt.Fprintf(w, "data: %s\n\n", buffer.String())
flusher.Flush()
}
cancelWriter()
return nil
}
buffer.Write(line)
case <-ticker.C:
if buffer.Len() > 0 {
fmt.Fprintf(w, "data: %s\n\n", buffer.String())
buffer.Reset()
Expand Down

0 comments on commit 8eabe03

Please sign in to comment.