Skip to content

Streaming Chat Responses Specification

Authors
Matt Cockayne, Claude (claude-opus-4-6) (AI drafting assistant)
Date
21 March 2026
Status
DRAFT

Overview

The current ChatClient interface only supports synchronous request/response interactions. For long-running AI responses, users see nothing until the full response is generated โ€” which can take 10-30 seconds for complex queries. Streaming delivers partial responses as they're generated, providing immediate feedback and enabling progressive UI rendering.

All three provider SDKs (Anthropic, OpenAI, Google Gemini) support server-sent events (SSE) streaming natively.


Design Decisions

Separate StreamingChatClient interface: Rather than adding streaming methods to ChatClient, define a new interface. Not all consumers need streaming, and not all providers may support it equally. Providers that support streaming implement both interfaces.

Callback-based delivery: Use a callback function func(StreamEvent) rather than channels. Callbacks are simpler to use correctly โ€” no need to manage channel lifecycle, and backpressure is handled naturally (the callback blocks the stream).

Event types: Stream events include text deltas, tool calls, and completion signals. This maps directly to all three providers' SSE event formats.

No streaming for Ask(): Ask() returns structured data that must be complete before unmarshalling. Streaming only applies to Chat() and a new StreamChat() method.


Public API Changes

New Interface: StreamingChatClient

// StreamingChatClient extends ChatClient with streaming support.
// Implementations that support streaming implement this interface
// in addition to ChatClient.
type StreamingChatClient interface {
    ChatClient

    // StreamChat sends a message and streams the response via the callback.
    // The callback is invoked for each event in the stream. If the callback
    // returns an error, the stream is cancelled.
    // The final return value is the complete response text (concatenation of
    // all text deltas) or an error.
    StreamChat(ctx context.Context, prompt string, callback StreamCallback) (string, error)
}

// StreamCallback receives streaming events. Return a non-nil error to cancel the stream.
type StreamCallback func(event StreamEvent) error

// StreamEvent represents a single event in a streaming response.
type StreamEvent struct {
    // Type indicates the kind of event.
    Type StreamEventType

    // Delta contains the text fragment for TextDelta events.
    Delta string

    // ToolCall contains tool call information for ToolCallStart/ToolCallDelta events.
    ToolCall *StreamToolCall

    // Error contains error information for Error events.
    Error error
}

// StreamEventType identifies the kind of stream event.
type StreamEventType int

const (
    // EventTextDelta is a partial text response.
    EventTextDelta StreamEventType = iota
    // EventToolCallStart indicates a tool call is beginning.
    EventToolCallStart
    // EventToolCallDelta contains partial tool call arguments.
    EventToolCallDelta
    // EventToolCallEnd indicates a tool call is complete and will be executed.
    EventToolCallEnd
    // EventComplete indicates the stream has finished successfully.
    EventComplete
    // EventError indicates an error occurred during streaming.
    EventError
)

// StreamToolCall contains information about a streaming tool call.
type StreamToolCall struct {
    ID        string
    Name      string
    Arguments string // partial or complete JSON arguments
}

Type Assertion Pattern

// Consumer usage:
client, err := chat.New(ctx, props, cfg)
if streamer, ok := client.(chat.StreamingChatClient); ok {
    result, err := streamer.StreamChat(ctx, "prompt", func(e chat.StreamEvent) error {
        if e.Type == chat.EventTextDelta {
            fmt.Print(e.Delta) // progressive output
        }
        return nil
    })
}

Internal Implementation

Claude Streaming

func (c *Claude) StreamChat(ctx context.Context, prompt string, callback StreamCallback) (string, error) {
    c.messages = append(c.messages, anthropic.NewUserMessage(anthropic.NewTextBlock(prompt)))

    params := anthropic.MessageNewParams{
        Model:     c.model,
        MaxTokens: c.maxTokens,
        Messages:  c.messages,
        System:    []anthropic.TextBlockParam{{Text: c.system}},
    }

    stream := c.client.Messages.NewStreaming(ctx, params)
    var fullText strings.Builder

    for stream.Next() {
        event := stream.Current()
        switch e := event.(type) {
        case *anthropic.ContentBlockDeltaEvent:
            if e.Delta.Type == "text_delta" {
                fullText.WriteString(e.Delta.Text)
                if err := callback(StreamEvent{Type: EventTextDelta, Delta: e.Delta.Text}); err != nil {
                    return fullText.String(), err
                }
            }
        case *anthropic.MessageStopEvent:
            callback(StreamEvent{Type: EventComplete})
        }
    }

    if err := stream.Err(); err != nil {
        return fullText.String(), errors.Wrap(err, "claude stream error")
    }

    return fullText.String(), nil
}

OpenAI Streaming

func (a *OpenAI) StreamChat(ctx context.Context, prompt string, callback StreamCallback) (string, error) {
    a.params.Messages = append(a.params.Messages, openai.UserMessage(prompt))
    a.params.StreamOptions = &openai.ChatCompletionStreamOptionsParam{IncludeUsage: true}

    stream := a.oai.Chat.Completions.NewStreaming(ctx, a.params)
    var fullText strings.Builder

    for stream.Next() {
        chunk := stream.Current()
        for _, choice := range chunk.Choices {
            if choice.Delta.Content != "" {
                fullText.WriteString(choice.Delta.Content)
                if err := callback(StreamEvent{Type: EventTextDelta, Delta: choice.Delta.Content}); err != nil {
                    return fullText.String(), err
                }
            }
        }
    }

    if err := stream.Err(); err != nil {
        return fullText.String(), errors.Wrap(err, "openai stream error")
    }

    callback(StreamEvent{Type: EventComplete})
    return fullText.String(), nil
}

