1#!/usr/local/bin/python3.8 2# vim:fileencoding=utf-8 3# License: GPL v3 Copyright: 2019, Kovid Goyal <kovid at kovidgoyal.net> 4 5 6import os 7from io import BytesIO 8from operator import itemgetter 9from threading import Thread 10 11from calibre.db.annotations import merge_annot_lists 12from calibre.gui2.viewer.convert_book import update_book 13from calibre.gui2.viewer.integration import save_annotations_list_to_library 14from calibre.gui2.viewer.web_view import viewer_config_dir 15from calibre.srv.render_book import EPUB_FILE_TYPE_MAGIC 16from calibre.utils.date import EPOCH 17from calibre.utils.iso8601 import parse_iso8601 18from calibre.utils.serialize import json_dumps, json_loads 19from calibre.utils.zipfile import safe_replace 20from polyglot.binary import as_base64_bytes 21from polyglot.builtins import iteritems 22from polyglot.queue import Queue 23 24annotations_dir = os.path.join(viewer_config_dir, 'annots') 25parse_annotations = json_loads 26 27 28def annotations_as_copied_list(annots_map): 29 for atype, annots in iteritems(annots_map): 30 for annot in annots: 31 ts = (parse_iso8601(annot['timestamp'], assume_utc=True) - EPOCH).total_seconds() 32 annot = annot.copy() 33 annot['type'] = atype 34 yield annot, ts 35 36 37def annot_list_as_bytes(annots): 38 return json_dumps(tuple(annot for annot, seconds in annots)) 39 40 41def split_lines(chunk, length=80): 42 pos = 0 43 while pos < len(chunk): 44 yield chunk[pos:pos+length] 45 pos += length 46 47 48def save_annots_to_epub(path, serialized_annots): 49 try: 50 zf = open(path, 'r+b') 51 except OSError: 52 return 53 with zf: 54 serialized_annots = EPUB_FILE_TYPE_MAGIC + b'\n'.join(split_lines(as_base64_bytes(serialized_annots))) 55 safe_replace(zf, 'META-INF/calibre_bookmarks.txt', BytesIO(serialized_annots), add_missing=True) 56 57 58def save_annotations(annotations_list, annotations_path_key, bld, pathtoebook, in_book_file, sync_annots_user): 59 annots = annot_list_as_bytes(annotations_list) 60 with open(os.path.join(annotations_dir, annotations_path_key), 'wb') as f: 61 f.write(annots) 62 if in_book_file and os.access(pathtoebook, os.W_OK): 63 before_stat = os.stat(pathtoebook) 64 save_annots_to_epub(pathtoebook, annots) 65 update_book(pathtoebook, before_stat, {'calibre-book-annotations.json': annots}) 66 if bld: 67 save_annotations_list_to_library(bld, annotations_list, sync_annots_user) 68 69 70class AnnotationsSaveWorker(Thread): 71 72 def __init__(self): 73 Thread.__init__(self, name='AnnotSaveWorker') 74 self.daemon = True 75 self.queue = Queue() 76 77 def shutdown(self): 78 if self.is_alive(): 79 self.queue.put(None) 80 self.join() 81 82 def run(self): 83 while True: 84 x = self.queue.get() 85 if x is None: 86 return 87 annotations_list = x['annotations_list'] 88 annotations_path_key = x['annotations_path_key'] 89 bld = x['book_library_details'] 90 pathtoebook = x['pathtoebook'] 91 in_book_file = x['in_book_file'] 92 sync_annots_user = x['sync_annots_user'] 93 try: 94 save_annotations(annotations_list, annotations_path_key, bld, pathtoebook, in_book_file, sync_annots_user) 95 except Exception: 96 import traceback 97 traceback.print_exc() 98 99 def save_annotations(self, current_book_data, in_book_file=True, sync_annots_user=''): 100 alist = tuple(annotations_as_copied_list(current_book_data['annotations_map'])) 101 ebp = current_book_data['pathtoebook'] 102 can_save_in_book_file = ebp.lower().endswith('.epub') 103 self.queue.put({ 104 'annotations_list': alist, 105 'annotations_path_key': current_book_data['annotations_path_key'], 106 'book_library_details': current_book_data['book_library_details'], 107 'pathtoebook': current_book_data['pathtoebook'], 108 'in_book_file': in_book_file and can_save_in_book_file, 109 'sync_annots_user': sync_annots_user, 110 }) 111 112 113def find_tests(): 114 import unittest 115 116 def bm(title, bmid, year=20, first_cfi_number=1): 117 return { 118 'title': title, 'id': bmid, 'timestamp': '20{}-06-29T03:21:48.895323+00:00'.format(year), 119 'pos_type': 'epubcfi', 'pos': 'epubcfi(/{}/4/8)'.format(first_cfi_number) 120 } 121 122 def hl(uuid, hlid, year=20, first_cfi_number=1): 123 return { 124 'uuid': uuid, 'id': hlid, 'timestamp': '20{}-06-29T03:21:48.895323+00:00'.format(year), 125 'start_cfi': 'epubcfi(/{}/4/8)'.format(first_cfi_number) 126 } 127 128 class AnnotationsTest(unittest.TestCase): 129 130 def test_merge_annotations(self): 131 for atype in 'bookmark highlight'.split(): 132 f = bm if atype == 'bookmark' else hl 133 a = [f('one', 1, 20, 2), f('two', 2, 20, 4), f('a', 3, 20, 16),] 134 b = [f('one', 10, 30, 2), f('two', 20, 10, 4), f('b', 30, 20, 8),] 135 c = merge_annot_lists(a, b, atype) 136 self.assertEqual(tuple(map(itemgetter('id'), c)), (10, 2, 30, 3)) 137 138 return unittest.TestLoader().loadTestsFromTestCase(AnnotationsTest) 139