1# -*- coding: utf-8 -*-
2#
3# Copyright (C) 2008,2009 Andrew Resch <andrewresch@gmail.com>
4#
5# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
6# the additional special exception to link portions of this program with the OpenSSL library.
7# See LICENSE for more details.
8#
9
10"""RPCServer Module"""
11from __future__ import unicode_literals
12
13import logging
14import os
15import stat
16import sys
17import traceback
18from collections import namedtuple
19from types import FunctionType
20
21from OpenSSL import crypto
22from twisted.internet import defer, reactor
23from twisted.internet.protocol import Factory, connectionDone
24
25import deluge.component as component
26import deluge.configmanager
27from deluge.core.authmanager import (
28    AUTH_LEVEL_ADMIN,
29    AUTH_LEVEL_DEFAULT,
30    AUTH_LEVEL_NONE,
31)
32from deluge.crypto_utils import get_context_factory
33from deluge.error import (
34    DelugeError,
35    IncompatibleClient,
36    NotAuthorizedError,
37    WrappedException,
38    _ClientSideRecreateError,
39)
40from deluge.event import ClientDisconnectedEvent
41from deluge.transfer import DelugeTransferProtocol
42
43RPC_RESPONSE = 1
44RPC_ERROR = 2
45RPC_EVENT = 3
46
47log = logging.getLogger(__name__)
48
49
50def export(auth_level=AUTH_LEVEL_DEFAULT):
51    """
52    Decorator function to register an object's method as an RPC.  The object
53    will need to be registered with an :class:`RPCServer` to be effective.
54
55    :param func: the function to export
56    :type func: function
57    :param auth_level: the auth level required to call this method
58    :type auth_level: int
59
60    """
61
62    def wrap(func, *args, **kwargs):
63        func._rpcserver_export = True
64        func._rpcserver_auth_level = auth_level
65
66        rpc_text = '**RPC exported method** (*Auth level: %s*)' % auth_level
67
68        # Append the RPC text while ensuring correct docstring formatting.
69        if func.__doc__:
70            if func.__doc__.endswith('    '):
71                indent = func.__doc__.split('\n')[-1]
72                func.__doc__ += '\n{}'.format(indent)
73            else:
74                func.__doc__ += '\n\n'
75            func.__doc__ += rpc_text
76        else:
77            func.__doc__ = rpc_text
78
79        return func
80
81    if isinstance(auth_level, FunctionType):
82        func = auth_level
83        auth_level = AUTH_LEVEL_DEFAULT
84        return wrap(func)
85    else:
86        return wrap
87
88
89def format_request(call):
90    """
91    Format the RPCRequest message for debug printing
92
93    :param call: the request
94    :type call: a RPCRequest
95
96    :returns: a formatted string for printing
97    :rtype: str
98
99    """
100    try:
101        s = call[1] + '('
102        if call[2]:
103            s += ', '.join([str(x) for x in call[2]])
104        if call[3]:
105            if call[2]:
106                s += ', '
107            s += ', '.join([key + '=' + str(value) for key, value in call[3].items()])
108        s += ')'
109    except UnicodeEncodeError:
110        return 'UnicodeEncodeError, call: %s' % call
111    else:
112        return s
113
114
115class DelugeRPCProtocol(DelugeTransferProtocol):
116    def __init__(self):
117        super(DelugeRPCProtocol, self).__init__()
118        # namedtuple subclass with auth_level, username for the connected session.
119        self.AuthLevel = namedtuple('SessionAuthlevel', 'auth_level, username')
120
121    def message_received(self, request):
122        """
123        This method is called whenever a message is received from a client.  The
124        only message that a client sends to the server is a RPC Request message.
125        If the RPC Request message is valid, then the method is called in
126        :meth:`dispatch`.
127
128        :param request: the request from the client.
129        :type data: tuple
130
131        """
132        if not isinstance(request, tuple):
133            log.debug('Received invalid message: type is not tuple')
134            return
135
136        if len(request) < 1:
137            log.debug('Received invalid message: there are no items')
138            return
139
140        for call in request:
141            if len(call) != 4:
142                log.debug(
143                    'Received invalid rpc request: number of items ' 'in request is %s',
144                    len(call),
145                )
146                continue
147            # log.debug('RPCRequest: %s', format_request(call))
148            reactor.callLater(0, self.dispatch, *call)
149
150    def sendData(self, data):  # NOQA: N802
151        """
152        Sends the data to the client.
153
154        :param data: the object that is to be sent to the client.  This should
155            be one of the RPC message types.
156        :type data: object
157
158        """
159        try:
160            self.transfer_message(data)
161        except Exception as ex:
162            log.warning('Error occurred when sending message: %s.', ex)
163            log.exception(ex)
164            raise
165
166    def connectionMade(self):  # NOQA: N802
167        """
168        This method is called when a new client connects.
169        """
170        peer = self.transport.getPeer()
171        log.info('Deluge Client connection made from: %s:%s', peer.host, peer.port)
172        # Set the initial auth level of this session to AUTH_LEVEL_NONE
173        self.factory.authorized_sessions[self.transport.sessionno] = self.AuthLevel(
174            AUTH_LEVEL_NONE, ''
175        )
176
177    def connectionLost(self, reason=connectionDone):  # NOQA: N802
178        """
179        This method is called when the client is disconnected.
180
181        :param reason: the reason the client disconnected.
182        :type reason: str
183
184        """
185
186        # We need to remove this session from various dicts
187        del self.factory.authorized_sessions[self.transport.sessionno]
188        if self.transport.sessionno in self.factory.session_protocols:
189            del self.factory.session_protocols[self.transport.sessionno]
190        if self.transport.sessionno in self.factory.interested_events:
191            del self.factory.interested_events[self.transport.sessionno]
192
193        if self.factory.state == 'running':
194            component.get('EventManager').emit(
195                ClientDisconnectedEvent(self.factory.session_id)
196            )
197        log.info('Deluge client disconnected: %s', reason.value)
198
199    def valid_session(self):
200        return self.transport.sessionno in self.factory.authorized_sessions
201
202    def dispatch(self, request_id, method, args, kwargs):
203        """
204        This method is run when a RPC Request is made.  It will run the local method
205        and will send either a RPC Response or RPC Error back to the client.
206
207        :param request_id: the request_id from the client (sent in the RPC Request)
208        :type request_id: int
209        :param method: the local method to call. It must be registered with
210            the :class:`RPCServer`.
211        :type method: str
212        :param args: the arguments to pass to `method`
213        :type args: list
214        :param kwargs: the keyword-arguments to pass to `method`
215        :type kwargs: dict
216
217        """
218
219        def send_error():
220            """
221            Sends an error response with the contents of the exception that was raised.
222            """
223            exc_type, exc_value, dummy_exc_trace = sys.exc_info()
224            formated_tb = traceback.format_exc()
225            try:
226                self.sendData(
227                    (
228                        RPC_ERROR,
229                        request_id,
230                        exc_type.__name__,
231                        exc_value._args,
232                        exc_value._kwargs,
233                        formated_tb,
234                    )
235                )
236            except AttributeError:
237                # This is not a deluge exception (object has no attribute '_args), let's wrap it
238                log.warning(
239                    'An exception occurred while sending RPC_ERROR to '
240                    'client. Wrapping it and resending. Error to '
241                    'send(causing exception goes next):\n%s',
242                    formated_tb,
243                )
244                try:
245                    raise WrappedException(
246                        str(exc_value), exc_type.__name__, formated_tb
247                    )
248                except WrappedException:
249                    send_error()
250            except Exception as ex:
251                log.error(
252                    'An exception occurred while sending RPC_ERROR to client: %s', ex
253                )
254
255        if method == 'daemon.info':
256            # This is a special case and used in the initial connection process
257            self.sendData((RPC_RESPONSE, request_id, deluge.common.get_version()))
258            return
259        elif method == 'daemon.login':
260            # This is a special case and used in the initial connection process
261            # We need to authenticate the user here
262            log.debug('RPC dispatch daemon.login')
263            try:
264                client_version = kwargs.pop('client_version', None)
265                if client_version is None:
266                    raise IncompatibleClient(deluge.common.get_version())
267                ret = component.get('AuthManager').authorize(*args, **kwargs)
268                if ret:
269                    self.factory.authorized_sessions[
270                        self.transport.sessionno
271                    ] = self.AuthLevel(ret, args[0])
272                    self.factory.session_protocols[self.transport.sessionno] = self
273            except Exception as ex:
274                send_error()
275                if not isinstance(ex, _ClientSideRecreateError):
276                    log.exception(ex)
277            else:
278                self.sendData((RPC_RESPONSE, request_id, (ret)))
279                if not ret:
280                    self.transport.loseConnection()
281            return
282
283        # Anything below requires a valid session
284        if not self.valid_session():
285            return
286
287        if method == 'daemon.set_event_interest':
288            log.debug('RPC dispatch daemon.set_event_interest')
289            # This special case is to allow clients to set which events they are
290            # interested in receiving.
291            # We are expecting a sequence from the client.
292            try:
293                if self.transport.sessionno not in self.factory.interested_events:
294                    self.factory.interested_events[self.transport.sessionno] = []
295                self.factory.interested_events[self.transport.sessionno].extend(args[0])
296            except Exception:
297                send_error()
298            else:
299                self.sendData((RPC_RESPONSE, request_id, (True)))
300            return
301
302        if method not in self.factory.methods:
303            try:
304                # Raise exception to be sent back to client
305                raise AttributeError('RPC call on invalid function: %s' % method)
306            except AttributeError:
307                send_error()
308                return
309
310        log.debug('RPC dispatch %s', method)
311        try:
312            method_auth_requirement = self.factory.methods[method]._rpcserver_auth_level
313            auth_level = self.factory.authorized_sessions[
314                self.transport.sessionno
315            ].auth_level
316            if auth_level < method_auth_requirement:
317                # This session is not allowed to call this method
318                log.debug(
319                    'Session %s is attempting an unauthorized method call!',
320                    self.transport.sessionno,
321                )
322                raise NotAuthorizedError(auth_level, method_auth_requirement)
323            # Set the session_id in the factory so that methods can know
324            # which session is calling it.
325            self.factory.session_id = self.transport.sessionno
326            ret = self.factory.methods[method](*args, **kwargs)
327        except Exception as ex:
328            send_error()
329            # Don't bother printing out DelugeErrors, because they are just
330            # for the client
331            if not isinstance(ex, DelugeError):
332                log.exception('Exception calling RPC request: %s', ex)
333        else:
334            # Check if the return value is a deferred, since we'll need to
335            # wait for it to fire before sending the RPC_RESPONSE
336            if isinstance(ret, defer.Deferred):
337
338                def on_success(result):
339                    try:
340                        self.sendData((RPC_RESPONSE, request_id, result))
341                    except Exception:
342                        send_error()
343                    return result
344
345                def on_fail(failure):
346                    try:
347                        failure.raiseException()
348                    except Exception:
349                        send_error()
350                    return failure
351
352                ret.addCallbacks(on_success, on_fail)
353            else:
354                self.sendData((RPC_RESPONSE, request_id, ret))
355
356
357class RPCServer(component.Component):
358    """
359    This class is used to handle rpc requests from the client.  Objects are
360    registered with this class and their methods are exported using the export
361    decorator.
362
363    :param port: the port the RPCServer will listen on
364    :type port: int
365    :param interface: the interface to listen on, this may override the `allow_remote` setting
366    :type interface: str
367    :param allow_remote: set True if the server should allow remote connections
368    :type allow_remote: bool
369    :param listen: if False, will not start listening.. This is only useful in Classic Mode
370    :type listen: bool
371    """
372
373    def __init__(self, port=58846, interface='', allow_remote=False, listen=True):
374        component.Component.__init__(self, 'RPCServer')
375
376        self.factory = Factory()
377        self.factory.protocol = DelugeRPCProtocol
378        self.factory.session_id = -1
379        self.factory.state = 'running'
380
381        # Holds the registered methods
382        self.factory.methods = {}
383        # Holds the session_ids and auth levels
384        self.factory.authorized_sessions = {}
385        # Holds the protocol objects with the session_id as key
386        self.factory.session_protocols = {}
387        # Holds the interested event list for the sessions
388        self.factory.interested_events = {}
389
390        self.listen = listen
391        if not listen:
392            return
393
394        if allow_remote:
395            hostname = ''
396        else:
397            hostname = 'localhost'
398
399        if interface:
400            hostname = interface
401
402        log.info('Starting DelugeRPC server %s:%s', hostname, port)
403
404        # Check for SSL keys and generate some if needed
405        check_ssl_keys()
406
407        cert = os.path.join(deluge.configmanager.get_config_dir('ssl'), 'daemon.cert')
408        pkey = os.path.join(deluge.configmanager.get_config_dir('ssl'), 'daemon.pkey')
409
410        try:
411            reactor.listenSSL(
412                port, self.factory, get_context_factory(cert, pkey), interface=hostname
413            )
414        except Exception as ex:
415            log.debug('Daemon already running or port not available.: %s', ex)
416            raise
417
418    def register_object(self, obj, name=None):
419        """
420        Registers an object to export it's rpc methods.  These methods should
421        be exported with the export decorator prior to registering the object.
422
423        :param obj: the object that we want to export
424        :type obj: object
425        :param name: the name to use, if None, it will be the class name of the object
426        :type name: str
427        """
428        if not name:
429            name = obj.__class__.__name__.lower()
430
431        for d in dir(obj):
432            if d[0] == '_':
433                continue
434            if getattr(getattr(obj, d), '_rpcserver_export', False):
435                log.debug('Registering method: %s', name + '.' + d)
436                self.factory.methods[name + '.' + d] = getattr(obj, d)
437
438    def deregister_object(self, obj):
439        """
440        Deregisters an objects exported rpc methods.
441
442        :param obj: the object that was previously registered
443
444        """
445        for key, value in self.factory.methods.items():
446            if value.__self__ == obj:
447                del self.factory.methods[key]
448
449    def get_object_method(self, name):
450        """
451        Returns a registered method.
452
453        :param name: the name of the method, usually in the form of 'object.method'
454        :type name: str
455
456        :returns: method
457
458        :raises KeyError: if `name` is not registered
459
460        """
461        return self.factory.methods[name]
462
463    def get_method_list(self):
464        """
465        Returns a list of the exported methods.
466
467        :returns: the exported methods
468        :rtype: list
469        """
470        return list(self.factory.methods)
471
472    def get_session_id(self):
473        """
474        Returns the session id of the current RPC.
475
476        :returns: the session id, this will be -1 if no connections have been made
477        :rtype: int
478
479        """
480        return self.factory.session_id
481
482    def get_session_user(self):
483        """
484        Returns the username calling the current RPC.
485
486        :returns: the username of the user calling the current RPC
487        :rtype: string
488
489        """
490        if not self.listen:
491            return 'localclient'
492        session_id = self.get_session_id()
493        if session_id > -1 and session_id in self.factory.authorized_sessions:
494            return self.factory.authorized_sessions[session_id].username
495        else:
496            # No connections made yet
497            return ''
498
499    def get_session_auth_level(self):
500        """
501        Returns the auth level of the user calling the current RPC.
502
503        :returns: the auth level
504        :rtype: int
505        """
506        if not self.listen or not self.is_session_valid(self.get_session_id()):
507            return AUTH_LEVEL_ADMIN
508        return self.factory.authorized_sessions[self.get_session_id()].auth_level
509
510    def get_rpc_auth_level(self, rpc):
511        """
512        Returns the auth level requirement for an exported rpc.
513
514        :returns: the auth level
515        :rtype: int
516        """
517        return self.factory.methods[rpc]._rpcserver_auth_level
518
519    def is_session_valid(self, session_id):
520        """
521        Checks if the session is still valid, eg, if the client is still connected.
522
523        :param session_id: the session id
524        :type session_id: int
525
526        :returns: True if the session is valid
527        :rtype: bool
528
529        """
530        return session_id in self.factory.authorized_sessions
531
532    def emit_event(self, event):
533        """
534        Emits the event to interested clients.
535
536        :param event: the event to emit
537        :type event: :class:`deluge.event.DelugeEvent`
538        """
539        log.debug('intevents: %s', self.factory.interested_events)
540        # Find sessions interested in this event
541        for session_id, interest in self.factory.interested_events.items():
542            if event.name in interest:
543                log.debug('Emit Event: %s %s', event.name, event.args)
544                # This session is interested so send a RPC_EVENT
545                self.factory.session_protocols[session_id].sendData(
546                    (RPC_EVENT, event.name, event.args)
547                )
548
549    def emit_event_for_session_id(self, session_id, event):
550        """
551        Emits the event to specified session_id.
552
553        :param session_id: the event to emit
554        :type session_id: int
555        :param event: the event to emit
556        :type event: :class:`deluge.event.DelugeEvent`
557        """
558        if not self.is_session_valid(session_id):
559            log.debug(
560                'Session ID %s is not valid. Not sending event "%s".',
561                session_id,
562                event.name,
563            )
564            return
565        if session_id not in self.factory.interested_events:
566            log.debug(
567                'Session ID %s is not interested in any events. Not sending event "%s".',
568                session_id,
569                event.name,
570            )
571            return
572        if event.name not in self.factory.interested_events[session_id]:
573            log.debug(
574                'Session ID %s is not interested in event "%s". Not sending it.',
575                session_id,
576                event.name,
577            )
578            return
579        log.debug(
580            'Sending event "%s" with args "%s" to session id "%s".',
581            event.name,
582            event.args,
583            session_id,
584        )
585        self.factory.session_protocols[session_id].sendData(
586            (RPC_EVENT, event.name, event.args)
587        )
588
589    def stop(self):
590        self.factory.state = 'stopping'
591
592
593def check_ssl_keys():
594    """
595    Check for SSL cert/key and create them if necessary
596    """
597    ssl_dir = deluge.configmanager.get_config_dir('ssl')
598    if not os.path.exists(ssl_dir):
599        # The ssl folder doesn't exist so we need to create it
600        os.makedirs(ssl_dir)
601        generate_ssl_keys()
602    else:
603        for f in ('daemon.pkey', 'daemon.cert'):
604            if not os.path.exists(os.path.join(ssl_dir, f)):
605                generate_ssl_keys()
606                break
607
608
609def generate_ssl_keys():
610    """
611    This method generates a new SSL key/cert.
612    """
613    from deluge.common import PY2
614
615    digest = 'sha256' if not PY2 else b'sha256'
616
617    # Generate key pair
618    pkey = crypto.PKey()
619    pkey.generate_key(crypto.TYPE_RSA, 2048)
620
621    # Generate cert request
622    req = crypto.X509Req()
623    subj = req.get_subject()
624    setattr(subj, 'CN', 'Deluge Daemon')
625    req.set_pubkey(pkey)
626    req.sign(pkey, digest)
627
628    # Generate certificate
629    cert = crypto.X509()
630    cert.set_serial_number(0)
631    cert.gmtime_adj_notBefore(0)
632    cert.gmtime_adj_notAfter(60 * 60 * 24 * 365 * 3)  # Three Years
633    cert.set_issuer(req.get_subject())
634    cert.set_subject(req.get_subject())
635    cert.set_pubkey(req.get_pubkey())
636    cert.sign(pkey, digest)
637
638    # Write out files
639    ssl_dir = deluge.configmanager.get_config_dir('ssl')
640    with open(os.path.join(ssl_dir, 'daemon.pkey'), 'wb') as _file:
641        _file.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
642    with open(os.path.join(ssl_dir, 'daemon.cert'), 'wb') as _file:
643        _file.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
644    # Make the files only readable by this user
645    for f in ('daemon.pkey', 'daemon.cert'):
646        os.chmod(os.path.join(ssl_dir, f), stat.S_IREAD | stat.S_IWRITE)
647