| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- """
- rpyc connection factories
- """
- import socket
- import thread, threading
- from rpyc import Connection, Channel, SocketStream, PipeStream, VoidService
- from rpyc.utils.registry import UDPRegistryClient
- from rpyc.lib import safe_import
- ssl = safe_import("ssl")
- class DiscoveryError(Exception):
- pass
- #------------------------------------------------------------------------------
- # API
- #------------------------------------------------------------------------------
- def connect_channel(channel, service = VoidService, config = {}):
- """creates a connection over a given channel
- channel - the channel to use
- service - the local service to expose (defaults to Void)
- config - configuration dict"""
- return Connection(service, channel, config = config)
- def connect_stream(stream, service = VoidService, config = {}):
- """creates a connection over a given stream
- stream - the stream to use
- service - the local service to expose (defaults to Void)
- config - configuration dict"""
- return connect_channel(Channel(stream), service = service, config = config)
- def connect_pipes(input, output, service = VoidService, config = {}):
- """creates a connection over the given input/output pipes
- input - the input pipe
- output - the output pipe
- service - the local service to expose (defaults to Void)
- config - configuration dict"""
- return connect_stream(PipeStream(input, output), service = service, config = config)
- def connect_stdpipes(service = VoidService, config = {}):
- """creates a connection over the standard input/output pipes
- service - the local service to expose (defaults to Void)
- config - configuration dict"""
- return connect_stream(PipeStream.from_std(), service = service, config = config)
- def connect(host, port, service = VoidService, config = {}):
- """creates a socket-connection to the given host
- host - the hostname to connect to
- port - the TCP port to use
- service - the local service to expose (defaults to Void)
- config - configuration dict"""
- return Connection(service, Channel(SocketStream.connect(host, port)), config = config)
- def tlslite_connect(host, port, username, password, service = VoidService, config = {}):
- """creates a TLS-connection to the given host (encrypted and authenticated)
- username - the username used to authenticate the client
- password - the password used to authenticate the client
- host - the hostname to connect to
- port - the TCP port to use
- service - the local service to expose (defaults to Void)
- config - configuration dict"""
- s = SocketStream.tlslite_connect(host, port, username, password)
- return Connection(service, Channel(s), config = config)
- def ssl_connect(host, port, keyfile = None, certfile = None, ca_certs = None,
- ssl_version = None, service = VoidService, config = {}):
- """creates an SSL-wrapped connection to the given host (encrypted and
- authenticated).
- host - the hostname to connect to
- port - the TCP port to use
- service - the local service to expose (defaults to Void)
- config - configuration dict
-
- keyfile, certfile, ca_certs, ssl_version -- arguments to ssl.wrap_socket.
- see that module's documentation for further info."""
- kwargs = {"server_side" : False}
- if keyfile:
- kwargs["keyfile"] = keyfile
- if certfile:
- kwargs["certfile"] = certfile
- if ca_certs:
- kwargs["ca_certs"] = ca_certs
- if ssl_version:
- kwargs["ssl_version"] = ssl_version
- else:
- kwargs["ssl_version"] = ssl.PROTOCOL_TLSv1
- s = SocketStream.ssl_connect(host, port, kwargs)
- return Connection(service, Channel(s), config = config)
- def discover(service_name, host = None, registrar = None, timeout = 2):
- """discovers hosts running the given service
- service_name - the service to look for
- host - limit the discovery to the given host only (None means any host)
- registrar - use this registry client to discover services. if None,
- use the default UDPRegistryClient with the default settings.
- timeout - the number of seconds to wait for a reply from the registry
- if no hosts are found, raises DiscoveryError
- returns a list of (ip, port) pairs
- """
- if registrar is None:
- registrar = UDPRegistryClient(timeout = timeout)
- addrs = registrar.discover(service_name)
- if not addrs:
- raise DiscoveryError("no servers exposing %r were found" % (service_name,))
- if host:
- ips = socket.gethostbyname_ex(host)[2]
- addrs = [(h, p) for h, p in addrs if h in ips]
- if not addrs:
- raise DiscoveryError("no servers exposing %r were found on %r" % (service_name, host))
- return addrs
- def connect_by_service(service_name, host = None, service = VoidService, config = {}):
- """create a connection to an arbitrary server that exposes the requested service
- service_name - the service to discover
- host - limit discovery to the given host only (None means any host)
- service - the local service to expose (defaults to Void)
- config - configuration dict"""
- host, port = discover(service_name, host = host)[0]
- return connect(host, port, service, config = config)
- def connect_subproc(args, service = VoidService, config = {}):
- """runs an rpyc server on a child process that and connects to it over
- the stdio pipes. uses the subprocess module.
- args - the args to Popen, e.g., ["python", "-u", "myfile.py"]
- service - the local service to expose (defaults to Void)
- config - configuration dict"""
- from subprocess import Popen, PIPE
- proc = Popen(args, stdin = PIPE, stdout = PIPE)
- conn = connect_pipes(proc.stdout, proc.stdin, service = service, config = config)
- conn.proc = proc # just so you can have control over the processs
- return conn
- def connect_thread(service = VoidService, config = {}, remote_service = VoidService, remote_config = {}):
- """starts an rpyc server on a thread and connects to it over a socket.
- service - the local service to expose (defaults to Void)
- config - configuration dict
- server_service - the remote service to expose (of the server; defaults to Void)
- server_config - remote configuration dict (of the server)
- """
- listener = socket.socket()
- listener.bind(("localhost", 0))
- listener.listen(1)
-
- def server(listener = listener):
- client = listener.accept()[0]
- listener.close()
- conn = connect_stream(SocketStream(client), service = remote_service,
- config = remote_config)
- try:
- conn.serve_all()
- except KeyboardInterrupt:
- thread.interrupt_main()
-
- t = threading.Thread(target = server)
- t.setDaemon(True)
- t.start()
- host, port = listener.getsockname()
- return connect(host, port, service = service, config = config)
|