1#!/usr/local/bin/python3.8 2# vim:fileencoding=utf-8 3# License: GPL v3 Copyright: 2020, Kovid Goyal <kovid at kovidgoyal.net> 4 5import apsw 6import ast 7import gzip 8import os 9import re 10import sys 11import types 12from contextlib import suppress 13from datetime import timedelta 14from enum import Enum, auto 15from http import HTTPStatus 16from importlib import import_module 17from queue import Queue 18from threading import Lock, Thread 19 20from calibre.constants import cache_dir, numeric_version 21from calibre.utils.date import utcnow 22from calibre.utils.https import HTTPError, get_https_resource_securely 23from calibre.utils.iso8601 import parse_iso8601 24 25download_queue = Queue() 26default_timeout = object() 27DEFAULT_TIMEOUT = 5 28worker = None 29worker_lock = Lock() 30fetcher = None 31db_path = None 32old_interval = timedelta(days=1) 33module_version = 1 34minimum_calibre_version = 5, 7, 0 35 36 37class Strategy(Enum): 38 39 download_now = auto() 40 download_if_old = auto() 41 fast = auto() 42 43 44def start_worker(): 45 global worker 46 with worker_lock: 47 if worker is None: 48 worker = Thread(name='LiveDownloader', target=download_worker, daemon=True) 49 worker.start() 50 51 52def stop_worker(timeout=2*DEFAULT_TIMEOUT): 53 global worker 54 with worker_lock: 55 if worker is not None: 56 download_queue.put(None) 57 w = worker 58 worker = None 59 w.join(timeout) 60 61 62def report_failure(full_name): 63 print(f'Failed to download live module {full_name}', file=sys.stderr) 64 import traceback 65 traceback.print_exc() 66 67 68def download_worker(): 69 while True: 70 x = download_queue.get() 71 if x is None: 72 break 73 try: 74 latest_data_for_module(x) 75 except Exception: 76 report_failure(x) 77 78 79def queue_for_download(full_name): 80 download_queue.put(full_name) 81 82 83def parse_metadata(full_name, raw_bytes): 84 q = raw_bytes[:2048] 85 m = re.search(br'^module_version\s*=\s*(\d+)', q, flags=re.MULTILINE) 86 if m is None: 87 raise ValueError(f'No module_version in downloaded source of {full_name}') 88 module_version = int(m.group(1)) 89 90 m = re.search(br'^minimum_calibre_version\s*=\s*(.+?)$', q, flags=re.MULTILINE) 91 minimum_calibre_version = 0, 0, 0 92 if m is not None: 93 minimum_calibre_version = ast.literal_eval(m.group(1).decode('utf-8')) 94 if not isinstance(minimum_calibre_version, tuple) or len(minimum_calibre_version) != 3 or \ 95 not isinstance(minimum_calibre_version[0], int) or not isinstance(minimum_calibre_version[1], int) or\ 96 not isinstance(minimum_calibre_version[2], int): 97 raise ValueError(f'minimum_calibre_version invalid: {minimum_calibre_version!r}') 98 99 return module_version, minimum_calibre_version 100 101 102def fetch_module(full_name, etag=None, timeout=default_timeout, url=None): 103 if timeout is default_timeout: 104 timeout = DEFAULT_TIMEOUT 105 if url is None: 106 path = '/'.join(full_name.split('.')) + '.py' 107 url = 'https://code.calibre-ebook.com/src/' + path 108 headers = {'accept-encoding': 'gzip'} 109 if etag: 110 headers['if-none-match'] = f'"{etag}"' 111 try: 112 res = get_https_resource_securely(url, headers=headers, get_response=True, timeout=timeout) 113 except HTTPError as e: 114 if e.code == HTTPStatus.NOT_MODIFIED: 115 return None, None 116 raise 117 etag = res.headers['etag'] 118 if etag.startswith('W/'): 119 etag = etag[2:] 120 etag = etag[1:-1] 121 if res.headers['content-encoding'] == 'gzip': 122 data = gzip.GzipFile(fileobj=res).read() 123 else: 124 data = res.read() 125 return etag, data 126 127 128def cache_path(): 129 return db_path or os.path.join(cache_dir(), 'live.sqlite') 130 131 132def db(): 133 return apsw.Connection(cache_path()) 134 135 136def table_definition(): 137 return ''' 138 CREATE TABLE IF NOT EXISTS modules ( 139 id INTEGER PRIMARY KEY AUTOINCREMENT, 140 date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 141 atime TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 142 full_name TEXT NOT NULL UNIQUE, 143 etag TEXT NOT NULL, 144 module_version INTEGER NOT NULL DEFAULT 1, 145 minimum_calibre_version TEXT NOT NULL DEFAULT "0,0,0", 146 data BLOB NOT NULL 147 ); 148 ''' 149 150 151def write_to_cache(full_name, etag, data): 152 module_version, minimum_calibre_version = parse_metadata(full_name, data) 153 mcv = ','.join(map(str, minimum_calibre_version)) 154 db().cursor().execute( 155 table_definition() + 156 'INSERT OR REPLACE INTO modules (full_name, etag, data, date, atime, module_version, minimum_calibre_version)' 157 ' VALUES (?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, ?, ?)', 158 (full_name, etag, data, module_version, mcv) 159 ) 160 161 162def read_from_cache(full_name): 163 rowid = etag = data = date = None 164 database = db() 165 with suppress(StopIteration): 166 rowid, etag, data, date = next(database.cursor().execute( 167 table_definition() + 'SELECT id, etag, data, date FROM modules WHERE full_name=? LIMIT 1', (full_name,))) 168 if rowid is not None: 169 database.cursor().execute('UPDATE modules SET atime=CURRENT_TIMESTAMP WHERE id=?', (rowid,)) 170 if date is not None: 171 date = parse_iso8601(date, assume_utc=True) 172 return etag, data, date 173 174 175def clear_cache(): 176 db().cursor().execute(table_definition() + 'DELETE FROM modules') 177 178 179def load_module_from_data(full_name, data): 180 m = import_module(full_name) 181 ans = types.ModuleType(m.__name__) 182 ans.__package__ = m.__package__ 183 ans.__file__ = m.__file__ 184 compiled = compile(data, full_name, 'exec', dont_inherit=True) 185 exec(compiled, ans.__dict__) 186 return ans 187 188 189def latest_data_for_module(full_name, timeout=default_timeout): 190 cached_etag, cached_data = read_from_cache(full_name)[:2] 191 downloaded_etag, downloaded_data = (fetcher or fetch_module)(full_name, etag=cached_etag, timeout=timeout) 192 if downloaded_data is not None: 193 write_to_cache(full_name, downloaded_etag, downloaded_data) 194 cached_etag, cached_data = downloaded_etag, downloaded_data 195 return cached_data 196 197 198def download_module(full_name, timeout=default_timeout, strategy=Strategy.download_now): 199 if strategy is Strategy.download_now: 200 return load_module_from_data(full_name, latest_data_for_module(full_name, timeout=timeout)) 201 cached_etag, cached_data, date = read_from_cache(full_name) 202 if date is None or (utcnow() - date) > old_interval: 203 return load_module_from_data(full_name, latest_data_for_module(full_name, timeout=timeout)) 204 if cached_data is not None: 205 return load_module_from_data(full_name, cached_data) 206 207 208def get_cached_module(full_name): 209 cached_etag, cached_data = read_from_cache(full_name)[:2] 210 if cached_data: 211 return load_module_from_data(full_name, cached_data) 212 213 214def cached_is_suitable(cached, installed): 215 try: 216 v = cached.module_version 217 except Exception: 218 v = -1 219 try: 220 cv = cached.minimum_calibre_version 221 except Exception: 222 cv = numeric_version 223 return cv <= numeric_version and v > installed.module_version 224 225 226def load_module(full_name, strategy=Strategy.download_now, timeout=default_timeout): 227 ''' 228 Load the specified module from the calibre servers. strategy controls 229 whether to check for the latest version immediately or eventually 230 (strategies other that download_now). Note that you must call 231 start_worker() for eventual checking to work. Remember to call 232 stop_worker() at exit as well. 233 ''' 234 installed = import_module(full_name) 235 try: 236 if strategy is Strategy.fast: 237 cached = get_cached_module(full_name) 238 queue_for_download(full_name) 239 else: 240 cached = download_module(full_name, timeout=timeout, strategy=strategy) 241 if cached_is_suitable(cached, installed): 242 installed = cached 243 except Exception: 244 report_failure(full_name) 245 return installed 246 247 248def find_tests(): 249 import tempfile 250 import unittest 251 import hashlib 252 253 class LiveTest(unittest.TestCase): 254 ae = unittest.TestCase.assertEqual 255 256 def setUp(self): 257 global db_path, fetcher 258 fd, db_path = tempfile.mkstemp() 259 os.close(fd) 260 fetcher = self.fetch_module 261 self.fetched_module_version = 99999 262 self.sentinel_value = 1 263 self.fetch_counter = 0 264 self.orig_old_interval = old_interval 265 266 @property 267 def live_data(self): 268 data = f'module_version = {self.fetched_module_version}\nminimum_calibre_version = (1, 2, 3)\nsentinel = {self.sentinel_value}' 269 return data.encode('ascii') 270 271 def fetch_module(self, full_name, etag=None, timeout=default_timeout): 272 self.fetch_counter += 1 273 data = self.live_data 274 q = hashlib.md5(data).hexdigest() 275 if etag and q == etag: 276 return None, None 277 return q, data 278 279 def tearDown(self): 280 global db_path, fetcher, old_interval 281 os.remove(db_path) 282 db_path = fetcher = None 283 old_interval = self.orig_old_interval 284 285 def assert_cache_empty(self): 286 self.ae(read_from_cache('live.test'), (None, None, None)) 287 288 def test_live_cache(self): 289 self.assert_cache_empty() 290 data = self.live_data 291 write_to_cache('live.test', 'etag', data) 292 self.ae(read_from_cache('live.test')[:2], ('etag', data)) 293 294 def test_module_loading(self): 295 global old_interval 296 self.assert_cache_empty() 297 m = load_module('calibre.live', strategy=Strategy.fast) 298 self.assertEqual(m.module_version, module_version) 299 self.assert_cache_empty() 300 self.ae(self.fetch_counter, 0) 301 start_worker() 302 stop_worker() 303 self.ae(self.fetch_counter, 1) 304 m = load_module('calibre.live', strategy=Strategy.fast) 305 self.assertEqual(m.module_version, self.fetched_module_version) 306 self.ae(self.fetch_counter, 1) 307 m = load_module('calibre.live', strategy=Strategy.download_if_old) 308 self.assertEqual(m.module_version, self.fetched_module_version) 309 self.ae(self.fetch_counter, 1) 310 m = load_module('calibre.live', strategy=Strategy.download_now) 311 self.assertEqual(m.module_version, self.fetched_module_version) 312 self.ae(self.fetch_counter, 2) 313 old_interval = timedelta(days=-1) 314 m = load_module('calibre.live', strategy=Strategy.download_if_old) 315 self.assertEqual(m.module_version, self.fetched_module_version) 316 self.ae(self.fetch_counter, 3) 317 old_interval = self.orig_old_interval 318 clear_cache() 319 m = load_module('calibre.live', strategy=Strategy.download_if_old) 320 self.assertEqual(m.module_version, self.fetched_module_version) 321 self.ae(self.fetch_counter, 4) 322 323 return unittest.defaultTestLoader.loadTestsFromTestCase(LiveTest) 324 325 326if __name__ == '__main__': 327 from calibre.utils.run_tests import run_cli 328 run_cli(find_tests()) 329