Source code for rsocket.transports.quart_websocket

import asyncio

from quart import websocket

from rsocket.frame import Frame
from rsocket.helpers import wrap_transport_exception
from rsocket.logger import logger
from rsocket.rsocket_server import RSocketServer
from rsocket.transports.abstract_messaging import AbstractMessagingTransport


[docs] async def websocket_handler(on_server_create=None, **kwargs): """ Helper method to instantiate an RSocket server using a quart websocket connection. :param on_server_create: callback to be called when the server is created :param kwargs: parameters passed to the server """ transport = TransportQuartWebsocket() server = RSocketServer(transport, **kwargs) if on_server_create is not None: on_server_create(server) await transport.handle_incoming_ws_messages()
[docs] class TransportQuartWebsocket(AbstractMessagingTransport): """ RSocket transport over server side quart websocket. Use the `websocket_handler <rsocket.transports.quart_websocket.websocket_handler>` helper method to instantiate. """ async def handle_incoming_ws_messages(self): try: while True: data = await websocket.receive() async for frame in self._frame_parser.receive_data(data, 0): self._incoming_frame_queue.put_nowait(frame) except asyncio.CancelledError: logger().debug('Asyncio task canceled: quart_handle_incoming_ws_messages') async def send_frame(self, frame: Frame): with wrap_transport_exception(): await websocket.send(frame.serialize()) async def close(self): pass