Source code for reactivestreams.subscriber

from abc import ABCMeta, abstractmethod
from typing import Optional

from reactivestreams.subscription import Subscription


[docs] class Subscriber(metaclass=ABCMeta): """ Handles stream events. """ @abstractmethod def on_subscribe(self, subscription: Subscription): ... @abstractmethod def on_next(self, value, is_complete=False): ... @abstractmethod def on_error(self, exception: Exception): ... @abstractmethod def on_complete(self): ...
class DefaultSubscriber(Subscriber): def __init__(self, on_next=None, on_error=None, on_complete=None, on_subscribe=None): self._on_subscribe = on_subscribe self._on_complete = on_complete self._on_error = on_error self._on_next = on_next self.subscription: Optional[Subscription] = None def on_next(self, value, is_complete=False): if self._on_next is not None: self._on_next(value, is_complete) def on_error(self, exception: Exception): if self._on_error is not None: self._on_error(exception) def on_subscribe(self, subscription: Subscription): self.subscription = subscription if self._on_subscribe is not None: self._on_subscribe(subscription) def on_complete(self): if self._on_complete is not None: self._on_complete()