from abc import ABCMeta, abstractmethod
from datetime import timedelta
from typing import Tuple, Optional
from reactivestreams.publisher import Publisher
from reactivestreams.subscriber import Subscriber
from rsocket.error_codes import ErrorCode
from rsocket.extensions.composite_metadata import CompositeMetadata
from rsocket.local_typing import Awaitable
from rsocket.logger import logger
from rsocket.payload import Payload
[docs]
class RequestHandler(metaclass=ABCMeta):
"""
An interface which defines handler for all rsocket interactions, and some other events (e.g. on_setup).
"""
@abstractmethod
async def on_setup(self,
data_encoding: bytes,
metadata_encoding: bytes,
payload: Payload):
...
@abstractmethod
async def on_metadata_push(self, metadata: Payload):
...
[docs]
@abstractmethod
async def request_channel(self,
payload: Payload
) -> Tuple[Optional[Publisher], Optional[Subscriber]]:
"""
Bi-Directional communication. A publisher on each end is connected
to a subscriber on the other end. Note that the first payload sent to the handler is passed as
an argument to this method and not to the local subscriber.
"""
@abstractmethod
async def request_fire_and_forget(self, payload: Payload):
...
[docs]
@abstractmethod
async def request_response(self, payload: Payload) -> Awaitable[Payload]:
"""
Handle request-response interaction
"""
[docs]
@abstractmethod
async def request_stream(self, payload: Payload) -> Publisher:
"""
Handle request-stream interaction
"""
@abstractmethod
async def on_error(self, error_code: ErrorCode, payload: Payload):
...
@abstractmethod
async def on_keepalive_timeout(self,
time_since_last_keepalive: timedelta,
rsocket):
...
@abstractmethod
async def on_connection_error(self, rsocket, exception: Exception):
...
@abstractmethod
async def on_close(self, rsocket, exception: Optional[Exception] = None):
...
# noinspection PyMethodMayBeStatic
def _parse_composite_metadata(self, metadata: bytes) -> CompositeMetadata:
composite_metadata = CompositeMetadata()
composite_metadata.parse(metadata)
return composite_metadata
[docs]
class BaseRequestHandler(RequestHandler):
"""
Default implementation of :class:`RequestHandler <rsocket.request_handler.RequestHandler>` to simplify
implementing handlers.
For each request handler, the implementation will raise a RuntimeError. For :meth:`request_fire_and_forget` and
:meth:`on_metadata_push` the request will be ignored.
"""
[docs]
async def on_setup(self,
data_encoding: bytes,
metadata_encoding: bytes,
payload: Payload):
"""Nothing to do on setup by default"""
[docs]
async def request_channel(self, payload: Payload) -> Tuple[Optional[Publisher], Optional[Subscriber]]:
"""
Raise RuntimeError by default if not implemented.
"""
raise RuntimeError('Not implemented')
[docs]
async def request_fire_and_forget(self, payload: Payload):
"""Ignored by default"""
async def request_response(self, payload: Payload) -> Awaitable[Payload]:
raise RuntimeError('Not implemented')
async def request_stream(self, payload: Payload) -> Publisher:
raise RuntimeError('Not implemented')
async def on_error(self, error_code: ErrorCode, payload: Payload):
logger().error('Error handler: %s, %s', error_code.name, payload)
async def on_connection_error(self, rsocket, exception: Exception):
pass
async def on_close(self, rsocket, exception: Optional[Exception] = None):
pass
async def on_keepalive_timeout(self,
time_since_last_keepalive: timedelta,
rsocket):
pass