1""" 2RPyC connection factories: ease the creation of a connection for the common 3cases) 4""" 5from __future__ import with_statement 6import socket 7from contextlib import closing 8from functools import partial 9import threading 10try: 11 from thread import interrupt_main 12except ImportError: 13 try: 14 from _thread import interrupt_main 15 except ImportError: 16 # assume jython (#83) 17 from java.lang import System 18 interrupt_main = System.exit 19 20from rpyc.core.channel import Channel 21from rpyc.core.stream import SocketStream, TunneledSocketStream, PipeStream 22from rpyc.core.service import VoidService, MasterService, SlaveService 23from rpyc.utils.registry import UDPRegistryClient 24from rpyc.lib import safe_import, spawn 25ssl = safe_import("ssl") 26 27 28class DiscoveryError(Exception): 29 pass 30 31 32# ------------------------------------------------------------------------------ 33# API 34# ------------------------------------------------------------------------------ 35def connect_channel(channel, service=VoidService, config={}): 36 """creates a connection over a given channel 37 38 :param channel: the channel to use 39 :param service: the local service to expose (defaults to Void) 40 :param config: configuration dict 41 42 :returns: an RPyC connection 43 """ 44 return service._connect(channel, config) 45 46 47def connect_stream(stream, service=VoidService, config={}): 48 """creates a connection over a given stream 49 50 :param stream: the stream to use 51 :param service: the local service to expose (defaults to Void) 52 :param config: configuration dict 53 54 :returns: an RPyC connection 55 """ 56 return connect_channel(Channel(stream), service=service, config=config) 57 58 59def connect_pipes(input, output, service=VoidService, config={}): 60 """ 61 creates a connection over the given input/output pipes 62 63 :param input: the input pipe 64 :param output: the output pipe 65 :param service: the local service to expose (defaults to Void) 66 :param config: configuration dict 67 68 :returns: an RPyC connection 69 """ 70 return connect_stream(PipeStream(input, output), service=service, config=config) 71 72 73def connect_stdpipes(service=VoidService, config={}): 74 """ 75 creates a connection over the standard input/output pipes 76 77 :param service: the local service to expose (defaults to Void) 78 :param config: configuration dict 79 80 :returns: an RPyC connection 81 """ 82 return connect_stream(PipeStream.from_std(), service=service, config=config) 83 84 85def connect(host, port, service=VoidService, config={}, ipv6=False, keepalive=False): 86 """ 87 creates a socket-connection to the given host and port 88 89 :param host: the hostname to connect to 90 :param port: the TCP port to use 91 :param service: the local service to expose (defaults to Void) 92 :param config: configuration dict 93 :param ipv6: whether to create an IPv6 socket (defaults to ``False``) 94 :param keepalive: whether to set TCP keepalive on the socket (defaults to ``False``) 95 96 :returns: an RPyC connection 97 """ 98 s = SocketStream.connect(host, port, ipv6=ipv6, keepalive=keepalive) 99 return connect_stream(s, service, config) 100 101 102def unix_connect(path, service=VoidService, config={}): 103 """ 104 creates a socket-connection to the given unix domain socket 105 106 :param path: the path to the unix domain socket 107 :param service: the local service to expose (defaults to Void) 108 :param config: configuration dict 109 110 :returns: an RPyC connection 111 """ 112 s = SocketStream.unix_connect(path) 113 return connect_stream(s, service, config) 114 115 116def ssl_connect(host, port, keyfile=None, certfile=None, ca_certs=None, 117 cert_reqs=None, ssl_version=None, ciphers=None, 118 service=VoidService, config={}, ipv6=False, keepalive=False): 119 """ 120 creates an SSL-wrapped connection to the given host (encrypted and 121 authenticated). 122 123 :param host: the hostname to connect to 124 :param port: the TCP port to use 125 :param service: the local service to expose (defaults to Void) 126 :param config: configuration dict 127 :param ipv6: whether to create an IPv6 socket or an IPv4 one(defaults to ``False``) 128 :param keepalive: whether to set TCP keepalive on the socket (defaults to ``False``) 129 130 The following arguments are passed directly to 131 `ssl.wrap_socket <http://docs.python.org/dev/library/ssl.html#ssl.wrap_socket>`_: 132 133 :param keyfile: see ``ssl.wrap_socket``. May be ``None`` 134 :param certfile: see ``ssl.wrap_socket``. May be ``None`` 135 :param ca_certs: see ``ssl.wrap_socket``. May be ``None`` 136 :param cert_reqs: see ``ssl.wrap_socket``. By default, if ``ca_cert`` is specified, 137 the requirement is set to ``CERT_REQUIRED``; otherwise it is 138 set to ``CERT_NONE`` 139 :param ssl_version: see ``ssl.wrap_socket``. The default is ``PROTOCOL_TLSv1`` 140 :param ciphers: see ``ssl.wrap_socket``. May be ``None``. New in Python 2.7/3.2 141 142 :returns: an RPyC connection 143 """ 144 ssl_kwargs = {"server_side": False} 145 if keyfile is not None: 146 ssl_kwargs["keyfile"] = keyfile 147 if certfile is not None: 148 ssl_kwargs["certfile"] = certfile 149 if ca_certs is not None: 150 ssl_kwargs["ca_certs"] = ca_certs 151 ssl_kwargs["cert_reqs"] = ssl.CERT_REQUIRED 152 if cert_reqs is not None: 153 ssl_kwargs["cert_reqs"] = cert_reqs 154 if ssl_version is None: 155 ssl_kwargs["ssl_version"] = ssl.PROTOCOL_TLSv1 156 else: 157 ssl_kwargs["ssl_version"] = ssl_version 158 if ciphers is not None: 159 ssl_kwargs["ciphers"] = ciphers 160 s = SocketStream.ssl_connect(host, port, ssl_kwargs, ipv6=ipv6, keepalive=keepalive) 161 return connect_stream(s, service, config) 162 163 164def _get_free_port(): 165 """attempts to find a free port""" 166 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 167 with closing(s): 168 s.bind(("localhost", 0)) 169 return s.getsockname()[1] 170 171 172_ssh_connect_lock = threading.Lock() 173 174 175def ssh_connect(remote_machine, remote_port, service=VoidService, config={}): 176 """ 177 Connects to an RPyC server over an SSH tunnel (created by plumbum). 178 See `Plumbum tunneling <http://plumbum.readthedocs.org/en/latest/remote.html#tunneling>`_ 179 for further details. 180 181 .. note:: 182 This function attempts to allocate a free TCP port for the underlying tunnel, but doing 183 so is inherently prone to a race condition with other processes who might bind the 184 same port before sshd does. Albeit unlikely, there is no sure way around it. 185 186 :param remote_machine: an :class:`plumbum.remote.RemoteMachine` instance 187 :param remote_port: the port of the remote server 188 :param service: the local service to expose (defaults to Void) 189 :param config: configuration dict 190 191 :returns: an RPyC connection 192 """ 193 with _ssh_connect_lock: 194 loc_port = _get_free_port() 195 tun = remote_machine.tunnel(loc_port, remote_port) 196 stream = TunneledSocketStream.connect("localhost", loc_port) 197 stream.tun = tun 198 return service._connect(Channel(stream), config=config) 199 200 201def discover(service_name, host=None, registrar=None, timeout=2): 202 """ 203 discovers hosts running the given service 204 205 :param service_name: the service to look for 206 :param host: limit the discovery to the given host only (None means any host) 207 :param registrar: use this registry client to discover services. if None, 208 use the default UDPRegistryClient with the default settings. 209 :param timeout: the number of seconds to wait for a reply from the registry 210 if no hosts are found, raises DiscoveryError 211 212 :raises: ``DiscoveryError`` if no server is found 213 :returns: a list of (ip, port) pairs 214 """ 215 if registrar is None: 216 registrar = UDPRegistryClient(timeout=timeout) 217 addrs = registrar.discover(service_name) 218 if not addrs: 219 raise DiscoveryError("no servers exposing %r were found" % (service_name,)) 220 if host: 221 ips = socket.gethostbyname_ex(host)[2] 222 addrs = [(h, p) for h, p in addrs if h in ips] 223 if not addrs: 224 raise DiscoveryError("no servers exposing %r were found on %r" % (service_name, host)) 225 return addrs 226 227 228def connect_by_service(service_name, host=None, service=VoidService, config={}): 229 """create a connection to an arbitrary server that exposes the requested service 230 231 :param service_name: the service to discover 232 :param host: limit discovery to the given host only (None means any host) 233 :param service: the local service to expose (defaults to Void) 234 :param config: configuration dict 235 236 :raises: ``DiscoveryError`` if no server is found 237 :returns: an RPyC connection 238 """ 239 # The registry server may have multiple services registered for the same service name, 240 # some of which could be dead. We iterate over the list returned and return the first 241 # one we could connect to. If none of the registered servers is responsive we re-throw 242 # the exception 243 addrs = discover(service_name, host=host) 244 for host, port in addrs: 245 try: 246 return connect(host, port, service, config=config) 247 except socket.error: 248 pass 249 raise DiscoveryError("All services are down: %s" % (addrs,)) 250 251 252def connect_subproc(args, service=VoidService, config={}): 253 """runs an rpyc server on a child process that and connects to it over 254 the stdio pipes. uses the subprocess module. 255 256 :param args: the args to Popen, e.g., ["python", "-u", "myfile.py"] 257 :param service: the local service to expose (defaults to Void) 258 :param config: configuration dict 259 """ 260 from subprocess import Popen, PIPE 261 proc = Popen(args, stdin=PIPE, stdout=PIPE) 262 conn = connect_pipes(proc.stdout, proc.stdin, service=service, config=config) 263 conn.proc = proc # just so you can have control over the processs 264 return conn 265 266 267def _server(listener, remote_service, remote_config, args=None): 268 try: 269 with closing(listener): 270 client = listener.accept()[0] 271 conn = connect_stream(SocketStream(client), service=remote_service, config=remote_config) 272 if isinstance(args, dict): 273 _oldstyle = (MasterService, SlaveService) 274 is_newstyle = isinstance(remote_service, type) and not issubclass(remote_service, _oldstyle) 275 is_newstyle |= not isinstance(remote_service, type) and not isinstance(remote_service, _oldstyle) 276 is_voidservice = isinstance(remote_service, type) and issubclass(remote_service, VoidService) 277 is_voidservice |= not isinstance(remote_service, type) and isinstance(remote_service, VoidService) 278 if is_newstyle and not is_voidservice: 279 conn._local_root.exposed_namespace.update(args) 280 elif not is_voidservice: 281 conn._local_root.namespace.update(args) 282 283 conn.serve_all() 284 except KeyboardInterrupt: 285 interrupt_main() 286 287 288def connect_thread(service=VoidService, config={}, remote_service=VoidService, remote_config={}): 289 """starts an rpyc server on a new thread, bound to an arbitrary port, 290 and connects to it over a socket. 291 292 :param service: the local service to expose (defaults to Void) 293 :param config: configuration dict 294 :param remote_service: the remote service to expose (of the server; defaults to Void) 295 :param remote_config: remote configuration dict (of the server) 296 """ 297 listener = socket.socket() 298 listener.bind(("localhost", 0)) 299 listener.listen(1) 300 remote_server = partial(_server, listener, remote_service, remote_config) 301 spawn(remote_server) 302 host, port = listener.getsockname() 303 return connect(host, port, service=service, config=config) 304 305 306def connect_multiprocess(service=VoidService, config={}, remote_service=VoidService, remote_config={}, args={}): 307 """starts an rpyc server on a new process, bound to an arbitrary port, 308 and connects to it over a socket. Basically a copy of connect_thread(). 309 However if args is used and if these are shared memory then changes 310 will be bi-directional. That is we now have access to shared memmory. 311 312 :param service: the local service to expose (defaults to Void) 313 :param config: configuration dict 314 :param remote_service: the remote service to expose (of the server; defaults to Void) 315 :param remote_config: remote configuration dict (of the server) 316 :param args: dict of local vars to pass to new connection, form {'name':var} 317 318 Contributed by *@tvanzyl* 319 """ 320 from multiprocessing import Process 321 322 listener = socket.socket() 323 listener.bind(("localhost", 0)) 324 listener.listen(1) 325 remote_server = partial(_server, listener, remote_service, remote_config, args) 326 t = Process(target=remote_server) 327 t.start() 328 host, port = listener.getsockname() 329 return connect(host, port, service=service, config=config) 330