When defining WebSockets, you normally declare a parameter of type WebSocket and with it you can read data from the client and send data to it.
It is provided directly by Starlette, but you can import it from fastapi:
fromfastapiimportWebSocket
Tip
When you want to define dependencies that should be compatible with both HTTP and WebSockets, you can define a parameter that takes an HTTPConnection instead of a Request or a WebSocket.
asyncdefreceive(self)->Message:""" Receive ASGI websocket messages, ensuring valid state transitions. """ifself.client_state==WebSocketState.CONNECTING:message=awaitself._receive()message_type=message["type"]ifmessage_type!="websocket.connect":raiseRuntimeError('Expected ASGI message "websocket.connect", 'f"but got {message_type!r}")self.client_state=WebSocketState.CONNECTEDreturnmessageelifself.client_state==WebSocketState.CONNECTED:message=awaitself._receive()message_type=message["type"]ifmessage_typenotin{"websocket.receive","websocket.disconnect"}:raiseRuntimeError('Expected ASGI message "websocket.receive" or 'f'"websocket.disconnect", but got {message_type!r}')ifmessage_type=="websocket.disconnect":self.client_state=WebSocketState.DISCONNECTEDreturnmessageelse:raiseRuntimeError('Cannot call "receive" once a disconnect message has been received.')
asyncdefsend(self,message:Message)->None:""" Send ASGI websocket messages, ensuring valid state transitions. """ifself.application_state==WebSocketState.CONNECTING:message_type=message["type"]ifmessage_typenotin{"websocket.accept","websocket.close","websocket.http.response.start",}:raiseRuntimeError('Expected ASGI message "websocket.accept",''"websocket.close" or "websocket.http.response.start",'f"but got {message_type!r}")ifmessage_type=="websocket.close":self.application_state=WebSocketState.DISCONNECTEDelifmessage_type=="websocket.http.response.start":self.application_state=WebSocketState.RESPONSEelse:self.application_state=WebSocketState.CONNECTEDawaitself._send(message)elifself.application_state==WebSocketState.CONNECTED:message_type=message["type"]ifmessage_typenotin{"websocket.send","websocket.close"}:raiseRuntimeError('Expected ASGI message "websocket.send" or "websocket.close", 'f"but got {message_type!r}")ifmessage_type=="websocket.close":self.application_state=WebSocketState.DISCONNECTEDtry:awaitself._send(message)exceptOSError:self.application_state=WebSocketState.DISCONNECTEDraiseWebSocketDisconnect(code=1006)elifself.application_state==WebSocketState.RESPONSE:message_type=message["type"]ifmessage_type!="websocket.http.response.body":raiseRuntimeError('Expected ASGI message "websocket.http.response.body", 'f"but got {message_type!r}")ifnotmessage.get("more_body",False):self.application_state=WebSocketState.DISCONNECTEDawaitself._send(message)else:raiseRuntimeError('Cannot call "send" once a close message has been sent.')
asyncdefaccept(self,subprotocol:str|None=None,headers:typing.Iterable[tuple[bytes,bytes]]|None=None,)->None:headers=headersor[]ifself.client_state==WebSocketState.CONNECTING:# If we haven't yet seen the 'connect' message, then wait for it first.awaitself.receive()awaitself.send({"type":"websocket.accept","subprotocol":subprotocol,"headers":headers})
asyncdefreceive_text(self)->str:ifself.application_state!=WebSocketState.CONNECTED:raiseRuntimeError('WebSocket is not connected. Need to call "accept" first.')message=awaitself.receive()self._raise_on_disconnect(message)returntyping.cast(str,message["text"])
asyncdefreceive_bytes(self)->bytes:ifself.application_state!=WebSocketState.CONNECTED:raiseRuntimeError('WebSocket is not connected. Need to call "accept" first.')message=awaitself.receive()self._raise_on_disconnect(message)returntyping.cast(bytes,message["bytes"])
asyncdefreceive_json(self,mode:str="text")->typing.Any:ifmodenotin{"text","binary"}:raiseRuntimeError('The "mode" argument should be "text" or "binary".')ifself.application_state!=WebSocketState.CONNECTED:raiseRuntimeError('WebSocket is not connected. Need to call "accept" first.')message=awaitself.receive()self._raise_on_disconnect(message)ifmode=="text":text=message["text"]else:text=message["bytes"].decode("utf-8")returnjson.loads(text)
asyncdefsend_json(self,data:typing.Any,mode:str="text")->None:ifmodenotin{"text","binary"}:raiseRuntimeError('The "mode" argument should be "text" or "binary".')text=json.dumps(data,separators=(",",":"),ensure_ascii=False)ifmode=="text":awaitself.send({"type":"websocket.send","text":text})else:awaitself.send({"type":"websocket.send","bytes":text.encode("utf-8")})