helpers.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. """
  2. helpers and wrappers for common rpyc tasks
  3. """
  4. import time
  5. import threading
  6. from rpyc.lib.colls import WeakValueDict
  7. from rpyc.lib.compat import callable
  8. from rpyc.core.consts import HANDLE_BUFFITER, HANDLE_CALL
  9. from rpyc.core.netref import syncreq, asyncreq
  10. def buffiter(obj, chunk = 10, max_chunk = 1000, factor = 2):
  11. """buffering iterator - reads the remote iterator in chunks starting with
  12. `chunk` up to `max_chunk`, multiplying by `factor` as an exponential
  13. backoff"""
  14. if factor < 1:
  15. raise ValueError("factor must be >= 1, got %r" % (factor,))
  16. it = iter(obj)
  17. count = chunk
  18. while True:
  19. items = syncreq(it, HANDLE_BUFFITER, count)
  20. count = min(count * factor, max_chunk)
  21. if not items:
  22. break
  23. for elem in items:
  24. yield elem
  25. class _Async(object):
  26. """creates an async proxy wrapper over an existing proxy. async proxies
  27. are cached. invoking an async proxy will return an AsyncResult instead of
  28. blocking"""
  29. __slots__ = ("proxy", "__weakref__")
  30. def __init__(self, proxy):
  31. self.proxy = proxy
  32. def __call__(self, *args, **kwargs):
  33. return asyncreq(self.proxy, HANDLE_CALL, args, tuple(kwargs.items()))
  34. def __repr__(self):
  35. return "async(%r)" % (self.proxy,)
  36. _async_proxies_cache = WeakValueDict()
  37. def async(proxy):
  38. pid = id(proxy)
  39. if pid in _async_proxies_cache:
  40. return _async_proxies_cache[pid]
  41. if not hasattr(proxy, "____conn__") or not hasattr(proxy, "____oid__"):
  42. raise TypeError("'proxy' must be a Netref: %r", (proxy,))
  43. if not callable(proxy):
  44. raise TypeError("'proxy' must be callable: %r" % (proxy,))
  45. caller = _Async(proxy)
  46. _async_proxies_cache[id(caller)] = _async_proxies_cache[pid] = caller
  47. return caller
  48. async.__doc__ = _Async.__doc__
  49. class timed(object):
  50. """creates a timed asynchronous proxy. invoking the timed proxy will
  51. run in the background and will raise an AsyncResultTimeout exception
  52. if the computation does not terminate within the given timeout"""
  53. __slots__ = ("__weakref__", "proxy", "timeout")
  54. def __init__(self, proxy, timeout):
  55. self.proxy = async(proxy)
  56. self.timeout = timeout
  57. def __call__(self, *args, **kwargs):
  58. res = self.proxy(*args, **kwargs)
  59. res.set_expiry(self.timeout)
  60. return res
  61. def __repr__(self):
  62. return "timed(%r, %r)" % (self.proxy.proxy, self.timeout)
  63. class BgServingThread(object):
  64. """runs an RPyC server in the background to serve all requests and replies
  65. that arrive on the given RPyC connection. the thread is created along with
  66. the object; you can use the stop() method to stop the server thread"""
  67. # these numbers are magical...
  68. SERVE_INTERVAL = 0.0
  69. SLEEP_INTERVAL = 0.1
  70. def __init__(self, conn):
  71. self._conn = conn
  72. self._thread = threading.Thread(target = self._bg_server)
  73. self._thread.setDaemon(True)
  74. self._active = True
  75. self._thread.start()
  76. def __del__(self):
  77. if self._active:
  78. self.stop()
  79. def _bg_server(self):
  80. try:
  81. while self._active:
  82. self._conn.serve(self.SERVE_INTERVAL)
  83. time.sleep(self.SLEEP_INTERVAL) # to reduce contention
  84. except Exception:
  85. if self._active:
  86. raise
  87. def stop(self):
  88. """stop the server thread. once stopped, it cannot be resumed. you will
  89. have to create a new BgServingThread object later."""
  90. assert self._active
  91. self._active = False
  92. self._thread.join()
  93. self._conn = None