Gemini Streaming

func (g *Gemini) StreamChat(ctx context.Context, prompt string, callback StreamCallback) (string, error) {
    g.history = append(g.history, &genai.Content{
        Role:  "user",
        Parts: []*genai.Part{genai.NewPartFromText(prompt)},
    })

    var fullText strings.Builder
    for result, err := range g.client.Models.GenerateContentStream(ctx, g.model, g.history, g.genCfg) {
        if err != nil {
            return fullText.String(), errors.Wrap(err, "gemini stream error")
        }
        for _, candidate := range result.Candidates {
            for _, part := range candidate.Content.Parts {
                if part.Text != "" {
                    fullText.WriteString(part.Text)
                    if err := callback(StreamEvent{Type: EventTextDelta, Delta: part.Text}); err != nil {
                        return fullText.String(), err
                    }
                }
            }
        }
    }

    callback(StreamEvent{Type: EventComplete})
    return fullText.String(), nil
}

Tool Calls During Streaming

When a tool call appears in the stream, the streaming pauses, the tool is executed (non-streaming), and the result is fed back into a new streaming request. This mirrors the existing ReAct loop but with streaming output for the non-tool-call portions.

// Simplified streaming ReAct loop
for step := range maxSteps {
    // Stream response
    // If tool calls detected โ†’ execute tools โ†’ continue loop
    // If no tool calls โ†’ stream is complete
}

ClaudeLocal

ClaudeLocal uses a CLI subprocess and does not support streaming. It implements ChatClient only, not StreamingChatClient. The type assertion pattern handles this gracefully.


Project Structure

pkg/chat/
โ”œโ”€โ”€ client.go          โ† MODIFIED: StreamingChatClient interface, event types
โ”œโ”€โ”€ streaming.go       โ† NEW: StreamEvent, StreamCallback, StreamEventType
โ”œโ”€โ”€ claude.go          โ† MODIFIED: StreamChat implementation
โ”œโ”€โ”€ openai.go          โ† MODIFIED: StreamChat implementation
โ”œโ”€โ”€ gemini.go          โ† MODIFIED: StreamChat implementation
โ”œโ”€โ”€ claude_local.go    โ† UNCHANGED: does not implement StreamingChatClient
โ”œโ”€โ”€ streaming_test.go  โ† NEW: streaming tests

Testing Strategy

Test Scenario
TestClaude_StreamChat_Success Mock SSE stream โ†’ callback receives text deltas and complete event
TestOpenAI_StreamChat_Success Mock SSE stream โ†’ same
TestGemini_StreamChat_Success Mock stream โ†’ same
TestStreamChat_CallbackError Callback returns error โ†’ stream cancelled, partial text returned
TestStreamChat_ContextCancelled Context cancelled mid-stream โ†’ appropriate error
TestStreamChat_EmptyResponse Provider returns empty stream โ†’ empty string, no error
TestStreamChat_WithToolCalls Stream contains tool call โ†’ tool executed, stream continues
TestClaudeLocal_NotStreaming Type assertion โ†’ StreamingChatClient not satisfied
TestStreamEvent_Types All event types have correct values

Mock SSE Server

func newMockSSEServer(t *testing.T, events []string) *httptest.Server {
    t.Helper()
    return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        w.Header().Set("Content-Type", "text/event-stream")
        flusher := w.(http.Flusher)
        for _, event := range events {
            fmt.Fprintf(w, "data: %s\n\n", event)
            flusher.Flush()
        }
    }))
}

Coverage

  • Target: 90%+ for pkg/chat/ including streaming paths.

Linting

  • golangci-lint run --fix must pass.
  • No new nolint directives.

Documentation

  • Godoc for StreamingChatClient interface and all event types.
  • Godoc explaining the callback contract (blocking, error cancellation).
  • Update docs/components/chat.md with streaming usage examples and the type assertion pattern.

Backwards Compatibility

  • No breaking changes. ChatClient interface is unchanged. Streaming is opt-in via type assertion.
  • Providers that implement StreamingChatClient still satisfy ChatClient.
  • ClaudeLocal is explicitly excluded โ€” this is documented, not a limitation.

Future Considerations

  • Streaming Ask(): If a structured response schema can be progressively validated, streaming Ask() could deliver partial results. Complex and likely not worth the effort.
  • WebSocket transport: For long-lived connections, WebSocket may be more efficient than SSE. Provider SDKs would need to support this.
  • Stream recording: For debugging, record stream events to a file for replay.

Implementation Phases

Phase 1 โ€” Interface and Types

  1. Define StreamingChatClient interface
  2. Define StreamEvent, StreamCallback, StreamEventType
  3. Add compile-time checks

Phase 2 โ€” Provider Implementations

  1. Implement Claude StreamChat
  2. Implement OpenAI StreamChat
  3. Implement Gemini StreamChat
  4. Handle tool calls within streaming

Phase 3 โ€” Tests

  1. Create mock SSE servers
  2. Add success, error, and cancellation tests for each provider
  3. Add tool call streaming tests
  4. Run with race detector

Verification

go build ./...
go test -race ./pkg/chat/...
go test ./...
golangci-lint run --fix

# Verify streaming interface exists
grep -n 'StreamingChatClient' pkg/chat/client.go

# Verify implementations
grep -n 'func.*StreamChat' pkg/chat/claude.go pkg/chat/openai.go pkg/chat/gemini.go