Extensions

Transports

TCP

class rsocket.transports.tcp.TransportTCP(reader, writer, read_buffer_size=1024)[source]

RSocket transport over asyncio TCP connection.

Parameters:
  • reader (StreamReader) – asyncio connection reader stream

  • writer (StreamWriter) – asyncio connection writer stream

Websocket

aiohttp

rsocket.transports.aiohttp_websocket.websocket_client(url=None, websocket=None, **kwargs)[source]

Helper method to instantiate an RSocket client using a websocket url over aiohttp client.

Parameters:
  • url (str | None)

  • websocket (ClientWebSocketResponse | None)

Return type:

RSocketClient

class rsocket.transports.aiohttp_websocket.TransportAioHttpClient(url=None, websocket=None)[source]

RSocket transport over client side aiohttp websocket.

Parameters:
  • url (str | None)

  • websocket (ClientWebSocketResponse | None)

quart

async rsocket.transports.quart_websocket.websocket_handler(on_server_create=None, **kwargs)[source]

Helper method to instantiate an RSocket server using a quart websocket connection.

Parameters:
  • on_server_create – callback to be called when the server is created

  • kwargs – parameters passed to the server

class rsocket.transports.quart_websocket.TransportQuartWebsocket[source]

RSocket transport over server side quart websocket. Use the websocket_handler <rsocket.transports.quart_websocket.websocket_handler> helper method to instantiate.

websockets

asyncwebsockets

rsocket.transports.asyncwebsockets_transport.websocket_client(url, **kwargs)[source]

Helper method to instantiate an RSocket client using a websocket url over asyncwebsockets client.

Parameters:
  • url (str) – websocket url

  • kwargs – parameters passed to the client

Return type:

RSocketClient

class rsocket.transports.asyncwebsockets_transport.TransportAsyncWebsocketsClient(websocket)[source]

RSocket transport over client side asyncwebsockets.

Parameters:

websocket – websocket connection

quic

class rsocket.transports.aioquic_transport.RSocketQuicProtocol(*args, **kwargs)[source]

RSocket transport over server side quic.

class rsocket.transports.aioquic_transport.RSocketQuicTransport(quic_protocol)[source]

RSocket transport over server/client side quic connection.

Parameters:

quic_protocol (RSocketQuicProtocol)

http3

class rsocket.transports.http3_transport.RSocketHttp3ClientProtocol(*args, **kwargs)[source]

RSocket transport over client side http3 connection.

class rsocket.transports.http3_transport.Http3TransportWebsocket(websocket)[source]

RSocket transport over server side http3 connection.

Parameters:

websocket (WebSocket | ClientWebSocket)

Routing

RequestRouter

class rsocket.routing.request_router.RequestRouter(payload_deserializer=<function RequestRouter.<lambda>>, payload_serializer=<function RequestRouter.<lambda>>)[source]

Used to define routes for RSocket endpoints.

Pass this to RoutingRequestHandler to instantiate a handler using these routes.

RoutingRequestHandler

class rsocket.routing.routing_request_handler.RoutingRequestHandler(router, authentication_verifier=None)[source]

Handler implementation which uses a RequestRouter to handle requests based on route information provided in the payload metadata.

Parameters:
  • router (RequestRouter)

  • authentication_verifier (Callable[[str, Authentication], Coroutine[None, None, None]] | None)

Load Balancer

Strategies

class rsocket.load_balancer.round_robin.LoadBalancerRoundRobin(pool, auto_connect=True, auto_close=True)[source]

Round Robin Load Balancer

Parameters:

pool (List[RSocket])

class rsocket.load_balancer.random_client.LoadBalancerRandom(pool, auto_connect=True, auto_close=True)[source]

Random Load Balancer

Parameters:

pool (List[RSocket])

ReactiveX

ReactiveX 4

class rsocket.reactivex.reactivex_handler.ReactivexHandler[source]

Variant of RequestHandler which uses Reactivex (4.0). Wrap with reactivex_handler_factory to pass as a request handler

abstract async on_setup(data_encoding, metadata_encoding, payload)[source]

Handle setup request

Parameters:
  • data_encoding (bytes)

  • metadata_encoding (bytes)

  • payload (Payload)

abstract async on_metadata_push(metadata)[source]

Handle metadata-push request

Parameters:

metadata (Payload)

abstract async request_channel(payload)[source]

Handle request-channel interaction

Parameters:

payload (Payload)

Return type:

ReactivexChannel

abstract async request_fire_and_forget(payload)[source]

Handle request-fire-and-forget interaction

Parameters:

payload (Payload)

abstract async request_response(payload)[source]

Handle request-response interaction

Parameters:

payload (Payload)

Return type:

Observable

abstract async request_stream(payload)[source]

Handle request-stream interaction

Parameters:

payload (Payload)

Return type:

Observable | Callable[[Subject], Observable]

abstract async on_error(error_code, payload)[source]

Handle errors received from the remote side

Parameters:
  • error_code (ErrorCode)

  • payload (Payload)

abstract async on_connection_error(rsocket, exception)[source]

Handle connection error

Parameters:

exception (Exception)

abstract async on_close(rsocket, exception=None)[source]

Handle connection closed

Parameters:

exception (Exception | None)

rsocket.reactivex.reactivex_handler_adapter.reactivex_handler_factory(handler_factory)[source]

Wraps a reactivex handler factory into a basic request handler adapter.

Parameters:

handler_factory (Callable[[], ReactivexHandler])

ReactiveX 3

class rsocket.rx_support.rx_handler.RxHandler[source]

Variant of RequestHandler which uses Rx (3.0). Wrap with rx_handler_factory to pass as a request handler

abstract async on_setup(data_encoding, metadata_encoding, payload)[source]

Handle setup request

Parameters:
  • data_encoding (bytes)

  • metadata_encoding (bytes)

  • payload (Payload)

abstract async on_metadata_push(metadata)[source]

Handle metadata-push request

Parameters:

metadata (Payload)

abstract async request_channel(payload)[source]

Handle request-channel interaction

Parameters:

payload (Payload)

Return type:

RxChannel

abstract async request_fire_and_forget(payload)[source]

Handle request-fire-and-forget request

Parameters:

payload (Payload)

abstract async request_response(payload)[source]

Handle request-response interaction

Parameters:

payload (Payload)

Return type:

Observable

abstract async request_stream(payload)[source]

Handle request-stream interaction

Parameters:

payload (Payload)

Return type:

Observable | Callable[[Subject], Observable]

abstract async on_error(error_code, payload)[source]

Handle errors received from the remote side

Parameters:
  • error_code (ErrorCode)

  • payload (Payload)

abstract async on_connection_error(rsocket, exception)[source]

Handle connection error

Parameters:

exception (Exception)

abstract async on_close(rsocket, exception=None)[source]

Handle connection closed

Parameters:

exception (Exception | None)

rsocket.rx_support.rx_handler_adapter.rx_handler_factory(handler_factory)[source]

Wraps an Rx handler factory into a basic request handler adapter.

Parameters:

handler_factory (Callable[[], RxHandler])