1# -*- coding: utf-8 -*-
2#
3# Copyright (C) 2007-2009 Andrew Resch <andrewresch@gmail.com>
4#
5# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
6# the additional special exception to link portions of this program with the OpenSSL library.
7# See LICENSE for more details.
8#
9
10"""TorrentManager handles Torrent objects"""
11from __future__ import unicode_literals
12
13import datetime
14import logging
15import operator
16import os
17import time
18from collections import namedtuple
19from tempfile import gettempdir
20
21import six.moves.cPickle as pickle  # noqa: N813
22from twisted.internet import defer, error, reactor, threads
23from twisted.internet.defer import Deferred, DeferredList
24from twisted.internet.task import LoopingCall
25
26import deluge.component as component
27from deluge._libtorrent import lt
28from deluge.common import PY2, archive_files, decode_bytes, get_magnet_info, is_magnet
29from deluge.configmanager import ConfigManager, get_config_dir
30from deluge.core.authmanager import AUTH_LEVEL_ADMIN
31from deluge.core.torrent import Torrent, TorrentOptions, sanitize_filepath
32from deluge.error import AddTorrentError, InvalidTorrentError
33from deluge.event import (
34    ExternalIPEvent,
35    PreTorrentRemovedEvent,
36    SessionStartedEvent,
37    TorrentAddedEvent,
38    TorrentFileCompletedEvent,
39    TorrentFileRenamedEvent,
40    TorrentFinishedEvent,
41    TorrentRemovedEvent,
42    TorrentResumedEvent,
43)
44
45log = logging.getLogger(__name__)
46
47LT_DEFAULT_ADD_TORRENT_FLAGS = (
48    lt.add_torrent_params_flags_t.flag_paused
49    | lt.add_torrent_params_flags_t.flag_auto_managed
50    | lt.add_torrent_params_flags_t.flag_update_subscribe
51    | lt.add_torrent_params_flags_t.flag_apply_ip_filter
52)
53
54
55class TorrentState:  # pylint: disable=old-style-class
56    """Create a torrent state.
57
58    Note:
59        This must be old style class to avoid breaking torrent.state file.
60
61    """
62
63    def __init__(
64        self,
65        torrent_id=None,
66        filename=None,
67        trackers=None,
68        storage_mode='sparse',
69        paused=False,
70        save_path=None,
71        max_connections=-1,
72        max_upload_slots=-1,
73        max_upload_speed=-1.0,
74        max_download_speed=-1.0,
75        prioritize_first_last=False,
76        sequential_download=False,
77        file_priorities=None,
78        queue=None,
79        auto_managed=True,
80        is_finished=False,
81        stop_ratio=2.00,
82        stop_at_ratio=False,
83        remove_at_ratio=False,
84        move_completed=False,
85        move_completed_path=None,
86        magnet=None,
87        owner=None,
88        shared=False,
89        super_seeding=False,
90        name=None,
91    ):
92        # Build the class atrribute list from args
93        for key, value in locals().items():
94            if key == 'self':
95                continue
96            setattr(self, key, value)
97
98    def __eq__(self, other):
99        return isinstance(other, TorrentState) and self.__dict__ == other.__dict__
100
101    def __ne__(self, other):
102        return not self == other
103
104
105class TorrentManagerState:  # pylint: disable=old-style-class
106    """TorrentManagerState holds a list of TorrentState objects.
107
108    Note:
109        This must be old style class to avoid breaking torrent.state file.
110
111    """
112
113    def __init__(self):
114        self.torrents = []
115
116    def __eq__(self, other):
117        return (
118            isinstance(other, TorrentManagerState) and self.torrents == other.torrents
119        )
120
121    def __ne__(self, other):
122        return not self == other
123
124
125class TorrentManager(component.Component):
126    """TorrentManager contains a list of torrents in the current libtorrent session.
127
128    This object is also responsible for saving the state of the session for use on restart.
129
130    """
131
132    callLater = reactor.callLater  # noqa: N815
133
134    def __init__(self):
135        component.Component.__init__(
136            self,
137            'TorrentManager',
138            interval=5,
139            depend=['CorePluginManager', 'AlertManager'],
140        )
141        log.debug('TorrentManager init...')
142        # Set the libtorrent session
143        self.session = component.get('Core').session
144        # Set the alertmanager
145        self.alerts = component.get('AlertManager')
146        # Get the core config
147        self.config = ConfigManager('core.conf')
148
149        # Make sure the state folder has been created
150        self.state_dir = os.path.join(get_config_dir(), 'state')
151        if not os.path.exists(self.state_dir):
152            os.makedirs(self.state_dir)
153        self.temp_file = os.path.join(self.state_dir, '.safe_state_check')
154
155        # Create the torrents dict { torrent_id: Torrent }
156        self.torrents = {}
157        self.queued_torrents = set()
158        self.is_saving_state = False
159        self.save_resume_data_file_lock = defer.DeferredLock()
160        self.torrents_loading = {}
161        self.prefetching_metadata = {}
162
163        # This is a map of torrent_ids to Deferreds used to track needed resume data.
164        # The Deferreds will be completed when resume data has been saved.
165        self.waiting_on_resume_data = {}
166
167        # Keep track of torrents finished but moving storage
168        self.waiting_on_finish_moving = []
169
170        # Keeps track of resume data
171        self.resume_data = {}
172
173        self.torrents_status_requests = []
174        self.status_dict = {}
175        self.last_state_update_alert_ts = 0
176
177        # Keep the previous saved state
178        self.prev_saved_state = None
179
180        # Register set functions
181        set_config_keys = [
182            'max_connections_per_torrent',
183            'max_upload_slots_per_torrent',
184            'max_upload_speed_per_torrent',
185            'max_download_speed_per_torrent',
186        ]
187
188        for config_key in set_config_keys:
189            on_set_func = getattr(self, ''.join(['on_set_', config_key]))
190            self.config.register_set_function(config_key, on_set_func)
191
192        # Register alert functions
193        alert_handles = [
194            'external_ip_alert',
195            'performance_alert',
196            'add_torrent_alert',
197            'metadata_received_alert',
198            'torrent_finished_alert',
199            'torrent_paused_alert',
200            'torrent_checked_alert',
201            'torrent_resumed_alert',
202            'tracker_reply_alert',
203            'tracker_announce_alert',
204            'tracker_warning_alert',
205            'tracker_error_alert',
206            'file_renamed_alert',
207            'file_error_alert',
208            'file_completed_alert',
209            'storage_moved_alert',
210            'storage_moved_failed_alert',
211            'state_update_alert',
212            'state_changed_alert',
213            'save_resume_data_alert',
214            'save_resume_data_failed_alert',
215            'fastresume_rejected_alert',
216        ]
217
218        for alert_handle in alert_handles:
219            on_alert_func = getattr(
220                self, ''.join(['on_alert_', alert_handle.replace('_alert', '')])
221            )
222            self.alerts.register_handler(alert_handle, on_alert_func)
223
224        # Define timers
225        self.save_state_timer = LoopingCall(self.save_state)
226        self.save_resume_data_timer = LoopingCall(self.save_resume_data)
227        self.prev_status_cleanup_loop = LoopingCall(self.cleanup_torrents_prev_status)
228
229    def start(self):
230        # Check for old temp file to verify safe shutdown
231        if os.path.isfile(self.temp_file):
232            self.archive_state('Bad shutdown detected so archiving state files')
233            os.remove(self.temp_file)
234
235        with open(self.temp_file, 'a'):
236            os.utime(self.temp_file, None)
237
238        # Try to load the state from file
239        self.load_state()
240
241        # Save the state periodically
242        self.save_state_timer.start(200, False)
243        self.save_resume_data_timer.start(190, False)
244        self.prev_status_cleanup_loop.start(10)
245
246    @defer.inlineCallbacks
247    def stop(self):
248        # Stop timers
249        if self.save_state_timer.running:
250            self.save_state_timer.stop()
251
252        if self.save_resume_data_timer.running:
253            self.save_resume_data_timer.stop()
254
255        if self.prev_status_cleanup_loop.running:
256            self.prev_status_cleanup_loop.stop()
257
258        # Save state on shutdown
259        yield self.save_state()
260
261        self.session.pause()
262
263        result = yield self.save_resume_data(flush_disk_cache=True)
264        # Remove the temp_file to signify successfully saved state
265        if result and os.path.isfile(self.temp_file):
266            os.remove(self.temp_file)
267
268    def update(self):
269        for torrent_id, torrent in self.torrents.items():
270            # XXX: Should the state check be those that _can_ be stopped at ratio
271            if torrent.options['stop_at_ratio'] and torrent.state not in (
272                'Checking',
273                'Allocating',
274                'Paused',
275                'Queued',
276            ):
277                # If the global setting is set, but the per-torrent isn't...
278                # Just skip to the next torrent.
279                # This is so that a user can turn-off the stop at ratio option on a per-torrent basis
280                if not torrent.options['stop_at_ratio']:
281                    continue
282                if (
283                    torrent.get_ratio() >= torrent.options['stop_ratio']
284                    and torrent.is_finished
285                ):
286                    if torrent.options['remove_at_ratio']:
287                        self.remove(torrent_id)
288                        break
289                    if not torrent.handle.status().paused:
290                        torrent.pause()
291
292    def __getitem__(self, torrent_id):
293        """Return the Torrent with torrent_id.
294
295        Args:
296            torrent_id (str): The torrent_id.
297
298        Returns:
299            Torrent: A torrent object.
300
301        """
302        return self.torrents[torrent_id]
303
304    def get_torrent_list(self):
305        """Creates a list of torrent_ids, owned by current user and any marked shared.
306
307        Returns:
308            list: A list of torrent_ids.
309
310        """
311        torrent_ids = list(self.torrents)
312        if component.get('RPCServer').get_session_auth_level() == AUTH_LEVEL_ADMIN:
313            return torrent_ids
314
315        current_user = component.get('RPCServer').get_session_user()
316        for torrent_id in torrent_ids[:]:
317            torrent_status = self.torrents[torrent_id].get_status(['owner', 'shared'])
318            if torrent_status['owner'] != current_user and not torrent_status['shared']:
319                torrent_ids.pop(torrent_ids.index(torrent_id))
320        return torrent_ids
321
322    def get_torrent_info_from_file(self, filepath):
323        """Retrieves torrent_info from the file specified.
324
325        Args:
326            filepath (str): The filepath to extract torrent info from.
327
328        Returns:
329            lt.torrent_info: A libtorrent torrent_info dict or None if invalid file or data.
330
331        """
332        # Get the torrent data from the torrent file
333        if log.isEnabledFor(logging.DEBUG):
334            log.debug('Attempting to extract torrent_info from %s', filepath)
335        try:
336            torrent_info = lt.torrent_info(filepath)
337        except RuntimeError as ex:
338            log.warning('Unable to open torrent file %s: %s', filepath, ex)
339        else:
340            return torrent_info
341
342    def prefetch_metadata(self, magnet, timeout):
343        """Download the metadata for a magnet uri.
344
345        Args:
346            magnet (str): A magnet uri to download the metadata for.
347            timeout (int): Number of seconds to wait before cancelling.
348
349        Returns:
350            Deferred: A tuple of (torrent_id (str), metadata (dict))
351
352        """
353
354        torrent_id = get_magnet_info(magnet)['info_hash']
355        if torrent_id in self.prefetching_metadata:
356            return self.prefetching_metadata[torrent_id].defer
357
358        add_torrent_params = {}
359        add_torrent_params['save_path'] = gettempdir()
360        add_torrent_params['url'] = magnet.strip().encode('utf8')
361        add_torrent_params['flags'] = (
362            (
363                LT_DEFAULT_ADD_TORRENT_FLAGS
364                | lt.add_torrent_params_flags_t.flag_duplicate_is_error
365                | lt.add_torrent_params_flags_t.flag_upload_mode
366            )
367            ^ lt.add_torrent_params_flags_t.flag_auto_managed
368            ^ lt.add_torrent_params_flags_t.flag_paused
369        )
370
371        torrent_handle = self.session.add_torrent(add_torrent_params)
372
373        d = Deferred()
374        # Cancel the defer if timeout reached.
375        defer_timeout = self.callLater(timeout, d.cancel)
376        d.addBoth(self.on_prefetch_metadata, torrent_id, defer_timeout)
377        Prefetch = namedtuple('Prefetch', 'defer handle')
378        self.prefetching_metadata[torrent_id] = Prefetch(defer=d, handle=torrent_handle)
379        return d
380
381    def on_prefetch_metadata(self, torrent_info, torrent_id, defer_timeout):
382        # Cancel reactor.callLater.
383        try:
384            defer_timeout.cancel()
385        except error.AlreadyCalled:
386            pass
387
388        log.debug('remove prefetch magnet from session')
389        try:
390            torrent_handle = self.prefetching_metadata.pop(torrent_id).handle
391        except KeyError:
392            pass
393        else:
394            self.session.remove_torrent(torrent_handle, 1)
395
396        metadata = None
397        if isinstance(torrent_info, lt.torrent_info):
398            log.debug('prefetch metadata received')
399            metadata = lt.bdecode(torrent_info.metadata())
400
401        return torrent_id, metadata
402
403    def _build_torrent_options(self, options):
404        """Load default options and update if needed."""
405        _options = TorrentOptions()
406        if options:
407            _options.update(options)
408        options = _options
409
410        if not options['owner']:
411            options['owner'] = component.get('RPCServer').get_session_user()
412        if not component.get('AuthManager').has_account(options['owner']):
413            options['owner'] = 'localclient'
414
415        return options
416
417    def _build_torrent_params(
418        self, torrent_info=None, magnet=None, options=None, resume_data=None
419    ):
420        """Create the add_torrent_params dict for adding torrent to libtorrent."""
421        add_torrent_params = {}
422        if torrent_info:
423            add_torrent_params['ti'] = torrent_info
424            name = torrent_info.name()
425            if not name:
426                name = (
427                    torrent_info.file_at(0).path.replace('\\', '/', 1).split('/', 1)[0]
428                )
429            add_torrent_params['name'] = name
430            torrent_id = str(torrent_info.info_hash())
431        elif magnet:
432            magnet_info = get_magnet_info(magnet)
433            if magnet_info:
434                add_torrent_params['url'] = magnet.strip().encode('utf8')
435                add_torrent_params['name'] = magnet_info['name']
436                torrent_id = magnet_info['info_hash']
437            else:
438                raise AddTorrentError(
439                    'Unable to add magnet, invalid magnet info: %s' % magnet
440                )
441
442        # Check for existing torrent in session.
443        if torrent_id in self.get_torrent_list():
444            # Attempt merge trackers before returning.
445            self.torrents[torrent_id].merge_trackers(torrent_info)
446            raise AddTorrentError('Torrent already in session (%s).' % torrent_id)
447        elif torrent_id in self.torrents_loading:
448            raise AddTorrentError('Torrent already being added (%s).' % torrent_id)
449        elif torrent_id in self.prefetching_metadata:
450            # Cancel and remove metadata fetching torrent.
451            self.prefetching_metadata[torrent_id].defer.cancel()
452
453        # Check for renamed files and if so, rename them in the torrent_info before adding.
454        if options['mapped_files'] and torrent_info:
455            for index, fname in options['mapped_files'].items():
456                fname = sanitize_filepath(decode_bytes(fname))
457                if log.isEnabledFor(logging.DEBUG):
458                    log.debug('renaming file index %s to %s', index, fname)
459                try:
460                    torrent_info.rename_file(index, fname.encode('utf8'))
461                except TypeError:
462                    torrent_info.rename_file(index, fname)
463            add_torrent_params['ti'] = torrent_info
464
465        if log.isEnabledFor(logging.DEBUG):
466            log.debug('options: %s', options)
467
468        # Fill in the rest of the add_torrent_params dictionary.
469        add_torrent_params['save_path'] = options['download_location'].encode('utf8')
470        if options['name']:
471            add_torrent_params['name'] = options['name']
472        if options['pre_allocate_storage']:
473            add_torrent_params['storage_mode'] = lt.storage_mode_t.storage_mode_allocate
474        if resume_data:
475            add_torrent_params['resume_data'] = resume_data
476
477        # Set flags: enable duplicate_is_error & override_resume_data, disable auto_managed.
478        add_torrent_params['flags'] = (
479            LT_DEFAULT_ADD_TORRENT_FLAGS
480            | lt.add_torrent_params_flags_t.flag_duplicate_is_error
481            | lt.add_torrent_params_flags_t.flag_override_resume_data
482        ) ^ lt.add_torrent_params_flags_t.flag_auto_managed
483        if options['seed_mode']:
484            add_torrent_params['flags'] |= lt.add_torrent_params_flags_t.flag_seed_mode
485        if options['super_seeding']:
486            add_torrent_params[
487                'flags'
488            ] |= lt.add_torrent_params_flags_t.flag_super_seeding
489
490        return torrent_id, add_torrent_params
491
492    def add(
493        self,
494        torrent_info=None,
495        state=None,
496        options=None,
497        save_state=True,
498        filedump=None,
499        filename=None,
500        magnet=None,
501        resume_data=None,
502    ):
503        """Adds a torrent to the torrent manager.
504
505        Args:
506            torrent_info (lt.torrent_info, optional): A libtorrent torrent_info object.
507            state (TorrentState, optional): The torrent state.
508            options (dict, optional): The options to apply to the torrent on adding.
509            save_state (bool, optional): If True save the session state after adding torrent, defaults to True.
510            filedump (str, optional): bencoded filedump of a torrent file.
511            filename (str, optional): The filename of the torrent file.
512            magnet (str, optional): The magnet uri.
513            resume_data (lt.entry, optional): libtorrent fast resume data.
514
515        Returns:
516            str: If successful the torrent_id of the added torrent, None if adding the torrent failed.
517
518        Emits:
519            TorrentAddedEvent: Torrent with torrent_id added to session.
520
521        """
522        if not torrent_info and not filedump and not magnet:
523            raise AddTorrentError(
524                'You must specify a valid torrent_info, torrent state or magnet.'
525            )
526
527        if filedump:
528            try:
529                torrent_info = lt.torrent_info(lt.bdecode(filedump))
530            except RuntimeError as ex:
531                raise AddTorrentError(
532                    'Unable to add torrent, decoding filedump failed: %s' % ex
533                )
534
535        options = self._build_torrent_options(options)
536        __, add_torrent_params = self._build_torrent_params(
537            torrent_info, magnet, options, resume_data
538        )
539
540        # We need to pause the AlertManager momentarily to prevent alerts
541        # for this torrent being generated before a Torrent object is created.
542        component.pause('AlertManager')
543
544        try:
545            handle = self.session.add_torrent(add_torrent_params)
546            if not handle.is_valid():
547                raise InvalidTorrentError('Torrent handle is invalid!')
548        except (RuntimeError, InvalidTorrentError) as ex:
549            component.resume('AlertManager')
550            raise AddTorrentError('Unable to add torrent to session: %s' % ex)
551
552        torrent = self._add_torrent_obj(
553            handle, options, state, filename, magnet, resume_data, filedump, save_state
554        )
555        return torrent.torrent_id
556
557    def add_async(
558        self,
559        torrent_info=None,
560        state=None,
561        options=None,
562        save_state=True,
563        filedump=None,
564        filename=None,
565        magnet=None,
566        resume_data=None,
567    ):
568        """Adds a torrent to the torrent manager using libtorrent async add torrent method.
569
570        Args:
571            torrent_info (lt.torrent_info, optional): A libtorrent torrent_info object.
572            state (TorrentState, optional): The torrent state.
573            options (dict, optional): The options to apply to the torrent on adding.
574            save_state (bool, optional): If True save the session state after adding torrent, defaults to True.
575            filedump (str, optional): bencoded filedump of a torrent file.
576            filename (str, optional): The filename of the torrent file.
577            magnet (str, optional): The magnet uri.
578            resume_data (lt.entry, optional): libtorrent fast resume data.
579
580        Returns:
581            Deferred: If successful the torrent_id of the added torrent, None if adding the torrent failed.
582
583        Emits:
584            TorrentAddedEvent: Torrent with torrent_id added to session.
585
586        """
587        if not torrent_info and not filedump and not magnet:
588            raise AddTorrentError(
589                'You must specify a valid torrent_info, torrent state or magnet.'
590            )
591
592        if filedump:
593            try:
594                torrent_info = lt.torrent_info(lt.bdecode(filedump))
595            except RuntimeError as ex:
596                raise AddTorrentError(
597                    'Unable to add torrent, decoding filedump failed: %s' % ex
598                )
599
600        options = self._build_torrent_options(options)
601        torrent_id, add_torrent_params = self._build_torrent_params(
602            torrent_info, magnet, options, resume_data
603        )
604
605        d = Deferred()
606        self.torrents_loading[torrent_id] = (
607            d,
608            options,
609            state,
610            filename,
611            magnet,
612            resume_data,
613            filedump,
614            save_state,
615        )
616        try:
617            self.session.async_add_torrent(add_torrent_params)
618        except RuntimeError as ex:
619            raise AddTorrentError('Unable to add torrent to session: %s' % ex)
620        return d
621
622    def _add_torrent_obj(
623        self,
624        handle,
625        options,
626        state,
627        filename,
628        magnet,
629        resume_data,
630        filedump,
631        save_state,
632    ):
633        # For magnets added with metadata, filename is used so set as magnet.
634        if not magnet and is_magnet(filename):
635            magnet = filename
636            filename = None
637
638        # Create a Torrent object and add to the dictionary.
639        torrent = Torrent(handle, options, state, filename, magnet)
640        self.torrents[torrent.torrent_id] = torrent
641
642        # Resume AlertManager if paused for adding torrent to libtorrent.
643        component.resume('AlertManager')
644
645        # Store the orignal resume_data, in case of errors.
646        if resume_data:
647            self.resume_data[torrent.torrent_id] = resume_data
648
649        # Add to queued torrents set.
650        self.queued_torrents.add(torrent.torrent_id)
651        if self.config['queue_new_to_top']:
652            self.queue_top(torrent.torrent_id)
653
654        # Resume the torrent if needed.
655        if not options['add_paused']:
656            torrent.resume()
657
658        # Emit torrent_added signal.
659        from_state = state is not None
660        component.get('EventManager').emit(
661            TorrentAddedEvent(torrent.torrent_id, from_state)
662        )
663
664        if log.isEnabledFor(logging.DEBUG):
665            log.debug('Torrent added: %s', str(handle.info_hash()))
666        if log.isEnabledFor(logging.INFO):
667            name_and_owner = torrent.get_status(['name', 'owner'])
668            log.info(
669                'Torrent %s from user "%s" %s',
670                name_and_owner['name'],
671                name_and_owner['owner'],
672                from_state and 'loaded' or 'added',
673            )
674
675        # Write the .torrent file to the state directory.
676        if filedump:
677            torrent.write_torrentfile(filedump)
678
679        # Save the session state.
680        if save_state:
681            self.save_state()
682
683        return torrent
684
685    def add_async_callback(
686        self,
687        handle,
688        d,
689        options,
690        state,
691        filename,
692        magnet,
693        resume_data,
694        filedump,
695        save_state,
696    ):
697        torrent = self._add_torrent_obj(
698            handle, options, state, filename, magnet, resume_data, filedump, save_state
699        )
700
701        d.callback(torrent.torrent_id)
702
703    def remove(self, torrent_id, remove_data=False, save_state=True):
704        """Remove a torrent from the session.
705
706        Args:
707            torrent_id (str): The torrent ID to remove.
708            remove_data (bool, optional): If True, remove the downloaded data, defaults to False.
709            save_state (bool, optional): If True, save the session state after removal, defaults to True.
710
711        Returns:
712            bool: True if removed successfully, False if not.
713
714        Emits:
715            PreTorrentRemovedEvent: Torrent is about to be removed from session.
716            TorrentRemovedEvent: Torrent with torrent_id removed from session.
717
718        Raises:
719            InvalidTorrentError: If the torrent_id is not in the session.
720
721        """
722        try:
723            torrent = self.torrents[torrent_id]
724        except KeyError:
725            raise InvalidTorrentError('torrent_id %s not in session.' % torrent_id)
726
727        torrent_name = torrent.get_status(['name'])['name']
728
729        # Emit the signal to the clients
730        component.get('EventManager').emit(PreTorrentRemovedEvent(torrent_id))
731
732        try:
733            self.session.remove_torrent(torrent.handle, 1 if remove_data else 0)
734        except RuntimeError as ex:
735            log.warning('Error removing torrent: %s', ex)
736            return False
737
738        # Remove fastresume data if it is exists
739        self.resume_data.pop(torrent_id, None)
740
741        # Remove the .torrent file in the state and copy location, if user requested.
742        delete_copies = (
743            self.config['copy_torrent_file'] and self.config['del_copy_torrent_file']
744        )
745        torrent.delete_torrentfile(delete_copies)
746
747        # Remove from set if it wasn't finished
748        if not torrent.is_finished:
749            try:
750                self.queued_torrents.remove(torrent_id)
751            except KeyError:
752                log.debug('%s is not in queued torrents set.', torrent_id)
753                raise InvalidTorrentError(
754                    '%s is not in queued torrents set.' % torrent_id
755                )
756
757        # Remove the torrent from deluge's session
758        del self.torrents[torrent_id]
759
760        if save_state:
761            self.save_state()
762
763        # Emit the signal to the clients
764        component.get('EventManager').emit(TorrentRemovedEvent(torrent_id))
765        log.info(
766            'Torrent %s removed by user: %s',
767            torrent_name,
768            component.get('RPCServer').get_session_user(),
769        )
770        return True
771
772    def fixup_state(self, state):
773        """Fixup an old state by adding missing TorrentState options and assigning default values.
774
775        Args:
776            state (TorrentManagerState): A torrentmanager state containing torrent details.
777
778        Returns:
779            TorrentManagerState: A fixedup TorrentManager state.
780
781        """
782        if state.torrents:
783            t_state_tmp = TorrentState()
784            if dir(state.torrents[0]) != dir(t_state_tmp):
785                self.archive_state('Migration of TorrentState required.')
786                try:
787                    for attr in set(dir(t_state_tmp)) - set(dir(state.torrents[0])):
788                        for t_state in state.torrents:
789                            setattr(t_state, attr, getattr(t_state_tmp, attr, None))
790                except AttributeError as ex:
791                    log.error(
792                        'Unable to update state file to a compatible version: %s', ex
793                    )
794        return state
795
796    def open_state(self):
797        """Open the torrents.state file containing a TorrentManager state with session torrents.
798
799        Returns:
800            TorrentManagerState: The TorrentManager state.
801
802        """
803        torrents_state = os.path.join(self.state_dir, 'torrents.state')
804        state = None
805        for filepath in (torrents_state, torrents_state + '.bak'):
806            log.info('Loading torrent state: %s', filepath)
807            if not os.path.isfile(filepath):
808                continue
809
810            try:
811                with open(filepath, 'rb') as _file:
812                    if PY2:
813                        state = pickle.load(_file)
814                    else:
815                        state = pickle.load(_file, encoding='utf8')
816            except (IOError, EOFError, pickle.UnpicklingError) as ex:
817                message = 'Unable to load {}: {}'.format(filepath, ex)
818                log.error(message)
819                if not filepath.endswith('.bak'):
820                    self.archive_state(message)
821            else:
822                log.info('Successfully loaded %s', filepath)
823                break
824
825        return state if state else TorrentManagerState()
826
827    def load_state(self):
828        """Load all the torrents from TorrentManager state into session.
829
830        Emits:
831            SessionStartedEvent: Emitted after all torrents are added to the session.
832
833        """
834        start = datetime.datetime.now()
835        state = self.open_state()
836        state = self.fixup_state(state)
837
838        # Reorder the state.torrents list to add torrents in the correct queue order.
839        state.torrents.sort(
840            key=operator.attrgetter('queue'), reverse=self.config['queue_new_to_top']
841        )
842        resume_data = self.load_resume_data_file()
843
844        deferreds = []
845        for t_state in state.torrents:
846            # Populate the options dict from state
847            options = TorrentOptions()
848            for option in options:
849                try:
850                    options[option] = getattr(t_state, option)
851                except AttributeError:
852                    pass
853            # Manually update unmatched attributes
854            options['download_location'] = t_state.save_path
855            options['pre_allocate_storage'] = t_state.storage_mode == 'allocate'
856            options['prioritize_first_last_pieces'] = t_state.prioritize_first_last
857            options['add_paused'] = t_state.paused
858
859            magnet = t_state.magnet
860            torrent_info = self.get_torrent_info_from_file(
861                os.path.join(self.state_dir, t_state.torrent_id + '.torrent')
862            )
863
864            try:
865                d = self.add_async(
866                    torrent_info=torrent_info,
867                    state=t_state,
868                    options=options,
869                    save_state=False,
870                    magnet=magnet,
871                    resume_data=resume_data.get(t_state.torrent_id),
872                )
873            except AddTorrentError as ex:
874                log.warning(
875                    'Error when adding torrent "%s" to session: %s',
876                    t_state.torrent_id,
877                    ex,
878                )
879            else:
880                deferreds.append(d)
881
882        deferred_list = DeferredList(deferreds, consumeErrors=False)
883
884        def on_complete(result):
885            log.info(
886                'Finished loading %d torrents in %s',
887                len(state.torrents),
888                str(datetime.datetime.now() - start),
889            )
890            component.get('EventManager').emit(SessionStartedEvent())
891
892        deferred_list.addCallback(on_complete)
893
894    def create_state(self):
895        """Create a state of all the torrents in TorrentManager.
896
897        Returns:
898            TorrentManagerState: The TorrentManager state.
899
900        """
901        state = TorrentManagerState()
902        # Create the state for each Torrent and append to the list
903        for torrent in self.torrents.values():
904            if self.session.is_paused():
905                paused = torrent.handle.is_paused()
906            elif torrent.forced_error:
907                paused = torrent.forced_error.was_paused
908            elif torrent.state == 'Paused':
909                paused = True
910            else:
911                paused = False
912
913            torrent_state = TorrentState(
914                torrent.torrent_id,
915                torrent.filename,
916                torrent.trackers,
917                torrent.get_status(['storage_mode'])['storage_mode'],
918                paused,
919                torrent.options['download_location'],
920                torrent.options['max_connections'],
921                torrent.options['max_upload_slots'],
922                torrent.options['max_upload_speed'],
923                torrent.options['max_download_speed'],
924                torrent.options['prioritize_first_last_pieces'],
925                torrent.options['sequential_download'],
926                torrent.options['file_priorities'],
927                torrent.get_queue_position(),
928                torrent.options['auto_managed'],
929                torrent.is_finished,
930                torrent.options['stop_ratio'],
931                torrent.options['stop_at_ratio'],
932                torrent.options['remove_at_ratio'],
933                torrent.options['move_completed'],
934                torrent.options['move_completed_path'],
935                torrent.magnet,
936                torrent.options['owner'],
937                torrent.options['shared'],
938                torrent.options['super_seeding'],
939                torrent.options['name'],
940            )
941            state.torrents.append(torrent_state)
942        return state
943
944    def save_state(self):
945        """Run the save state task in a separate thread to avoid blocking main thread.
946
947        Note:
948            If a save task is already running, this call is ignored.
949
950        """
951        if self.is_saving_state:
952            return defer.succeed(None)
953        self.is_saving_state = True
954        d = threads.deferToThread(self._save_state)
955
956        def on_state_saved(arg):
957            self.is_saving_state = False
958            if self.save_state_timer.running:
959                self.save_state_timer.reset()
960
961        d.addBoth(on_state_saved)
962        return d
963
964    def _save_state(self):
965        """Save the state of the TorrentManager to the torrents.state file."""
966        state = self.create_state()
967
968        # If the state hasn't changed, no need to save it
969        if self.prev_saved_state == state:
970            return
971
972        filename = 'torrents.state'
973        filepath = os.path.join(self.state_dir, filename)
974        filepath_bak = filepath + '.bak'
975        filepath_tmp = filepath + '.tmp'
976
977        try:
978            log.debug('Creating the temporary file: %s', filepath_tmp)
979            with open(filepath_tmp, 'wb', 0) as _file:
980                pickle.dump(state, _file, protocol=2)
981                _file.flush()
982                os.fsync(_file.fileno())
983        except (OSError, pickle.PicklingError) as ex:
984            log.error('Unable to save %s: %s', filename, ex)
985            return
986
987        try:
988            log.debug('Creating backup of %s at: %s', filename, filepath_bak)
989            if os.path.isfile(filepath_bak):
990                os.remove(filepath_bak)
991            if os.path.isfile(filepath):
992                os.rename(filepath, filepath_bak)
993        except OSError as ex:
994            log.error('Unable to backup %s to %s: %s', filepath, filepath_bak, ex)
995            return
996
997        try:
998            log.debug('Saving %s to: %s', filename, filepath)
999            os.rename(filepath_tmp, filepath)
1000            self.prev_saved_state = state
1001        except OSError as ex:
1002            log.error('Failed to set new state file %s: %s', filepath, ex)
1003            if os.path.isfile(filepath_bak):
1004                log.info('Restoring backup of state from: %s', filepath_bak)
1005                os.rename(filepath_bak, filepath)
1006
1007    def save_resume_data(self, torrent_ids=None, flush_disk_cache=False):
1008        """Saves torrents resume data.
1009
1010        Args:
1011            torrent_ids (list of str): A list of torrents to save the resume data for, defaults
1012                to None which saves all torrents resume data.
1013            flush_disk_cache (bool, optional): If True flushes the disk cache which avoids potential
1014                issue with file timestamps, defaults to False. This is only needed when stopping the session.
1015
1016        Returns:
1017            t.i.d.DeferredList: A list of twisted Deferred callbacks to be invoked when save is complete.
1018
1019        """
1020        if torrent_ids is None:
1021            torrent_ids = (
1022                tid
1023                for tid, t in self.torrents.items()
1024                if t.handle.need_save_resume_data()
1025            )
1026
1027        def on_torrent_resume_save(dummy_result, torrent_id):
1028            """Recieved torrent resume_data alert so remove from waiting list"""
1029            self.waiting_on_resume_data.pop(torrent_id, None)
1030
1031        deferreds = []
1032        for torrent_id in torrent_ids:
1033            d = self.waiting_on_resume_data.get(torrent_id)
1034            if not d:
1035                d = Deferred().addBoth(on_torrent_resume_save, torrent_id)
1036                self.waiting_on_resume_data[torrent_id] = d
1037            deferreds.append(d)
1038            self.torrents[torrent_id].save_resume_data(flush_disk_cache)
1039
1040        def on_all_resume_data_finished(dummy_result):
1041            """Saves resume data file when no more torrents waiting for resume data.
1042
1043            Returns:
1044                bool: True if fastresume file is saved.
1045
1046                This return value determines removal of `self.temp_file` in `self.stop()`.
1047
1048            """
1049            # Use flush_disk_cache as a marker for shutdown so fastresume is
1050            # saved even if torrents are waiting.
1051            if not self.waiting_on_resume_data or flush_disk_cache:
1052                return self.save_resume_data_file(queue_task=flush_disk_cache)
1053
1054        return DeferredList(deferreds).addBoth(on_all_resume_data_finished)
1055
1056    def load_resume_data_file(self):
1057        """Load the resume data from file for all torrents.
1058
1059        Returns:
1060            dict: A dict of torrents and their resume_data.
1061
1062        """
1063        filename = 'torrents.fastresume'
1064        filepath = os.path.join(self.state_dir, filename)
1065        filepath_bak = filepath + '.bak'
1066        old_data_filepath = os.path.join(get_config_dir(), filename)
1067
1068        for _filepath in (filepath, filepath_bak, old_data_filepath):
1069            log.info('Opening %s for load: %s', filename, _filepath)
1070            try:
1071                with open(_filepath, 'rb') as _file:
1072                    resume_data = lt.bdecode(_file.read())
1073            except (IOError, EOFError, RuntimeError) as ex:
1074                if self.torrents:
1075                    log.warning('Unable to load %s: %s', _filepath, ex)
1076                resume_data = None
1077            else:
1078                # lt.bdecode returns the dict keys as bytes so decode them.
1079                resume_data = {k.decode(): v for k, v in resume_data.items()}
1080                log.info('Successfully loaded %s: %s', filename, _filepath)
1081                break
1082
1083        # If the libtorrent bdecode doesn't happen properly, it will return None
1084        # so we need to make sure we return a {}
1085        if resume_data is None:
1086            return {}
1087        else:
1088            return resume_data
1089
1090    def save_resume_data_file(self, queue_task=False):
1091        """Save resume data to file in a separate thread to avoid blocking main thread.
1092
1093        Args:
1094            queue_task (bool): If True and a save task is already running then queue
1095                this save task to run next. Default is to not queue save tasks.
1096
1097        Returns:
1098            Deferred: Fires with arg, True if save task was successful, False if
1099                not and None if task was not performed.
1100
1101        """
1102        if not queue_task and self.save_resume_data_file_lock.locked:
1103            return defer.succeed(None)
1104
1105        def on_lock_aquired():
1106            d = threads.deferToThread(self._save_resume_data_file)
1107
1108            def on_resume_data_file_saved(arg):
1109                if self.save_resume_data_timer.running:
1110                    self.save_resume_data_timer.reset()
1111                return arg
1112
1113            d.addBoth(on_resume_data_file_saved)
1114            return d
1115
1116        return self.save_resume_data_file_lock.run(on_lock_aquired)
1117
1118    def _save_resume_data_file(self):
1119        """Saves the resume data file with the contents of self.resume_data"""
1120        if not self.resume_data:
1121            return True
1122
1123        filename = 'torrents.fastresume'
1124        filepath = os.path.join(self.state_dir, filename)
1125        filepath_bak = filepath + '.bak'
1126        filepath_tmp = filepath + '.tmp'
1127
1128        try:
1129            log.debug('Creating the temporary file: %s', filepath_tmp)
1130            with open(filepath_tmp, 'wb', 0) as _file:
1131                _file.write(lt.bencode(self.resume_data))
1132                _file.flush()
1133                os.fsync(_file.fileno())
1134        except (OSError, EOFError) as ex:
1135            log.error('Unable to save %s: %s', filename, ex)
1136            return False
1137
1138        try:
1139            log.debug('Creating backup of %s at: %s', filename, filepath_bak)
1140            if os.path.isfile(filepath_bak):
1141                os.remove(filepath_bak)
1142            if os.path.isfile(filepath):
1143                os.rename(filepath, filepath_bak)
1144        except OSError as ex:
1145            log.error('Unable to backup %s to %s: %s', filepath, filepath_bak, ex)
1146            return False
1147
1148        try:
1149            log.debug('Saving %s to: %s', filename, filepath)
1150            os.rename(filepath_tmp, filepath)
1151        except OSError as ex:
1152            log.error('Failed to set new file %s: %s', filepath, ex)
1153            if os.path.isfile(filepath_bak):
1154                log.info('Restoring backup from: %s', filepath_bak)
1155                os.rename(filepath_bak, filepath)
1156        else:
1157            # Sync the rename operations for the directory
1158            if hasattr(os, 'O_DIRECTORY'):
1159                dirfd = os.open(os.path.dirname(filepath), os.O_DIRECTORY)
1160                os.fsync(dirfd)
1161                os.close(dirfd)
1162            return True
1163
1164    def archive_state(self, message):
1165        log.warning(message)
1166        arc_filepaths = []
1167        for filename in ('torrents.fastresume', 'torrents.state'):
1168            filepath = os.path.join(self.state_dir, filename)
1169            arc_filepaths.extend([filepath, filepath + '.bak'])
1170
1171        archive_files('state', arc_filepaths, message=message)
1172
1173    def get_queue_position(self, torrent_id):
1174        """Get queue position of torrent"""
1175        return self.torrents[torrent_id].get_queue_position()
1176
1177    def queue_top(self, torrent_id):
1178        """Queue torrent to top"""
1179        if self.torrents[torrent_id].get_queue_position() == 0:
1180            return False
1181
1182        self.torrents[torrent_id].handle.queue_position_top()
1183        return True
1184
1185    def queue_up(self, torrent_id):
1186        """Queue torrent up one position"""
1187        if self.torrents[torrent_id].get_queue_position() == 0:
1188            return False
1189
1190        self.torrents[torrent_id].handle.queue_position_up()
1191        return True
1192
1193    def queue_down(self, torrent_id):
1194        """Queue torrent down one position"""
1195        if self.torrents[torrent_id].get_queue_position() == (
1196            len(self.queued_torrents) - 1
1197        ):
1198            return False
1199
1200        self.torrents[torrent_id].handle.queue_position_down()
1201        return True
1202
1203    def queue_bottom(self, torrent_id):
1204        """Queue torrent to bottom"""
1205        if self.torrents[torrent_id].get_queue_position() == (
1206            len(self.queued_torrents) - 1
1207        ):
1208            return False
1209
1210        self.torrents[torrent_id].handle.queue_position_bottom()
1211        return True
1212
1213    def cleanup_torrents_prev_status(self):
1214        """Run cleanup_prev_status for each registered torrent"""
1215        for torrent in self.torrents.values():
1216            torrent.cleanup_prev_status()
1217
1218    def on_set_max_connections_per_torrent(self, key, value):
1219        """Sets the per-torrent connection limit"""
1220        log.debug('max_connections_per_torrent set to %s...', value)
1221        for key in self.torrents:
1222            self.torrents[key].set_max_connections(value)
1223
1224    def on_set_max_upload_slots_per_torrent(self, key, value):
1225        """Sets the per-torrent upload slot limit"""
1226        log.debug('max_upload_slots_per_torrent set to %s...', value)
1227        for key in self.torrents:
1228            self.torrents[key].set_max_upload_slots(value)
1229
1230    def on_set_max_upload_speed_per_torrent(self, key, value):
1231        """Sets the per-torrent upload speed limit"""
1232        log.debug('max_upload_speed_per_torrent set to %s...', value)
1233        for key in self.torrents:
1234            self.torrents[key].set_max_upload_speed(value)
1235
1236    def on_set_max_download_speed_per_torrent(self, key, value):
1237        """Sets the per-torrent download speed limit"""
1238        log.debug('max_download_speed_per_torrent set to %s...', value)
1239        for key in self.torrents:
1240            self.torrents[key].set_max_download_speed(value)
1241
1242    # --- Alert handlers ---
1243    def on_alert_add_torrent(self, alert):
1244        """Alert handler for libtorrent add_torrent_alert"""
1245        if not alert.handle.is_valid():
1246            log.warning('Torrent handle is invalid!')
1247            return
1248
1249        try:
1250            torrent_id = str(alert.handle.info_hash())
1251        except RuntimeError as ex:
1252            log.warning('Failed to get torrent id from handle: %s', ex)
1253            return
1254
1255        try:
1256            add_async_params = self.torrents_loading.pop(torrent_id)
1257        except KeyError as ex:
1258            log.warning('Torrent id not in torrents loading list: %s', ex)
1259            return
1260
1261        self.add_async_callback(alert.handle, *add_async_params)
1262
1263    def on_alert_torrent_finished(self, alert):
1264        """Alert handler for libtorrent torrent_finished_alert"""
1265        try:
1266            torrent_id = str(alert.handle.info_hash())
1267            torrent = self.torrents[torrent_id]
1268        except (RuntimeError, KeyError):
1269            return
1270
1271        # If total_download is 0, do not move, it's likely the torrent wasn't downloaded, but just added.
1272        # Get fresh data from libtorrent, the cache isn't always up to date
1273        total_download = torrent.get_status(['total_payload_download'], update=True)[
1274            'total_payload_download'
1275        ]
1276
1277        if log.isEnabledFor(logging.DEBUG):
1278            log.debug('Finished %s ', torrent_id)
1279            log.debug(
1280                'Torrent settings: is_finished: %s, total_download: %s, move_completed: %s, move_path: %s',
1281                torrent.is_finished,
1282                total_download,
1283                torrent.options['move_completed'],
1284                torrent.options['move_completed_path'],
1285            )
1286
1287        torrent.update_state()
1288        if not torrent.is_finished and total_download:
1289            # Move completed download to completed folder if needed
1290            if (
1291                torrent.options['move_completed']
1292                and torrent.options['download_location']
1293                != torrent.options['move_completed_path']
1294            ):
1295                self.waiting_on_finish_moving.append(torrent_id)
1296                torrent.move_storage(torrent.options['move_completed_path'])
1297            else:
1298                torrent.is_finished = True
1299                component.get('EventManager').emit(TorrentFinishedEvent(torrent_id))
1300        else:
1301            torrent.is_finished = True
1302
1303        # Torrent is no longer part of the queue
1304        try:
1305            self.queued_torrents.remove(torrent_id)
1306        except KeyError:
1307            # Sometimes libtorrent fires a TorrentFinishedEvent twice
1308            if log.isEnabledFor(logging.DEBUG):
1309                log.debug('%s is not in queued torrents set.', torrent_id)
1310
1311        # Only save resume data if it was actually downloaded something. Helps
1312        # on startup with big queues with lots of seeding torrents. Libtorrent
1313        # emits alert_torrent_finished for them, but there seems like nothing
1314        # worth really to save in resume data, we just read it up in
1315        # self.load_state().
1316        if total_download:
1317            self.save_resume_data((torrent_id,))
1318
1319    def on_alert_torrent_paused(self, alert):
1320        """Alert handler for libtorrent torrent_paused_alert"""
1321        try:
1322            torrent_id = str(alert.handle.info_hash())
1323            torrent = self.torrents[torrent_id]
1324        except (RuntimeError, KeyError):
1325            return
1326        torrent.update_state()
1327        # Write the fastresume file if we are not waiting on a bulk write
1328        if torrent_id not in self.waiting_on_resume_data:
1329            self.save_resume_data((torrent_id,))
1330
1331    def on_alert_torrent_checked(self, alert):
1332        """Alert handler for libtorrent torrent_checked_alert"""
1333        try:
1334            torrent = self.torrents[str(alert.handle.info_hash())]
1335        except (RuntimeError, KeyError):
1336            return
1337
1338        # Check to see if we're forcing a recheck and set it back to paused if necessary.
1339        if torrent.forcing_recheck:
1340            torrent.forcing_recheck = False
1341            if torrent.forcing_recheck_paused:
1342                torrent.handle.pause()
1343
1344        torrent.update_state()
1345
1346    def on_alert_tracker_reply(self, alert):
1347        """Alert handler for libtorrent tracker_reply_alert"""
1348        try:
1349            torrent = self.torrents[str(alert.handle.info_hash())]
1350        except (RuntimeError, KeyError):
1351            return
1352
1353        # Set the tracker status for the torrent
1354        torrent.set_tracker_status('Announce OK')
1355
1356        # Check for peer information from the tracker, if none then send a scrape request.
1357        if (
1358            alert.handle.status().num_complete == -1
1359            or alert.handle.status().num_incomplete == -1
1360        ):
1361            torrent.scrape_tracker()
1362
1363    def on_alert_tracker_announce(self, alert):
1364        """Alert handler for libtorrent tracker_announce_alert"""
1365        try:
1366            torrent = self.torrents[str(alert.handle.info_hash())]
1367        except (RuntimeError, KeyError):
1368            return
1369
1370        # Set the tracker status for the torrent
1371        torrent.set_tracker_status('Announce Sent')
1372
1373    def on_alert_tracker_warning(self, alert):
1374        """Alert handler for libtorrent tracker_warning_alert"""
1375        try:
1376            torrent = self.torrents[str(alert.handle.info_hash())]
1377        except (RuntimeError, KeyError):
1378            return
1379        # Set the tracker status for the torrent
1380        torrent.set_tracker_status('Warning: %s' % decode_bytes(alert.message()))
1381
1382    def on_alert_tracker_error(self, alert):
1383        """Alert handler for libtorrent tracker_error_alert"""
1384        try:
1385            torrent = self.torrents[str(alert.handle.info_hash())]
1386        except (RuntimeError, KeyError):
1387            return
1388
1389        error_message = decode_bytes(alert.error_message())
1390        if not error_message:
1391            error_message = decode_bytes(alert.error.message())
1392        log.debug(
1393            'Tracker Error Alert: %s [%s]', decode_bytes(alert.message()), error_message
1394        )
1395        torrent.set_tracker_status('Error: ' + error_message)
1396
1397    def on_alert_storage_moved(self, alert):
1398        """Alert handler for libtorrent storage_moved_alert"""
1399        try:
1400            torrent_id = str(alert.handle.info_hash())
1401            torrent = self.torrents[torrent_id]
1402        except (RuntimeError, KeyError):
1403            return
1404
1405        torrent.set_download_location(os.path.normpath(alert.storage_path()))
1406        torrent.set_move_completed(False)
1407        torrent.update_state()
1408
1409        if torrent_id in self.waiting_on_finish_moving:
1410            self.waiting_on_finish_moving.remove(torrent_id)
1411            torrent.is_finished = True
1412            component.get('EventManager').emit(TorrentFinishedEvent(torrent_id))
1413
1414    def on_alert_storage_moved_failed(self, alert):
1415        """Alert handler for libtorrent storage_moved_failed_alert"""
1416        try:
1417            torrent_id = str(alert.handle.info_hash())
1418            torrent = self.torrents[torrent_id]
1419        except (RuntimeError, KeyError):
1420            return
1421
1422        log.warning('on_alert_storage_moved_failed: %s', decode_bytes(alert.message()))
1423        # Set an Error message and pause the torrent
1424        alert_msg = decode_bytes(alert.message()).split(':', 1)[1].strip()
1425        torrent.force_error_state('Failed to move download folder: %s' % alert_msg)
1426
1427        if torrent_id in self.waiting_on_finish_moving:
1428            self.waiting_on_finish_moving.remove(torrent_id)
1429            torrent.is_finished = True
1430            component.get('EventManager').emit(TorrentFinishedEvent(torrent_id))
1431
1432    def on_alert_torrent_resumed(self, alert):
1433        """Alert handler for libtorrent torrent_resumed_alert"""
1434        try:
1435            torrent_id = str(alert.handle.info_hash())
1436            torrent = self.torrents[torrent_id]
1437        except (RuntimeError, KeyError):
1438            return
1439        torrent.update_state()
1440        component.get('EventManager').emit(TorrentResumedEvent(torrent_id))
1441
1442    def on_alert_state_changed(self, alert):
1443        """Alert handler for libtorrent state_changed_alert.
1444
1445        Emits:
1446            TorrentStateChangedEvent: The state has changed.
1447
1448        """
1449        try:
1450            torrent_id = str(alert.handle.info_hash())
1451            torrent = self.torrents[torrent_id]
1452        except (RuntimeError, KeyError):
1453            return
1454
1455        torrent.update_state()
1456        # Torrent may need to download data after checking.
1457        if torrent.state in ('Checking', 'Downloading'):
1458            torrent.is_finished = False
1459            self.queued_torrents.add(torrent_id)
1460
1461    def on_alert_save_resume_data(self, alert):
1462        """Alert handler for libtorrent save_resume_data_alert"""
1463        try:
1464            torrent_id = str(alert.handle.info_hash())
1465        except RuntimeError:
1466            return
1467        if torrent_id in self.torrents:
1468            # libtorrent add_torrent expects bencoded resume_data.
1469            self.resume_data[torrent_id] = lt.bencode(alert.resume_data)
1470
1471        if torrent_id in self.waiting_on_resume_data:
1472            self.waiting_on_resume_data[torrent_id].callback(None)
1473
1474    def on_alert_save_resume_data_failed(self, alert):
1475        """Alert handler for libtorrent save_resume_data_failed_alert"""
1476        try:
1477            torrent_id = str(alert.handle.info_hash())
1478        except RuntimeError:
1479            return
1480
1481        if torrent_id in self.waiting_on_resume_data:
1482            self.waiting_on_resume_data[torrent_id].errback(
1483                Exception(decode_bytes(alert.message()))
1484            )
1485
1486    def on_alert_fastresume_rejected(self, alert):
1487        """Alert handler for libtorrent fastresume_rejected_alert"""
1488        try:
1489            torrent_id = str(alert.handle.info_hash())
1490            torrent = self.torrents[torrent_id]
1491        except (RuntimeError, KeyError):
1492            return
1493
1494        alert_msg = decode_bytes(alert.message())
1495        log.error('on_alert_fastresume_rejected: %s', alert_msg)
1496        if alert.error.value() == 134:
1497            if not os.path.isdir(torrent.options['download_location']):
1498                error_msg = 'Unable to locate Download Folder!'
1499            else:
1500                error_msg = 'Missing or invalid torrent data!'
1501        else:
1502            error_msg = (
1503                'Problem with resume data: %s' % alert_msg.split(':', 1)[1].strip()
1504            )
1505        torrent.force_error_state(error_msg, restart_to_resume=True)
1506
1507    def on_alert_file_renamed(self, alert):
1508        """Alert handler for libtorrent file_renamed_alert.
1509
1510        Emits:
1511            TorrentFileRenamedEvent: Files in the torrent have been renamed.
1512
1513        """
1514        try:
1515            torrent_id = str(alert.handle.info_hash())
1516            torrent = self.torrents[torrent_id]
1517        except (RuntimeError, KeyError):
1518            return
1519
1520        new_name = decode_bytes(alert.new_name())
1521        log.debug('index: %s name: %s', alert.index, new_name)
1522
1523        # We need to see if this file index is in a waiting_on_folder dict
1524        for wait_on_folder in torrent.waiting_on_folder_rename:
1525            if alert.index in wait_on_folder:
1526                wait_on_folder[alert.index].callback(None)
1527                break
1528        else:
1529            # This is just a regular file rename so send the signal
1530            component.get('EventManager').emit(
1531                TorrentFileRenamedEvent(torrent_id, alert.index, new_name)
1532            )
1533            self.save_resume_data((torrent_id,))
1534
1535    def on_alert_metadata_received(self, alert):
1536        """Alert handler for libtorrent metadata_received_alert"""
1537        try:
1538            torrent_id = str(alert.handle.info_hash())
1539        except RuntimeError:
1540            return
1541
1542        try:
1543            torrent = self.torrents[torrent_id]
1544        except KeyError:
1545            pass
1546        else:
1547            return torrent.on_metadata_received()
1548
1549        # Try callback to prefetch_metadata method.
1550        try:
1551            d = self.prefetching_metadata[torrent_id].defer
1552        except KeyError:
1553            pass
1554        else:
1555            torrent_info = alert.handle.get_torrent_info()
1556            return d.callback(torrent_info)
1557
1558    def on_alert_file_error(self, alert):
1559        """Alert handler for libtorrent file_error_alert"""
1560        try:
1561            torrent = self.torrents[str(alert.handle.info_hash())]
1562        except (RuntimeError, KeyError):
1563            return
1564        torrent.update_state()
1565
1566    def on_alert_file_completed(self, alert):
1567        """Alert handler for libtorrent file_completed_alert
1568
1569        Emits:
1570            TorrentFileCompletedEvent: When an individual file completes downloading.
1571
1572        """
1573        try:
1574            torrent_id = str(alert.handle.info_hash())
1575        except RuntimeError:
1576            return
1577        if torrent_id in self.torrents:
1578            component.get('EventManager').emit(
1579                TorrentFileCompletedEvent(torrent_id, alert.index)
1580            )
1581
1582    def on_alert_state_update(self, alert):
1583        """Alert handler for libtorrent state_update_alert
1584
1585        Result of a session.post_torrent_updates() call and contains the torrent status
1586        of all torrents that changed since last time this was posted.
1587
1588        """
1589        self.last_state_update_alert_ts = time.time()
1590
1591        for t_status in alert.status:
1592            try:
1593                torrent_id = str(t_status.info_hash)
1594            except RuntimeError:
1595                continue
1596            if torrent_id in self.torrents:
1597                self.torrents[torrent_id].update_status(t_status)
1598
1599        self.handle_torrents_status_callback(self.torrents_status_requests.pop())
1600
1601    def on_alert_external_ip(self, alert):
1602        """Alert handler for libtorrent external_ip_alert
1603
1604        Note:
1605            The alert.message IPv4 address format is:
1606                'external IP received: 0.0.0.0'
1607            and IPv6 address format is:
1608                'external IP received: 0:0:0:0:0:0:0:0'
1609        """
1610
1611        external_ip = decode_bytes(alert.message()).split(' ')[-1]
1612        log.info('on_alert_external_ip: %s', external_ip)
1613        component.get('EventManager').emit(ExternalIPEvent(external_ip))
1614
1615    def on_alert_performance(self, alert):
1616        """Alert handler for libtorrent performance_alert"""
1617        log.warning(
1618            'on_alert_performance: %s, %s',
1619            decode_bytes(alert.message()),
1620            alert.warning_code,
1621        )
1622        if alert.warning_code == lt.performance_warning_t.send_buffer_watermark_too_low:
1623            max_send_buffer_watermark = 3 * 1024 * 1024  # 3MiB
1624            settings = self.session.get_settings()
1625            send_buffer_watermark = settings['send_buffer_watermark']
1626
1627            # If send buffer is too small, try increasing its size by 512KiB (up to max_send_buffer_watermark)
1628            if send_buffer_watermark < max_send_buffer_watermark:
1629                value = send_buffer_watermark + (500 * 1024)
1630                log.info(
1631                    'Increasing send_buffer_watermark from %s to %s Bytes',
1632                    send_buffer_watermark,
1633                    value,
1634                )
1635                component.get('Core').apply_session_setting(
1636                    'send_buffer_watermark', value
1637                )
1638            else:
1639                log.warning(
1640                    'send_buffer_watermark reached maximum value: %s Bytes',
1641                    max_send_buffer_watermark,
1642                )
1643
1644    def separate_keys(self, keys, torrent_ids):
1645        """Separates the input keys into torrent class keys and plugins keys"""
1646        if self.torrents:
1647            for torrent_id in torrent_ids:
1648                if torrent_id in self.torrents:
1649                    status_keys = list(self.torrents[torrent_id].status_funcs)
1650                    leftover_keys = list(set(keys) - set(status_keys))
1651                    torrent_keys = list(set(keys) - set(leftover_keys))
1652                    return torrent_keys, leftover_keys
1653        return [], []
1654
1655    def handle_torrents_status_callback(self, status_request):
1656        """Build the status dictionary with torrent values"""
1657        d, torrent_ids, keys, diff = status_request
1658        status_dict = {}.fromkeys(torrent_ids)
1659        torrent_keys, plugin_keys = self.separate_keys(keys, torrent_ids)
1660
1661        # Get the torrent status for each torrent_id
1662        for torrent_id in torrent_ids:
1663            if torrent_id not in self.torrents:
1664                # The torrent_id does not exist in the dict.
1665                # Could be the clients cache (sessionproxy) isn't up to speed.
1666                del status_dict[torrent_id]
1667            else:
1668                status_dict[torrent_id] = self.torrents[torrent_id].get_status(
1669                    torrent_keys, diff, all_keys=not keys
1670                )
1671        self.status_dict = status_dict
1672        d.callback((status_dict, plugin_keys))
1673
1674    def torrents_status_update(self, torrent_ids, keys, diff=False):
1675        """Returns status dict for the supplied torrent_ids async.
1676
1677        Note:
1678            If torrent states was updated recently post_torrent_updates is not called and
1679            instead cached state is used.
1680
1681        Args:
1682            torrent_ids (list of str): The torrent IDs to get the status of.
1683            keys (list of str): The keys to get the status on.
1684            diff (bool, optional): If True, will return a diff of the changes since the
1685                last call to get_status based on the session_id, defaults to False.
1686
1687        Returns:
1688            dict: A status dictionary for the requested torrents.
1689
1690        """
1691        d = Deferred()
1692        now = time.time()
1693        # If last update was recent, use cached data instead of request updates from libtorrent
1694        if (now - self.last_state_update_alert_ts) < 1.5:
1695            reactor.callLater(
1696                0, self.handle_torrents_status_callback, (d, torrent_ids, keys, diff)
1697            )
1698        else:
1699            # Ask libtorrent for status update
1700            self.torrents_status_requests.insert(0, (d, torrent_ids, keys, diff))
1701            self.session.post_torrent_updates()
1702        return d
1703