1# -*- coding: utf-8 -*-
2#
3# Copyright (C) 2010 Andrew Resch <andrewresch@gmail.com>
4#
5# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
6# the additional special exception to link portions of this program with the OpenSSL library.
7# See LICENSE for more details.
8#
9from __future__ import unicode_literals
10
11import logging
12from time import time
13
14from twisted.internet.defer import maybeDeferred, succeed
15
16import deluge.component as component
17from deluge.ui.client import client
18
19log = logging.getLogger(__name__)
20
21
22class SessionProxy(component.Component):
23    """
24    The SessionProxy component is used to cache session information client-side
25    to reduce the number of RPCs needed to provide a rich user interface.
26
27    It will query the Core for only changes in the status of the torrents
28    and will try to satisfy client requests from the cache.
29
30    """
31
32    def __init__(self):
33        log.debug('SessionProxy init..')
34        component.Component.__init__(self, 'SessionProxy', interval=5)
35
36        # Set the cache time in seconds
37        # This is how long data will be valid before re-fetching from the core
38        self.cache_time = 1.5
39
40        # Hold the torrents' status.. {torrent_id: [time, {status_dict}], ...}
41        self.torrents = {}
42
43        # Holds the time of the last key update.. {torrent_id: {key1, time, ...}, ...}
44        self.cache_times = {}
45
46    def start(self):
47        client.register_event_handler(
48            'TorrentStateChangedEvent', self.on_torrent_state_changed
49        )
50        client.register_event_handler('TorrentRemovedEvent', self.on_torrent_removed)
51        client.register_event_handler('TorrentAddedEvent', self.on_torrent_added)
52
53        def on_get_session_state(torrent_ids):
54            for torrent_id in torrent_ids:
55                # Let's at least store the torrent ids with empty statuses
56                # so that upcoming queries or status updates don't throw errors.
57                self.torrents.setdefault(torrent_id, [time(), {}])
58                self.cache_times.setdefault(torrent_id, {})
59            return torrent_ids
60
61        return client.core.get_session_state().addCallback(on_get_session_state)
62
63    def stop(self):
64        client.deregister_event_handler(
65            'TorrentStateChangedEvent', self.on_torrent_state_changed
66        )
67        client.deregister_event_handler('TorrentRemovedEvent', self.on_torrent_removed)
68        client.deregister_event_handler('TorrentAddedEvent', self.on_torrent_added)
69        self.torrents = {}
70
71    def create_status_dict(self, torrent_ids, keys):
72        """
73        Creates a status dict from the cache.
74
75        :param torrent_ids: the torrent_ids
76        :type torrent_ids: list of strings
77        :param keys: the status keys
78        :type keys: list of strings
79
80        :returns: a dict with the status information for the *torrent_ids*
81        :rtype: dict
82
83        """
84        sd = {}
85        keys = set(keys)
86        keys_len = (
87            -1
88        )  # The number of keys for the current cache (not the len of keys_diff_cached)
89        keys_diff_cached = []
90
91        for torrent_id in torrent_ids:
92            try:
93                if keys:
94                    sd[torrent_id] = self.torrents[torrent_id][1].copy()
95
96                    # Have to remove the keys that weren't requested
97                    if len(sd[torrent_id]) == keys_len:
98                        # If the number of keys are equal they are the same keys
99                        # so we use the cached diff of the keys we need to remove
100                        keys_to_remove = keys_diff_cached
101                    else:
102                        # Not the same keys so create a new diff
103                        keys_to_remove = set(sd[torrent_id]) - keys
104                        # Update the cached diff
105                        keys_diff_cached = keys_to_remove
106                        keys_len = len(sd[torrent_id])
107
108                    # Usually there are no keys to remove, so it's cheaper with
109                    # this if-test than a for-loop with no iterations.
110                    if keys_to_remove:
111                        for k in keys_to_remove:
112                            del sd[torrent_id][k]
113                else:
114                    sd[torrent_id] = dict(self.torrents[torrent_id][1])
115            except KeyError:
116                continue
117        return sd
118
119    def get_torrent_status(self, torrent_id, keys):
120        """
121        Get a status dict for one torrent.
122
123        :param torrent_id: the torrent_id
124        :type torrent_id: string
125        :param keys: the status keys
126        :type keys: list of strings
127
128        :returns: a dict of status information
129        :rtype: dict
130
131        """
132        if torrent_id in self.torrents:
133            # Keep track of keys we need to request from the core
134            keys_to_get = []
135            if not keys:
136                keys = list(self.torrents[torrent_id][1])
137
138            for key in keys:
139                if (
140                    time() - self.cache_times[torrent_id].get(key, 0.0)
141                    > self.cache_time
142                ):
143                    keys_to_get.append(key)
144            if not keys_to_get:
145                return succeed(self.create_status_dict([torrent_id], keys)[torrent_id])
146            else:
147                d = client.core.get_torrent_status(torrent_id, keys_to_get, True)
148
149                def on_status(result, torrent_id):
150                    t = time()
151                    self.torrents[torrent_id][0] = t
152                    self.torrents[torrent_id][1].update(result)
153                    for key in keys_to_get:
154                        self.cache_times[torrent_id][key] = t
155                    return self.create_status_dict([torrent_id], keys)[torrent_id]
156
157                return d.addCallback(on_status, torrent_id)
158        else:
159            d = client.core.get_torrent_status(torrent_id, keys, True)
160
161            def on_status(result):
162                if result:
163                    t = time()
164                    self.torrents[torrent_id] = (t, result)
165                    self.cache_times[torrent_id] = {}
166                    for key in result:
167                        self.cache_times[torrent_id][key] = t
168
169                return result
170
171            return d.addCallback(on_status)
172
173    def get_torrents_status(self, filter_dict, keys):
174        """
175        Get a dict of torrent statuses.
176
177        The filter can take 2 keys, *state* and *id*.  The state filter can be
178        one of the torrent states or the special one *Active*.  The *id* key is
179        simply a list of torrent_ids.
180
181        :param filter_dict: the filter used for this query
182        :type filter_dict: dict
183        :param keys: the status keys
184        :type keys: list of strings
185
186        :returns: a dict of torrent_ids and their status dicts
187        :rtype: dict
188
189        """
190        # Helper functions and callbacks ---------------------------------------
191        def on_status(result, torrent_ids, keys):
192            # Update the internal torrent status dict with the update values
193            t = time()
194            for key, value in result.items():
195                try:
196                    self.torrents[key][0] = t
197                    self.torrents[key][1].update(value)
198                    for k in value:
199                        self.cache_times[key][k] = t
200                except KeyError:
201                    # The torrent was removed
202                    continue
203
204            # Create the status dict
205            if not torrent_ids:
206                torrent_ids = list(result)
207
208            return self.create_status_dict(torrent_ids, keys)
209
210        def find_torrents_to_fetch(torrent_ids):
211            to_fetch = []
212            t = time()
213            for torrent_id in torrent_ids:
214                torrent = self.torrents[torrent_id]
215                if t - torrent[0] > self.cache_time:
216                    to_fetch.append(torrent_id)
217                else:
218                    # We need to check if a key is expired
219                    for key in keys:
220                        if (
221                            t - self.cache_times[torrent_id].get(key, 0.0)
222                            > self.cache_time
223                        ):
224                            to_fetch.append(torrent_id)
225                            break
226
227            return to_fetch
228
229        # -----------------------------------------------------------------------
230
231        if not filter_dict:
232            # This means we want all the torrents status
233            # We get a list of any torrent_ids with expired status dicts
234            torrents_list = list(self.torrents)
235            to_fetch = find_torrents_to_fetch(torrents_list)
236            if to_fetch:
237                d = client.core.get_torrents_status({'id': to_fetch}, keys, True)
238                return d.addCallback(on_status, torrents_list, keys)
239
240            # Don't need to fetch anything
241            return maybeDeferred(self.create_status_dict, torrents_list, keys)
242
243        if len(filter_dict) == 1 and 'id' in filter_dict:
244            # At this point we should have a filter with just "id" in it
245            to_fetch = find_torrents_to_fetch(filter_dict['id'])
246            if to_fetch:
247                d = client.core.get_torrents_status({'id': to_fetch}, keys, True)
248                return d.addCallback(on_status, filter_dict['id'], keys)
249            else:
250                # Don't need to fetch anything, so just return data from the cache
251                return maybeDeferred(self.create_status_dict, filter_dict['id'], keys)
252        else:
253            # This is a keyworded filter so lets just pass it onto the core
254            # XXX: Add more caching here.
255            d = client.core.get_torrents_status(filter_dict, keys, True)
256            return d.addCallback(on_status, None, keys)
257
258    def on_torrent_state_changed(self, torrent_id, state):
259        if torrent_id in self.torrents:
260            self.torrents[torrent_id][1].setdefault('state', state)
261            self.cache_times.setdefault(torrent_id, {}).update(state=time())
262
263    def on_torrent_added(self, torrent_id, from_state):
264        self.torrents[torrent_id] = [time() - self.cache_time - 1, {}]
265        self.cache_times[torrent_id] = {}
266
267        def on_status(status):
268            self.torrents[torrent_id][1].update(status)
269            t = time()
270            for key in status:
271                self.cache_times[torrent_id][key] = t
272
273        client.core.get_torrent_status(torrent_id, []).addCallback(on_status)
274
275    def on_torrent_removed(self, torrent_id):
276        if torrent_id in self.torrents:
277            del self.torrents[torrent_id]
278            del self.cache_times[torrent_id]
279