Source code for rsocket.reactivex.reactivex_handler

from abc import abstractmethod
from datetime import timedelta
from typing import Optional, Union, Callable

from reactivex import Observable, Subject

from rsocket.error_codes import ErrorCode
from rsocket.extensions.composite_metadata import CompositeMetadata
from rsocket.logger import logger
from rsocket.payload import Payload
from rsocket.reactivex.reactivex_channel import ReactivexChannel


[docs] class ReactivexHandler: """ Variant of :class:`RequestHandler <rsocket.request_handler.RequestHandler>` which uses Reactivex (4.0). Wrap with :meth:`reactivex_handler_factory <rsocket.reactivex.reactivex_handler_adapter.reactivex_handler_factory>` to pass as a request handler """
[docs] @abstractmethod async def on_setup(self, data_encoding: bytes, metadata_encoding: bytes, payload: Payload): """ Handle setup request """
[docs] @abstractmethod async def on_metadata_push(self, metadata: Payload): """ Handle metadata-push request """
[docs] @abstractmethod async def request_channel(self, payload: Payload) -> ReactivexChannel: """ Handle request-channel interaction """
[docs] @abstractmethod async def request_fire_and_forget(self, payload: Payload): """ Handle request-fire-and-forget interaction """
[docs] @abstractmethod async def request_response(self, payload: Payload) -> Observable: """ Handle request-response interaction """
[docs] @abstractmethod async def request_stream(self, payload: Payload) -> Union[Observable, Callable[[Subject], Observable]]: """ Handle request-stream interaction """
[docs] @abstractmethod async def on_error(self, error_code: ErrorCode, payload: Payload): """ Handle errors received from the remote side """
@abstractmethod async def on_keepalive_timeout(self, time_since_last_keepalive: timedelta, rsocket): ...
[docs] @abstractmethod async def on_connection_error(self, rsocket, exception: Exception): """ Handle connection error """
[docs] @abstractmethod async def on_close(self, rsocket, exception: Optional[Exception] = None): """ Handle connection closed """
# noinspection PyMethodMayBeStatic def _parse_composite_metadata(self, metadata: bytes) -> CompositeMetadata: return CompositeMetadata().parse(metadata)
class BaseReactivexHandler(ReactivexHandler): async def on_setup(self, data_encoding: bytes, metadata_encoding: bytes, payload: Payload): """Nothing to do on setup by default""" async def on_metadata_push(self, metadata: Payload): """Nothing by default""" async def request_channel(self, payload: Payload) -> ReactivexChannel: raise RuntimeError('Not implemented') async def request_fire_and_forget(self, payload: Payload): """The requester isn't listening for errors. Nothing to do.""" async def request_response(self, payload: Payload) -> Observable: raise RuntimeError('Not implemented') async def request_stream(self, payload: Payload) -> Observable: raise RuntimeError('Not implemented') async def on_error(self, error_code: ErrorCode, payload: Payload): logger().error('Error handler: %s, %s', error_code.name, payload) async def on_keepalive_timeout(self, time_since_last_keepalive: timedelta, rsocket): pass async def on_close(self, rsocket, exception: Optional[Exception] = None): pass async def on_connection_error(self, rsocket, exception: Exception): pass