1"""
2RPyC connection factories: ease the creation of a connection for the common
3cases)
4"""
5from __future__ import with_statement
6import socket
7from contextlib import closing
8from functools import partial
9import threading
10try:
11    from thread import interrupt_main
12except ImportError:
13    try:
14        from _thread import interrupt_main
15    except ImportError:
16        # assume jython (#83)
17        from java.lang import System
18        interrupt_main = System.exit
19
20from rpyc.core.channel import Channel
21from rpyc.core.stream import SocketStream, TunneledSocketStream, PipeStream
22from rpyc.core.service import VoidService, MasterService, SlaveService
23from rpyc.utils.registry import UDPRegistryClient
24from rpyc.lib import safe_import, spawn
25ssl = safe_import("ssl")
26
27
28class DiscoveryError(Exception):
29    pass
30
31
32# ------------------------------------------------------------------------------
33# API
34# ------------------------------------------------------------------------------
35def connect_channel(channel, service=VoidService, config={}):
36    """creates a connection over a given channel
37
38    :param channel: the channel to use
39    :param service: the local service to expose (defaults to Void)
40    :param config: configuration dict
41
42    :returns: an RPyC connection
43    """
44    return service._connect(channel, config)
45
46
47def connect_stream(stream, service=VoidService, config={}):
48    """creates a connection over a given stream
49
50    :param stream: the stream to use
51    :param service: the local service to expose (defaults to Void)
52    :param config: configuration dict
53
54    :returns: an RPyC connection
55    """
56    return connect_channel(Channel(stream), service=service, config=config)
57
58
59def connect_pipes(input, output, service=VoidService, config={}):
60    """
61    creates a connection over the given input/output pipes
62
63    :param input: the input pipe
64    :param output: the output pipe
65    :param service: the local service to expose (defaults to Void)
66    :param config: configuration dict
67
68    :returns: an RPyC connection
69    """
70    return connect_stream(PipeStream(input, output), service=service, config=config)
71
72
73def connect_stdpipes(service=VoidService, config={}):
74    """
75    creates a connection over the standard input/output pipes
76
77    :param service: the local service to expose (defaults to Void)
78    :param config: configuration dict
79
80    :returns: an RPyC connection
81    """
82    return connect_stream(PipeStream.from_std(), service=service, config=config)
83
84
85def connect(host, port, service=VoidService, config={}, ipv6=False, keepalive=False):
86    """
87    creates a socket-connection to the given host and port
88
89    :param host: the hostname to connect to
90    :param port: the TCP port to use
91    :param service: the local service to expose (defaults to Void)
92    :param config: configuration dict
93    :param ipv6: whether to create an IPv6 socket (defaults to ``False``)
94    :param keepalive: whether to set TCP keepalive on the socket (defaults to ``False``)
95
96    :returns: an RPyC connection
97    """
98    s = SocketStream.connect(host, port, ipv6=ipv6, keepalive=keepalive)
99    return connect_stream(s, service, config)
100
101
102def unix_connect(path, service=VoidService, config={}):
103    """
104    creates a socket-connection to the given unix domain socket
105
106    :param path: the path to the unix domain socket
107    :param service: the local service to expose (defaults to Void)
108    :param config: configuration dict
109
110    :returns: an RPyC connection
111    """
112    s = SocketStream.unix_connect(path)
113    return connect_stream(s, service, config)
114
115
116def ssl_connect(host, port, keyfile=None, certfile=None, ca_certs=None,
117                cert_reqs=None, ssl_version=None, ciphers=None,
118                service=VoidService, config={}, ipv6=False, keepalive=False):
119    """
120    creates an SSL-wrapped connection to the given host (encrypted and
121    authenticated).
122
123    :param host: the hostname to connect to
124    :param port: the TCP port to use
125    :param service: the local service to expose (defaults to Void)
126    :param config: configuration dict
127    :param ipv6: whether to create an IPv6 socket or an IPv4 one(defaults to ``False``)
128    :param keepalive: whether to set TCP keepalive on the socket (defaults to ``False``)
129
130    The following arguments are passed directly to
131    `ssl.wrap_socket <http://docs.python.org/dev/library/ssl.html#ssl.wrap_socket>`_:
132
133    :param keyfile: see ``ssl.wrap_socket``. May be ``None``
134    :param certfile: see ``ssl.wrap_socket``. May be ``None``
135    :param ca_certs: see ``ssl.wrap_socket``. May be ``None``
136    :param cert_reqs: see ``ssl.wrap_socket``. By default, if ``ca_cert`` is specified,
137                      the requirement is set to ``CERT_REQUIRED``; otherwise it is
138                      set to ``CERT_NONE``
139    :param ssl_version: see ``ssl.wrap_socket``. The default is ``PROTOCOL_TLSv1``
140    :param ciphers: see ``ssl.wrap_socket``. May be ``None``. New in Python 2.7/3.2
141
142    :returns: an RPyC connection
143    """
144    ssl_kwargs = {"server_side": False}
145    if keyfile is not None:
146        ssl_kwargs["keyfile"] = keyfile
147    if certfile is not None:
148        ssl_kwargs["certfile"] = certfile
149    if ca_certs is not None:
150        ssl_kwargs["ca_certs"] = ca_certs
151        ssl_kwargs["cert_reqs"] = ssl.CERT_REQUIRED
152    if cert_reqs is not None:
153        ssl_kwargs["cert_reqs"] = cert_reqs
154    if ssl_version is None:
155        ssl_kwargs["ssl_version"] = ssl.PROTOCOL_TLSv1
156    else:
157        ssl_kwargs["ssl_version"] = ssl_version
158    if ciphers is not None:
159        ssl_kwargs["ciphers"] = ciphers
160    s = SocketStream.ssl_connect(host, port, ssl_kwargs, ipv6=ipv6, keepalive=keepalive)
161    return connect_stream(s, service, config)
162
163
164def _get_free_port():
165    """attempts to find a free port"""
166    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
167    with closing(s):
168        s.bind(("localhost", 0))
169        return s.getsockname()[1]
170
171
172_ssh_connect_lock = threading.Lock()
173
174
175def ssh_connect(remote_machine, remote_port, service=VoidService, config={}):
176    """
177    Connects to an RPyC server over an SSH tunnel (created by plumbum).
178    See `Plumbum tunneling <http://plumbum.readthedocs.org/en/latest/remote.html#tunneling>`_
179    for further details.
180
181    .. note::
182       This function attempts to allocate a free TCP port for the underlying tunnel, but doing
183       so is inherently prone to a race condition with other processes who might bind the
184       same port before sshd does. Albeit unlikely, there is no sure way around it.
185
186    :param remote_machine: an :class:`plumbum.remote.RemoteMachine` instance
187    :param remote_port: the port of the remote server
188    :param service: the local service to expose (defaults to Void)
189    :param config: configuration dict
190
191    :returns: an RPyC connection
192    """
193    with _ssh_connect_lock:
194        loc_port = _get_free_port()
195        tun = remote_machine.tunnel(loc_port, remote_port)
196        stream = TunneledSocketStream.connect("localhost", loc_port)
197        stream.tun = tun
198    return service._connect(Channel(stream), config=config)
199
200
201def discover(service_name, host=None, registrar=None, timeout=2):
202    """
203    discovers hosts running the given service
204
205    :param service_name: the service to look for
206    :param host: limit the discovery to the given host only (None means any host)
207    :param registrar: use this registry client to discover services. if None,
208                      use the default UDPRegistryClient with the default settings.
209    :param timeout: the number of seconds to wait for a reply from the registry
210                    if no hosts are found, raises DiscoveryError
211
212    :raises: ``DiscoveryError`` if no server is found
213    :returns: a list of (ip, port) pairs
214    """
215    if registrar is None:
216        registrar = UDPRegistryClient(timeout=timeout)
217    addrs = registrar.discover(service_name)
218    if not addrs:
219        raise DiscoveryError("no servers exposing %r were found" % (service_name,))
220    if host:
221        ips = socket.gethostbyname_ex(host)[2]
222        addrs = [(h, p) for h, p in addrs if h in ips]
223    if not addrs:
224        raise DiscoveryError("no servers exposing %r were found on %r" % (service_name, host))
225    return addrs
226
227
228def connect_by_service(service_name, host=None, service=VoidService, config={}):
229    """create a connection to an arbitrary server that exposes the requested service
230
231    :param service_name: the service to discover
232    :param host: limit discovery to the given host only (None means any host)
233    :param service: the local service to expose (defaults to Void)
234    :param config: configuration dict
235
236    :raises: ``DiscoveryError`` if no server is found
237    :returns: an RPyC connection
238    """
239    # The registry server may have multiple services registered for the same service name,
240    # some of which could be dead. We iterate over the list returned and return the first
241    # one we could connect to. If none of the registered servers is responsive we re-throw
242    # the exception
243    addrs = discover(service_name, host=host)
244    for host, port in addrs:
245        try:
246            return connect(host, port, service, config=config)
247        except socket.error:
248            pass
249    raise DiscoveryError("All services are down: %s" % (addrs,))
250
251
252def connect_subproc(args, service=VoidService, config={}):
253    """runs an rpyc server on a child process that and connects to it over
254    the stdio pipes. uses the subprocess module.
255
256    :param args: the args to Popen, e.g., ["python", "-u", "myfile.py"]
257    :param service: the local service to expose (defaults to Void)
258    :param config: configuration dict
259    """
260    from subprocess import Popen, PIPE
261    proc = Popen(args, stdin=PIPE, stdout=PIPE)
262    conn = connect_pipes(proc.stdout, proc.stdin, service=service, config=config)
263    conn.proc = proc  # just so you can have control over the processs
264    return conn
265
266
267def _server(listener, remote_service, remote_config, args=None):
268    try:
269        with closing(listener):
270            client = listener.accept()[0]
271        conn = connect_stream(SocketStream(client), service=remote_service, config=remote_config)
272        if isinstance(args, dict):
273            _oldstyle = (MasterService, SlaveService)
274            is_newstyle = isinstance(remote_service, type) and not issubclass(remote_service, _oldstyle)
275            is_newstyle |= not isinstance(remote_service, type) and not isinstance(remote_service, _oldstyle)
276            is_voidservice = isinstance(remote_service, type) and issubclass(remote_service, VoidService)
277            is_voidservice |= not isinstance(remote_service, type) and isinstance(remote_service, VoidService)
278            if is_newstyle and not is_voidservice:
279                conn._local_root.exposed_namespace.update(args)
280            elif not is_voidservice:
281                conn._local_root.namespace.update(args)
282
283        conn.serve_all()
284    except KeyboardInterrupt:
285        interrupt_main()
286
287
288def connect_thread(service=VoidService, config={}, remote_service=VoidService, remote_config={}):
289    """starts an rpyc server on a new thread, bound to an arbitrary port,
290    and connects to it over a socket.
291
292    :param service: the local service to expose (defaults to Void)
293    :param config: configuration dict
294    :param remote_service: the remote service to expose (of the server; defaults to Void)
295    :param remote_config: remote configuration dict (of the server)
296    """
297    listener = socket.socket()
298    listener.bind(("localhost", 0))
299    listener.listen(1)
300    remote_server = partial(_server, listener, remote_service, remote_config)
301    spawn(remote_server)
302    host, port = listener.getsockname()
303    return connect(host, port, service=service, config=config)
304
305
306def connect_multiprocess(service=VoidService, config={}, remote_service=VoidService, remote_config={}, args={}):
307    """starts an rpyc server on a new process, bound to an arbitrary port,
308    and connects to it over a socket. Basically a copy of connect_thread().
309    However if args is used and if these are shared memory then changes
310    will be bi-directional. That is we now have access to shared memmory.
311
312    :param service: the local service to expose (defaults to Void)
313    :param config: configuration dict
314    :param remote_service: the remote service to expose (of the server; defaults to Void)
315    :param remote_config: remote configuration dict (of the server)
316    :param args: dict of local vars to pass to new connection, form {'name':var}
317
318    Contributed by *@tvanzyl*
319    """
320    from multiprocessing import Process
321
322    listener = socket.socket()
323    listener.bind(("localhost", 0))
324    listener.listen(1)
325    remote_server = partial(_server, listener, remote_service, remote_config, args)
326    t = Process(target=remote_server)
327    t.start()
328    host, port = listener.getsockname()
329    return connect(host, port, service=service, config=config)
330