async.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. import time
  2. class AsyncResultTimeout(Exception):
  3. pass
  4. class AsyncResult(object):
  5. """AsyncResult is an object that represent a computation that occurs in
  6. the background and will eventually have a result. Use the .value property
  7. to access the result (which will block if the result has not yet arrived)
  8. """
  9. __slots__ = ["_conn", "_is_ready", "_is_exc", "_callbacks", "_obj", "_ttl"]
  10. def __init__(self, conn):
  11. self._conn = conn
  12. self._is_ready = False
  13. self._is_exc = None
  14. self._obj = None
  15. self._callbacks = []
  16. self._ttl = None
  17. def __repr__(self):
  18. if self._is_ready:
  19. state = "ready"
  20. elif self._is_exc:
  21. state = "error"
  22. elif self.expired:
  23. state = "expired"
  24. else:
  25. state = "pending"
  26. return "<AsyncResult object (%s) at 0x%08x>" % (state, id(self))
  27. def __call__(self, is_exc, obj):
  28. if self.expired:
  29. return
  30. self._is_ready = True
  31. self._is_exc = is_exc
  32. self._obj = obj
  33. for cb in self._callbacks:
  34. cb(self)
  35. del self._callbacks[:]
  36. def wait(self):
  37. """wait for the result to arrive. if the AsyncResult object has an
  38. expiry set, and the result does not arrive within that timeout,
  39. an AsyncResultTimeout exception is raised"""
  40. if self._is_ready:
  41. return
  42. if self._ttl is None:
  43. while not self._is_ready:
  44. self._conn.serve()
  45. else:
  46. while True:
  47. timeout = self._ttl - time.time()
  48. self._conn.poll(timeout = max(timeout, 0))
  49. if self._is_ready:
  50. break
  51. if timeout <= 0:
  52. raise AsyncResultTimeout("result expired")
  53. def add_callback(self, func):
  54. """adds a callback to be invoked when the result arrives. the
  55. callback function takes a single argument, which is the current
  56. AsyncResult (self)"""
  57. if self._is_ready:
  58. func(self)
  59. else:
  60. self._callbacks.append(func)
  61. def set_expiry(self, timeout):
  62. """set the expiry time (in seconds, relative to now) or None for
  63. unlimited time"""
  64. if timeout is None:
  65. self._ttl = None
  66. else:
  67. self._ttl = time.time() + timeout
  68. @property
  69. def ready(self):
  70. """a predicate of whether the result has arrived"""
  71. if self.expired:
  72. return False
  73. if not self._is_ready:
  74. self._conn.poll_all()
  75. return self._is_ready
  76. @property
  77. def error(self):
  78. """a predicate of whether the returned result is an exception"""
  79. if self.ready:
  80. return self._is_exc
  81. return False
  82. @property
  83. def expired(self):
  84. """a predicate of whether the async result has expired"""
  85. if self._is_ready or self._ttl is None:
  86. return False
  87. else:
  88. return time.time() > self._ttl
  89. @property
  90. def value(self):
  91. """returns the result of the operation. if the result has not yet
  92. arrived, accessing this property will wait for it. if the result does
  93. not arrive before the expiry time elapses, AsyncResultTimeout is
  94. raised. if the returned result is an exception, it will be raised here.
  95. otherwise, the result is returned directly."""
  96. self.wait()
  97. if self._is_exc:
  98. raise self._obj
  99. else:
  100. return self._obj