""" abstraction layer over OS-depenedent byte streams """ import sys import os import socket import time import errno from rpyc.lib import safe_import from rpyc.lib.compat import select win32file = safe_import("win32file") win32pipe = safe_import("win32pipe") msvcrt = safe_import("msvcrt") ssl = safe_import("ssl") tlsapi = safe_import("tlslite.api") retry_errnos = set([errno.EAGAIN]) if hasattr(errno, "WSAEWOULDBLOCK"): retry_errnos.add(errno.WSAEWOULDBLOCK) class Stream(object): __slots__ = () def close(self): raise NotImplementedError() @property def closed(self): raise NotImplementedError() def fileno(self): raise NotImplementedError() def poll(self, timeout): """indicate whether the stream has data to read""" rl, _, _ = select([self], [], [], timeout) return bool(rl) def read(self, count): """read exactly `count` bytes, or raise EOFError""" raise NotImplementedError() def write(self, data): """write the entire `data`, or raise EOFError""" raise NotImplementedError() class ClosedFile(object): """represents a closed file object (singleton)""" __slots__ = () def __getattr__(self, name): raise EOFError("stream has been closed") def close(self): pass @property def closed(self): return True def fileno(self): raise EOFError("stream has been closed") ClosedFile = ClosedFile() class SocketStream(Stream): __slots__ = ("sock",) MAX_IO_CHUNK = 8000 def __init__(self, sock): self.sock = sock @classmethod def _connect(cls, host, port, family = socket.AF_INET, socktype = socket.SOCK_STREAM, proto = 0, timeout = 3, nodelay = False): s = socket.socket(family, socktype, proto) s.settimeout(timeout) s.connect((host, port)) if nodelay: s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) return s @classmethod def connect(cls, host, port, **kwargs): return cls(cls._connect(host, port, **kwargs)) @classmethod def tlslite_connect(cls, host, port, username, password, **kwargs): s = cls._connect(host, port, **kwargs) s2 = tlsapi.TLSConnection(s) s2.fileno = lambda fd = s.fileno(): fd s2.handshakeClientSRP(username, password) return cls(s2) @classmethod def ssl_connect(cls, host, port, ssl_kwargs, **kwargs): s = cls._connect(host, port, **kwargs) s2 = ssl.wrap_socket(s, **ssl_kwargs) return cls(s2) @property def closed(self): return self.sock is ClosedFile def close(self): if not self.closed: try: self.sock.shutdown(socket.SHUT_RDWR) except Exception: pass self.sock.close() self.sock = ClosedFile def fileno(self): return self.sock.fileno() def read(self, count): data = [] while count > 0: try: buf = self.sock.recv(min(self.MAX_IO_CHUNK, count)) except socket.timeout: continue except socket.error: ex = sys.exc_info()[1] if ex[0] in retry_errnos: # windows just has to be a bitch continue self.close() raise EOFError(ex) if not buf: self.close() raise EOFError("connection closed by peer") data.append(buf) count -= len(buf) return "".join(data) def write(self, data): try: while data: count = self.sock.send(data[:self.MAX_IO_CHUNK]) data = data[count:] except socket.error: ex = sys.exc_info()[1] self.close() raise EOFError(ex) class PipeStream(Stream): __slots__ = ("incoming", "outgoing") MAX_IO_CHUNK = 32000 def __init__(self, incoming, outgoing): outgoing.flush() self.incoming = incoming self.outgoing = outgoing @classmethod def from_std(cls): return cls(sys.stdin, sys.stdout) @classmethod def create_pair(cls): r1, w1 = os.pipe() r2, w2 = os.pipe() side1 = cls(os.fdopen(r1, "rb"), os.fdopen(w2, "wb")) side2 = cls(os.fdopen(r2, "rb"), os.fdopen(w1, "wb")) return side1, side2 @property def closed(self): return self.incoming is ClosedFile def close(self): self.incoming.close() self.outgoing.close() self.incoming = ClosedFile self.outgoing = ClosedFile def fileno(self): return self.incoming.fileno() def read(self, count): data = [] try: while count > 0: buf = os.read(self.incoming.fileno(), min(self.MAX_IO_CHUNK, count)) if not buf: raise EOFError("connection closed by peer") data.append(buf) count -= len(buf) except EOFError: self.close() raise except EnvironmentError: ex = sys.exc_info()[1] self.close() raise EOFError(ex) return "".join(data) def write(self, data): try: while data: chunk = data[:self.MAX_IO_CHUNK] written = os.write(self.outgoing.fileno(), chunk) data = data[written:] except EnvironmentError: ex = sys.exc_info()[1] self.close() raise EOFError(ex) class Win32PipeStream(Stream): """win32 has to suck""" __slots__ = ("incoming", "outgoing", "_fileno", "_keepalive") PIPE_BUFFER_SIZE = 130000 MAX_IO_CHUNK = 32000 def __init__(self, incoming, outgoing): self._keepalive = (incoming, outgoing) if hasattr(incoming, "fileno"): self._fileno = incoming.fileno() incoming = msvcrt.get_osfhandle(incoming.fileno()) if hasattr(outgoing, "fileno"): outgoing = msvcrt.get_osfhandle(outgoing.fileno()) self.incoming = incoming self.outgoing = outgoing @classmethod def from_std(cls): return cls(sys.stdin, sys.stdout) @classmethod def create_pair(cls): r1, w1 = win32pipe.CreatePipe(None, cls.PIPE_BUFFER_SIZE) r2, w2 = win32pipe.CreatePipe(None, cls.PIPE_BUFFER_SIZE) return cls(r1, w2), cls(r2, w1) def fileno(self): return self._fileno @property def closed(self): return self.incoming is ClosedFile def close(self): if self.closed: return try: win32file.CloseHandle(self.incoming) except Exception: pass self.incoming = ClosedFile try: win32file.CloseHandle(self.outgoing) except Exception: pass self.outgoing = ClosedFile def read(self, count): try: data = [] while count > 0: dummy, buf = win32file.ReadFile(self.incoming, int(min(self.MAX_IO_CHUNK, count))) count -= len(buf) data.append(buf) except TypeError: ex = sys.exc_info()[1] if not self.closed: raise raise EOFError(ex) except win32file.error: ex = sys.exc_info()[1] self.close() raise EOFError(ex) return "".join(data) def write(self, data): try: while data: dummy, count = win32file.WriteFile(self.outgoing, data[:self.MAX_IO_CHUNK]) data = data[count:] except TypeError: ex = sys.exc_info()[1] if not self.closed: raise raise EOFError(ex) except win32file.error: ex = sys.exc_info()[1] self.close() raise EOFError(ex) def poll(self, timeout, interval = 0.1): """a poor man's version of select()""" if timeout is None: timeout = sys.maxint length = 0 tmax = time.time() + timeout try: while length == 0: length = win32pipe.PeekNamedPipe(self.incoming, 0)[1] if time.time() >= tmax: break time.sleep(interval) except TypeError: ex = sys.exc_info()[1] if not self.closed: raise raise EOFError(ex) return length != 0 class NamedPipeStream(Win32PipeStream): NAMED_PIPE_PREFIX = r'\\.\pipe\rpyc_' PIPE_IO_TIMEOUT = 3 CONNECT_TIMEOUT = 3 __slots__ = ("is_server_side",) def __init__(self, handle, is_server_side): Win32PipeStream.__init__(self, handle, handle) self.is_server_side = is_server_side @classmethod def from_std(cls): raise NotImplementedError() @classmethod def create_pair(cls): raise NotImplementedError() @classmethod def create_server(cls, pipename, connect = True): if not pipename.startswith("\\\\."): pipename = cls.NAMED_PIPE_PREFIX + pipename handle = win32pipe.CreateNamedPipe( pipename, win32pipe.PIPE_ACCESS_DUPLEX, win32pipe.PIPE_TYPE_BYTE | win32pipe.PIPE_READMODE_BYTE | win32pipe.PIPE_WAIT, 1, cls.PIPE_BUFFER_SIZE, cls.PIPE_BUFFER_SIZE, cls.PIPE_IO_TIMEOUT * 1000, None ) inst = cls(handle, True) if connect: inst.connect_server() return inst def connect_server(self): if not self.is_server_side: raise ValueError("this must be the server side") win32pipe.ConnectNamedPipe(self.incoming, None) @classmethod def create_client(cls, pipename, timeout = CONNECT_TIMEOUT): if not pipename.startswith("\\\\."): pipename = cls.NAMED_PIPE_PREFIX + pipename handle = win32file.CreateFile( pipename, win32file.GENERIC_READ | win32file.GENERIC_WRITE, 0, None, win32file.OPEN_EXISTING, 0, None ) return cls(handle, False) def close(self): if self.closed: return if self.is_server_side: win32file.FlushFileBuffers(self.outgoing) win32pipe.DisconnectNamedPipe(self.outgoing) Win32PipeStream.close(self) if sys.platform == "win32": PipeStream = Win32PipeStream