channel.py 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. """
  2. channel - an abstraction layer over streams that works with data frames
  3. (rather than bytes) and supports compression.
  4. Note: in order to avoid problems with all sorts of line-buffered transports,
  5. we deliberately add \\n at the end of each frame.
  6. note: unlike previous versions, this is no longer thread safe (thread-safety was
  7. moved up to the Connection class)
  8. """
  9. from rpyc.lib import safe_import
  10. from rpyc.lib.compat import Struct
  11. zlib = safe_import("zlib")
  12. # * 64 bit length field?
  13. # * separate \n into a FlushingChannel subclass?
  14. # * add thread safety as a subclass?
  15. class Channel(object):
  16. COMPRESSION_THRESHOLD = 3000
  17. COMPRESSION_LEVEL = 1
  18. FRAME_HEADER = Struct("!LB")
  19. FLUSHER = "\n" # cause any line-buffered layers below us to flush
  20. __slots__ = ["stream", "compress"]
  21. def __init__(self, stream, compress = True):
  22. self.stream = stream
  23. if not zlib:
  24. compress = False
  25. self.compress = compress
  26. def close(self):
  27. self.stream.close()
  28. @property
  29. def closed(self):
  30. return self.stream.closed
  31. def fileno(self):
  32. return self.stream.fileno()
  33. def poll(self, timeout):
  34. return self.stream.poll(timeout)
  35. def recv(self):
  36. header = self.stream.read(self.FRAME_HEADER.size)
  37. length, compressed = self.FRAME_HEADER.unpack(header)
  38. data = self.stream.read(length + len(self.FLUSHER))[:-len(self.FLUSHER)]
  39. if compressed:
  40. data = zlib.decompress(data)
  41. return data
  42. def send(self, data):
  43. if self.compress and len(data) > self.COMPRESSION_THRESHOLD:
  44. compressed = 1
  45. data = zlib.compress(data, self.COMPRESSION_LEVEL)
  46. else:
  47. compressed = 0
  48. header = self.FRAME_HEADER.pack(len(data), compressed)
  49. buf = header + data + self.FLUSHER
  50. self.stream.write(buf)