Source code for rsocket.transports.tcp

from asyncio import StreamReader, StreamWriter

from rsocket.frame import Frame, serialize_prefix_with_frame_size_header
from rsocket.helpers import wrap_transport_exception
from rsocket.transports.transport import Transport


[docs] class TransportTCP(Transport): """ RSocket transport over asyncio TCP connection. :param reader: asyncio connection reader stream :param writer: asyncio connection writer stream """ def __init__(self, reader: StreamReader, writer: StreamWriter, read_buffer_size=1024): super().__init__() self._read_buffer_size = read_buffer_size self._writer = writer self._reader = reader async def send_frame(self, frame: Frame): await self.serialize_partial(frame) async def serialize_partial(self, frame: Frame): with wrap_transport_exception(): self._writer.write(serialize_prefix_with_frame_size_header(frame)) frame.write_data_metadata(self._writer.write) await self._writer.drain() async def on_send_queue_empty(self): with wrap_transport_exception(): await self._writer.drain() async def close(self): self._writer.close() await self._writer.wait_closed() async def next_frame_generator(self): with wrap_transport_exception(): data = await self._reader.read(self._read_buffer_size) if not data: self._writer.close() return return self._frame_parser.receive_data(data) def requires_length_header(self) -> bool: return True