| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758 |
- """
- channel - an abstraction layer over streams that works with data frames
- (rather than bytes) and supports compression.
- Note: in order to avoid problems with all sorts of line-buffered transports,
- we deliberately add \\n at the end of each frame.
- note: unlike previous versions, this is no longer thread safe (thread-safety was
- moved up to the Connection class)
- """
- from rpyc.lib import safe_import
- from rpyc.lib.compat import Struct
- zlib = safe_import("zlib")
- # * 64 bit length field?
- # * separate \n into a FlushingChannel subclass?
- # * add thread safety as a subclass?
- class Channel(object):
- COMPRESSION_THRESHOLD = 3000
- COMPRESSION_LEVEL = 1
- FRAME_HEADER = Struct("!LB")
- FLUSHER = "\n" # cause any line-buffered layers below us to flush
- __slots__ = ["stream", "compress"]
-
- def __init__(self, stream, compress = True):
- self.stream = stream
- if not zlib:
- compress = False
- self.compress = compress
- def close(self):
- self.stream.close()
- @property
- def closed(self):
- return self.stream.closed
- def fileno(self):
- return self.stream.fileno()
- def poll(self, timeout):
- return self.stream.poll(timeout)
- def recv(self):
- header = self.stream.read(self.FRAME_HEADER.size)
- length, compressed = self.FRAME_HEADER.unpack(header)
- data = self.stream.read(length + len(self.FLUSHER))[:-len(self.FLUSHER)]
- if compressed:
- data = zlib.decompress(data)
- return data
- def send(self, data):
- if self.compress and len(data) > self.COMPRESSION_THRESHOLD:
- compressed = 1
- data = zlib.compress(data, self.COMPRESSION_LEVEL)
- else:
- compressed = 0
- header = self.FRAME_HEADER.pack(len(data), compressed)
- buf = header + data + self.FLUSHER
- self.stream.write(buf)
|