Streaming Chat Responses Specification¶
- Authors
- Matt Cockayne, Claude (claude-opus-4-6) (AI drafting assistant)
- Date
- 21 March 2026
- Status
- DRAFT
Overview¶
The current ChatClient interface only supports synchronous request/response interactions. For long-running AI responses, users see nothing until the full response is generated โ which can take 10-30 seconds for complex queries. Streaming delivers partial responses as they're generated, providing immediate feedback and enabling progressive UI rendering.
All three provider SDKs (Anthropic, OpenAI, Google Gemini) support server-sent events (SSE) streaming natively.
Design Decisions¶
Separate StreamingChatClient interface: Rather than adding streaming methods to ChatClient, define a new interface. Not all consumers need streaming, and not all providers may support it equally. Providers that support streaming implement both interfaces.
Callback-based delivery: Use a callback function func(StreamEvent) rather than channels. Callbacks are simpler to use correctly โ no need to manage channel lifecycle, and backpressure is handled naturally (the callback blocks the stream).
Event types: Stream events include text deltas, tool calls, and completion signals. This maps directly to all three providers' SSE event formats.
No streaming for Ask(): Ask() returns structured data that must be complete before unmarshalling. Streaming only applies to Chat() and a new StreamChat() method.
Public API Changes¶
New Interface: StreamingChatClient¶
// StreamingChatClient extends ChatClient with streaming support.
// Implementations that support streaming implement this interface
// in addition to ChatClient.
type StreamingChatClient interface {
ChatClient
// StreamChat sends a message and streams the response via the callback.
// The callback is invoked for each event in the stream. If the callback
// returns an error, the stream is cancelled.
// The final return value is the complete response text (concatenation of
// all text deltas) or an error.
StreamChat(ctx context.Context, prompt string, callback StreamCallback) (string, error)
}
// StreamCallback receives streaming events. Return a non-nil error to cancel the stream.
type StreamCallback func(event StreamEvent) error
// StreamEvent represents a single event in a streaming response.
type StreamEvent struct {
// Type indicates the kind of event.
Type StreamEventType
// Delta contains the text fragment for TextDelta events.
Delta string
// ToolCall contains tool call information for ToolCallStart/ToolCallDelta events.
ToolCall *StreamToolCall
// Error contains error information for Error events.
Error error
}
// StreamEventType identifies the kind of stream event.
type StreamEventType int
const (
// EventTextDelta is a partial text response.
EventTextDelta StreamEventType = iota
// EventToolCallStart indicates a tool call is beginning.
EventToolCallStart
// EventToolCallDelta contains partial tool call arguments.
EventToolCallDelta
// EventToolCallEnd indicates a tool call is complete and will be executed.
EventToolCallEnd
// EventComplete indicates the stream has finished successfully.
EventComplete
// EventError indicates an error occurred during streaming.
EventError
)
// StreamToolCall contains information about a streaming tool call.
type StreamToolCall struct {
ID string
Name string
Arguments string // partial or complete JSON arguments
}
Type Assertion Pattern¶
// Consumer usage:
client, err := chat.New(ctx, props, cfg)
if streamer, ok := client.(chat.StreamingChatClient); ok {
result, err := streamer.StreamChat(ctx, "prompt", func(e chat.StreamEvent) error {
if e.Type == chat.EventTextDelta {
fmt.Print(e.Delta) // progressive output
}
return nil
})
}
Internal Implementation¶
Claude Streaming¶
func (c *Claude) StreamChat(ctx context.Context, prompt string, callback StreamCallback) (string, error) {
c.messages = append(c.messages, anthropic.NewUserMessage(anthropic.NewTextBlock(prompt)))
params := anthropic.MessageNewParams{
Model: c.model,
MaxTokens: c.maxTokens,
Messages: c.messages,
System: []anthropic.TextBlockParam{{Text: c.system}},
}
stream := c.client.Messages.NewStreaming(ctx, params)
var fullText strings.Builder
for stream.Next() {
event := stream.Current()
switch e := event.(type) {
case *anthropic.ContentBlockDeltaEvent:
if e.Delta.Type == "text_delta" {
fullText.WriteString(e.Delta.Text)
if err := callback(StreamEvent{Type: EventTextDelta, Delta: e.Delta.Text}); err != nil {
return fullText.String(), err
}
}
case *anthropic.MessageStopEvent:
callback(StreamEvent{Type: EventComplete})
}
}
if err := stream.Err(); err != nil {
return fullText.String(), errors.Wrap(err, "claude stream error")
}
return fullText.String(), nil
}
OpenAI Streaming¶
func (a *OpenAI) StreamChat(ctx context.Context, prompt string, callback StreamCallback) (string, error) {
a.params.Messages = append(a.params.Messages, openai.UserMessage(prompt))
a.params.StreamOptions = &openai.ChatCompletionStreamOptionsParam{IncludeUsage: true}
stream := a.oai.Chat.Completions.NewStreaming(ctx, a.params)
var fullText strings.Builder
for stream.Next() {
chunk := stream.Current()
for _, choice := range chunk.Choices {
if choice.Delta.Content != "" {
fullText.WriteString(choice.Delta.Content)
if err := callback(StreamEvent{Type: EventTextDelta, Delta: choice.Delta.Content}); err != nil {
return fullText.String(), err
}
}
}
}
if err := stream.Err(); err != nil {
return fullText.String(), errors.Wrap(err, "openai stream error")
}
callback(StreamEvent{Type: EventComplete})
return fullText.String(), nil
}
Gemini Streaming¶
func (g *Gemini) StreamChat(ctx context.Context, prompt string, callback StreamCallback) (string, error) {
g.history = append(g.history, &genai.Content{
Role: "user",
Parts: []*genai.Part{genai.NewPartFromText(prompt)},
})
var fullText strings.Builder
for result, err := range g.client.Models.GenerateContentStream(ctx, g.model, g.history, g.genCfg) {
if err != nil {
return fullText.String(), errors.Wrap(err, "gemini stream error")
}
for _, candidate := range result.Candidates {
for _, part := range candidate.Content.Parts {
if part.Text != "" {
fullText.WriteString(part.Text)
if err := callback(StreamEvent{Type: EventTextDelta, Delta: part.Text}); err != nil {
return fullText.String(), err
}
}
}
}
}
callback(StreamEvent{Type: EventComplete})
return fullText.String(), nil
}
Tool Calls During Streaming¶
When a tool call appears in the stream, the streaming pauses, the tool is executed (non-streaming), and the result is fed back into a new streaming request. This mirrors the existing ReAct loop but with streaming output for the non-tool-call portions.
// Simplified streaming ReAct loop
for step := range maxSteps {
// Stream response
// If tool calls detected โ execute tools โ continue loop
// If no tool calls โ stream is complete
}
ClaudeLocal¶
ClaudeLocal uses a CLI subprocess and does not support streaming. It implements ChatClient only, not StreamingChatClient. The type assertion pattern handles this gracefully.
Project Structure¶
pkg/chat/
โโโ client.go โ MODIFIED: StreamingChatClient interface, event types
โโโ streaming.go โ NEW: StreamEvent, StreamCallback, StreamEventType
โโโ claude.go โ MODIFIED: StreamChat implementation
โโโ openai.go โ MODIFIED: StreamChat implementation
โโโ gemini.go โ MODIFIED: StreamChat implementation
โโโ claude_local.go โ UNCHANGED: does not implement StreamingChatClient
โโโ streaming_test.go โ NEW: streaming tests
Testing Strategy¶
| Test | Scenario |
|---|---|
TestClaude_StreamChat_Success |
Mock SSE stream โ callback receives text deltas and complete event |
TestOpenAI_StreamChat_Success |
Mock SSE stream โ same |
TestGemini_StreamChat_Success |
Mock stream โ same |
TestStreamChat_CallbackError |
Callback returns error โ stream cancelled, partial text returned |
TestStreamChat_ContextCancelled |
Context cancelled mid-stream โ appropriate error |
TestStreamChat_EmptyResponse |
Provider returns empty stream โ empty string, no error |
TestStreamChat_WithToolCalls |
Stream contains tool call โ tool executed, stream continues |
TestClaudeLocal_NotStreaming |
Type assertion โ StreamingChatClient not satisfied |
TestStreamEvent_Types |
All event types have correct values |
Mock SSE Server¶
func newMockSSEServer(t *testing.T, events []string) *httptest.Server {
t.Helper()
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
flusher := w.(http.Flusher)
for _, event := range events {
fmt.Fprintf(w, "data: %s\n\n", event)
flusher.Flush()
}
}))
}
Coverage¶
- Target: 90%+ for
pkg/chat/including streaming paths.
Linting¶
golangci-lint run --fixmust pass.- No new
nolintdirectives.
Documentation¶
- Godoc for
StreamingChatClientinterface and all event types. - Godoc explaining the callback contract (blocking, error cancellation).
- Update
docs/components/chat.mdwith streaming usage examples and the type assertion pattern.
Backwards Compatibility¶
- No breaking changes.
ChatClientinterface is unchanged. Streaming is opt-in via type assertion. - Providers that implement
StreamingChatClientstill satisfyChatClient. ClaudeLocalis explicitly excluded โ this is documented, not a limitation.
Future Considerations¶
- Streaming Ask(): If a structured response schema can be progressively validated, streaming
Ask()could deliver partial results. Complex and likely not worth the effort. - WebSocket transport: For long-lived connections, WebSocket may be more efficient than SSE. Provider SDKs would need to support this.
- Stream recording: For debugging, record stream events to a file for replay.
Implementation Phases¶
Phase 1 โ Interface and Types¶
- Define
StreamingChatClientinterface - Define
StreamEvent,StreamCallback,StreamEventType - Add compile-time checks
Phase 2 โ Provider Implementations¶
- Implement Claude
StreamChat - Implement OpenAI
StreamChat - Implement Gemini
StreamChat - Handle tool calls within streaming
Phase 3 โ Tests¶
- Create mock SSE servers
- Add success, error, and cancellation tests for each provider
- Add tool call streaming tests
- Run with race detector
Verification¶
go build ./...
go test -race ./pkg/chat/...
go test ./...
golangci-lint run --fix
# Verify streaming interface exists
grep -n 'StreamingChatClient' pkg/chat/client.go
# Verify implementations
grep -n 'func.*StreamChat' pkg/chat/claude.go pkg/chat/openai.go pkg/chat/gemini.go