1# -*- coding: utf-8 -*- 2# 3# Copyright (C) 2008,2009 Andrew Resch <andrewresch@gmail.com> 4# 5# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with 6# the additional special exception to link portions of this program with the OpenSSL library. 7# See LICENSE for more details. 8# 9 10"""RPCServer Module""" 11from __future__ import unicode_literals 12 13import logging 14import os 15import stat 16import sys 17import traceback 18from collections import namedtuple 19from types import FunctionType 20 21from OpenSSL import crypto 22from twisted.internet import defer, reactor 23from twisted.internet.protocol import Factory, connectionDone 24 25import deluge.component as component 26import deluge.configmanager 27from deluge.core.authmanager import ( 28 AUTH_LEVEL_ADMIN, 29 AUTH_LEVEL_DEFAULT, 30 AUTH_LEVEL_NONE, 31) 32from deluge.crypto_utils import get_context_factory 33from deluge.error import ( 34 DelugeError, 35 IncompatibleClient, 36 NotAuthorizedError, 37 WrappedException, 38 _ClientSideRecreateError, 39) 40from deluge.event import ClientDisconnectedEvent 41from deluge.transfer import DelugeTransferProtocol 42 43RPC_RESPONSE = 1 44RPC_ERROR = 2 45RPC_EVENT = 3 46 47log = logging.getLogger(__name__) 48 49 50def export(auth_level=AUTH_LEVEL_DEFAULT): 51 """ 52 Decorator function to register an object's method as an RPC. The object 53 will need to be registered with an :class:`RPCServer` to be effective. 54 55 :param func: the function to export 56 :type func: function 57 :param auth_level: the auth level required to call this method 58 :type auth_level: int 59 60 """ 61 62 def wrap(func, *args, **kwargs): 63 func._rpcserver_export = True 64 func._rpcserver_auth_level = auth_level 65 66 rpc_text = '**RPC exported method** (*Auth level: %s*)' % auth_level 67 68 # Append the RPC text while ensuring correct docstring formatting. 69 if func.__doc__: 70 if func.__doc__.endswith(' '): 71 indent = func.__doc__.split('\n')[-1] 72 func.__doc__ += '\n{}'.format(indent) 73 else: 74 func.__doc__ += '\n\n' 75 func.__doc__ += rpc_text 76 else: 77 func.__doc__ = rpc_text 78 79 return func 80 81 if isinstance(auth_level, FunctionType): 82 func = auth_level 83 auth_level = AUTH_LEVEL_DEFAULT 84 return wrap(func) 85 else: 86 return wrap 87 88 89def format_request(call): 90 """ 91 Format the RPCRequest message for debug printing 92 93 :param call: the request 94 :type call: a RPCRequest 95 96 :returns: a formatted string for printing 97 :rtype: str 98 99 """ 100 try: 101 s = call[1] + '(' 102 if call[2]: 103 s += ', '.join([str(x) for x in call[2]]) 104 if call[3]: 105 if call[2]: 106 s += ', ' 107 s += ', '.join([key + '=' + str(value) for key, value in call[3].items()]) 108 s += ')' 109 except UnicodeEncodeError: 110 return 'UnicodeEncodeError, call: %s' % call 111 else: 112 return s 113 114 115class DelugeRPCProtocol(DelugeTransferProtocol): 116 def __init__(self): 117 super(DelugeRPCProtocol, self).__init__() 118 # namedtuple subclass with auth_level, username for the connected session. 119 self.AuthLevel = namedtuple('SessionAuthlevel', 'auth_level, username') 120 121 def message_received(self, request): 122 """ 123 This method is called whenever a message is received from a client. The 124 only message that a client sends to the server is a RPC Request message. 125 If the RPC Request message is valid, then the method is called in 126 :meth:`dispatch`. 127 128 :param request: the request from the client. 129 :type data: tuple 130 131 """ 132 if not isinstance(request, tuple): 133 log.debug('Received invalid message: type is not tuple') 134 return 135 136 if len(request) < 1: 137 log.debug('Received invalid message: there are no items') 138 return 139 140 for call in request: 141 if len(call) != 4: 142 log.debug( 143 'Received invalid rpc request: number of items ' 'in request is %s', 144 len(call), 145 ) 146 continue 147 # log.debug('RPCRequest: %s', format_request(call)) 148 reactor.callLater(0, self.dispatch, *call) 149 150 def sendData(self, data): # NOQA: N802 151 """ 152 Sends the data to the client. 153 154 :param data: the object that is to be sent to the client. This should 155 be one of the RPC message types. 156 :type data: object 157 158 """ 159 try: 160 self.transfer_message(data) 161 except Exception as ex: 162 log.warning('Error occurred when sending message: %s.', ex) 163 log.exception(ex) 164 raise 165 166 def connectionMade(self): # NOQA: N802 167 """ 168 This method is called when a new client connects. 169 """ 170 peer = self.transport.getPeer() 171 log.info('Deluge Client connection made from: %s:%s', peer.host, peer.port) 172 # Set the initial auth level of this session to AUTH_LEVEL_NONE 173 self.factory.authorized_sessions[self.transport.sessionno] = self.AuthLevel( 174 AUTH_LEVEL_NONE, '' 175 ) 176 177 def connectionLost(self, reason=connectionDone): # NOQA: N802 178 """ 179 This method is called when the client is disconnected. 180 181 :param reason: the reason the client disconnected. 182 :type reason: str 183 184 """ 185 186 # We need to remove this session from various dicts 187 del self.factory.authorized_sessions[self.transport.sessionno] 188 if self.transport.sessionno in self.factory.session_protocols: 189 del self.factory.session_protocols[self.transport.sessionno] 190 if self.transport.sessionno in self.factory.interested_events: 191 del self.factory.interested_events[self.transport.sessionno] 192 193 if self.factory.state == 'running': 194 component.get('EventManager').emit( 195 ClientDisconnectedEvent(self.factory.session_id) 196 ) 197 log.info('Deluge client disconnected: %s', reason.value) 198 199 def valid_session(self): 200 return self.transport.sessionno in self.factory.authorized_sessions 201 202 def dispatch(self, request_id, method, args, kwargs): 203 """ 204 This method is run when a RPC Request is made. It will run the local method 205 and will send either a RPC Response or RPC Error back to the client. 206 207 :param request_id: the request_id from the client (sent in the RPC Request) 208 :type request_id: int 209 :param method: the local method to call. It must be registered with 210 the :class:`RPCServer`. 211 :type method: str 212 :param args: the arguments to pass to `method` 213 :type args: list 214 :param kwargs: the keyword-arguments to pass to `method` 215 :type kwargs: dict 216 217 """ 218 219 def send_error(): 220 """ 221 Sends an error response with the contents of the exception that was raised. 222 """ 223 exc_type, exc_value, dummy_exc_trace = sys.exc_info() 224 formated_tb = traceback.format_exc() 225 try: 226 self.sendData( 227 ( 228 RPC_ERROR, 229 request_id, 230 exc_type.__name__, 231 exc_value._args, 232 exc_value._kwargs, 233 formated_tb, 234 ) 235 ) 236 except AttributeError: 237 # This is not a deluge exception (object has no attribute '_args), let's wrap it 238 log.warning( 239 'An exception occurred while sending RPC_ERROR to ' 240 'client. Wrapping it and resending. Error to ' 241 'send(causing exception goes next):\n%s', 242 formated_tb, 243 ) 244 try: 245 raise WrappedException( 246 str(exc_value), exc_type.__name__, formated_tb 247 ) 248 except WrappedException: 249 send_error() 250 except Exception as ex: 251 log.error( 252 'An exception occurred while sending RPC_ERROR to client: %s', ex 253 ) 254 255 if method == 'daemon.info': 256 # This is a special case and used in the initial connection process 257 self.sendData((RPC_RESPONSE, request_id, deluge.common.get_version())) 258 return 259 elif method == 'daemon.login': 260 # This is a special case and used in the initial connection process 261 # We need to authenticate the user here 262 log.debug('RPC dispatch daemon.login') 263 try: 264 client_version = kwargs.pop('client_version', None) 265 if client_version is None: 266 raise IncompatibleClient(deluge.common.get_version()) 267 ret = component.get('AuthManager').authorize(*args, **kwargs) 268 if ret: 269 self.factory.authorized_sessions[ 270 self.transport.sessionno 271 ] = self.AuthLevel(ret, args[0]) 272 self.factory.session_protocols[self.transport.sessionno] = self 273 except Exception as ex: 274 send_error() 275 if not isinstance(ex, _ClientSideRecreateError): 276 log.exception(ex) 277 else: 278 self.sendData((RPC_RESPONSE, request_id, (ret))) 279 if not ret: 280 self.transport.loseConnection() 281 return 282 283 # Anything below requires a valid session 284 if not self.valid_session(): 285 return 286 287 if method == 'daemon.set_event_interest': 288 log.debug('RPC dispatch daemon.set_event_interest') 289 # This special case is to allow clients to set which events they are 290 # interested in receiving. 291 # We are expecting a sequence from the client. 292 try: 293 if self.transport.sessionno not in self.factory.interested_events: 294 self.factory.interested_events[self.transport.sessionno] = [] 295 self.factory.interested_events[self.transport.sessionno].extend(args[0]) 296 except Exception: 297 send_error() 298 else: 299 self.sendData((RPC_RESPONSE, request_id, (True))) 300 return 301 302 if method not in self.factory.methods: 303 try: 304 # Raise exception to be sent back to client 305 raise AttributeError('RPC call on invalid function: %s' % method) 306 except AttributeError: 307 send_error() 308 return 309 310 log.debug('RPC dispatch %s', method) 311 try: 312 method_auth_requirement = self.factory.methods[method]._rpcserver_auth_level 313 auth_level = self.factory.authorized_sessions[ 314 self.transport.sessionno 315 ].auth_level 316 if auth_level < method_auth_requirement: 317 # This session is not allowed to call this method 318 log.debug( 319 'Session %s is attempting an unauthorized method call!', 320 self.transport.sessionno, 321 ) 322 raise NotAuthorizedError(auth_level, method_auth_requirement) 323 # Set the session_id in the factory so that methods can know 324 # which session is calling it. 325 self.factory.session_id = self.transport.sessionno 326 ret = self.factory.methods[method](*args, **kwargs) 327 except Exception as ex: 328 send_error() 329 # Don't bother printing out DelugeErrors, because they are just 330 # for the client 331 if not isinstance(ex, DelugeError): 332 log.exception('Exception calling RPC request: %s', ex) 333 else: 334 # Check if the return value is a deferred, since we'll need to 335 # wait for it to fire before sending the RPC_RESPONSE 336 if isinstance(ret, defer.Deferred): 337 338 def on_success(result): 339 try: 340 self.sendData((RPC_RESPONSE, request_id, result)) 341 except Exception: 342 send_error() 343 return result 344 345 def on_fail(failure): 346 try: 347 failure.raiseException() 348 except Exception: 349 send_error() 350 return failure 351 352 ret.addCallbacks(on_success, on_fail) 353 else: 354 self.sendData((RPC_RESPONSE, request_id, ret)) 355 356 357class RPCServer(component.Component): 358 """ 359 This class is used to handle rpc requests from the client. Objects are 360 registered with this class and their methods are exported using the export 361 decorator. 362 363 :param port: the port the RPCServer will listen on 364 :type port: int 365 :param interface: the interface to listen on, this may override the `allow_remote` setting 366 :type interface: str 367 :param allow_remote: set True if the server should allow remote connections 368 :type allow_remote: bool 369 :param listen: if False, will not start listening.. This is only useful in Classic Mode 370 :type listen: bool 371 """ 372 373 def __init__(self, port=58846, interface='', allow_remote=False, listen=True): 374 component.Component.__init__(self, 'RPCServer') 375 376 self.factory = Factory() 377 self.factory.protocol = DelugeRPCProtocol 378 self.factory.session_id = -1 379 self.factory.state = 'running' 380 381 # Holds the registered methods 382 self.factory.methods = {} 383 # Holds the session_ids and auth levels 384 self.factory.authorized_sessions = {} 385 # Holds the protocol objects with the session_id as key 386 self.factory.session_protocols = {} 387 # Holds the interested event list for the sessions 388 self.factory.interested_events = {} 389 390 self.listen = listen 391 if not listen: 392 return 393 394 if allow_remote: 395 hostname = '' 396 else: 397 hostname = 'localhost' 398 399 if interface: 400 hostname = interface 401 402 log.info('Starting DelugeRPC server %s:%s', hostname, port) 403 404 # Check for SSL keys and generate some if needed 405 check_ssl_keys() 406 407 cert = os.path.join(deluge.configmanager.get_config_dir('ssl'), 'daemon.cert') 408 pkey = os.path.join(deluge.configmanager.get_config_dir('ssl'), 'daemon.pkey') 409 410 try: 411 reactor.listenSSL( 412 port, self.factory, get_context_factory(cert, pkey), interface=hostname 413 ) 414 except Exception as ex: 415 log.debug('Daemon already running or port not available.: %s', ex) 416 raise 417 418 def register_object(self, obj, name=None): 419 """ 420 Registers an object to export it's rpc methods. These methods should 421 be exported with the export decorator prior to registering the object. 422 423 :param obj: the object that we want to export 424 :type obj: object 425 :param name: the name to use, if None, it will be the class name of the object 426 :type name: str 427 """ 428 if not name: 429 name = obj.__class__.__name__.lower() 430 431 for d in dir(obj): 432 if d[0] == '_': 433 continue 434 if getattr(getattr(obj, d), '_rpcserver_export', False): 435 log.debug('Registering method: %s', name + '.' + d) 436 self.factory.methods[name + '.' + d] = getattr(obj, d) 437 438 def deregister_object(self, obj): 439 """ 440 Deregisters an objects exported rpc methods. 441 442 :param obj: the object that was previously registered 443 444 """ 445 for key, value in self.factory.methods.items(): 446 if value.__self__ == obj: 447 del self.factory.methods[key] 448 449 def get_object_method(self, name): 450 """ 451 Returns a registered method. 452 453 :param name: the name of the method, usually in the form of 'object.method' 454 :type name: str 455 456 :returns: method 457 458 :raises KeyError: if `name` is not registered 459 460 """ 461 return self.factory.methods[name] 462 463 def get_method_list(self): 464 """ 465 Returns a list of the exported methods. 466 467 :returns: the exported methods 468 :rtype: list 469 """ 470 return list(self.factory.methods) 471 472 def get_session_id(self): 473 """ 474 Returns the session id of the current RPC. 475 476 :returns: the session id, this will be -1 if no connections have been made 477 :rtype: int 478 479 """ 480 return self.factory.session_id 481 482 def get_session_user(self): 483 """ 484 Returns the username calling the current RPC. 485 486 :returns: the username of the user calling the current RPC 487 :rtype: string 488 489 """ 490 if not self.listen: 491 return 'localclient' 492 session_id = self.get_session_id() 493 if session_id > -1 and session_id in self.factory.authorized_sessions: 494 return self.factory.authorized_sessions[session_id].username 495 else: 496 # No connections made yet 497 return '' 498 499 def get_session_auth_level(self): 500 """ 501 Returns the auth level of the user calling the current RPC. 502 503 :returns: the auth level 504 :rtype: int 505 """ 506 if not self.listen or not self.is_session_valid(self.get_session_id()): 507 return AUTH_LEVEL_ADMIN 508 return self.factory.authorized_sessions[self.get_session_id()].auth_level 509 510 def get_rpc_auth_level(self, rpc): 511 """ 512 Returns the auth level requirement for an exported rpc. 513 514 :returns: the auth level 515 :rtype: int 516 """ 517 return self.factory.methods[rpc]._rpcserver_auth_level 518 519 def is_session_valid(self, session_id): 520 """ 521 Checks if the session is still valid, eg, if the client is still connected. 522 523 :param session_id: the session id 524 :type session_id: int 525 526 :returns: True if the session is valid 527 :rtype: bool 528 529 """ 530 return session_id in self.factory.authorized_sessions 531 532 def emit_event(self, event): 533 """ 534 Emits the event to interested clients. 535 536 :param event: the event to emit 537 :type event: :class:`deluge.event.DelugeEvent` 538 """ 539 log.debug('intevents: %s', self.factory.interested_events) 540 # Find sessions interested in this event 541 for session_id, interest in self.factory.interested_events.items(): 542 if event.name in interest: 543 log.debug('Emit Event: %s %s', event.name, event.args) 544 # This session is interested so send a RPC_EVENT 545 self.factory.session_protocols[session_id].sendData( 546 (RPC_EVENT, event.name, event.args) 547 ) 548 549 def emit_event_for_session_id(self, session_id, event): 550 """ 551 Emits the event to specified session_id. 552 553 :param session_id: the event to emit 554 :type session_id: int 555 :param event: the event to emit 556 :type event: :class:`deluge.event.DelugeEvent` 557 """ 558 if not self.is_session_valid(session_id): 559 log.debug( 560 'Session ID %s is not valid. Not sending event "%s".', 561 session_id, 562 event.name, 563 ) 564 return 565 if session_id not in self.factory.interested_events: 566 log.debug( 567 'Session ID %s is not interested in any events. Not sending event "%s".', 568 session_id, 569 event.name, 570 ) 571 return 572 if event.name not in self.factory.interested_events[session_id]: 573 log.debug( 574 'Session ID %s is not interested in event "%s". Not sending it.', 575 session_id, 576 event.name, 577 ) 578 return 579 log.debug( 580 'Sending event "%s" with args "%s" to session id "%s".', 581 event.name, 582 event.args, 583 session_id, 584 ) 585 self.factory.session_protocols[session_id].sendData( 586 (RPC_EVENT, event.name, event.args) 587 ) 588 589 def stop(self): 590 self.factory.state = 'stopping' 591 592 593def check_ssl_keys(): 594 """ 595 Check for SSL cert/key and create them if necessary 596 """ 597 ssl_dir = deluge.configmanager.get_config_dir('ssl') 598 if not os.path.exists(ssl_dir): 599 # The ssl folder doesn't exist so we need to create it 600 os.makedirs(ssl_dir) 601 generate_ssl_keys() 602 else: 603 for f in ('daemon.pkey', 'daemon.cert'): 604 if not os.path.exists(os.path.join(ssl_dir, f)): 605 generate_ssl_keys() 606 break 607 608 609def generate_ssl_keys(): 610 """ 611 This method generates a new SSL key/cert. 612 """ 613 from deluge.common import PY2 614 615 digest = 'sha256' if not PY2 else b'sha256' 616 617 # Generate key pair 618 pkey = crypto.PKey() 619 pkey.generate_key(crypto.TYPE_RSA, 2048) 620 621 # Generate cert request 622 req = crypto.X509Req() 623 subj = req.get_subject() 624 setattr(subj, 'CN', 'Deluge Daemon') 625 req.set_pubkey(pkey) 626 req.sign(pkey, digest) 627 628 # Generate certificate 629 cert = crypto.X509() 630 cert.set_serial_number(0) 631 cert.gmtime_adj_notBefore(0) 632 cert.gmtime_adj_notAfter(60 * 60 * 24 * 365 * 3) # Three Years 633 cert.set_issuer(req.get_subject()) 634 cert.set_subject(req.get_subject()) 635 cert.set_pubkey(req.get_pubkey()) 636 cert.sign(pkey, digest) 637 638 # Write out files 639 ssl_dir = deluge.configmanager.get_config_dir('ssl') 640 with open(os.path.join(ssl_dir, 'daemon.pkey'), 'wb') as _file: 641 _file.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)) 642 with open(os.path.join(ssl_dir, 'daemon.cert'), 'wb') as _file: 643 _file.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert)) 644 # Make the files only readable by this user 645 for f in ('daemon.pkey', 'daemon.cert'): 646 os.chmod(os.path.join(ssl_dir, f), stat.S_IREAD | stat.S_IWRITE) 647