""" channel - an abstraction layer over streams that works with data frames (rather than bytes) and supports compression. Note: in order to avoid problems with all sorts of line-buffered transports, we deliberately add \\n at the end of each frame. note: unlike previous versions, this is no longer thread safe (thread-safety was moved up to the Connection class) """ from rpyc.lib import safe_import from rpyc.lib.compat import Struct zlib = safe_import("zlib") # * 64 bit length field? # * separate \n into a FlushingChannel subclass? # * add thread safety as a subclass? class Channel(object): COMPRESSION_THRESHOLD = 3000 COMPRESSION_LEVEL = 1 FRAME_HEADER = Struct("!LB") FLUSHER = "\n" # cause any line-buffered layers below us to flush __slots__ = ["stream", "compress"] def __init__(self, stream, compress = True): self.stream = stream if not zlib: compress = False self.compress = compress def close(self): self.stream.close() @property def closed(self): return self.stream.closed def fileno(self): return self.stream.fileno() def poll(self, timeout): return self.stream.poll(timeout) def recv(self): header = self.stream.read(self.FRAME_HEADER.size) length, compressed = self.FRAME_HEADER.unpack(header) data = self.stream.read(length + len(self.FLUSHER))[:-len(self.FLUSHER)] if compressed: data = zlib.decompress(data) return data def send(self, data): if self.compress and len(data) > self.COMPRESSION_THRESHOLD: compressed = 1 data = zlib.compress(data, self.COMPRESSION_LEVEL) else: compressed = 0 header = self.FRAME_HEADER.pack(len(data), compressed) buf = header + data + self.FLUSHER self.stream.write(buf)