Source code for rsocket.transports.asyncwebsockets_transport

import asyncio
from contextlib import asynccontextmanager

from rsocket.exceptions import RSocketTransportError
from rsocket.frame import Frame
from rsocket.helpers import wrap_transport_exception, single_transport_provider
from rsocket.logger import logger
from rsocket.rsocket_client import RSocketClient
from rsocket.transports.abstract_messaging import AbstractMessagingTransport


[docs] @asynccontextmanager async def websocket_client(url: str, **kwargs) -> RSocketClient: """ Helper method to instantiate an RSocket client using a websocket url over asyncwebsockets client. """ from asyncwebsockets import open_websocket async with open_websocket(url) as websocket: async with RSocketClient(single_transport_provider(TransportAsyncWebsocketsClient(websocket)), **kwargs) as client: yield client
[docs] class TransportAsyncWebsocketsClient(AbstractMessagingTransport): """ RSocket transport over client side asyncwebsockets. """ def __init__(self, websocket): super().__init__() self._ws = websocket self._message_handler = None async def connect(self): self._message_handler = asyncio.create_task(self.handle_incoming_ws_messages()) async def handle_incoming_ws_messages(self): from wsproto.events import BytesMessage try: async for message in self._ws: if isinstance(message, BytesMessage): async for frame in self._frame_parser.receive_data(message.data, 0): self._incoming_frame_queue.put_nowait(frame) except asyncio.CancelledError: logger().debug('Asyncio task canceled: incoming_data_listener') except Exception: self._incoming_frame_queue.put_nowait(RSocketTransportError()) async def send_frame(self, frame: Frame): with wrap_transport_exception(): await self._ws.send(frame.serialize()) async def close(self): self._message_handler.cancel() await self._message_handler