Core API Reference

Controls

Server

class rsocket.rsocket_server.RSocketServer(transport, handler_factory=<class 'rsocket.request_handler.BaseRequestHandler'>, honor_lease=False, lease_publisher=None, request_queue_size=0, data_encoding=WellKnownMimeTypes.APPLICATION_JSON, metadata_encoding=WellKnownMimeTypes.APPLICATION_JSON, keep_alive_period=datetime.timedelta(microseconds=500000), max_lifetime_period=datetime.timedelta(seconds=600), setup_payload=None, fragment_size_bytes=None, on_ready=None)[source]

Server side instance of an RSocket connection.

Parameters:
  • transport (Transport) – Transport to use with this instance. See Transport class implementations.

  • request_queue_size (int) – Number of frames which can be queued while waiting for a lease.

  • fragment_size_bytes (int | None) – Minimum 64, Maximum depends on transport.

  • on_ready (Callable[[RSocketBase], None] | None) – Called after the RSocket server internals setup is done.

  • handler_factory (Callable[[], RequestHandler]) – Callable which returns the implemented application logic endpoints. See RequestRouter

  • lease_publisher (Publisher | None)

  • data_encoding (str | bytes | WellKnownMimeTypes)

  • metadata_encoding (str | bytes | WellKnownMimeTypes)

  • keep_alive_period (timedelta)

  • max_lifetime_period (timedelta)

  • setup_payload (Payload | None)

fire_and_forget(payload)

Initiate a fire-and-forget interaction.

Parameters:

payload (Payload)

Return type:

Awaitable[None]

metadata_push(metadata)

Initiate a metadata-push interaction.

Parameters:

metadata (bytes)

Return type:

Awaitable[None]

request_channel(payload, publisher=None, sending_done=None)

Initiate a request-channel interaction.

Parameters:
Return type:

BackpressureApi | Publisher

request_response(payload)

Initiate a request-response interaction.

Parameters:

payload (Payload)

Return type:

Awaitable[Payload]

request_stream(payload)

Initiate a request-stream interaction.

Parameters:

payload (Payload)

Return type:

BackpressureApi | Publisher

Client

class rsocket.rsocket_client.RSocketClient(transport_provider, handler_factory=<class 'rsocket.request_handler.BaseRequestHandler'>, honor_lease=False, lease_publisher=None, request_queue_size=0, data_encoding=WellKnownMimeTypes.APPLICATION_JSON, metadata_encoding=WellKnownMimeTypes.APPLICATION_JSON, keep_alive_period=datetime.timedelta(microseconds=500000), max_lifetime_period=datetime.timedelta(seconds=600), setup_payload=None, fragment_size_bytes=None)[source]

Client side instance of an RSocket connection.

Parameters:
  • transport_provider (AsyncGenerator[Transport, Any]) – Async generator which returns Transport to use with this instance.

  • request_queue_size (int) – Number of frames which can be queued while waiting for a lease.

  • fragment_size_bytes (int | None) – Minimum 64, Maximum depends on transport.

  • handler_factory (Callable[[RSocketBase], RequestHandler]) – Callable which returns the implemented application logic endpoints. See also RequestRouter

  • lease_publisher (Publisher | None)

  • data_encoding (str | bytes | WellKnownMimeTypes)

  • metadata_encoding (str | bytes | WellKnownMimeTypes)

  • keep_alive_period (timedelta)

  • max_lifetime_period (timedelta)

  • setup_payload (Payload | None)

fire_and_forget(payload)

Initiate a fire-and-forget interaction.

Parameters:

payload (Payload)

Return type:

Awaitable[None]

metadata_push(metadata)

Initiate a metadata-push interaction.

Parameters:

metadata (bytes)

Return type:

Awaitable[None]

request_channel(payload, publisher=None, sending_done=None)

Initiate a request-channel interaction.

Parameters:
Return type:

BackpressureApi | Publisher

request_response(payload)

Initiate a request-response interaction.

Parameters:

payload (Payload)

Return type:

Awaitable[Payload]

request_stream(payload)

Initiate a request-stream interaction.

Parameters:

payload (Payload)

Return type:

BackpressureApi | Publisher

Handler

class rsocket.request_handler.RequestHandler[source]

An interface which defines handler for all rsocket interactions, and some other events (e.g. on_setup).

abstract async on_metadata_push(metadata)[source]

Handle metadata-push request

Parameters:

metadata (Payload)

abstract async request_channel(payload)[source]

Bi-Directional communication. A publisher on each end is connected to a subscriber on the other end. Note that the first payload sent to the handler is passed as an argument to this method and not to the local subscriber.

Parameters:

payload (Payload)

Return type:

Tuple[Publisher | None, Subscriber | None]

abstract async request_fire_and_forget(payload)[source]

Handle fire-and-forget request

Parameters:

payload (Payload)

abstract async request_response(payload)[source]

Handle request-response interaction

Parameters:

payload (Payload)

Return type:

Awaitable[Payload]

abstract async request_stream(payload)[source]

Handle request-stream interaction

Parameters:

payload (Payload)

Return type:

Publisher

abstract async on_error(error_code, payload)[source]

Handle errors received from the remote side

Parameters:
  • error_code (ErrorCode)

  • payload (Payload)

abstract async on_keepalive_timeout(time_since_last_keepalive, rsocket)[source]

Handle keepalive timeout

Parameters:

time_since_last_keepalive (timedelta)

abstract async on_connection_error(rsocket, exception)[source]

Handle connection error

Parameters:

exception (Exception)

abstract async on_close(rsocket, exception=None)[source]

Handle connection closed

Parameters:

exception (Exception | None)

class rsocket.request_handler.BaseRequestHandler[source]

Default implementation of RequestHandler to simplify implementing handlers.

For each request handler, the implementation will raise a RuntimeError. For request_fire_and_forget() and on_metadata_push() the request will be ignored.

async on_setup(data_encoding, metadata_encoding, payload)[source]

Nothing to do on setup by default

Parameters:
  • data_encoding (bytes)

  • metadata_encoding (bytes)

  • payload (Payload)

async request_channel(payload)[source]

Raise RuntimeError by default if not implemented.

Parameters:

payload (Payload)

Return type:

Tuple[Publisher | None, Subscriber | None]

async request_fire_and_forget(payload)[source]

Ignored by default

Parameters:

payload (Payload)

async on_metadata_push(payload)[source]

Nothing by default

Parameters:

payload (Payload)

Enums

class rsocket.extensions.mimetypes.WellKnownMimeTypes(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Known mime types for data and metadata included in payloads.

Models

class rsocket.payload.Payload(data=None, metadata=None)[source]

A response/stream message (upstream or downstream). Contains data and metadata, both bytes.

Parameters:
  • data (bytes | bytearray | None) – data segment of payload

  • metadata (bytes | bytearray | None) – metadata segment of payload

Interfaces

Publisher

class reactivestreams.publisher.Publisher[source]

Handles event for subscription to a subscriber

Subscriber

class reactivestreams.subscriber.Subscriber[source]

Handles stream events.

Subscription

class reactivestreams.subscription.Subscription[source]

Backpressure stream control.

Transports

class rsocket.transports.transport.Transport[source]

Base class for all transports: