Core API Reference

Controls

Server

class rsocket.rsocket_server.RSocketServer(transport, handler_factory=<class 'rsocket.request_handler.BaseRequestHandler'>, honor_lease=False, lease_publisher=None, request_queue_size=0, data_encoding=WellKnownMimeTypes.APPLICATION_JSON, metadata_encoding=WellKnownMimeTypes.APPLICATION_JSON, keep_alive_period=datetime.timedelta(microseconds=500000), max_lifetime_period=datetime.timedelta(seconds=600), setup_payload=None, fragment_size_bytes=None, on_ready=None)[source]

Server side instance of an RSocket connection.

Parameters:
  • transport (Transport) – Transport to use with this instance. See Transport class implementations.

  • request_queue_size (int) – Number of frames which can be queued while waiting for a lease.

  • fragment_size_bytes (int | None) – Minimum 64, Maximum depends on transport.

  • on_ready (Callable[[RSocketBase], None] | None) – Called after the RSocket server internals setup is done.

  • handler_factory (Callable[[], RequestHandler]) – Callable which returns the implemented application logic endpoints. See also RequestRouter

  • lease_publisher (Publisher | None)

  • data_encoding (str | bytes | WellKnownMimeTypes)

  • metadata_encoding (str | bytes | WellKnownMimeTypes)

  • keep_alive_period (timedelta)

  • max_lifetime_period (timedelta)

  • setup_payload (Payload | None)

fire_and_forget(payload)

Initiate a fire-and-forget interaction.

Parameters:

payload (Payload)

Return type:

Awaitable[None]

metadata_push(metadata)

Initiate a metadata-push interaction.

Parameters:

metadata (bytes)

Return type:

Awaitable[None]

request_channel(payload, publisher=None, sending_done=None)

Initiate a request-channel interaction.

Parameters:
Return type:

BackpressureApi | Publisher

request_response(payload)

Initiate a request-response interaction.

Parameters:

payload (Payload)

Return type:

Awaitable[Payload]

request_stream(payload)

Initiate a request-stream interaction.

Parameters:

payload (Payload)

Return type:

BackpressureApi | Publisher

Client

class rsocket.rsocket_client.RSocketClient(transport_provider, handler_factory=<class 'rsocket.request_handler.BaseRequestHandler'>, honor_lease=False, lease_publisher=None, request_queue_size=0, data_encoding=WellKnownMimeTypes.APPLICATION_JSON, metadata_encoding=WellKnownMimeTypes.APPLICATION_JSON, keep_alive_period=datetime.timedelta(microseconds=500000), max_lifetime_period=datetime.timedelta(seconds=600), setup_payload=None, fragment_size_bytes=None)[source]

Client side instance of an RSocket connection.

Parameters:
  • transport_provider (AsyncGenerator[Transport, Any]) – Async generator which returns Transport to use with this instance.

  • request_queue_size (int) – Number of frames which can be queued while waiting for a lease.

  • fragment_size_bytes (int | None) – Minimum 64, Maximum depends on transport.

  • handler_factory (Callable[[RSocketBase], RequestHandler]) – Callable which returns the implemented application logic endpoints. See also RequestRouter

  • lease_publisher (Publisher | None)

  • data_encoding (str | bytes | WellKnownMimeTypes)

  • metadata_encoding (str | bytes | WellKnownMimeTypes)

  • keep_alive_period (timedelta)

  • max_lifetime_period (timedelta)

  • setup_payload (Payload | None)

fire_and_forget(payload)

Initiate a fire-and-forget interaction.

Parameters:

payload (Payload)

Return type:

Awaitable[None]

metadata_push(metadata)

Initiate a metadata-push interaction.

Parameters:

metadata (bytes)

Return type:

Awaitable[None]

request_channel(payload, publisher=None, sending_done=None)

Initiate a request-channel interaction.

Parameters:
Return type:

BackpressureApi | Publisher

request_response(payload)

Initiate a request-response interaction.

Parameters:

payload (Payload)

Return type:

Awaitable[Payload]

request_stream(payload)

Initiate a request-stream interaction.

Parameters:

payload (Payload)

Return type:

BackpressureApi | Publisher

Handler

class rsocket.request_handler.RequestHandler[source]

An interface which defines handler for all rsocket interactions, and some other events (e.g. on_setup).

abstract async request_channel(payload)[source]

Bi-Directional communication. A publisher on each end is connected to a subscriber on the other end. Note that the first payload sent to the handler is passed as an argument to this method and not to the local subscriber.

Parameters:

payload (Payload)

Return type:

Tuple[Publisher | None, Subscriber | None]

abstract async request_response(payload)[source]

Handle request-response interaction

Parameters:

payload (Payload)

Return type:

Awaitable[Payload]

abstract async request_stream(payload)[source]

Handle request-stream interaction

Parameters:

payload (Payload)

Return type:

Publisher

class rsocket.request_handler.BaseRequestHandler[source]

Default implementation of RequestHandler to simplify implementing handlers.

For each request handler, the implementation will raise a RuntimeError. For request_fire_and_forget() and on_metadata_push() the request will be ignored.

async on_setup(data_encoding, metadata_encoding, payload)[source]

Nothing to do on setup by default

Parameters:
  • data_encoding (bytes)

  • metadata_encoding (bytes)

  • payload (Payload)

async request_channel(payload)[source]

Raise RuntimeError by default if not implemented.

Parameters:

payload (Payload)

Return type:

Tuple[Publisher | None, Subscriber | None]

async request_fire_and_forget(payload)[source]

Ignored by default

Parameters:

payload (Payload)

async on_metadata_push(payload)[source]

Nothing by default

Parameters:

payload (Payload)

Enums

class rsocket.extensions.mimetypes.WellKnownMimeTypes(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Known mime types for data and metadata included in payloads.

Models

class rsocket.payload.Payload(data=None, metadata=None)[source]

A response/stream message (upstream or downstream). Contains data and metadata, both bytes.

Parameters:
  • data (bytes | bytearray | None) – data segment of payload

  • metadata (bytes | bytearray | None) – metadata segment of payload

Interfaces

Publisher

class reactivestreams.publisher.Publisher[source]

Handles event for subscription to a subscriber

Subscriber

class reactivestreams.subscriber.Subscriber[source]

Handles stream events.

Subscription

class reactivestreams.subscription.Subscription[source]

Backpressure stream control.

Transports

class rsocket.transports.transport.Transport[source]

Base class for all transports:

TCP

class rsocket.transports.tcp.TransportTCP(reader, writer, read_buffer_size=1024)[source]

RSocket transport over asyncio TCP connection.

Parameters:
  • reader (StreamReader) – asyncio connection reader stream

  • writer (StreamWriter) – asyncio connection writer stream

Quart

AIOHttp

AIOQuic

HTTP3

Websockets

rsocket.transports.asyncwebsockets_transport.websocket_client(url, **kwargs)[source]

Helper method to instantiate an RSocket client using a websocket url over asyncwebsockets client.

Parameters:

url (str)

Return type:

RSocketClient

class rsocket.transports.asyncwebsockets_transport.TransportAsyncWebsocketsClient(websocket)[source]

RSocket transport over client side asyncwebsockets.