registry.py 12 KB


  1. """
  2. rpyc registry server implementation
  3. """
  4. import sys
  5. import socket
  6. import time
  7. import logging
  8. from rpyc.core import brine
  9. DEFAULT_PRUNING_TIMEOUT = 4 * 60
  10. MAX_DGRAM_SIZE = 1500
  11. REGISTRY_PORT = 18811
  12. #------------------------------------------------------------------------------
  13. # servers
  14. #------------------------------------------------------------------------------
  15. class RegistryServer(object):
  16. def __init__(self, listenersock, pruning_timeout = None, logger = None):
  17. self.sock = listenersock
  18. self.port = self.sock.getsockname()[1]
  19. self.active = False
  20. self.services = {}
  21. if pruning_timeout is None:
  22. pruning_timeout = DEFAULT_PRUNING_TIMEOUT
  23. self.pruning_timeout = pruning_timeout
  24. if logger is None:
  25. logger = self._get_logger()
  26. self.logger = logger
  27. def _get_logger(self):
  28. raise NotImplementedError()
  29. def on_service_added(self, name, addrinfo):
  30. """called when a new service joins the registry (but on keepalives).
  31. override this to add custom logic"""
  32. def on_service_removed(self, name, addrinfo):
  33. """called when a service unregisters or is pruned.
  34. override this to add custom logic"""
  35. def add_service(self, name, addrinfo):
  36. if name not in self.services:
  37. self.services[name] = {}
  38. is_new = addrinfo not in self.services
  39. self.services[name][addrinfo] = time.time()
  40. if is_new:
  41. try:
  42. self.on_service_added(name, addrinfo)
  43. except Exception:
  44. self.logger.exception('error executing service add callback')
  45. def remove_service(self, name, addrinfo):
  46. self.services[name].pop(addrinfo, None)
  47. if not self.services[name]:
  48. del self.services[name]
  49. try:
  50. self.on_service_removed(name, addrinfo)
  51. except Exception:
  52. self.logger.exception('error executing service remove callback')
  53. def cmd_query(self, host, name):
  54. name = name.upper()
  55. self.logger.debug("querying for %r", name)
  56. if name not in self.services:
  57. self.logger.debug("no such service")
  58. return ()
  59. oldest = time.time() - self.pruning_timeout
  60. all_servers = sorted(self.services[name].items(), key = lambda x: x[1])
  61. servers = []
  62. for addrinfo, t in all_servers:
  63. if t < oldest:
  64. self.logger.debug("discarding stale %s:%s", *addrinfo)
  65. self.remove_service(name, addrinfo)
  66. else:
  67. servers.append(addrinfo)
  68. self.logger.debug("replying with %r", servers)
  69. return tuple(servers)
  70. def cmd_register(self, host, names, port):
  71. self.logger.debug("registering %s:%s as %s", host, port, ", ".join(names))
  72. for name in names:
  73. self.add_service(name.upper(), (host, port))
  74. return "OK"
  75. def cmd_unregister(self, host, port):
  76. self.logger.debug("unregistering %s:%s", host, port)
  77. for name in self.services.keys():
  78. self.remove_service(name, (host, port))
  79. return "OK"
  80. def _recv(self):
  81. raise NotImplementedError()
  82. def _send(self, data, addrinfo):
  83. raise NotImplementedError()
  84. def _work(self):
  85. while self.active:
  86. try:
  87. data, addrinfo = self._recv()
  88. except (socket.error, socket.timeout):
  89. continue
  90. try:
  91. magic, cmd, args = brine.load(data)
  92. except Exception:
  93. continue
  94. if magic != "RPYC":
  95. self.logger.warn("invalid magic: %r", magic)
  96. continue
  97. cmdfunc = getattr(self, "cmd_%s" % (cmd.lower(),), None)
  98. if not cmdfunc:
  99. self.logger.warn("unknown command: %r", cmd)
  100. continue
  101. try:
  102. reply = cmdfunc(addrinfo[0], *args)
  103. except Exception:
  104. self.logger.exception('error executing function')
  105. else:
  106. self._send(brine.dump(reply), addrinfo)
  107. def start(self):
  108. if self.active:
  109. raise ValueError("server is already running")
  110. if self.sock is None:
  111. raise ValueError("object disposed")
  112. self.logger.debug("server started on %s:%s", *self.sock.getsockname())
  113. try:
  114. try:
  115. self.active = True
  116. self._work()
  117. except KeyboardInterrupt:
  118. self.logger.warn("User interrupt!")
  119. finally:
  120. self.active = False
  121. self.logger.debug("server closed")
  122. self.sock.close()
  123. self.sock = None
  124. def close(self):
  125. if not self.active:
  126. raise ValueError("server is not running")
  127. self.logger.debug("stopping server...")
  128. self.active = False
  129. class UDPRegistryServer(RegistryServer):
  130. def __init__(self, host = "0.0.0.0", port = REGISTRY_PORT,
  131. pruning_timeout = None, logger = None):
  132. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  133. sock.bind((host, port))
  134. sock.settimeout(0.5)
  135. RegistryServer.__init__(self, sock, pruning_timeout = pruning_timeout,
  136. logger = logger)
  137. def _get_logger(self):
  138. return logging.getLogger("REGSRV/UDP/%d" % (self.port,))
  139. def _recv(self):
  140. return self.sock.recvfrom(MAX_DGRAM_SIZE)
  141. def _send(self, data, addrinfo):
  142. try:
  143. self.sock.sendto(data, addrinfo)
  144. except (socket.error, socket.timeout):
  145. pass
  146. class TCPRegistryServer(RegistryServer):
  147. def __init__(self, host = "0.0.0.0", port = REGISTRY_PORT,
  148. pruning_timeout = None, logger = None, reuse_addr = True):
  149. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  150. if reuse_addr and sys.platform != "win32":
  151. # warning: reuseaddr is not what you expect on windows!
  152. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  153. sock.bind((host, port))
  154. sock.listen(10)
  155. sock.settimeout(0.5)
  156. RegistryServer.__init__(self, sock, pruning_timeout = pruning_timeout,
  157. logger = logger)
  158. self._connected_sockets = {}
  159. def _get_logger(self):
  160. return logging.getLogger("REGSRV/TCP/%d" % (self.port,))
  161. def _recv(self):
  162. sock2 = self.sock.accept()[0]
  163. addrinfo = sock2.getpeername()
  164. data = sock2.recv(MAX_DGRAM_SIZE)
  165. self._connected_sockets[addrinfo] = sock2
  166. return data, addrinfo
  167. def _send(self, data, addrinfo):
  168. sock2 = self._connected_sockets.pop(addrinfo)
  169. try:
  170. sock2.send(data)
  171. except (socket.error, socket.timeout):
  172. pass
  173. #------------------------------------------------------------------------------
  174. # clients (registrars)
  175. #------------------------------------------------------------------------------
  176. class RegistryClient(object):
  177. REREGISTER_INTERVAL = 60
  178. def __init__(self, ip, port, timeout, logger = None):
  179. self.ip = ip
  180. self.port = port
  181. self.timeout = timeout
  182. if logger is None:
  183. logger = self._get_logger()
  184. self.logger = logger
  185. def _get_logger(self):
  186. raise NotImplementedError()
  187. def discover(self, name):
  188. raise NotImplementedError()
  189. def register(self, aliases, port):
  190. raise NotImplementedError()
  191. def unregister(self, port):
  192. raise NotImplementedError()
  193. class UDPRegistryClient(RegistryClient):
  194. def __init__(self, ip = "255.255.255.255", port = REGISTRY_PORT, timeout = 2,
  195. bcast = None, logger = None):
  196. RegistryClient.__init__(self, ip = ip, port = port, timeout = timeout,
  197. logger = logger)
  198. if bcast is None:
  199. bcast = "255" in ip.split(".")
  200. self.bcast = bcast
  201. def _get_logger(self):
  202. return logging.getLogger('REGCLNT/UDP')
  203. def discover(self, name):
  204. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  205. if self.bcast:
  206. sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, True)
  207. data = brine.dump(("RPYC", "QUERY", (name,)))
  208. sock.sendto(data, (self.ip, self.port))
  209. sock.settimeout(self.timeout)
  210. try:
  211. try:
  212. data, _ = sock.recvfrom(MAX_DGRAM_SIZE)
  213. except (socket.error, socket.timeout):
  214. servers = ()
  215. else:
  216. servers = brine.load(data)
  217. finally:
  218. sock.close()
  219. return servers
  220. def register(self, aliases, port):
  221. self.logger.info("registering on %s:%s", self.ip, self.port)
  222. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  223. if self.bcast:
  224. sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, True)
  225. data = brine.dump(("RPYC", "REGISTER", (aliases, port)))
  226. sock.sendto(data, (self.ip, self.port))
  227. tmax = time.time() + self.timeout
  228. while time.time() < tmax:
  229. sock.settimeout(tmax - time.time())
  230. try:
  231. data, (rip, rport) = sock.recvfrom(MAX_DGRAM_SIZE)
  232. except socket.timeout:
  233. self.logger.warn("no registry acknowledged")
  234. break
  235. if rport != self.port:
  236. continue
  237. try:
  238. reply = brine.load(data)
  239. except Exception:
  240. continue
  241. if reply == "OK":
  242. self.logger.info("registry %s:%s acknowledged", rip, rport)
  243. break
  244. else:
  245. self.logger.warn("no registry acknowledged")
  246. sock.close()
  247. def unregister(self, port):
  248. self.logger.info("unregistering from %s:%s", self.ip, self.port)
  249. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  250. if self.bcast:
  251. sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, True)
  252. data = brine.dump(("RPYC", "UNREGISTER", (port,)))
  253. sock.sendto(data, (self.ip, self.port))
  254. sock.close()
  255. class TCPRegistryClient(RegistryClient):
  256. def __init__(self, ip, port = REGISTRY_PORT, timeout = 2, logger = None):
  257. RegistryClient.__init__(self, ip = ip, port = port, timeout = timeout,
  258. logger = logger)
  259. def _get_logger(self):
  260. return logging.getLogger('REGCLNT/TCP')
  261. def discover(self, name):
  262. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  263. sock.settimeout(self.timeout)
  264. data = brine.dump(("RPYC", "QUERY", (name,)))
  265. sock.connect((self.ip, self.port))
  266. sock.send(data)
  267. try:
  268. try:
  269. data = sock.recv(MAX_DGRAM_SIZE)
  270. except (socket.error, socket.timeout):
  271. servers = ()
  272. else:
  273. servers = brine.load(data)
  274. finally:
  275. sock.close()
  276. return servers
  277. def register(self, aliases, port):
  278. self.logger.info("registering on %s:%s", self.ip, self.port)
  279. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  280. sock.settimeout(self.timeout)
  281. data = brine.dump(("RPYC", "REGISTER", (aliases, port)))
  282. try:
  283. try:
  284. sock.connect((self.ip, self.port))
  285. sock.send(data)
  286. except (socket.error, socket.timeout):
  287. self.logger.warn("could not connect to registry")
  288. return
  289. try:
  290. data = sock.recv(MAX_DGRAM_SIZE)
  291. except socket.timeout:
  292. self.logger.warn("registry did not acknowledge")
  293. return
  294. try:
  295. reply = brine.load(data)
  296. except Exception:
  297. self.logger.warn("received corrupted data from registry")
  298. return
  299. if reply == "OK":
  300. self.logger.info("registry %s:%s acknowledged", self.ip, self.port)
  301. finally:
  302. sock.close()
  303. def unregister(self, port):
  304. self.logger.info("unregistering from %s:%s", self.ip, self.port)
  305. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  306. sock.settimeout(self.timeout)
  307. data = brine.dump(("RPYC", "UNREGISTER", (port,)))
  308. try:
  309. sock.connect((self.ip, self.port))
  310. sock.send(data)
  311. except (socket.error, socket.timeout):
  312. self.logger.warn("could not connect to registry")
  313. sock.close()