?
Templateintermédiaire

Streaming et Server-Sent Events

Implémenter le streaming pour une UX réactive avec l'API Claude.

Pourquoi le streaming ?

Sans streaming, l'utilisateur attend la réponse complète de Claude avant de voir quoi que ce soit. Avec le streaming, les tokens apparaissent au fur et à mesure de leur génération, créant une expérience fluide et réactive.

Streaming avec le SDK TypeScript

import Anthropic from "@anthropic-ai/sdk";

const client = new Anthropic();

const stream = await client.messages.stream({
  model: "claude-sonnet-4-20250514",
  max_tokens: 1024,
  messages: [{ role: "user", content: "Explique le théorème de Bayes." }]
});

for await (const event of stream) {
  if (event.type === "content_block_delta" && event.delta.type === "text_delta") {
    process.stdout.write(event.delta.text);
  }
}

// Accéder au message final complet
const finalMessage = await stream.finalMessage();
console.log(finalMessage.usage);

Streaming avec le SDK Python

import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Explique le théorème de Bayes."}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

Implémenter un endpoint SSE (Next.js)

// app/api/chat/route.ts
import Anthropic from "@anthropic-ai/sdk";

export async function POST(req: Request) {
  const { message } = await req.json();
  const client = new Anthropic();

  const stream = await client.messages.stream({
    model: "claude-sonnet-4-20250514",
    max_tokens: 1024,
    messages: [{ role: "user", content: message }]
  });

  // Convertir en ReadableStream pour le client
  const readableStream = new ReadableStream({
    async start(controller) {
      for await (const event of stream) {
        if (event.type === "content_block_delta" && event.delta.type === "text_delta") {
          controller.enqueue(
            new TextEncoder().encode(`data: ${JSON.stringify({ text: event.delta.text })}\n\n`)
          );
        }
      }
      controller.enqueue(new TextEncoder().encode("data: [DONE]\n\n"));
      controller.close();
    }
  });

  return new Response(readableStream, {
    headers: {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache",
      "Connection": "keep-alive"
    }
  });
}

Consommer côté client (React)

async function streamChat(message: string, onToken: (text: string) => void) {
  const response = await fetch("/api/chat", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ message })
  });

  const reader = response.body!.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    const chunk = decoder.decode(value);
    const lines = chunk.split("\n").filter(l => l.startsWith("data: "));

    for (const line of lines) {
      const data = line.slice(6);
      if (data === "[DONE]") return;
      const { text } = JSON.parse(data);
      onToken(text);
    }
  }
}

Événements du stream

Les principaux types d'événements : - message_start : Début du message, contient les métadonnées - content_block_start : Début d'un bloc (texte ou tool_use) - content_block_delta : Contenu incrémental - content_block_stop : Fin d'un bloc - message_delta : Mise à jour du message (stop_reason, usage) - message_stop : Fin du message

Bonnes pratiques

  • Timeout : Implémentez un timeout côté client (30-60s sans événement)
  • Reconnexion : Gérez les déconnexions réseau avec retry
  • Buffering : Accumulez quelques tokens avant de re-render pour éviter le flickering
  • Annulation : Utilisez AbortController pour permettre à l'utilisateur d'interrompre

Sources

APIstreamingUX