| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354 |
- """
- abstraction layer over OS-depenedent byte streams
- """
- import sys
- import os
- import socket
- import time
- import errno
- from rpyc.lib import safe_import
- from rpyc.lib.compat import select
- win32file = safe_import("win32file")
- win32pipe = safe_import("win32pipe")
- msvcrt = safe_import("msvcrt")
- ssl = safe_import("ssl")
- tlsapi = safe_import("tlslite.api")
- retry_errnos = set([errno.EAGAIN])
- if hasattr(errno, "WSAEWOULDBLOCK"):
- retry_errnos.add(errno.WSAEWOULDBLOCK)
- class Stream(object):
- __slots__ = ()
- def close(self):
- raise NotImplementedError()
- @property
- def closed(self):
- raise NotImplementedError()
- def fileno(self):
- raise NotImplementedError()
- def poll(self, timeout):
- """indicate whether the stream has data to read"""
- rl, _, _ = select([self], [], [], timeout)
- return bool(rl)
- def read(self, count):
- """read exactly `count` bytes, or raise EOFError"""
- raise NotImplementedError()
- def write(self, data):
- """write the entire `data`, or raise EOFError"""
- raise NotImplementedError()
- class ClosedFile(object):
- """represents a closed file object (singleton)"""
- __slots__ = ()
- def __getattr__(self, name):
- raise EOFError("stream has been closed")
- def close(self):
- pass
- @property
- def closed(self):
- return True
- def fileno(self):
- raise EOFError("stream has been closed")
- ClosedFile = ClosedFile()
- class SocketStream(Stream):
- __slots__ = ("sock",)
- MAX_IO_CHUNK = 8000
- def __init__(self, sock):
- self.sock = sock
- @classmethod
- def _connect(cls, host, port, family = socket.AF_INET, socktype = socket.SOCK_STREAM,
- proto = 0, timeout = 3, nodelay = False):
- s = socket.socket(family, socktype, proto)
- s.settimeout(timeout)
- s.connect((host, port))
- if nodelay:
- s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
- return s
- @classmethod
- def connect(cls, host, port, **kwargs):
- return cls(cls._connect(host, port, **kwargs))
- @classmethod
- def tlslite_connect(cls, host, port, username, password, **kwargs):
- s = cls._connect(host, port, **kwargs)
- s2 = tlsapi.TLSConnection(s)
- s2.fileno = lambda fd = s.fileno(): fd
- s2.handshakeClientSRP(username, password)
- return cls(s2)
- @classmethod
- def ssl_connect(cls, host, port, ssl_kwargs, **kwargs):
- s = cls._connect(host, port, **kwargs)
- s2 = ssl.wrap_socket(s, **ssl_kwargs)
- return cls(s2)
- @property
- def closed(self):
- return self.sock is ClosedFile
- def close(self):
- if not self.closed:
- try:
- self.sock.shutdown(socket.SHUT_RDWR)
- except Exception:
- pass
- self.sock.close()
- self.sock = ClosedFile
- def fileno(self):
- return self.sock.fileno()
- def read(self, count):
- data = []
- while count > 0:
- try:
- buf = self.sock.recv(min(self.MAX_IO_CHUNK, count))
- except socket.timeout:
- continue
- except socket.error:
- ex = sys.exc_info()[1]
- if ex[0] in retry_errnos:
- # windows just has to be a bitch
- continue
- self.close()
- raise EOFError(ex)
- if not buf:
- self.close()
- raise EOFError("connection closed by peer")
- data.append(buf)
- count -= len(buf)
- return "".join(data)
- def write(self, data):
- try:
- while data:
- count = self.sock.send(data[:self.MAX_IO_CHUNK])
- data = data[count:]
- except socket.error:
- ex = sys.exc_info()[1]
- self.close()
- raise EOFError(ex)
- class PipeStream(Stream):
- __slots__ = ("incoming", "outgoing")
- MAX_IO_CHUNK = 32000
- def __init__(self, incoming, outgoing):
- outgoing.flush()
- self.incoming = incoming
- self.outgoing = outgoing
- @classmethod
- def from_std(cls):
- return cls(sys.stdin, sys.stdout)
- @classmethod
- def create_pair(cls):
- r1, w1 = os.pipe()
- r2, w2 = os.pipe()
- side1 = cls(os.fdopen(r1, "rb"), os.fdopen(w2, "wb"))
- side2 = cls(os.fdopen(r2, "rb"), os.fdopen(w1, "wb"))
- return side1, side2
- @property
- def closed(self):
- return self.incoming is ClosedFile
- def close(self):
- self.incoming.close()
- self.outgoing.close()
- self.incoming = ClosedFile
- self.outgoing = ClosedFile
- def fileno(self):
- return self.incoming.fileno()
- def read(self, count):
- data = []
- try:
- while count > 0:
- buf = os.read(self.incoming.fileno(), min(self.MAX_IO_CHUNK, count))
- if not buf:
- raise EOFError("connection closed by peer")
- data.append(buf)
- count -= len(buf)
- except EOFError:
- self.close()
- raise
- except EnvironmentError:
- ex = sys.exc_info()[1]
- self.close()
- raise EOFError(ex)
- return "".join(data)
- def write(self, data):
- try:
- while data:
- chunk = data[:self.MAX_IO_CHUNK]
- written = os.write(self.outgoing.fileno(), chunk)
- data = data[written:]
- except EnvironmentError:
- ex = sys.exc_info()[1]
- self.close()
- raise EOFError(ex)
- class Win32PipeStream(Stream):
- """win32 has to suck"""
- __slots__ = ("incoming", "outgoing", "_fileno", "_keepalive")
- PIPE_BUFFER_SIZE = 130000
- MAX_IO_CHUNK = 32000
-
- def __init__(self, incoming, outgoing):
- self._keepalive = (incoming, outgoing)
- if hasattr(incoming, "fileno"):
- self._fileno = incoming.fileno()
- incoming = msvcrt.get_osfhandle(incoming.fileno())
- if hasattr(outgoing, "fileno"):
- outgoing = msvcrt.get_osfhandle(outgoing.fileno())
- self.incoming = incoming
- self.outgoing = outgoing
- @classmethod
- def from_std(cls):
- return cls(sys.stdin, sys.stdout)
- @classmethod
- def create_pair(cls):
- r1, w1 = win32pipe.CreatePipe(None, cls.PIPE_BUFFER_SIZE)
- r2, w2 = win32pipe.CreatePipe(None, cls.PIPE_BUFFER_SIZE)
- return cls(r1, w2), cls(r2, w1)
-
- def fileno(self):
- return self._fileno
- @property
- def closed(self):
- return self.incoming is ClosedFile
- def close(self):
- if self.closed:
- return
- try:
- win32file.CloseHandle(self.incoming)
- except Exception:
- pass
- self.incoming = ClosedFile
- try:
- win32file.CloseHandle(self.outgoing)
- except Exception:
- pass
- self.outgoing = ClosedFile
- def read(self, count):
- try:
- data = []
- while count > 0:
- dummy, buf = win32file.ReadFile(self.incoming, int(min(self.MAX_IO_CHUNK, count)))
- count -= len(buf)
- data.append(buf)
- except TypeError:
- ex = sys.exc_info()[1]
- if not self.closed:
- raise
- raise EOFError(ex)
- except win32file.error:
- ex = sys.exc_info()[1]
- self.close()
- raise EOFError(ex)
- return "".join(data)
- def write(self, data):
- try:
- while data:
- dummy, count = win32file.WriteFile(self.outgoing, data[:self.MAX_IO_CHUNK])
- data = data[count:]
- except TypeError:
- ex = sys.exc_info()[1]
- if not self.closed:
- raise
- raise EOFError(ex)
- except win32file.error:
- ex = sys.exc_info()[1]
- self.close()
- raise EOFError(ex)
-
- def poll(self, timeout, interval = 0.1):
- """a poor man's version of select()"""
- if timeout is None:
- timeout = sys.maxint
- length = 0
- tmax = time.time() + timeout
- try:
- while length == 0:
- length = win32pipe.PeekNamedPipe(self.incoming, 0)[1]
- if time.time() >= tmax:
- break
- time.sleep(interval)
- except TypeError:
- ex = sys.exc_info()[1]
- if not self.closed:
- raise
- raise EOFError(ex)
- return length != 0
- class NamedPipeStream(Win32PipeStream):
- NAMED_PIPE_PREFIX = r'\\.\pipe\rpyc_'
- PIPE_IO_TIMEOUT = 3
- CONNECT_TIMEOUT = 3
- __slots__ = ("is_server_side",)
-
- def __init__(self, handle, is_server_side):
- Win32PipeStream.__init__(self, handle, handle)
- self.is_server_side = is_server_side
- @classmethod
- def from_std(cls):
- raise NotImplementedError()
- @classmethod
- def create_pair(cls):
- raise NotImplementedError()
-
- @classmethod
- def create_server(cls, pipename, connect = True):
- if not pipename.startswith("\\\\."):
- pipename = cls.NAMED_PIPE_PREFIX + pipename
- handle = win32pipe.CreateNamedPipe(
- pipename,
- win32pipe.PIPE_ACCESS_DUPLEX,
- win32pipe.PIPE_TYPE_BYTE | win32pipe.PIPE_READMODE_BYTE | win32pipe.PIPE_WAIT,
- 1,
- cls.PIPE_BUFFER_SIZE,
- cls.PIPE_BUFFER_SIZE,
- cls.PIPE_IO_TIMEOUT * 1000,
- None
- )
- inst = cls(handle, True)
- if connect:
- inst.connect_server()
- return inst
-
- def connect_server(self):
- if not self.is_server_side:
- raise ValueError("this must be the server side")
- win32pipe.ConnectNamedPipe(self.incoming, None)
-
- @classmethod
- def create_client(cls, pipename, timeout = CONNECT_TIMEOUT):
- if not pipename.startswith("\\\\."):
- pipename = cls.NAMED_PIPE_PREFIX + pipename
- handle = win32file.CreateFile(
- pipename,
- win32file.GENERIC_READ | win32file.GENERIC_WRITE,
- 0,
- None,
- win32file.OPEN_EXISTING,
- 0,
- None
- )
- return cls(handle, False)
-
- def close(self):
- if self.closed:
- return
- if self.is_server_side:
- win32file.FlushFileBuffers(self.outgoing)
- win32pipe.DisconnectNamedPipe(self.outgoing)
- Win32PipeStream.close(self)
- if sys.platform == "win32":
- PipeStream = Win32PipeStream
|