Websockets

WebSockets provide a persistent, full-duplex communication channel over a single TCP connection, allowing for real-time, bi-directional communication between clients and servers. FastAPI supports WebSockets, making it a suitable choice for applications that require real-time updates, like chat applications, live notifications, or online games.

Getting Started with WebSockets in FastAPI

Here's a step-by-step guide to implementing WebSockets with FastAPI.

1. Setting Up the Project

Create a new project directory:

fastapi_websockets/
│
├── main.py
└── requirements.txt

Install FastAPI and Uvicorn if you haven't already:

pip install fastapi uvicorn

2. Implementing WebSocket Endpoint

In your main.py file, set up a basic WebSocket endpoint.

# main.py
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse

app = FastAPI()

html = """
<!DOCTYPE html>
<html>
<head>
    <title>WebSocket Chat</title>
</head>
<body>
    <h1>WebSocket Chat</h1>
    <textarea id="chat-log" cols="100" rows="20"></textarea><br>
    <input id="messageText" type="text" size="100">
    <button onclick="sendMessage()">Send</button>

    <script>
        const ws = new WebSocket("ws://localhost:8000/ws");

        ws.onmessage = function(event) {
            const chatLog = document.getElementById("chat-log");
            chatLog.value += event.data + "\\n";
        };

        function sendMessage() {
            const input = document.getElementById("messageText");
            ws.send(input.value);
            input.value = '';
        }
    </script>
</body>
</html>
"""

@app.get("/")
async def get():
    return HTMLResponse(html)

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    while True:
        data = await websocket.receive_text()
        await websocket.send_text(f"Message text was: {data}")
  • The @app.get("/") endpoint serves a simple HTML page with a chat interface.
  • The @app.websocket("/ws") endpoint establishes a WebSocket connection. It listens for incoming messages and echoes them back to the client.

3. Running the Application

Start the FastAPI application using Uvicorn:

uvicorn main:app --reload
  • Open your browser and navigate to http://localhost:8000 to interact with the WebSocket server.

4. Understanding WebSocket Communication

  • Connection Establishment: The client connects to the WebSocket server at ws://localhost:8000/ws.
  • Message Handling: The server listens for messages sent from the client and responds with the same message prefixed with "Message text was:".
  • Client-Side Script: The HTML includes JavaScript that opens a WebSocket connection, sends messages, and updates the chat log.

Advanced WebSocket Usage

For more complex applications, you might want to handle multiple clients, broadcast messages, or manage different WebSocket events.

Broadcasting Messages to Multiple Clients

To broadcast messages to all connected clients, you need to maintain a list of active WebSocket connections and iterate over them to send messages.

# main.py (updated)
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

html = """
<!DOCTYPE html>
<html>
<head>
    <title>WebSocket Chat</title>
</head>
<body>
    <h1>WebSocket Chat</h1>
    <textarea id="chat-log" cols="100" rows="20"></textarea><br>
    <input id="messageText" type="text" size="100">
    <button onclick="sendMessage()">Send</button>

    <script>
        const ws = new WebSocket("ws://localhost:8000/ws");

        ws.onmessage = function(event) {
            const chatLog = document.getElementById("chat-log");
            chatLog.value += event.data + "\\n";
        };

        function sendMessage() {
            const input = document.getElementById("messageText");
            ws.send(input.value);
            input.value = '';
        }
    </script>
</body>
</html>
"""

@app.get("/")
async def get():
    return HTMLResponse(html)

class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            await manager.broadcast(f"Message: {data}")
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast(f"Client left the chat")
  • ConnectionManager: This class manages WebSocket connections and provides methods to connect, disconnect, and broadcast messages.
  • WebSocket Endpoint: Uses ConnectionManager to handle incoming WebSocket connections and broadcasts received messages to all connected clients.

WebSocket Event Handling

You can handle various WebSocket events such as connection, disconnection, and message reception.

  • Connection Handling: Accept and store new WebSocket connections.
  • Disconnection Handling: Remove connections from the list when clients disconnect.
  • Message Reception: Process incoming messages, and potentially validate or transform them before broadcasting.

Example: Multi-Room Chat Application

For more sophisticated applications, like a chat application with multiple rooms, you can extend the ConnectionManager to handle rooms and message routing.

from typing import List, Dict
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

class ConnectionManager:
    def __init__(self):
        self.active_connections: Dict[str, List[WebSocket]] = {}

    async def connect(self, room: str, websocket: WebSocket):
        await websocket.accept()
        if room not in self.active_connections:
            self.active_connections[room] = []
        self.active_connections[room].append(websocket)

    def disconnect(self, room: str, websocket: WebSocket):
        self.active_connections[room].remove(websocket)
        if not self.active_connections[room]:
            del self.active_connections[room]

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

    async def broadcast(self, room: str, message: str):
        if room in self.active_connections:
            for connection in self.active_connections[room]:
                await connection.send_text(message)

manager = ConnectionManager()

@app.websocket("/ws/{room}")
async def websocket_endpoint(websocket: WebSocket, room: str):
    await manager.connect(room, websocket)
    try:
        while True:
            data = await websocket.receive_text()
            await manager.broadcast(room, f"Message from {room}: {data}")
    except WebSocketDisconnect:
        manager.disconnect(room, websocket)
        await manager.broadcast(room, f"Client left the room {room}")
  • This ConnectionManager now handles connections by room.
  • The WebSocket endpoint /ws/{room} routes messages to the appropriate room.

Conclusion

FastAPI’s WebSocket support is versatile and powerful, enabling real-time, bi-directional communication for your applications. Whether you need simple WebSocket connections or more complex setups like multi-room chats, FastAPI provides the tools to implement these features efficiently.

Resources

With these tools and examples, you can start building robust real-time applications using FastAPI and WebSockets.