NEW: Fenrir v1.2.2 is now available — Logo & Favicon Patch! Read the changelog
server-sent-events.md
docs server-sent-events.md

Server-Sent Events (SSE)

Basic SSE Endpoint

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from fenrir import EventSourceResponse
import asyncio

@app.get("/events")
async def get_events():
    async def event_generator():
        for i in range(10):
            yield f"data: Message {i}\n\n"
            await asyncio.sleep(1)

    return EventSourceResponse(event_generator())

SSE with Named Events

1
2
3
4
5
6
7
8
@app.get("/stream")
async def stream_events():
    async def event_generator():
        for i in range(5):
            yield f"event: update\ndata: {{'count': {i}}}\n\n"
            await asyncio.sleep(2)

    return EventSourceResponse(event_generator())

Real-time Notifications

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio
from typing import Set

subscribers: Set = set()

@app.get("/subscribe")
async def subscribe():
    async def event_generator():
        queue = asyncio.Queue()
        subscribers.add(queue)

        try:
            while True:
                message = await queue.get()
                yield f"data: {message}\n\n"
        finally:
            subscribers.remove(queue)

    return EventSourceResponse(event_generator())

@app.post("/notify")
async def notify(message: str = Query(...)):
    for queue in subscribers:
        await queue.put(message)
    return {"notified": len(subscribers)}
Edit on GitHub Last Updated: Jun 04, 2026
© 2026 Fenrir Project.
main*
v1.2.2
Ln 1, Col 1
UTF-8
Prettier
Light Mode
Markdown