server.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. """
  2. rpyc plug-in server (threaded or forking)
  3. """
  4. import sys
  5. import os
  6. import socket
  7. import time
  8. import threading
  9. import errno
  10. import logging
  11. from rpyc.core import SocketStream, Channel, Connection
  12. from rpyc.utils.registry import UDPRegistryClient
  13. from rpyc.utils.authenticators import AuthenticationError
  14. from rpyc.lib import safe_import
  15. signal = safe_import("signal")
  16. class Server(object):
  17. def __init__(self, service, hostname = "0.0.0.0", port = 0, backlog = 10,
  18. reuse_addr = True, authenticator = None, registrar = None,
  19. auto_register = True, protocol_config = {}, logger = None):
  20. self.active = False
  21. self._closed = False
  22. self.service = service
  23. self.authenticator = authenticator
  24. self.backlog = backlog
  25. self.auto_register = auto_register
  26. self.protocol_config = protocol_config
  27. self.clients = set()
  28. self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  29. if reuse_addr and sys.platform != "win32":
  30. # warning: reuseaddr is not what you expect on windows!
  31. self.listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  32. self.listener.bind((hostname, port))
  33. self.port = self.listener.getsockname()[1]
  34. if logger is None:
  35. logger = self._get_logger()
  36. self.logger = logger
  37. if registrar is None:
  38. registrar = UDPRegistryClient(logger = self.logger)
  39. self.registrar = registrar
  40. def _get_logger(self):
  41. return logging.getLogger("%s/%d" % (self.service.get_service_name(), self.port))
  42. def close(self):
  43. if self._closed:
  44. return
  45. self._closed = True
  46. self.active = False
  47. if self.auto_register:
  48. try:
  49. self.registrar.unregister(self.port)
  50. except Exception:
  51. self.logger.exception("error unregistering services")
  52. self.listener.close()
  53. self.logger.info("listener closed")
  54. for c in set(self.clients):
  55. try:
  56. c.shutdown(socket.SHUT_RDWR)
  57. except Exception:
  58. pass
  59. c.close()
  60. self.clients.clear()
  61. def fileno(self):
  62. return self.listener.fileno()
  63. def accept(self):
  64. while True:
  65. try:
  66. sock, (h, p) = self.listener.accept()
  67. except socket.timeout:
  68. pass
  69. except socket.error:
  70. ex = sys.exc_info()[1]
  71. if ex[0] == errno.EINTR:
  72. pass
  73. else:
  74. raise EOFError()
  75. else:
  76. break
  77. sock.setblocking(True)
  78. self.logger.info("accepted %s:%s", h, p)
  79. self.clients.add(sock)
  80. self._accept_method(sock)
  81. def _accept_method(self, sock):
  82. """this method should start a thread, fork a child process, or
  83. anything else in order to serve the client. once the mechanism has
  84. been created, it should invoke _authenticate_and_serve_client with
  85. `sock` as the argument"""
  86. raise NotImplementedError
  87. def _authenticate_and_serve_client(self, sock):
  88. try:
  89. if self.authenticator:
  90. h, p = sock.getpeername()
  91. try:
  92. sock, credentials = self.authenticator(sock)
  93. except AuthenticationError:
  94. self.logger.info("%s:%s failed to authenticate, rejecting connection", h, p)
  95. return
  96. else:
  97. self.logger.info("%s:%s authenticated successfully", h, p)
  98. else:
  99. credentials = None
  100. try:
  101. self._serve_client(sock, credentials)
  102. except Exception:
  103. self.logger.exception("client connection terminated abruptly")
  104. raise
  105. finally:
  106. try:
  107. sock.shutdown(socket.SHUT_RDWR)
  108. except Exception:
  109. pass
  110. sock.close()
  111. self.clients.discard(sock)
  112. def _serve_client(self, sock, credentials):
  113. h, p = sock.getpeername()
  114. if credentials:
  115. self.logger.info("welcome %s:%s (%r)", h, p, credentials)
  116. else:
  117. self.logger.info("welcome %s:%s", h, p)
  118. try:
  119. config = dict(self.protocol_config, credentials = credentials)
  120. conn = Connection(self.service, Channel(SocketStream(sock)),
  121. config = config, _lazy = True)
  122. conn._init_service()
  123. conn.serve_all()
  124. finally:
  125. self.logger.info("goodbye %s:%s", h, p)
  126. def _bg_register(self):
  127. interval = self.registrar.REREGISTER_INTERVAL
  128. self.logger.info("started background auto-register thread "
  129. "(interval = %s)", interval)
  130. tnext = 0
  131. try:
  132. while self.active:
  133. t = time.time()
  134. if t >= tnext:
  135. tnext = t + interval
  136. try:
  137. self.registrar.register(self.service.get_service_aliases(),
  138. self.port)
  139. except Exception:
  140. self.logger.exception("error registering services")
  141. time.sleep(1)
  142. finally:
  143. if not self._closed:
  144. self.logger.info("background auto-register thread finished")
  145. def start(self):
  146. """starts the server. use close() to stop"""
  147. self.listener.listen(self.backlog)
  148. h, p = self.listener.getsockname()
  149. self.logger.info("server started on %s:%s", h, p)
  150. self.active = True
  151. if self.auto_register:
  152. t = threading.Thread(target = self._bg_register)
  153. t.setDaemon(True)
  154. t.start()
  155. #if sys.platform == "win32":
  156. # hack so we can receive Ctrl+C on windows
  157. self.listener.settimeout(0.5)
  158. try:
  159. try:
  160. while True:
  161. self.accept()
  162. except EOFError:
  163. pass # server closed by another thread
  164. except KeyboardInterrupt:
  165. print("")
  166. self.logger.warn("keyboard interrupt!")
  167. finally:
  168. self.logger.info("server has terminated")
  169. self.close()
  170. class ThreadedServer(Server):
  171. def _accept_method(self, sock):
  172. t = threading.Thread(target = self._authenticate_and_serve_client, args = (sock,))
  173. t.setDaemon(True)
  174. t.start()
  175. class ForkingServer(Server):
  176. def __init__(self, *args, **kwargs):
  177. if not signal:
  178. raise OSError("ForkingServer not supported on this platform")
  179. Server.__init__(self, *args, **kwargs)
  180. # setup sigchld handler
  181. self._prevhandler = signal.signal(signal.SIGCHLD, self._handle_sigchld)
  182. def close(self):
  183. Server.close(self)
  184. signal.signal(signal.SIGCHLD, self._prevhandler)
  185. @classmethod
  186. def _handle_sigchld(cls, signum, unused):
  187. try:
  188. while True:
  189. pid, dummy = os.waitpid(-1, os.WNOHANG)
  190. if pid <= 0:
  191. break
  192. except OSError:
  193. pass
  194. # re-register signal handler (see man signal(2), under Portability)
  195. signal.signal(signal.SIGCHLD, cls._handle_sigchld)
  196. def _accept_method(self, sock):
  197. pid = os.fork()
  198. if pid == 0:
  199. # child
  200. try:
  201. try:
  202. self.logger.debug("child process created")
  203. signal.signal(signal.SIGCHLD, self._prevhandler)
  204. self.listener.close()
  205. self.clients.clear()
  206. self._authenticate_and_serve_client(sock)
  207. except:
  208. self.logger.exception("child process terminated abnormally")
  209. else:
  210. self.logger.debug("child process terminated")
  211. finally:
  212. self.logger.debug("child terminated")
  213. os._exit(0)
  214. else:
  215. # parent
  216. sock.close()