Backend FastAPI pour ChatKit avec streaming SSE
Implémentez un backend FastAPI production pour ChatKit avec streaming SSE, persistance des conversations et intégration d'agents IA. Gère le endpoint /chatkit, le format SSE compatible ChatKit et le stockage des messages.
Spar Skills Guide Bot
DeveloppementAvancé1 vues0 installations04/03/2026Claude CodeCursorCopilot
fastapisse-streamingai-agentschatkitreal-time
name: chatkit-backend description: Build FastAPI backend for OpenAI ChatKit with SSE streaming, conversation persistence, and AI agent integration. Handles /chatkit endpoint, ChatKit-compatible SSE format, conversation models, and message storage. Use when implementing chat backend, SSE streaming endpoint, or connecting AI agent to ChatKit frontend. allowed-tools: Bash, Write, Read, Edit, Glob, Grep
ChatKit Backend Skill
Production-ready skill for implementing FastAPI backend that powers OpenAI ChatKit frontend.
Reference Repositories:
Overview
ChatKit backend provides:
- SSE Streaming Endpoint - Real-time response streaming in ChatKit format
- Conversation Persistence - Store conversations and messages in database
- AI Agent Integration - Connect OpenAI Agents SDK to ChatKit
- Tool Execution - Execute tools and stream results to frontend
Architecture
┌─────────────────────────────────────────────────────────────────────────┐
│ FastAPI Backend │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ POST /chatkit │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ 1. Validate request (auth, user_id) │ │
│ │ 2. Get/create conversation │ │
│ │ 3. Load conversation history │ │
│ │ 4. Run AI agent with message │ │
│ │ 5. Stream response in ChatKit SSE format │ │
│ │ 6. Store message and response │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ SSE Response Format: │ │
│ │ data: {"type": "text", "content": "Hello"}\n\n │ │
│ │ data: {"type": "tool_call", "name": "add_task", "args": {...}}\n\n│ │
│ │ data: {"type": "tool_result", "result": {...}}\n\n │ │
│ │ data: [DONE]\n\n │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
SSE Event Format
ChatKit expects specific SSE event types:
Text Event (Streaming Content)
yield f"data: {json.dumps({'type': 'text', 'content': 'Hello'})}\n\n"
Tool Call Event
yield f"data: {json.dumps({'type': 'tool_call', 'name': 'add_task', 'args': {'title': 'Buy groceries'}})}\n\n"
Tool Result Event
yield f"data: {json.dumps({'type': 'tool_result', 'name': 'add_task', 'result': {'success': True, 'task_id': 123}})}\n\n"
Done Event
yield "data: [DONE]\n\n"
Or:
yield f"data: {json.dumps({'type': 'done'})}\n\n"
Project Structure
backend/src/
├── routers/
│ ├── chatkit.py # ChatKit SSE endpoint (NEW!)
│ └── conversations.py # Conversation CRUD endpoints
│
├── models/
│ ├── conversation.py # Conversation model
│ └── message.py # Message model
│
├── schemas/
│ └── chatkit.py # ChatKit request/response schemas
│
├── services/
│ └── chatkit_service.py # ChatKit business logic
│
├── agents/ # OpenAI Agents (from openai-agents-setup)
│ ├── config.py # Gemini/LiteLLM config
│ ├── todo_agent.py # Agent definition
│ └── runner.py # Agent execution
│
└── main.py # Register chatkit router
Quick Start
Step 1: Create ChatKit Router
# backend/src/routers/chatkit.py
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import StreamingResponse
from sqlmodel import Session, select
from src.database import get_session
from src.middleware.auth import verify_jwt
from src.models.conversation import Conversation
from src.models.message import Message
from src.agents import run_todo_agent_streaming
from datetime import datetime
import json
import logging
router = APIRouter(tags=["chatkit"])
logger = logging.getLogger(__name__)
@router.post("/chatkit")
async def chatkit_endpoint(
request: Request,
session: Session = Depends(get_session),
current_user: dict = Depends(verify_jwt),
):
"""
ChatKit SSE streaming endpoint.
Receives messages from ChatKit frontend and streams responses
in ChatKit-compatible SSE format.
"""
user_id = current_user["id"]
# Parse request body
body = await request.json()
message = body.get("message", "")
thread_id = body.get("thread_id") # Optional conversation ID
if not message:
raise HTTPException(status_code=400, detail="Message is required")
# Get or create conversation
conversation = await get_or_create_conversation(
session, user_id, thread_id, message
)
# Load conversation history
history = await load_conversation_history(session, conversation.id)
# Store user message
user_msg = Message(
conversation_id=conversation.id,
role="user",
content=message,
)
session.add(user_msg)
session.commit()
async def generate():
response_content = ""
try:
# Stream agent response
async for event in run_todo_agent_streaming(
user_message=message,
user_id=user_id,
conversation_history=history,
):
event_type = event.get("type")
if event_type == "text":
content = event.get("content", "")
response_content += content
yield f"data: {json.dumps(event)}\n\n"
elif event_type == "tool_call":
yield f"data: {json.dumps(event)}\n\n"
elif event_type == "tool_result":
yield f"data: {json.dumps(event)}\n\n"
elif event_type == "thinking":
# Optional: send thinking events
yield f"data: {json.dumps(event)}\n\n"
# Store assistant response
if response_content:
assistant_msg = Message(
conversation_id=conversation.id,
role="assistant",
content=response_content,
)
session.add(assistant_msg)
# Update conversation timestamp
conversation.updated_at = datetime.utcnow()
session.add(conversation)
session.commit()
# Signal completion
yield "data: [DONE]\n\n"
except Exception as e:
logger.error(f"ChatKit streaming error: {e}")
yield f"data: {json.dumps({'type': 'error', 'message': 'An error occurred'})}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
async def get_or_create_conversation(
session: Session,
user_id: str,
thread_id: int | None,
message: str,
) -> Conversation:
"""Get existing conversation or create new one."""
if thread_id:
conversation = session.exec(
select(Conversation).where(
Conversation.id == thread_id,
Conversation.user_id == user_id,
)
).first()
if conversation:
return conversation
# Create new conversation
title = message[:50] + "..." if len(message) > 50 else message
conversation = Conversation(user_id=user_id, title=title)
session.add(conversation)
session.commit()
session.refresh(conversation)
return conversation
async def load_conversation_history(
session: Session,
conversation_id: int,
) -> list[dict]:
"""Load conversation history for context."""
messages = session.exec(
select(Message)
.where(Message.conversation_id == conversation_id)
.order_by(Message.created_at)
).all()
return [
{"role": msg.role, "content": msg.content}
for msg in messages
]
Step 2: Register Router
# backend/src/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from src.routers import tasks, chatkit, conversations
app = FastAPI(title="Todo API")
# CORS for ChatKit
app.add_middleware(
CORSMiddleware,
allow_origins=[
"http://localhost:3000",
"https://your-app.vercel.app",
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Register routers
app.include_router(tasks.router)
app.include_router(chatkit.router)
app.include_router(conversations.router)
Streaming Agent Runner
Integrate with OpenAI Agents SDK:
# backend/src/agents/runner.py
from typing import AsyncGenerator
from agents import Runner
from .todo_agent import todo_agent
import asyncio
import logging
logger = logging.getLogger(__name__)
async def run_todo_agent_streaming(
user_message: str,
user_id: str,
conversation_history: list[dict] | None = None,
) -> AsyncGenerator[dict, None]:
"""
Execute agent and yield events for ChatKit streaming.
Event Types:
- {"type": "thinking", "content": "Analyzing request..."}
- {"type": "text", "content": "Hello"}
- {"type": "tool_call", "name": "add_task", "args": {...}}
- {"type": "tool_result", "name": "add_task", "result": {...}}
"""
# Enhance message with user context
enhanced_message = f"[User ID: {user_id}]\n{user_message}"
# Build input with history
input_messages = []
if conversation_history:
input_messages.extend(conversation_history)
input_messages.append({"role": "user", "content": enhanced_message})
try:
# Signal thinking
yield {"type": "thinking", "content": "Processing your request..."}
# Run agent
result = await Runner.run(
todo_agent,
input=input_messages if conversation_history else enhanced_message,
max_turns=10,
)
response_text = result.final_output
# Stream text in chunks for natural feel
chunk_size = 20
for i in range(0, len(response_text), chunk_size):
chunk = response_text[i:i + chunk_size]
yield {"type": "text", "content": chunk}
await asyncio.sleep(0.02) # Natural streaming pace
except Exception as e:
logger.error(f"Agent streaming error: {e}")
yield {"type": "error", "message": str(e)}
Streaming with Tool Events
Full streaming with tool call/result events:
# backend/src/agents/runner.py
async def run_todo_agent_streaming_with_tools(
user_message: str,
user_id: str,
conversation_history: list[dict] | None = None,
) -> AsyncGenerator[dict, None]:
"""
Stream agent response with tool execution events.
"""
from agents import Runner
enhanced_message = f"[User ID: {user_id}]\n{user_message}"
input_messages = []
if conversation_history:
input_messages.extend(conversation_history)
input_messages.append({"role": "user", "content": enhanced_message})
try:
yield {"type": "thinking", "content": "Analyzing your request..."}
# Use streaming run for real-time events
async with Runner.run_streamed(
todo_agent,
input=input_messages,
) as stream:
async for event in stream:
if event.type == "raw_model_stream_event":
# Text delta from model
if hasattr(event.data, "delta") and event.data.delta:
yield {"type": "text", "content": event.data.delta}
elif event.type == "tool_call_start":
yield {
"type": "tool_call",
"name": event.tool.name,
"args": event.arguments,
}
elif event.type == "tool_call_end":
yield {
"type": "tool_result",
"name": event.tool.name,
"result": event.result if hasattr(event, 'result') else {},
}
except Exception as e:
logger.error(f"Streaming error: {e}")
yield {"type": "error", "message": str(e)}
Database Models
Conversation Model
# backend/src/models/conversation.py
from sqlmodel import SQLModel, Field, Relationship
from datetime import datetime
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .message import Message
class Conversation(SQLModel, table=True):
__tablename__ = "conversations"
id: int | None = Field(default=None, primary_key=True)
user_id: str = Field(index=True)
title: str | None = Field(default=None, max_length=200)
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
messages: list["Message"] = Relationship(back_populates="conversation")
Message Model
# backend/src/models/message.py
from sqlmodel import SQLModel, Field, Relationship
from datetime import datetime
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .conversation import Conversation
class Message(SQLModel, table=True):
__tablename__ = "messages"
id: int | None = Field(default=None, primary_key=True)
conversation_id: int = Field(foreign_key="conversations.id", index=True)
role: str = Field(max_length=20) # "user", "assistant", "system"
content: str = Field()
tool_calls: str | None = Field(default=None) # JSON string
created_at: datetime = Field(default_factory=datetime.utcnow)
conversation: "Conversation" = Relationship(back_populates="messages")
Conversation CRUD Endpoints
# backend/src/routers/conversations.py
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlmodel import Session, select, desc
from src.database import get_session
from src.middleware.auth import verify_jwt
from src.models.conversation import Conversation
from src.models.message import Message
from pydantic import BaseModel
from datetime import datetime
router = APIRouter(prefix="/api/conversations", tags=["conversations"])
class ConversationResponse(BaseModel):
id: int
title: str | None
created_at: datetime
updated_at: datetime
@router.get("/", response_model=list[ConversationResponse])
async def list_conversations(
session: Session = Depends(get_session),
current_user: dict = Depends(verify_jwt),
):
"""List user's conversations."""
conversations = session.exec(
select(Conversation)
.where(Conversation.user_id == current_user["id"])
.order_by(desc(Conversation.updated_at))
).all()
return conversations
@router.delete("/{conversation_id}")
async def delete_conversation(
conversation_id: int,
session: Session = Depends(get_session),
current_user: dict = Depends(verify_jwt),
):
"""Delete conversation and messages."""
conversation = session.exec(
select(Conversation).where(
Conversation.id == conversation_id,
Conversation.user_id == current_user["id"],
)
).first()
if not conversation:
raise HTTPException(status_code=404, detail="Not found")
# Delete messages
for msg in conversation.messages:
session.delete(msg)
# Delete conversation
session.delete(conversation)
session.commit()
return {"status": "deleted"}
Database Migration
# Create migration
cd backend
uv run alembic revision --autogenerate -m "Add conversations and messages"
uv run alembic upgrade head
Environment Variables
# Backend (.env)
DATABASE_URL=postgresql://user:pass@host/db
BETTER_AUTH_SECRET=your_auth_secret
GEMINI_API_KEY=your_gemini_api_key
GEMINI_MODEL=gemini-2.5-flash
Testing
Test SSE Endpoint
# Test with curl
curl -X POST http://localhost:8000/chatkit \
-H "Content-Type: application/json" \
-H "Authorization: Bearer YOUR_TOKEN" \
-d '{"message": "Show my tasks"}'
Python Test
import httpx
import asyncio
async def test_chatkit():
async with httpx.AsyncClient() as client:
async with client.stream(
"POST",
"http://localhost:8000/chatkit",
json={"message": "Show my tasks"},
headers={"Authorization": "Bearer TOKEN"},
) as response:
async for line in response.aiter_lines():
if line.startswith("data: "):
print(line[6:])
asyncio.run(test_chatkit())
Verification Checklist
- [ ]
/chatkitendpoint created - [ ] SSE format matches ChatKit expectations
- [ ] Conversation model with user_id
- [ ] Message model with role and content
- [ ] Messages stored after streaming
- [ ] Conversation updated_at updated
- [ ] CORS configured for frontend
- [ ] Agent integration working
- [ ] Tool events streaming correctly
- [ ] Error handling with SSE error events
See Also
- REFERENCE.md - SSE format reference
- examples.md - Full code examples
- chatkit-frontend skill - Frontend integration
- openai-agents-setup skill - Agent configuration
Skills similaires
Expert Next.js App Router
100
Un skill qui transforme Claude en expert Next.js App Router.
Claude CodeCursoradvanced
8902342523Admin
Générateur de README
100
Crée des README.md professionnels et complets pour vos projets.
claudeCursorWindsurfbeginner
25972522Admin
Rédacteur de Documentation API
100
Génère de la documentation API complète au format OpenAPI/Swagger.
claudeCursorWindsurfintermediate
15644375Admin