1#!/usr/local/bin/python3.8
2# vim:fileencoding=utf-8
3# License: GPL v3 Copyright: 2020, Kovid Goyal <kovid at kovidgoyal.net>
4
5import apsw
6import ast
7import gzip
8import os
9import re
10import sys
11import types
12from contextlib import suppress
13from datetime import timedelta
14from enum import Enum, auto
15from http import HTTPStatus
16from importlib import import_module
17from queue import Queue
18from threading import Lock, Thread
19
20from calibre.constants import cache_dir, numeric_version
21from calibre.utils.date import utcnow
22from calibre.utils.https import HTTPError, get_https_resource_securely
23from calibre.utils.iso8601 import parse_iso8601
24
25download_queue = Queue()
26default_timeout = object()
27DEFAULT_TIMEOUT = 5
28worker = None
29worker_lock = Lock()
30fetcher = None
31db_path = None
32old_interval = timedelta(days=1)
33module_version = 1
34minimum_calibre_version = 5, 7, 0
35
36
37class Strategy(Enum):
38
39    download_now = auto()
40    download_if_old = auto()
41    fast = auto()
42
43
44def start_worker():
45    global worker
46    with worker_lock:
47        if worker is None:
48            worker = Thread(name='LiveDownloader', target=download_worker, daemon=True)
49            worker.start()
50
51
52def stop_worker(timeout=2*DEFAULT_TIMEOUT):
53    global worker
54    with worker_lock:
55        if worker is not None:
56            download_queue.put(None)
57            w = worker
58            worker = None
59            w.join(timeout)
60
61
62def report_failure(full_name):
63    print(f'Failed to download live module {full_name}', file=sys.stderr)
64    import traceback
65    traceback.print_exc()
66
67
68def download_worker():
69    while True:
70        x = download_queue.get()
71        if x is None:
72            break
73        try:
74            latest_data_for_module(x)
75        except Exception:
76            report_failure(x)
77
78
79def queue_for_download(full_name):
80    download_queue.put(full_name)
81
82
83def parse_metadata(full_name, raw_bytes):
84    q = raw_bytes[:2048]
85    m = re.search(br'^module_version\s*=\s*(\d+)', q, flags=re.MULTILINE)
86    if m is None:
87        raise ValueError(f'No module_version in downloaded source of {full_name}')
88    module_version = int(m.group(1))
89
90    m = re.search(br'^minimum_calibre_version\s*=\s*(.+?)$', q, flags=re.MULTILINE)
91    minimum_calibre_version = 0, 0, 0
92    if m is not None:
93        minimum_calibre_version = ast.literal_eval(m.group(1).decode('utf-8'))
94        if not isinstance(minimum_calibre_version, tuple) or len(minimum_calibre_version) != 3 or \
95                not isinstance(minimum_calibre_version[0], int) or not isinstance(minimum_calibre_version[1], int) or\
96                not isinstance(minimum_calibre_version[2], int):
97            raise ValueError(f'minimum_calibre_version invalid: {minimum_calibre_version!r}')
98
99    return module_version, minimum_calibre_version
100
101
102def fetch_module(full_name, etag=None, timeout=default_timeout, url=None):
103    if timeout is default_timeout:
104        timeout = DEFAULT_TIMEOUT
105    if url is None:
106        path = '/'.join(full_name.split('.')) + '.py'
107        url = 'https://code.calibre-ebook.com/src/' + path
108    headers = {'accept-encoding': 'gzip'}
109    if etag:
110        headers['if-none-match'] = f'"{etag}"'
111    try:
112        res = get_https_resource_securely(url, headers=headers, get_response=True, timeout=timeout)
113    except HTTPError as e:
114        if e.code == HTTPStatus.NOT_MODIFIED:
115            return None, None
116        raise
117    etag = res.headers['etag']
118    if etag.startswith('W/'):
119        etag = etag[2:]
120    etag = etag[1:-1]
121    if res.headers['content-encoding'] == 'gzip':
122        data = gzip.GzipFile(fileobj=res).read()
123    else:
124        data = res.read()
125    return etag, data
126
127
128def cache_path():
129    return db_path or os.path.join(cache_dir(), 'live.sqlite')
130
131
132def db():
133    return apsw.Connection(cache_path())
134
135
136def table_definition():
137    return '''
138    CREATE TABLE IF NOT EXISTS modules (
139            id INTEGER PRIMARY KEY AUTOINCREMENT,
140            date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
141            atime TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
142            full_name TEXT NOT NULL UNIQUE,
143            etag TEXT NOT NULL,
144            module_version INTEGER NOT NULL DEFAULT 1,
145            minimum_calibre_version TEXT NOT NULL DEFAULT "0,0,0",
146            data BLOB NOT NULL
147    );
148    '''
149
150
151def write_to_cache(full_name, etag, data):
152    module_version, minimum_calibre_version = parse_metadata(full_name, data)
153    mcv = ','.join(map(str, minimum_calibre_version))
154    db().cursor().execute(
155        table_definition() +
156        'INSERT OR REPLACE INTO modules (full_name, etag, data, date, atime, module_version, minimum_calibre_version)'
157        ' VALUES (?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, ?, ?)',
158        (full_name, etag, data, module_version, mcv)
159    )
160
161
162def read_from_cache(full_name):
163    rowid = etag = data = date = None
164    database = db()
165    with suppress(StopIteration):
166        rowid, etag, data, date = next(database.cursor().execute(
167            table_definition() + 'SELECT id, etag, data, date FROM modules WHERE full_name=? LIMIT 1', (full_name,)))
168    if rowid is not None:
169        database.cursor().execute('UPDATE modules SET atime=CURRENT_TIMESTAMP WHERE id=?', (rowid,))
170    if date is not None:
171        date = parse_iso8601(date, assume_utc=True)
172    return etag, data, date
173
174
175def clear_cache():
176    db().cursor().execute(table_definition() + 'DELETE FROM modules')
177
178
179def load_module_from_data(full_name, data):
180    m = import_module(full_name)
181    ans = types.ModuleType(m.__name__)
182    ans.__package__ = m.__package__
183    ans.__file__ = m.__file__
184    compiled = compile(data, full_name, 'exec', dont_inherit=True)
185    exec(compiled, ans.__dict__)
186    return ans
187
188
189def latest_data_for_module(full_name, timeout=default_timeout):
190    cached_etag, cached_data = read_from_cache(full_name)[:2]
191    downloaded_etag, downloaded_data = (fetcher or fetch_module)(full_name, etag=cached_etag, timeout=timeout)
192    if downloaded_data is not None:
193        write_to_cache(full_name, downloaded_etag, downloaded_data)
194        cached_etag, cached_data = downloaded_etag, downloaded_data
195    return cached_data
196
197
198def download_module(full_name, timeout=default_timeout, strategy=Strategy.download_now):
199    if strategy is Strategy.download_now:
200        return load_module_from_data(full_name, latest_data_for_module(full_name, timeout=timeout))
201    cached_etag, cached_data, date = read_from_cache(full_name)
202    if date is None or (utcnow() - date) > old_interval:
203        return load_module_from_data(full_name, latest_data_for_module(full_name, timeout=timeout))
204    if cached_data is not None:
205        return load_module_from_data(full_name, cached_data)
206
207
208def get_cached_module(full_name):
209    cached_etag, cached_data = read_from_cache(full_name)[:2]
210    if cached_data:
211        return load_module_from_data(full_name, cached_data)
212
213
214def cached_is_suitable(cached, installed):
215    try:
216        v = cached.module_version
217    except Exception:
218        v = -1
219    try:
220        cv = cached.minimum_calibre_version
221    except Exception:
222        cv = numeric_version
223    return cv <= numeric_version and v > installed.module_version
224
225
226def load_module(full_name, strategy=Strategy.download_now, timeout=default_timeout):
227    '''
228    Load the specified module from the calibre servers. strategy controls
229    whether to check for the latest version immediately or eventually
230    (strategies other that download_now).  Note that you must call
231    start_worker() for eventual checking to work. Remember to call
232    stop_worker() at exit as well.
233    '''
234    installed = import_module(full_name)
235    try:
236        if strategy is Strategy.fast:
237            cached = get_cached_module(full_name)
238            queue_for_download(full_name)
239        else:
240            cached = download_module(full_name, timeout=timeout, strategy=strategy)
241        if cached_is_suitable(cached, installed):
242            installed = cached
243    except Exception:
244        report_failure(full_name)
245    return installed
246
247
248def find_tests():
249    import tempfile
250    import unittest
251    import hashlib
252
253    class LiveTest(unittest.TestCase):
254        ae = unittest.TestCase.assertEqual
255
256        def setUp(self):
257            global db_path, fetcher
258            fd, db_path = tempfile.mkstemp()
259            os.close(fd)
260            fetcher = self.fetch_module
261            self.fetched_module_version = 99999
262            self.sentinel_value = 1
263            self.fetch_counter = 0
264            self.orig_old_interval = old_interval
265
266        @property
267        def live_data(self):
268            data = f'module_version = {self.fetched_module_version}\nminimum_calibre_version = (1, 2, 3)\nsentinel = {self.sentinel_value}'
269            return data.encode('ascii')
270
271        def fetch_module(self, full_name, etag=None, timeout=default_timeout):
272            self.fetch_counter += 1
273            data = self.live_data
274            q = hashlib.md5(data).hexdigest()
275            if etag and q == etag:
276                return None, None
277            return q, data
278
279        def tearDown(self):
280            global db_path, fetcher, old_interval
281            os.remove(db_path)
282            db_path = fetcher = None
283            old_interval = self.orig_old_interval
284
285        def assert_cache_empty(self):
286            self.ae(read_from_cache('live.test'), (None, None, None))
287
288        def test_live_cache(self):
289            self.assert_cache_empty()
290            data = self.live_data
291            write_to_cache('live.test', 'etag', data)
292            self.ae(read_from_cache('live.test')[:2], ('etag', data))
293
294        def test_module_loading(self):
295            global old_interval
296            self.assert_cache_empty()
297            m = load_module('calibre.live', strategy=Strategy.fast)
298            self.assertEqual(m.module_version, module_version)
299            self.assert_cache_empty()
300            self.ae(self.fetch_counter, 0)
301            start_worker()
302            stop_worker()
303            self.ae(self.fetch_counter, 1)
304            m = load_module('calibre.live', strategy=Strategy.fast)
305            self.assertEqual(m.module_version, self.fetched_module_version)
306            self.ae(self.fetch_counter, 1)
307            m = load_module('calibre.live', strategy=Strategy.download_if_old)
308            self.assertEqual(m.module_version, self.fetched_module_version)
309            self.ae(self.fetch_counter, 1)
310            m = load_module('calibre.live', strategy=Strategy.download_now)
311            self.assertEqual(m.module_version, self.fetched_module_version)
312            self.ae(self.fetch_counter, 2)
313            old_interval = timedelta(days=-1)
314            m = load_module('calibre.live', strategy=Strategy.download_if_old)
315            self.assertEqual(m.module_version, self.fetched_module_version)
316            self.ae(self.fetch_counter, 3)
317            old_interval = self.orig_old_interval
318            clear_cache()
319            m = load_module('calibre.live', strategy=Strategy.download_if_old)
320            self.assertEqual(m.module_version, self.fetched_module_version)
321            self.ae(self.fetch_counter, 4)
322
323    return unittest.defaultTestLoader.loadTestsFromTestCase(LiveTest)
324
325
326if __name__ == '__main__':
327    from calibre.utils.run_tests import run_cli
328    run_cli(find_tests())
329