1# -*- coding: utf-8 -*- 2# 3# Copyright (C) 2007-2009 Andrew Resch <andrewresch@gmail.com> 4# 5# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with 6# the additional special exception to link portions of this program with the OpenSSL library. 7# See LICENSE for more details. 8# 9 10"""TorrentManager handles Torrent objects""" 11from __future__ import unicode_literals 12 13import datetime 14import logging 15import operator 16import os 17import time 18from collections import namedtuple 19from tempfile import gettempdir 20 21import six.moves.cPickle as pickle # noqa: N813 22from twisted.internet import defer, error, reactor, threads 23from twisted.internet.defer import Deferred, DeferredList 24from twisted.internet.task import LoopingCall 25 26import deluge.component as component 27from deluge._libtorrent import lt 28from deluge.common import PY2, archive_files, decode_bytes, get_magnet_info, is_magnet 29from deluge.configmanager import ConfigManager, get_config_dir 30from deluge.core.authmanager import AUTH_LEVEL_ADMIN 31from deluge.core.torrent import Torrent, TorrentOptions, sanitize_filepath 32from deluge.error import AddTorrentError, InvalidTorrentError 33from deluge.event import ( 34 ExternalIPEvent, 35 PreTorrentRemovedEvent, 36 SessionStartedEvent, 37 TorrentAddedEvent, 38 TorrentFileCompletedEvent, 39 TorrentFileRenamedEvent, 40 TorrentFinishedEvent, 41 TorrentRemovedEvent, 42 TorrentResumedEvent, 43) 44 45log = logging.getLogger(__name__) 46 47LT_DEFAULT_ADD_TORRENT_FLAGS = ( 48 lt.add_torrent_params_flags_t.flag_paused 49 | lt.add_torrent_params_flags_t.flag_auto_managed 50 | lt.add_torrent_params_flags_t.flag_update_subscribe 51 | lt.add_torrent_params_flags_t.flag_apply_ip_filter 52) 53 54 55class TorrentState: # pylint: disable=old-style-class 56 """Create a torrent state. 57 58 Note: 59 This must be old style class to avoid breaking torrent.state file. 60 61 """ 62 63 def __init__( 64 self, 65 torrent_id=None, 66 filename=None, 67 trackers=None, 68 storage_mode='sparse', 69 paused=False, 70 save_path=None, 71 max_connections=-1, 72 max_upload_slots=-1, 73 max_upload_speed=-1.0, 74 max_download_speed=-1.0, 75 prioritize_first_last=False, 76 sequential_download=False, 77 file_priorities=None, 78 queue=None, 79 auto_managed=True, 80 is_finished=False, 81 stop_ratio=2.00, 82 stop_at_ratio=False, 83 remove_at_ratio=False, 84 move_completed=False, 85 move_completed_path=None, 86 magnet=None, 87 owner=None, 88 shared=False, 89 super_seeding=False, 90 name=None, 91 ): 92 # Build the class atrribute list from args 93 for key, value in locals().items(): 94 if key == 'self': 95 continue 96 setattr(self, key, value) 97 98 def __eq__(self, other): 99 return isinstance(other, TorrentState) and self.__dict__ == other.__dict__ 100 101 def __ne__(self, other): 102 return not self == other 103 104 105class TorrentManagerState: # pylint: disable=old-style-class 106 """TorrentManagerState holds a list of TorrentState objects. 107 108 Note: 109 This must be old style class to avoid breaking torrent.state file. 110 111 """ 112 113 def __init__(self): 114 self.torrents = [] 115 116 def __eq__(self, other): 117 return ( 118 isinstance(other, TorrentManagerState) and self.torrents == other.torrents 119 ) 120 121 def __ne__(self, other): 122 return not self == other 123 124 125class TorrentManager(component.Component): 126 """TorrentManager contains a list of torrents in the current libtorrent session. 127 128 This object is also responsible for saving the state of the session for use on restart. 129 130 """ 131 132 callLater = reactor.callLater # noqa: N815 133 134 def __init__(self): 135 component.Component.__init__( 136 self, 137 'TorrentManager', 138 interval=5, 139 depend=['CorePluginManager', 'AlertManager'], 140 ) 141 log.debug('TorrentManager init...') 142 # Set the libtorrent session 143 self.session = component.get('Core').session 144 # Set the alertmanager 145 self.alerts = component.get('AlertManager') 146 # Get the core config 147 self.config = ConfigManager('core.conf') 148 149 # Make sure the state folder has been created 150 self.state_dir = os.path.join(get_config_dir(), 'state') 151 if not os.path.exists(self.state_dir): 152 os.makedirs(self.state_dir) 153 self.temp_file = os.path.join(self.state_dir, '.safe_state_check') 154 155 # Create the torrents dict { torrent_id: Torrent } 156 self.torrents = {} 157 self.queued_torrents = set() 158 self.is_saving_state = False 159 self.save_resume_data_file_lock = defer.DeferredLock() 160 self.torrents_loading = {} 161 self.prefetching_metadata = {} 162 163 # This is a map of torrent_ids to Deferreds used to track needed resume data. 164 # The Deferreds will be completed when resume data has been saved. 165 self.waiting_on_resume_data = {} 166 167 # Keep track of torrents finished but moving storage 168 self.waiting_on_finish_moving = [] 169 170 # Keeps track of resume data 171 self.resume_data = {} 172 173 self.torrents_status_requests = [] 174 self.status_dict = {} 175 self.last_state_update_alert_ts = 0 176 177 # Keep the previous saved state 178 self.prev_saved_state = None 179 180 # Register set functions 181 set_config_keys = [ 182 'max_connections_per_torrent', 183 'max_upload_slots_per_torrent', 184 'max_upload_speed_per_torrent', 185 'max_download_speed_per_torrent', 186 ] 187 188 for config_key in set_config_keys: 189 on_set_func = getattr(self, ''.join(['on_set_', config_key])) 190 self.config.register_set_function(config_key, on_set_func) 191 192 # Register alert functions 193 alert_handles = [ 194 'external_ip_alert', 195 'performance_alert', 196 'add_torrent_alert', 197 'metadata_received_alert', 198 'torrent_finished_alert', 199 'torrent_paused_alert', 200 'torrent_checked_alert', 201 'torrent_resumed_alert', 202 'tracker_reply_alert', 203 'tracker_announce_alert', 204 'tracker_warning_alert', 205 'tracker_error_alert', 206 'file_renamed_alert', 207 'file_error_alert', 208 'file_completed_alert', 209 'storage_moved_alert', 210 'storage_moved_failed_alert', 211 'state_update_alert', 212 'state_changed_alert', 213 'save_resume_data_alert', 214 'save_resume_data_failed_alert', 215 'fastresume_rejected_alert', 216 ] 217 218 for alert_handle in alert_handles: 219 on_alert_func = getattr( 220 self, ''.join(['on_alert_', alert_handle.replace('_alert', '')]) 221 ) 222 self.alerts.register_handler(alert_handle, on_alert_func) 223 224 # Define timers 225 self.save_state_timer = LoopingCall(self.save_state) 226 self.save_resume_data_timer = LoopingCall(self.save_resume_data) 227 self.prev_status_cleanup_loop = LoopingCall(self.cleanup_torrents_prev_status) 228 229 def start(self): 230 # Check for old temp file to verify safe shutdown 231 if os.path.isfile(self.temp_file): 232 self.archive_state('Bad shutdown detected so archiving state files') 233 os.remove(self.temp_file) 234 235 with open(self.temp_file, 'a'): 236 os.utime(self.temp_file, None) 237 238 # Try to load the state from file 239 self.load_state() 240 241 # Save the state periodically 242 self.save_state_timer.start(200, False) 243 self.save_resume_data_timer.start(190, False) 244 self.prev_status_cleanup_loop.start(10) 245 246 @defer.inlineCallbacks 247 def stop(self): 248 # Stop timers 249 if self.save_state_timer.running: 250 self.save_state_timer.stop() 251 252 if self.save_resume_data_timer.running: 253 self.save_resume_data_timer.stop() 254 255 if self.prev_status_cleanup_loop.running: 256 self.prev_status_cleanup_loop.stop() 257 258 # Save state on shutdown 259 yield self.save_state() 260 261 self.session.pause() 262 263 result = yield self.save_resume_data(flush_disk_cache=True) 264 # Remove the temp_file to signify successfully saved state 265 if result and os.path.isfile(self.temp_file): 266 os.remove(self.temp_file) 267 268 def update(self): 269 for torrent_id, torrent in self.torrents.items(): 270 # XXX: Should the state check be those that _can_ be stopped at ratio 271 if torrent.options['stop_at_ratio'] and torrent.state not in ( 272 'Checking', 273 'Allocating', 274 'Paused', 275 'Queued', 276 ): 277 # If the global setting is set, but the per-torrent isn't... 278 # Just skip to the next torrent. 279 # This is so that a user can turn-off the stop at ratio option on a per-torrent basis 280 if not torrent.options['stop_at_ratio']: 281 continue 282 if ( 283 torrent.get_ratio() >= torrent.options['stop_ratio'] 284 and torrent.is_finished 285 ): 286 if torrent.options['remove_at_ratio']: 287 self.remove(torrent_id) 288 break 289 if not torrent.handle.status().paused: 290 torrent.pause() 291 292 def __getitem__(self, torrent_id): 293 """Return the Torrent with torrent_id. 294 295 Args: 296 torrent_id (str): The torrent_id. 297 298 Returns: 299 Torrent: A torrent object. 300 301 """ 302 return self.torrents[torrent_id] 303 304 def get_torrent_list(self): 305 """Creates a list of torrent_ids, owned by current user and any marked shared. 306 307 Returns: 308 list: A list of torrent_ids. 309 310 """ 311 torrent_ids = list(self.torrents) 312 if component.get('RPCServer').get_session_auth_level() == AUTH_LEVEL_ADMIN: 313 return torrent_ids 314 315 current_user = component.get('RPCServer').get_session_user() 316 for torrent_id in torrent_ids[:]: 317 torrent_status = self.torrents[torrent_id].get_status(['owner', 'shared']) 318 if torrent_status['owner'] != current_user and not torrent_status['shared']: 319 torrent_ids.pop(torrent_ids.index(torrent_id)) 320 return torrent_ids 321 322 def get_torrent_info_from_file(self, filepath): 323 """Retrieves torrent_info from the file specified. 324 325 Args: 326 filepath (str): The filepath to extract torrent info from. 327 328 Returns: 329 lt.torrent_info: A libtorrent torrent_info dict or None if invalid file or data. 330 331 """ 332 # Get the torrent data from the torrent file 333 if log.isEnabledFor(logging.DEBUG): 334 log.debug('Attempting to extract torrent_info from %s', filepath) 335 try: 336 torrent_info = lt.torrent_info(filepath) 337 except RuntimeError as ex: 338 log.warning('Unable to open torrent file %s: %s', filepath, ex) 339 else: 340 return torrent_info 341 342 def prefetch_metadata(self, magnet, timeout): 343 """Download the metadata for a magnet uri. 344 345 Args: 346 magnet (str): A magnet uri to download the metadata for. 347 timeout (int): Number of seconds to wait before cancelling. 348 349 Returns: 350 Deferred: A tuple of (torrent_id (str), metadata (dict)) 351 352 """ 353 354 torrent_id = get_magnet_info(magnet)['info_hash'] 355 if torrent_id in self.prefetching_metadata: 356 return self.prefetching_metadata[torrent_id].defer 357 358 add_torrent_params = {} 359 add_torrent_params['save_path'] = gettempdir() 360 add_torrent_params['url'] = magnet.strip().encode('utf8') 361 add_torrent_params['flags'] = ( 362 ( 363 LT_DEFAULT_ADD_TORRENT_FLAGS 364 | lt.add_torrent_params_flags_t.flag_duplicate_is_error 365 | lt.add_torrent_params_flags_t.flag_upload_mode 366 ) 367 ^ lt.add_torrent_params_flags_t.flag_auto_managed 368 ^ lt.add_torrent_params_flags_t.flag_paused 369 ) 370 371 torrent_handle = self.session.add_torrent(add_torrent_params) 372 373 d = Deferred() 374 # Cancel the defer if timeout reached. 375 defer_timeout = self.callLater(timeout, d.cancel) 376 d.addBoth(self.on_prefetch_metadata, torrent_id, defer_timeout) 377 Prefetch = namedtuple('Prefetch', 'defer handle') 378 self.prefetching_metadata[torrent_id] = Prefetch(defer=d, handle=torrent_handle) 379 return d 380 381 def on_prefetch_metadata(self, torrent_info, torrent_id, defer_timeout): 382 # Cancel reactor.callLater. 383 try: 384 defer_timeout.cancel() 385 except error.AlreadyCalled: 386 pass 387 388 log.debug('remove prefetch magnet from session') 389 try: 390 torrent_handle = self.prefetching_metadata.pop(torrent_id).handle 391 except KeyError: 392 pass 393 else: 394 self.session.remove_torrent(torrent_handle, 1) 395 396 metadata = None 397 if isinstance(torrent_info, lt.torrent_info): 398 log.debug('prefetch metadata received') 399 metadata = lt.bdecode(torrent_info.metadata()) 400 401 return torrent_id, metadata 402 403 def _build_torrent_options(self, options): 404 """Load default options and update if needed.""" 405 _options = TorrentOptions() 406 if options: 407 _options.update(options) 408 options = _options 409 410 if not options['owner']: 411 options['owner'] = component.get('RPCServer').get_session_user() 412 if not component.get('AuthManager').has_account(options['owner']): 413 options['owner'] = 'localclient' 414 415 return options 416 417 def _build_torrent_params( 418 self, torrent_info=None, magnet=None, options=None, resume_data=None 419 ): 420 """Create the add_torrent_params dict for adding torrent to libtorrent.""" 421 add_torrent_params = {} 422 if torrent_info: 423 add_torrent_params['ti'] = torrent_info 424 name = torrent_info.name() 425 if not name: 426 name = ( 427 torrent_info.file_at(0).path.replace('\\', '/', 1).split('/', 1)[0] 428 ) 429 add_torrent_params['name'] = name 430 torrent_id = str(torrent_info.info_hash()) 431 elif magnet: 432 magnet_info = get_magnet_info(magnet) 433 if magnet_info: 434 add_torrent_params['url'] = magnet.strip().encode('utf8') 435 add_torrent_params['name'] = magnet_info['name'] 436 torrent_id = magnet_info['info_hash'] 437 else: 438 raise AddTorrentError( 439 'Unable to add magnet, invalid magnet info: %s' % magnet 440 ) 441 442 # Check for existing torrent in session. 443 if torrent_id in self.get_torrent_list(): 444 # Attempt merge trackers before returning. 445 self.torrents[torrent_id].merge_trackers(torrent_info) 446 raise AddTorrentError('Torrent already in session (%s).' % torrent_id) 447 elif torrent_id in self.torrents_loading: 448 raise AddTorrentError('Torrent already being added (%s).' % torrent_id) 449 elif torrent_id in self.prefetching_metadata: 450 # Cancel and remove metadata fetching torrent. 451 self.prefetching_metadata[torrent_id].defer.cancel() 452 453 # Check for renamed files and if so, rename them in the torrent_info before adding. 454 if options['mapped_files'] and torrent_info: 455 for index, fname in options['mapped_files'].items(): 456 fname = sanitize_filepath(decode_bytes(fname)) 457 if log.isEnabledFor(logging.DEBUG): 458 log.debug('renaming file index %s to %s', index, fname) 459 try: 460 torrent_info.rename_file(index, fname.encode('utf8')) 461 except TypeError: 462 torrent_info.rename_file(index, fname) 463 add_torrent_params['ti'] = torrent_info 464 465 if log.isEnabledFor(logging.DEBUG): 466 log.debug('options: %s', options) 467 468 # Fill in the rest of the add_torrent_params dictionary. 469 add_torrent_params['save_path'] = options['download_location'].encode('utf8') 470 if options['name']: 471 add_torrent_params['name'] = options['name'] 472 if options['pre_allocate_storage']: 473 add_torrent_params['storage_mode'] = lt.storage_mode_t.storage_mode_allocate 474 if resume_data: 475 add_torrent_params['resume_data'] = resume_data 476 477 # Set flags: enable duplicate_is_error & override_resume_data, disable auto_managed. 478 add_torrent_params['flags'] = ( 479 LT_DEFAULT_ADD_TORRENT_FLAGS 480 | lt.add_torrent_params_flags_t.flag_duplicate_is_error 481 | lt.add_torrent_params_flags_t.flag_override_resume_data 482 ) ^ lt.add_torrent_params_flags_t.flag_auto_managed 483 if options['seed_mode']: 484 add_torrent_params['flags'] |= lt.add_torrent_params_flags_t.flag_seed_mode 485 if options['super_seeding']: 486 add_torrent_params[ 487 'flags' 488 ] |= lt.add_torrent_params_flags_t.flag_super_seeding 489 490 return torrent_id, add_torrent_params 491 492 def add( 493 self, 494 torrent_info=None, 495 state=None, 496 options=None, 497 save_state=True, 498 filedump=None, 499 filename=None, 500 magnet=None, 501 resume_data=None, 502 ): 503 """Adds a torrent to the torrent manager. 504 505 Args: 506 torrent_info (lt.torrent_info, optional): A libtorrent torrent_info object. 507 state (TorrentState, optional): The torrent state. 508 options (dict, optional): The options to apply to the torrent on adding. 509 save_state (bool, optional): If True save the session state after adding torrent, defaults to True. 510 filedump (str, optional): bencoded filedump of a torrent file. 511 filename (str, optional): The filename of the torrent file. 512 magnet (str, optional): The magnet uri. 513 resume_data (lt.entry, optional): libtorrent fast resume data. 514 515 Returns: 516 str: If successful the torrent_id of the added torrent, None if adding the torrent failed. 517 518 Emits: 519 TorrentAddedEvent: Torrent with torrent_id added to session. 520 521 """ 522 if not torrent_info and not filedump and not magnet: 523 raise AddTorrentError( 524 'You must specify a valid torrent_info, torrent state or magnet.' 525 ) 526 527 if filedump: 528 try: 529 torrent_info = lt.torrent_info(lt.bdecode(filedump)) 530 except RuntimeError as ex: 531 raise AddTorrentError( 532 'Unable to add torrent, decoding filedump failed: %s' % ex 533 ) 534 535 options = self._build_torrent_options(options) 536 __, add_torrent_params = self._build_torrent_params( 537 torrent_info, magnet, options, resume_data 538 ) 539 540 # We need to pause the AlertManager momentarily to prevent alerts 541 # for this torrent being generated before a Torrent object is created. 542 component.pause('AlertManager') 543 544 try: 545 handle = self.session.add_torrent(add_torrent_params) 546 if not handle.is_valid(): 547 raise InvalidTorrentError('Torrent handle is invalid!') 548 except (RuntimeError, InvalidTorrentError) as ex: 549 component.resume('AlertManager') 550 raise AddTorrentError('Unable to add torrent to session: %s' % ex) 551 552 torrent = self._add_torrent_obj( 553 handle, options, state, filename, magnet, resume_data, filedump, save_state 554 ) 555 return torrent.torrent_id 556 557 def add_async( 558 self, 559 torrent_info=None, 560 state=None, 561 options=None, 562 save_state=True, 563 filedump=None, 564 filename=None, 565 magnet=None, 566 resume_data=None, 567 ): 568 """Adds a torrent to the torrent manager using libtorrent async add torrent method. 569 570 Args: 571 torrent_info (lt.torrent_info, optional): A libtorrent torrent_info object. 572 state (TorrentState, optional): The torrent state. 573 options (dict, optional): The options to apply to the torrent on adding. 574 save_state (bool, optional): If True save the session state after adding torrent, defaults to True. 575 filedump (str, optional): bencoded filedump of a torrent file. 576 filename (str, optional): The filename of the torrent file. 577 magnet (str, optional): The magnet uri. 578 resume_data (lt.entry, optional): libtorrent fast resume data. 579 580 Returns: 581 Deferred: If successful the torrent_id of the added torrent, None if adding the torrent failed. 582 583 Emits: 584 TorrentAddedEvent: Torrent with torrent_id added to session. 585 586 """ 587 if not torrent_info and not filedump and not magnet: 588 raise AddTorrentError( 589 'You must specify a valid torrent_info, torrent state or magnet.' 590 ) 591 592 if filedump: 593 try: 594 torrent_info = lt.torrent_info(lt.bdecode(filedump)) 595 except RuntimeError as ex: 596 raise AddTorrentError( 597 'Unable to add torrent, decoding filedump failed: %s' % ex 598 ) 599 600 options = self._build_torrent_options(options) 601 torrent_id, add_torrent_params = self._build_torrent_params( 602 torrent_info, magnet, options, resume_data 603 ) 604 605 d = Deferred() 606 self.torrents_loading[torrent_id] = ( 607 d, 608 options, 609 state, 610 filename, 611 magnet, 612 resume_data, 613 filedump, 614 save_state, 615 ) 616 try: 617 self.session.async_add_torrent(add_torrent_params) 618 except RuntimeError as ex: 619 raise AddTorrentError('Unable to add torrent to session: %s' % ex) 620 return d 621 622 def _add_torrent_obj( 623 self, 624 handle, 625 options, 626 state, 627 filename, 628 magnet, 629 resume_data, 630 filedump, 631 save_state, 632 ): 633 # For magnets added with metadata, filename is used so set as magnet. 634 if not magnet and is_magnet(filename): 635 magnet = filename 636 filename = None 637 638 # Create a Torrent object and add to the dictionary. 639 torrent = Torrent(handle, options, state, filename, magnet) 640 self.torrents[torrent.torrent_id] = torrent 641 642 # Resume AlertManager if paused for adding torrent to libtorrent. 643 component.resume('AlertManager') 644 645 # Store the orignal resume_data, in case of errors. 646 if resume_data: 647 self.resume_data[torrent.torrent_id] = resume_data 648 649 # Add to queued torrents set. 650 self.queued_torrents.add(torrent.torrent_id) 651 if self.config['queue_new_to_top']: 652 self.queue_top(torrent.torrent_id) 653 654 # Resume the torrent if needed. 655 if not options['add_paused']: 656 torrent.resume() 657 658 # Emit torrent_added signal. 659 from_state = state is not None 660 component.get('EventManager').emit( 661 TorrentAddedEvent(torrent.torrent_id, from_state) 662 ) 663 664 if log.isEnabledFor(logging.DEBUG): 665 log.debug('Torrent added: %s', str(handle.info_hash())) 666 if log.isEnabledFor(logging.INFO): 667 name_and_owner = torrent.get_status(['name', 'owner']) 668 log.info( 669 'Torrent %s from user "%s" %s', 670 name_and_owner['name'], 671 name_and_owner['owner'], 672 from_state and 'loaded' or 'added', 673 ) 674 675 # Write the .torrent file to the state directory. 676 if filedump: 677 torrent.write_torrentfile(filedump) 678 679 # Save the session state. 680 if save_state: 681 self.save_state() 682 683 return torrent 684 685 def add_async_callback( 686 self, 687 handle, 688 d, 689 options, 690 state, 691 filename, 692 magnet, 693 resume_data, 694 filedump, 695 save_state, 696 ): 697 torrent = self._add_torrent_obj( 698 handle, options, state, filename, magnet, resume_data, filedump, save_state 699 ) 700 701 d.callback(torrent.torrent_id) 702 703 def remove(self, torrent_id, remove_data=False, save_state=True): 704 """Remove a torrent from the session. 705 706 Args: 707 torrent_id (str): The torrent ID to remove. 708 remove_data (bool, optional): If True, remove the downloaded data, defaults to False. 709 save_state (bool, optional): If True, save the session state after removal, defaults to True. 710 711 Returns: 712 bool: True if removed successfully, False if not. 713 714 Emits: 715 PreTorrentRemovedEvent: Torrent is about to be removed from session. 716 TorrentRemovedEvent: Torrent with torrent_id removed from session. 717 718 Raises: 719 InvalidTorrentError: If the torrent_id is not in the session. 720 721 """ 722 try: 723 torrent = self.torrents[torrent_id] 724 except KeyError: 725 raise InvalidTorrentError('torrent_id %s not in session.' % torrent_id) 726 727 torrent_name = torrent.get_status(['name'])['name'] 728 729 # Emit the signal to the clients 730 component.get('EventManager').emit(PreTorrentRemovedEvent(torrent_id)) 731 732 try: 733 self.session.remove_torrent(torrent.handle, 1 if remove_data else 0) 734 except RuntimeError as ex: 735 log.warning('Error removing torrent: %s', ex) 736 return False 737 738 # Remove fastresume data if it is exists 739 self.resume_data.pop(torrent_id, None) 740 741 # Remove the .torrent file in the state and copy location, if user requested. 742 delete_copies = ( 743 self.config['copy_torrent_file'] and self.config['del_copy_torrent_file'] 744 ) 745 torrent.delete_torrentfile(delete_copies) 746 747 # Remove from set if it wasn't finished 748 if not torrent.is_finished: 749 try: 750 self.queued_torrents.remove(torrent_id) 751 except KeyError: 752 log.debug('%s is not in queued torrents set.', torrent_id) 753 raise InvalidTorrentError( 754 '%s is not in queued torrents set.' % torrent_id 755 ) 756 757 # Remove the torrent from deluge's session 758 del self.torrents[torrent_id] 759 760 if save_state: 761 self.save_state() 762 763 # Emit the signal to the clients 764 component.get('EventManager').emit(TorrentRemovedEvent(torrent_id)) 765 log.info( 766 'Torrent %s removed by user: %s', 767 torrent_name, 768 component.get('RPCServer').get_session_user(), 769 ) 770 return True 771 772 def fixup_state(self, state): 773 """Fixup an old state by adding missing TorrentState options and assigning default values. 774 775 Args: 776 state (TorrentManagerState): A torrentmanager state containing torrent details. 777 778 Returns: 779 TorrentManagerState: A fixedup TorrentManager state. 780 781 """ 782 if state.torrents: 783 t_state_tmp = TorrentState() 784 if dir(state.torrents[0]) != dir(t_state_tmp): 785 self.archive_state('Migration of TorrentState required.') 786 try: 787 for attr in set(dir(t_state_tmp)) - set(dir(state.torrents[0])): 788 for t_state in state.torrents: 789 setattr(t_state, attr, getattr(t_state_tmp, attr, None)) 790 except AttributeError as ex: 791 log.error( 792 'Unable to update state file to a compatible version: %s', ex 793 ) 794 return state 795 796 def open_state(self): 797 """Open the torrents.state file containing a TorrentManager state with session torrents. 798 799 Returns: 800 TorrentManagerState: The TorrentManager state. 801 802 """ 803 torrents_state = os.path.join(self.state_dir, 'torrents.state') 804 state = None 805 for filepath in (torrents_state, torrents_state + '.bak'): 806 log.info('Loading torrent state: %s', filepath) 807 if not os.path.isfile(filepath): 808 continue 809 810 try: 811 with open(filepath, 'rb') as _file: 812 if PY2: 813 state = pickle.load(_file) 814 else: 815 state = pickle.load(_file, encoding='utf8') 816 except (IOError, EOFError, pickle.UnpicklingError) as ex: 817 message = 'Unable to load {}: {}'.format(filepath, ex) 818 log.error(message) 819 if not filepath.endswith('.bak'): 820 self.archive_state(message) 821 else: 822 log.info('Successfully loaded %s', filepath) 823 break 824 825 return state if state else TorrentManagerState() 826 827 def load_state(self): 828 """Load all the torrents from TorrentManager state into session. 829 830 Emits: 831 SessionStartedEvent: Emitted after all torrents are added to the session. 832 833 """ 834 start = datetime.datetime.now() 835 state = self.open_state() 836 state = self.fixup_state(state) 837 838 # Reorder the state.torrents list to add torrents in the correct queue order. 839 state.torrents.sort( 840 key=operator.attrgetter('queue'), reverse=self.config['queue_new_to_top'] 841 ) 842 resume_data = self.load_resume_data_file() 843 844 deferreds = [] 845 for t_state in state.torrents: 846 # Populate the options dict from state 847 options = TorrentOptions() 848 for option in options: 849 try: 850 options[option] = getattr(t_state, option) 851 except AttributeError: 852 pass 853 # Manually update unmatched attributes 854 options['download_location'] = t_state.save_path 855 options['pre_allocate_storage'] = t_state.storage_mode == 'allocate' 856 options['prioritize_first_last_pieces'] = t_state.prioritize_first_last 857 options['add_paused'] = t_state.paused 858 859 magnet = t_state.magnet 860 torrent_info = self.get_torrent_info_from_file( 861 os.path.join(self.state_dir, t_state.torrent_id + '.torrent') 862 ) 863 864 try: 865 d = self.add_async( 866 torrent_info=torrent_info, 867 state=t_state, 868 options=options, 869 save_state=False, 870 magnet=magnet, 871 resume_data=resume_data.get(t_state.torrent_id), 872 ) 873 except AddTorrentError as ex: 874 log.warning( 875 'Error when adding torrent "%s" to session: %s', 876 t_state.torrent_id, 877 ex, 878 ) 879 else: 880 deferreds.append(d) 881 882 deferred_list = DeferredList(deferreds, consumeErrors=False) 883 884 def on_complete(result): 885 log.info( 886 'Finished loading %d torrents in %s', 887 len(state.torrents), 888 str(datetime.datetime.now() - start), 889 ) 890 component.get('EventManager').emit(SessionStartedEvent()) 891 892 deferred_list.addCallback(on_complete) 893 894 def create_state(self): 895 """Create a state of all the torrents in TorrentManager. 896 897 Returns: 898 TorrentManagerState: The TorrentManager state. 899 900 """ 901 state = TorrentManagerState() 902 # Create the state for each Torrent and append to the list 903 for torrent in self.torrents.values(): 904 if self.session.is_paused(): 905 paused = torrent.handle.is_paused() 906 elif torrent.forced_error: 907 paused = torrent.forced_error.was_paused 908 elif torrent.state == 'Paused': 909 paused = True 910 else: 911 paused = False 912 913 torrent_state = TorrentState( 914 torrent.torrent_id, 915 torrent.filename, 916 torrent.trackers, 917 torrent.get_status(['storage_mode'])['storage_mode'], 918 paused, 919 torrent.options['download_location'], 920 torrent.options['max_connections'], 921 torrent.options['max_upload_slots'], 922 torrent.options['max_upload_speed'], 923 torrent.options['max_download_speed'], 924 torrent.options['prioritize_first_last_pieces'], 925 torrent.options['sequential_download'], 926 torrent.options['file_priorities'], 927 torrent.get_queue_position(), 928 torrent.options['auto_managed'], 929 torrent.is_finished, 930 torrent.options['stop_ratio'], 931 torrent.options['stop_at_ratio'], 932 torrent.options['remove_at_ratio'], 933 torrent.options['move_completed'], 934 torrent.options['move_completed_path'], 935 torrent.magnet, 936 torrent.options['owner'], 937 torrent.options['shared'], 938 torrent.options['super_seeding'], 939 torrent.options['name'], 940 ) 941 state.torrents.append(torrent_state) 942 return state 943 944 def save_state(self): 945 """Run the save state task in a separate thread to avoid blocking main thread. 946 947 Note: 948 If a save task is already running, this call is ignored. 949 950 """ 951 if self.is_saving_state: 952 return defer.succeed(None) 953 self.is_saving_state = True 954 d = threads.deferToThread(self._save_state) 955 956 def on_state_saved(arg): 957 self.is_saving_state = False 958 if self.save_state_timer.running: 959 self.save_state_timer.reset() 960 961 d.addBoth(on_state_saved) 962 return d 963 964 def _save_state(self): 965 """Save the state of the TorrentManager to the torrents.state file.""" 966 state = self.create_state() 967 968 # If the state hasn't changed, no need to save it 969 if self.prev_saved_state == state: 970 return 971 972 filename = 'torrents.state' 973 filepath = os.path.join(self.state_dir, filename) 974 filepath_bak = filepath + '.bak' 975 filepath_tmp = filepath + '.tmp' 976 977 try: 978 log.debug('Creating the temporary file: %s', filepath_tmp) 979 with open(filepath_tmp, 'wb', 0) as _file: 980 pickle.dump(state, _file, protocol=2) 981 _file.flush() 982 os.fsync(_file.fileno()) 983 except (OSError, pickle.PicklingError) as ex: 984 log.error('Unable to save %s: %s', filename, ex) 985 return 986 987 try: 988 log.debug('Creating backup of %s at: %s', filename, filepath_bak) 989 if os.path.isfile(filepath_bak): 990 os.remove(filepath_bak) 991 if os.path.isfile(filepath): 992 os.rename(filepath, filepath_bak) 993 except OSError as ex: 994 log.error('Unable to backup %s to %s: %s', filepath, filepath_bak, ex) 995 return 996 997 try: 998 log.debug('Saving %s to: %s', filename, filepath) 999 os.rename(filepath_tmp, filepath) 1000 self.prev_saved_state = state 1001 except OSError as ex: 1002 log.error('Failed to set new state file %s: %s', filepath, ex) 1003 if os.path.isfile(filepath_bak): 1004 log.info('Restoring backup of state from: %s', filepath_bak) 1005 os.rename(filepath_bak, filepath) 1006 1007 def save_resume_data(self, torrent_ids=None, flush_disk_cache=False): 1008 """Saves torrents resume data. 1009 1010 Args: 1011 torrent_ids (list of str): A list of torrents to save the resume data for, defaults 1012 to None which saves all torrents resume data. 1013 flush_disk_cache (bool, optional): If True flushes the disk cache which avoids potential 1014 issue with file timestamps, defaults to False. This is only needed when stopping the session. 1015 1016 Returns: 1017 t.i.d.DeferredList: A list of twisted Deferred callbacks to be invoked when save is complete. 1018 1019 """ 1020 if torrent_ids is None: 1021 torrent_ids = ( 1022 tid 1023 for tid, t in self.torrents.items() 1024 if t.handle.need_save_resume_data() 1025 ) 1026 1027 def on_torrent_resume_save(dummy_result, torrent_id): 1028 """Recieved torrent resume_data alert so remove from waiting list""" 1029 self.waiting_on_resume_data.pop(torrent_id, None) 1030 1031 deferreds = [] 1032 for torrent_id in torrent_ids: 1033 d = self.waiting_on_resume_data.get(torrent_id) 1034 if not d: 1035 d = Deferred().addBoth(on_torrent_resume_save, torrent_id) 1036 self.waiting_on_resume_data[torrent_id] = d 1037 deferreds.append(d) 1038 self.torrents[torrent_id].save_resume_data(flush_disk_cache) 1039 1040 def on_all_resume_data_finished(dummy_result): 1041 """Saves resume data file when no more torrents waiting for resume data. 1042 1043 Returns: 1044 bool: True if fastresume file is saved. 1045 1046 This return value determines removal of `self.temp_file` in `self.stop()`. 1047 1048 """ 1049 # Use flush_disk_cache as a marker for shutdown so fastresume is 1050 # saved even if torrents are waiting. 1051 if not self.waiting_on_resume_data or flush_disk_cache: 1052 return self.save_resume_data_file(queue_task=flush_disk_cache) 1053 1054 return DeferredList(deferreds).addBoth(on_all_resume_data_finished) 1055 1056 def load_resume_data_file(self): 1057 """Load the resume data from file for all torrents. 1058 1059 Returns: 1060 dict: A dict of torrents and their resume_data. 1061 1062 """ 1063 filename = 'torrents.fastresume' 1064 filepath = os.path.join(self.state_dir, filename) 1065 filepath_bak = filepath + '.bak' 1066 old_data_filepath = os.path.join(get_config_dir(), filename) 1067 1068 for _filepath in (filepath, filepath_bak, old_data_filepath): 1069 log.info('Opening %s for load: %s', filename, _filepath) 1070 try: 1071 with open(_filepath, 'rb') as _file: 1072 resume_data = lt.bdecode(_file.read()) 1073 except (IOError, EOFError, RuntimeError) as ex: 1074 if self.torrents: 1075 log.warning('Unable to load %s: %s', _filepath, ex) 1076 resume_data = None 1077 else: 1078 # lt.bdecode returns the dict keys as bytes so decode them. 1079 resume_data = {k.decode(): v for k, v in resume_data.items()} 1080 log.info('Successfully loaded %s: %s', filename, _filepath) 1081 break 1082 1083 # If the libtorrent bdecode doesn't happen properly, it will return None 1084 # so we need to make sure we return a {} 1085 if resume_data is None: 1086 return {} 1087 else: 1088 return resume_data 1089 1090 def save_resume_data_file(self, queue_task=False): 1091 """Save resume data to file in a separate thread to avoid blocking main thread. 1092 1093 Args: 1094 queue_task (bool): If True and a save task is already running then queue 1095 this save task to run next. Default is to not queue save tasks. 1096 1097 Returns: 1098 Deferred: Fires with arg, True if save task was successful, False if 1099 not and None if task was not performed. 1100 1101 """ 1102 if not queue_task and self.save_resume_data_file_lock.locked: 1103 return defer.succeed(None) 1104 1105 def on_lock_aquired(): 1106 d = threads.deferToThread(self._save_resume_data_file) 1107 1108 def on_resume_data_file_saved(arg): 1109 if self.save_resume_data_timer.running: 1110 self.save_resume_data_timer.reset() 1111 return arg 1112 1113 d.addBoth(on_resume_data_file_saved) 1114 return d 1115 1116 return self.save_resume_data_file_lock.run(on_lock_aquired) 1117 1118 def _save_resume_data_file(self): 1119 """Saves the resume data file with the contents of self.resume_data""" 1120 if not self.resume_data: 1121 return True 1122 1123 filename = 'torrents.fastresume' 1124 filepath = os.path.join(self.state_dir, filename) 1125 filepath_bak = filepath + '.bak' 1126 filepath_tmp = filepath + '.tmp' 1127 1128 try: 1129 log.debug('Creating the temporary file: %s', filepath_tmp) 1130 with open(filepath_tmp, 'wb', 0) as _file: 1131 _file.write(lt.bencode(self.resume_data)) 1132 _file.flush() 1133 os.fsync(_file.fileno()) 1134 except (OSError, EOFError) as ex: 1135 log.error('Unable to save %s: %s', filename, ex) 1136 return False 1137 1138 try: 1139 log.debug('Creating backup of %s at: %s', filename, filepath_bak) 1140 if os.path.isfile(filepath_bak): 1141 os.remove(filepath_bak) 1142 if os.path.isfile(filepath): 1143 os.rename(filepath, filepath_bak) 1144 except OSError as ex: 1145 log.error('Unable to backup %s to %s: %s', filepath, filepath_bak, ex) 1146 return False 1147 1148 try: 1149 log.debug('Saving %s to: %s', filename, filepath) 1150 os.rename(filepath_tmp, filepath) 1151 except OSError as ex: 1152 log.error('Failed to set new file %s: %s', filepath, ex) 1153 if os.path.isfile(filepath_bak): 1154 log.info('Restoring backup from: %s', filepath_bak) 1155 os.rename(filepath_bak, filepath) 1156 else: 1157 # Sync the rename operations for the directory 1158 if hasattr(os, 'O_DIRECTORY'): 1159 dirfd = os.open(os.path.dirname(filepath), os.O_DIRECTORY) 1160 os.fsync(dirfd) 1161 os.close(dirfd) 1162 return True 1163 1164 def archive_state(self, message): 1165 log.warning(message) 1166 arc_filepaths = [] 1167 for filename in ('torrents.fastresume', 'torrents.state'): 1168 filepath = os.path.join(self.state_dir, filename) 1169 arc_filepaths.extend([filepath, filepath + '.bak']) 1170 1171 archive_files('state', arc_filepaths, message=message) 1172 1173 def get_queue_position(self, torrent_id): 1174 """Get queue position of torrent""" 1175 return self.torrents[torrent_id].get_queue_position() 1176 1177 def queue_top(self, torrent_id): 1178 """Queue torrent to top""" 1179 if self.torrents[torrent_id].get_queue_position() == 0: 1180 return False 1181 1182 self.torrents[torrent_id].handle.queue_position_top() 1183 return True 1184 1185 def queue_up(self, torrent_id): 1186 """Queue torrent up one position""" 1187 if self.torrents[torrent_id].get_queue_position() == 0: 1188 return False 1189 1190 self.torrents[torrent_id].handle.queue_position_up() 1191 return True 1192 1193 def queue_down(self, torrent_id): 1194 """Queue torrent down one position""" 1195 if self.torrents[torrent_id].get_queue_position() == ( 1196 len(self.queued_torrents) - 1 1197 ): 1198 return False 1199 1200 self.torrents[torrent_id].handle.queue_position_down() 1201 return True 1202 1203 def queue_bottom(self, torrent_id): 1204 """Queue torrent to bottom""" 1205 if self.torrents[torrent_id].get_queue_position() == ( 1206 len(self.queued_torrents) - 1 1207 ): 1208 return False 1209 1210 self.torrents[torrent_id].handle.queue_position_bottom() 1211 return True 1212 1213 def cleanup_torrents_prev_status(self): 1214 """Run cleanup_prev_status for each registered torrent""" 1215 for torrent in self.torrents.values(): 1216 torrent.cleanup_prev_status() 1217 1218 def on_set_max_connections_per_torrent(self, key, value): 1219 """Sets the per-torrent connection limit""" 1220 log.debug('max_connections_per_torrent set to %s...', value) 1221 for key in self.torrents: 1222 self.torrents[key].set_max_connections(value) 1223 1224 def on_set_max_upload_slots_per_torrent(self, key, value): 1225 """Sets the per-torrent upload slot limit""" 1226 log.debug('max_upload_slots_per_torrent set to %s...', value) 1227 for key in self.torrents: 1228 self.torrents[key].set_max_upload_slots(value) 1229 1230 def on_set_max_upload_speed_per_torrent(self, key, value): 1231 """Sets the per-torrent upload speed limit""" 1232 log.debug('max_upload_speed_per_torrent set to %s...', value) 1233 for key in self.torrents: 1234 self.torrents[key].set_max_upload_speed(value) 1235 1236 def on_set_max_download_speed_per_torrent(self, key, value): 1237 """Sets the per-torrent download speed limit""" 1238 log.debug('max_download_speed_per_torrent set to %s...', value) 1239 for key in self.torrents: 1240 self.torrents[key].set_max_download_speed(value) 1241 1242 # --- Alert handlers --- 1243 def on_alert_add_torrent(self, alert): 1244 """Alert handler for libtorrent add_torrent_alert""" 1245 if not alert.handle.is_valid(): 1246 log.warning('Torrent handle is invalid!') 1247 return 1248 1249 try: 1250 torrent_id = str(alert.handle.info_hash()) 1251 except RuntimeError as ex: 1252 log.warning('Failed to get torrent id from handle: %s', ex) 1253 return 1254 1255 try: 1256 add_async_params = self.torrents_loading.pop(torrent_id) 1257 except KeyError as ex: 1258 log.warning('Torrent id not in torrents loading list: %s', ex) 1259 return 1260 1261 self.add_async_callback(alert.handle, *add_async_params) 1262 1263 def on_alert_torrent_finished(self, alert): 1264 """Alert handler for libtorrent torrent_finished_alert""" 1265 try: 1266 torrent_id = str(alert.handle.info_hash()) 1267 torrent = self.torrents[torrent_id] 1268 except (RuntimeError, KeyError): 1269 return 1270 1271 # If total_download is 0, do not move, it's likely the torrent wasn't downloaded, but just added. 1272 # Get fresh data from libtorrent, the cache isn't always up to date 1273 total_download = torrent.get_status(['total_payload_download'], update=True)[ 1274 'total_payload_download' 1275 ] 1276 1277 if log.isEnabledFor(logging.DEBUG): 1278 log.debug('Finished %s ', torrent_id) 1279 log.debug( 1280 'Torrent settings: is_finished: %s, total_download: %s, move_completed: %s, move_path: %s', 1281 torrent.is_finished, 1282 total_download, 1283 torrent.options['move_completed'], 1284 torrent.options['move_completed_path'], 1285 ) 1286 1287 torrent.update_state() 1288 if not torrent.is_finished and total_download: 1289 # Move completed download to completed folder if needed 1290 if ( 1291 torrent.options['move_completed'] 1292 and torrent.options['download_location'] 1293 != torrent.options['move_completed_path'] 1294 ): 1295 self.waiting_on_finish_moving.append(torrent_id) 1296 torrent.move_storage(torrent.options['move_completed_path']) 1297 else: 1298 torrent.is_finished = True 1299 component.get('EventManager').emit(TorrentFinishedEvent(torrent_id)) 1300 else: 1301 torrent.is_finished = True 1302 1303 # Torrent is no longer part of the queue 1304 try: 1305 self.queued_torrents.remove(torrent_id) 1306 except KeyError: 1307 # Sometimes libtorrent fires a TorrentFinishedEvent twice 1308 if log.isEnabledFor(logging.DEBUG): 1309 log.debug('%s is not in queued torrents set.', torrent_id) 1310 1311 # Only save resume data if it was actually downloaded something. Helps 1312 # on startup with big queues with lots of seeding torrents. Libtorrent 1313 # emits alert_torrent_finished for them, but there seems like nothing 1314 # worth really to save in resume data, we just read it up in 1315 # self.load_state(). 1316 if total_download: 1317 self.save_resume_data((torrent_id,)) 1318 1319 def on_alert_torrent_paused(self, alert): 1320 """Alert handler for libtorrent torrent_paused_alert""" 1321 try: 1322 torrent_id = str(alert.handle.info_hash()) 1323 torrent = self.torrents[torrent_id] 1324 except (RuntimeError, KeyError): 1325 return 1326 torrent.update_state() 1327 # Write the fastresume file if we are not waiting on a bulk write 1328 if torrent_id not in self.waiting_on_resume_data: 1329 self.save_resume_data((torrent_id,)) 1330 1331 def on_alert_torrent_checked(self, alert): 1332 """Alert handler for libtorrent torrent_checked_alert""" 1333 try: 1334 torrent = self.torrents[str(alert.handle.info_hash())] 1335 except (RuntimeError, KeyError): 1336 return 1337 1338 # Check to see if we're forcing a recheck and set it back to paused if necessary. 1339 if torrent.forcing_recheck: 1340 torrent.forcing_recheck = False 1341 if torrent.forcing_recheck_paused: 1342 torrent.handle.pause() 1343 1344 torrent.update_state() 1345 1346 def on_alert_tracker_reply(self, alert): 1347 """Alert handler for libtorrent tracker_reply_alert""" 1348 try: 1349 torrent = self.torrents[str(alert.handle.info_hash())] 1350 except (RuntimeError, KeyError): 1351 return 1352 1353 # Set the tracker status for the torrent 1354 torrent.set_tracker_status('Announce OK') 1355 1356 # Check for peer information from the tracker, if none then send a scrape request. 1357 if ( 1358 alert.handle.status().num_complete == -1 1359 or alert.handle.status().num_incomplete == -1 1360 ): 1361 torrent.scrape_tracker() 1362 1363 def on_alert_tracker_announce(self, alert): 1364 """Alert handler for libtorrent tracker_announce_alert""" 1365 try: 1366 torrent = self.torrents[str(alert.handle.info_hash())] 1367 except (RuntimeError, KeyError): 1368 return 1369 1370 # Set the tracker status for the torrent 1371 torrent.set_tracker_status('Announce Sent') 1372 1373 def on_alert_tracker_warning(self, alert): 1374 """Alert handler for libtorrent tracker_warning_alert""" 1375 try: 1376 torrent = self.torrents[str(alert.handle.info_hash())] 1377 except (RuntimeError, KeyError): 1378 return 1379 # Set the tracker status for the torrent 1380 torrent.set_tracker_status('Warning: %s' % decode_bytes(alert.message())) 1381 1382 def on_alert_tracker_error(self, alert): 1383 """Alert handler for libtorrent tracker_error_alert""" 1384 try: 1385 torrent = self.torrents[str(alert.handle.info_hash())] 1386 except (RuntimeError, KeyError): 1387 return 1388 1389 error_message = decode_bytes(alert.error_message()) 1390 if not error_message: 1391 error_message = decode_bytes(alert.error.message()) 1392 log.debug( 1393 'Tracker Error Alert: %s [%s]', decode_bytes(alert.message()), error_message 1394 ) 1395 torrent.set_tracker_status('Error: ' + error_message) 1396 1397 def on_alert_storage_moved(self, alert): 1398 """Alert handler for libtorrent storage_moved_alert""" 1399 try: 1400 torrent_id = str(alert.handle.info_hash()) 1401 torrent = self.torrents[torrent_id] 1402 except (RuntimeError, KeyError): 1403 return 1404 1405 torrent.set_download_location(os.path.normpath(alert.storage_path())) 1406 torrent.set_move_completed(False) 1407 torrent.update_state() 1408 1409 if torrent_id in self.waiting_on_finish_moving: 1410 self.waiting_on_finish_moving.remove(torrent_id) 1411 torrent.is_finished = True 1412 component.get('EventManager').emit(TorrentFinishedEvent(torrent_id)) 1413 1414 def on_alert_storage_moved_failed(self, alert): 1415 """Alert handler for libtorrent storage_moved_failed_alert""" 1416 try: 1417 torrent_id = str(alert.handle.info_hash()) 1418 torrent = self.torrents[torrent_id] 1419 except (RuntimeError, KeyError): 1420 return 1421 1422 log.warning('on_alert_storage_moved_failed: %s', decode_bytes(alert.message())) 1423 # Set an Error message and pause the torrent 1424 alert_msg = decode_bytes(alert.message()).split(':', 1)[1].strip() 1425 torrent.force_error_state('Failed to move download folder: %s' % alert_msg) 1426 1427 if torrent_id in self.waiting_on_finish_moving: 1428 self.waiting_on_finish_moving.remove(torrent_id) 1429 torrent.is_finished = True 1430 component.get('EventManager').emit(TorrentFinishedEvent(torrent_id)) 1431 1432 def on_alert_torrent_resumed(self, alert): 1433 """Alert handler for libtorrent torrent_resumed_alert""" 1434 try: 1435 torrent_id = str(alert.handle.info_hash()) 1436 torrent = self.torrents[torrent_id] 1437 except (RuntimeError, KeyError): 1438 return 1439 torrent.update_state() 1440 component.get('EventManager').emit(TorrentResumedEvent(torrent_id)) 1441 1442 def on_alert_state_changed(self, alert): 1443 """Alert handler for libtorrent state_changed_alert. 1444 1445 Emits: 1446 TorrentStateChangedEvent: The state has changed. 1447 1448 """ 1449 try: 1450 torrent_id = str(alert.handle.info_hash()) 1451 torrent = self.torrents[torrent_id] 1452 except (RuntimeError, KeyError): 1453 return 1454 1455 torrent.update_state() 1456 # Torrent may need to download data after checking. 1457 if torrent.state in ('Checking', 'Downloading'): 1458 torrent.is_finished = False 1459 self.queued_torrents.add(torrent_id) 1460 1461 def on_alert_save_resume_data(self, alert): 1462 """Alert handler for libtorrent save_resume_data_alert""" 1463 try: 1464 torrent_id = str(alert.handle.info_hash()) 1465 except RuntimeError: 1466 return 1467 if torrent_id in self.torrents: 1468 # libtorrent add_torrent expects bencoded resume_data. 1469 self.resume_data[torrent_id] = lt.bencode(alert.resume_data) 1470 1471 if torrent_id in self.waiting_on_resume_data: 1472 self.waiting_on_resume_data[torrent_id].callback(None) 1473 1474 def on_alert_save_resume_data_failed(self, alert): 1475 """Alert handler for libtorrent save_resume_data_failed_alert""" 1476 try: 1477 torrent_id = str(alert.handle.info_hash()) 1478 except RuntimeError: 1479 return 1480 1481 if torrent_id in self.waiting_on_resume_data: 1482 self.waiting_on_resume_data[torrent_id].errback( 1483 Exception(decode_bytes(alert.message())) 1484 ) 1485 1486 def on_alert_fastresume_rejected(self, alert): 1487 """Alert handler for libtorrent fastresume_rejected_alert""" 1488 try: 1489 torrent_id = str(alert.handle.info_hash()) 1490 torrent = self.torrents[torrent_id] 1491 except (RuntimeError, KeyError): 1492 return 1493 1494 alert_msg = decode_bytes(alert.message()) 1495 log.error('on_alert_fastresume_rejected: %s', alert_msg) 1496 if alert.error.value() == 134: 1497 if not os.path.isdir(torrent.options['download_location']): 1498 error_msg = 'Unable to locate Download Folder!' 1499 else: 1500 error_msg = 'Missing or invalid torrent data!' 1501 else: 1502 error_msg = ( 1503 'Problem with resume data: %s' % alert_msg.split(':', 1)[1].strip() 1504 ) 1505 torrent.force_error_state(error_msg, restart_to_resume=True) 1506 1507 def on_alert_file_renamed(self, alert): 1508 """Alert handler for libtorrent file_renamed_alert. 1509 1510 Emits: 1511 TorrentFileRenamedEvent: Files in the torrent have been renamed. 1512 1513 """ 1514 try: 1515 torrent_id = str(alert.handle.info_hash()) 1516 torrent = self.torrents[torrent_id] 1517 except (RuntimeError, KeyError): 1518 return 1519 1520 new_name = decode_bytes(alert.new_name()) 1521 log.debug('index: %s name: %s', alert.index, new_name) 1522 1523 # We need to see if this file index is in a waiting_on_folder dict 1524 for wait_on_folder in torrent.waiting_on_folder_rename: 1525 if alert.index in wait_on_folder: 1526 wait_on_folder[alert.index].callback(None) 1527 break 1528 else: 1529 # This is just a regular file rename so send the signal 1530 component.get('EventManager').emit( 1531 TorrentFileRenamedEvent(torrent_id, alert.index, new_name) 1532 ) 1533 self.save_resume_data((torrent_id,)) 1534 1535 def on_alert_metadata_received(self, alert): 1536 """Alert handler for libtorrent metadata_received_alert""" 1537 try: 1538 torrent_id = str(alert.handle.info_hash()) 1539 except RuntimeError: 1540 return 1541 1542 try: 1543 torrent = self.torrents[torrent_id] 1544 except KeyError: 1545 pass 1546 else: 1547 return torrent.on_metadata_received() 1548 1549 # Try callback to prefetch_metadata method. 1550 try: 1551 d = self.prefetching_metadata[torrent_id].defer 1552 except KeyError: 1553 pass 1554 else: 1555 torrent_info = alert.handle.get_torrent_info() 1556 return d.callback(torrent_info) 1557 1558 def on_alert_file_error(self, alert): 1559 """Alert handler for libtorrent file_error_alert""" 1560 try: 1561 torrent = self.torrents[str(alert.handle.info_hash())] 1562 except (RuntimeError, KeyError): 1563 return 1564 torrent.update_state() 1565 1566 def on_alert_file_completed(self, alert): 1567 """Alert handler for libtorrent file_completed_alert 1568 1569 Emits: 1570 TorrentFileCompletedEvent: When an individual file completes downloading. 1571 1572 """ 1573 try: 1574 torrent_id = str(alert.handle.info_hash()) 1575 except RuntimeError: 1576 return 1577 if torrent_id in self.torrents: 1578 component.get('EventManager').emit( 1579 TorrentFileCompletedEvent(torrent_id, alert.index) 1580 ) 1581 1582 def on_alert_state_update(self, alert): 1583 """Alert handler for libtorrent state_update_alert 1584 1585 Result of a session.post_torrent_updates() call and contains the torrent status 1586 of all torrents that changed since last time this was posted. 1587 1588 """ 1589 self.last_state_update_alert_ts = time.time() 1590 1591 for t_status in alert.status: 1592 try: 1593 torrent_id = str(t_status.info_hash) 1594 except RuntimeError: 1595 continue 1596 if torrent_id in self.torrents: 1597 self.torrents[torrent_id].update_status(t_status) 1598 1599 self.handle_torrents_status_callback(self.torrents_status_requests.pop()) 1600 1601 def on_alert_external_ip(self, alert): 1602 """Alert handler for libtorrent external_ip_alert 1603 1604 Note: 1605 The alert.message IPv4 address format is: 1606 'external IP received: 0.0.0.0' 1607 and IPv6 address format is: 1608 'external IP received: 0:0:0:0:0:0:0:0' 1609 """ 1610 1611 external_ip = decode_bytes(alert.message()).split(' ')[-1] 1612 log.info('on_alert_external_ip: %s', external_ip) 1613 component.get('EventManager').emit(ExternalIPEvent(external_ip)) 1614 1615 def on_alert_performance(self, alert): 1616 """Alert handler for libtorrent performance_alert""" 1617 log.warning( 1618 'on_alert_performance: %s, %s', 1619 decode_bytes(alert.message()), 1620 alert.warning_code, 1621 ) 1622 if alert.warning_code == lt.performance_warning_t.send_buffer_watermark_too_low: 1623 max_send_buffer_watermark = 3 * 1024 * 1024 # 3MiB 1624 settings = self.session.get_settings() 1625 send_buffer_watermark = settings['send_buffer_watermark'] 1626 1627 # If send buffer is too small, try increasing its size by 512KiB (up to max_send_buffer_watermark) 1628 if send_buffer_watermark < max_send_buffer_watermark: 1629 value = send_buffer_watermark + (500 * 1024) 1630 log.info( 1631 'Increasing send_buffer_watermark from %s to %s Bytes', 1632 send_buffer_watermark, 1633 value, 1634 ) 1635 component.get('Core').apply_session_setting( 1636 'send_buffer_watermark', value 1637 ) 1638 else: 1639 log.warning( 1640 'send_buffer_watermark reached maximum value: %s Bytes', 1641 max_send_buffer_watermark, 1642 ) 1643 1644 def separate_keys(self, keys, torrent_ids): 1645 """Separates the input keys into torrent class keys and plugins keys""" 1646 if self.torrents: 1647 for torrent_id in torrent_ids: 1648 if torrent_id in self.torrents: 1649 status_keys = list(self.torrents[torrent_id].status_funcs) 1650 leftover_keys = list(set(keys) - set(status_keys)) 1651 torrent_keys = list(set(keys) - set(leftover_keys)) 1652 return torrent_keys, leftover_keys 1653 return [], [] 1654 1655 def handle_torrents_status_callback(self, status_request): 1656 """Build the status dictionary with torrent values""" 1657 d, torrent_ids, keys, diff = status_request 1658 status_dict = {}.fromkeys(torrent_ids) 1659 torrent_keys, plugin_keys = self.separate_keys(keys, torrent_ids) 1660 1661 # Get the torrent status for each torrent_id 1662 for torrent_id in torrent_ids: 1663 if torrent_id not in self.torrents: 1664 # The torrent_id does not exist in the dict. 1665 # Could be the clients cache (sessionproxy) isn't up to speed. 1666 del status_dict[torrent_id] 1667 else: 1668 status_dict[torrent_id] = self.torrents[torrent_id].get_status( 1669 torrent_keys, diff, all_keys=not keys 1670 ) 1671 self.status_dict = status_dict 1672 d.callback((status_dict, plugin_keys)) 1673 1674 def torrents_status_update(self, torrent_ids, keys, diff=False): 1675 """Returns status dict for the supplied torrent_ids async. 1676 1677 Note: 1678 If torrent states was updated recently post_torrent_updates is not called and 1679 instead cached state is used. 1680 1681 Args: 1682 torrent_ids (list of str): The torrent IDs to get the status of. 1683 keys (list of str): The keys to get the status on. 1684 diff (bool, optional): If True, will return a diff of the changes since the 1685 last call to get_status based on the session_id, defaults to False. 1686 1687 Returns: 1688 dict: A status dictionary for the requested torrents. 1689 1690 """ 1691 d = Deferred() 1692 now = time.time() 1693 # If last update was recent, use cached data instead of request updates from libtorrent 1694 if (now - self.last_state_update_alert_ts) < 1.5: 1695 reactor.callLater( 1696 0, self.handle_torrents_status_callback, (d, torrent_ids, keys, diff) 1697 ) 1698 else: 1699 # Ask libtorrent for status update 1700 self.torrents_status_requests.insert(0, (d, torrent_ids, keys, diff)) 1701 self.session.post_torrent_updates() 1702 return d 1703