| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376 |
- """
- rpyc registry server implementation
- """
- import sys
- import socket
- import time
- import logging
- from rpyc.core import brine
- DEFAULT_PRUNING_TIMEOUT = 4 * 60
- MAX_DGRAM_SIZE = 1500
- REGISTRY_PORT = 18811
- #------------------------------------------------------------------------------
- # servers
- #------------------------------------------------------------------------------
- class RegistryServer(object):
- def __init__(self, listenersock, pruning_timeout = None, logger = None):
- self.sock = listenersock
- self.port = self.sock.getsockname()[1]
- self.active = False
- self.services = {}
- if pruning_timeout is None:
- pruning_timeout = DEFAULT_PRUNING_TIMEOUT
- self.pruning_timeout = pruning_timeout
- if logger is None:
- logger = self._get_logger()
- self.logger = logger
-
- def _get_logger(self):
- raise NotImplementedError()
-
- def on_service_added(self, name, addrinfo):
- """called when a new service joins the registry (but on keepalives).
- override this to add custom logic"""
-
- def on_service_removed(self, name, addrinfo):
- """called when a service unregisters or is pruned.
- override this to add custom logic"""
-
- def add_service(self, name, addrinfo):
- if name not in self.services:
- self.services[name] = {}
- is_new = addrinfo not in self.services
- self.services[name][addrinfo] = time.time()
- if is_new:
- try:
- self.on_service_added(name, addrinfo)
- except Exception:
- self.logger.exception('error executing service add callback')
-
- def remove_service(self, name, addrinfo):
- self.services[name].pop(addrinfo, None)
- if not self.services[name]:
- del self.services[name]
- try:
- self.on_service_removed(name, addrinfo)
- except Exception:
- self.logger.exception('error executing service remove callback')
-
- def cmd_query(self, host, name):
- name = name.upper()
- self.logger.debug("querying for %r", name)
- if name not in self.services:
- self.logger.debug("no such service")
- return ()
-
- oldest = time.time() - self.pruning_timeout
- all_servers = sorted(self.services[name].items(), key = lambda x: x[1])
- servers = []
- for addrinfo, t in all_servers:
- if t < oldest:
- self.logger.debug("discarding stale %s:%s", *addrinfo)
- self.remove_service(name, addrinfo)
- else:
- servers.append(addrinfo)
-
- self.logger.debug("replying with %r", servers)
- return tuple(servers)
-
- def cmd_register(self, host, names, port):
- self.logger.debug("registering %s:%s as %s", host, port, ", ".join(names))
- for name in names:
- self.add_service(name.upper(), (host, port))
- return "OK"
-
- def cmd_unregister(self, host, port):
- self.logger.debug("unregistering %s:%s", host, port)
- for name in self.services.keys():
- self.remove_service(name, (host, port))
- return "OK"
-
- def _recv(self):
- raise NotImplementedError()
-
- def _send(self, data, addrinfo):
- raise NotImplementedError()
-
- def _work(self):
- while self.active:
- try:
- data, addrinfo = self._recv()
- except (socket.error, socket.timeout):
- continue
- try:
- magic, cmd, args = brine.load(data)
- except Exception:
- continue
- if magic != "RPYC":
- self.logger.warn("invalid magic: %r", magic)
- continue
- cmdfunc = getattr(self, "cmd_%s" % (cmd.lower(),), None)
- if not cmdfunc:
- self.logger.warn("unknown command: %r", cmd)
- continue
-
- try:
- reply = cmdfunc(addrinfo[0], *args)
- except Exception:
- self.logger.exception('error executing function')
- else:
- self._send(brine.dump(reply), addrinfo)
-
- def start(self):
- if self.active:
- raise ValueError("server is already running")
- if self.sock is None:
- raise ValueError("object disposed")
- self.logger.debug("server started on %s:%s", *self.sock.getsockname())
- try:
- try:
- self.active = True
- self._work()
- except KeyboardInterrupt:
- self.logger.warn("User interrupt!")
- finally:
- self.active = False
- self.logger.debug("server closed")
- self.sock.close()
- self.sock = None
-
- def close(self):
- if not self.active:
- raise ValueError("server is not running")
- self.logger.debug("stopping server...")
- self.active = False
- class UDPRegistryServer(RegistryServer):
- def __init__(self, host = "0.0.0.0", port = REGISTRY_PORT,
- pruning_timeout = None, logger = None):
-
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- sock.bind((host, port))
- sock.settimeout(0.5)
- RegistryServer.__init__(self, sock, pruning_timeout = pruning_timeout,
- logger = logger)
-
- def _get_logger(self):
- return logging.getLogger("REGSRV/UDP/%d" % (self.port,))
-
- def _recv(self):
- return self.sock.recvfrom(MAX_DGRAM_SIZE)
-
- def _send(self, data, addrinfo):
- try:
- self.sock.sendto(data, addrinfo)
- except (socket.error, socket.timeout):
- pass
- class TCPRegistryServer(RegistryServer):
- def __init__(self, host = "0.0.0.0", port = REGISTRY_PORT,
- pruning_timeout = None, logger = None, reuse_addr = True):
-
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- if reuse_addr and sys.platform != "win32":
- # warning: reuseaddr is not what you expect on windows!
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock.bind((host, port))
- sock.listen(10)
- sock.settimeout(0.5)
- RegistryServer.__init__(self, sock, pruning_timeout = pruning_timeout,
- logger = logger)
- self._connected_sockets = {}
- def _get_logger(self):
- return logging.getLogger("REGSRV/TCP/%d" % (self.port,))
-
- def _recv(self):
- sock2 = self.sock.accept()[0]
- addrinfo = sock2.getpeername()
- data = sock2.recv(MAX_DGRAM_SIZE)
- self._connected_sockets[addrinfo] = sock2
- return data, addrinfo
-
- def _send(self, data, addrinfo):
- sock2 = self._connected_sockets.pop(addrinfo)
- try:
- sock2.send(data)
- except (socket.error, socket.timeout):
- pass
- #------------------------------------------------------------------------------
- # clients (registrars)
- #------------------------------------------------------------------------------
- class RegistryClient(object):
- REREGISTER_INTERVAL = 60
-
- def __init__(self, ip, port, timeout, logger = None):
- self.ip = ip
- self.port = port
- self.timeout = timeout
- if logger is None:
- logger = self._get_logger()
- self.logger = logger
- def _get_logger(self):
- raise NotImplementedError()
- def discover(self, name):
- raise NotImplementedError()
-
- def register(self, aliases, port):
- raise NotImplementedError()
-
- def unregister(self, port):
- raise NotImplementedError()
- class UDPRegistryClient(RegistryClient):
- def __init__(self, ip = "255.255.255.255", port = REGISTRY_PORT, timeout = 2,
- bcast = None, logger = None):
- RegistryClient.__init__(self, ip = ip, port = port, timeout = timeout,
- logger = logger)
- if bcast is None:
- bcast = "255" in ip.split(".")
- self.bcast = bcast
- def _get_logger(self):
- return logging.getLogger('REGCLNT/UDP')
-
- def discover(self, name):
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- if self.bcast:
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, True)
- data = brine.dump(("RPYC", "QUERY", (name,)))
- sock.sendto(data, (self.ip, self.port))
- sock.settimeout(self.timeout)
-
- try:
- try:
- data, _ = sock.recvfrom(MAX_DGRAM_SIZE)
- except (socket.error, socket.timeout):
- servers = ()
- else:
- servers = brine.load(data)
- finally:
- sock.close()
- return servers
-
- def register(self, aliases, port):
- self.logger.info("registering on %s:%s", self.ip, self.port)
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- if self.bcast:
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, True)
- data = brine.dump(("RPYC", "REGISTER", (aliases, port)))
- sock.sendto(data, (self.ip, self.port))
-
- tmax = time.time() + self.timeout
- while time.time() < tmax:
- sock.settimeout(tmax - time.time())
- try:
- data, (rip, rport) = sock.recvfrom(MAX_DGRAM_SIZE)
- except socket.timeout:
- self.logger.warn("no registry acknowledged")
- break
- if rport != self.port:
- continue
- try:
- reply = brine.load(data)
- except Exception:
- continue
- if reply == "OK":
- self.logger.info("registry %s:%s acknowledged", rip, rport)
- break
- else:
- self.logger.warn("no registry acknowledged")
- sock.close()
-
- def unregister(self, port):
- self.logger.info("unregistering from %s:%s", self.ip, self.port)
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- if self.bcast:
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, True)
- data = brine.dump(("RPYC", "UNREGISTER", (port,)))
- sock.sendto(data, (self.ip, self.port))
- sock.close()
- class TCPRegistryClient(RegistryClient):
- def __init__(self, ip, port = REGISTRY_PORT, timeout = 2, logger = None):
- RegistryClient.__init__(self, ip = ip, port = port, timeout = timeout,
- logger = logger)
- def _get_logger(self):
- return logging.getLogger('REGCLNT/TCP')
-
- def discover(self, name):
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.settimeout(self.timeout)
- data = brine.dump(("RPYC", "QUERY", (name,)))
- sock.connect((self.ip, self.port))
- sock.send(data)
-
- try:
- try:
- data = sock.recv(MAX_DGRAM_SIZE)
- except (socket.error, socket.timeout):
- servers = ()
- else:
- servers = brine.load(data)
- finally:
- sock.close()
- return servers
-
- def register(self, aliases, port):
- self.logger.info("registering on %s:%s", self.ip, self.port)
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.settimeout(self.timeout)
- data = brine.dump(("RPYC", "REGISTER", (aliases, port)))
-
- try:
- try:
- sock.connect((self.ip, self.port))
- sock.send(data)
- except (socket.error, socket.timeout):
- self.logger.warn("could not connect to registry")
- return
- try:
- data = sock.recv(MAX_DGRAM_SIZE)
- except socket.timeout:
- self.logger.warn("registry did not acknowledge")
- return
- try:
- reply = brine.load(data)
- except Exception:
- self.logger.warn("received corrupted data from registry")
- return
- if reply == "OK":
- self.logger.info("registry %s:%s acknowledged", self.ip, self.port)
- finally:
- sock.close()
-
- def unregister(self, port):
- self.logger.info("unregistering from %s:%s", self.ip, self.port)
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.settimeout(self.timeout)
- data = brine.dump(("RPYC", "UNREGISTER", (port,)))
- try:
- sock.connect((self.ip, self.port))
- sock.send(data)
- except (socket.error, socket.timeout):
- self.logger.warn("could not connect to registry")
- sock.close()
|