1#!/usr/local/bin/python3.8
2# vim:fileencoding=utf-8
3# License: GPL v3 Copyright: 2019, Kovid Goyal <kovid at kovidgoyal.net>
4
5
6import os
7from io import BytesIO
8from operator import itemgetter
9from threading import Thread
10
11from calibre.db.annotations import merge_annot_lists
12from calibre.gui2.viewer.convert_book import update_book
13from calibre.gui2.viewer.integration import save_annotations_list_to_library
14from calibre.gui2.viewer.web_view import viewer_config_dir
15from calibre.srv.render_book import EPUB_FILE_TYPE_MAGIC
16from calibre.utils.date import EPOCH
17from calibre.utils.iso8601 import parse_iso8601
18from calibre.utils.serialize import json_dumps, json_loads
19from calibre.utils.zipfile import safe_replace
20from polyglot.binary import as_base64_bytes
21from polyglot.builtins import iteritems
22from polyglot.queue import Queue
23
24annotations_dir = os.path.join(viewer_config_dir, 'annots')
25parse_annotations = json_loads
26
27
28def annotations_as_copied_list(annots_map):
29    for atype, annots in iteritems(annots_map):
30        for annot in annots:
31            ts = (parse_iso8601(annot['timestamp'], assume_utc=True) - EPOCH).total_seconds()
32            annot = annot.copy()
33            annot['type'] = atype
34            yield annot, ts
35
36
37def annot_list_as_bytes(annots):
38    return json_dumps(tuple(annot for annot, seconds in annots))
39
40
41def split_lines(chunk, length=80):
42    pos = 0
43    while pos < len(chunk):
44        yield chunk[pos:pos+length]
45        pos += length
46
47
48def save_annots_to_epub(path, serialized_annots):
49    try:
50        zf = open(path, 'r+b')
51    except OSError:
52        return
53    with zf:
54        serialized_annots = EPUB_FILE_TYPE_MAGIC + b'\n'.join(split_lines(as_base64_bytes(serialized_annots)))
55        safe_replace(zf, 'META-INF/calibre_bookmarks.txt', BytesIO(serialized_annots), add_missing=True)
56
57
58def save_annotations(annotations_list, annotations_path_key, bld, pathtoebook, in_book_file, sync_annots_user):
59    annots = annot_list_as_bytes(annotations_list)
60    with open(os.path.join(annotations_dir, annotations_path_key), 'wb') as f:
61        f.write(annots)
62    if in_book_file and os.access(pathtoebook, os.W_OK):
63        before_stat = os.stat(pathtoebook)
64        save_annots_to_epub(pathtoebook, annots)
65        update_book(pathtoebook, before_stat, {'calibre-book-annotations.json': annots})
66    if bld:
67        save_annotations_list_to_library(bld, annotations_list, sync_annots_user)
68
69
70class AnnotationsSaveWorker(Thread):
71
72    def __init__(self):
73        Thread.__init__(self, name='AnnotSaveWorker')
74        self.daemon = True
75        self.queue = Queue()
76
77    def shutdown(self):
78        if self.is_alive():
79            self.queue.put(None)
80            self.join()
81
82    def run(self):
83        while True:
84            x = self.queue.get()
85            if x is None:
86                return
87            annotations_list = x['annotations_list']
88            annotations_path_key = x['annotations_path_key']
89            bld = x['book_library_details']
90            pathtoebook = x['pathtoebook']
91            in_book_file = x['in_book_file']
92            sync_annots_user = x['sync_annots_user']
93            try:
94                save_annotations(annotations_list, annotations_path_key, bld, pathtoebook, in_book_file, sync_annots_user)
95            except Exception:
96                import traceback
97                traceback.print_exc()
98
99    def save_annotations(self, current_book_data, in_book_file=True, sync_annots_user=''):
100        alist = tuple(annotations_as_copied_list(current_book_data['annotations_map']))
101        ebp = current_book_data['pathtoebook']
102        can_save_in_book_file = ebp.lower().endswith('.epub')
103        self.queue.put({
104            'annotations_list': alist,
105            'annotations_path_key': current_book_data['annotations_path_key'],
106            'book_library_details': current_book_data['book_library_details'],
107            'pathtoebook': current_book_data['pathtoebook'],
108            'in_book_file': in_book_file and can_save_in_book_file,
109            'sync_annots_user': sync_annots_user,
110        })
111
112
113def find_tests():
114    import unittest
115
116    def bm(title, bmid, year=20, first_cfi_number=1):
117        return {
118            'title': title, 'id': bmid, 'timestamp': '20{}-06-29T03:21:48.895323+00:00'.format(year),
119            'pos_type': 'epubcfi', 'pos': 'epubcfi(/{}/4/8)'.format(first_cfi_number)
120        }
121
122    def hl(uuid, hlid, year=20, first_cfi_number=1):
123        return {
124            'uuid': uuid, 'id': hlid, 'timestamp': '20{}-06-29T03:21:48.895323+00:00'.format(year),
125            'start_cfi': 'epubcfi(/{}/4/8)'.format(first_cfi_number)
126        }
127
128    class AnnotationsTest(unittest.TestCase):
129
130        def test_merge_annotations(self):
131            for atype in 'bookmark highlight'.split():
132                f = bm if atype == 'bookmark' else hl
133                a = [f('one', 1, 20, 2), f('two', 2, 20, 4), f('a', 3, 20, 16),]
134                b = [f('one', 10, 30, 2), f('two', 20, 10, 4), f('b', 30, 20, 8),]
135                c = merge_annot_lists(a, b, atype)
136                self.assertEqual(tuple(map(itemgetter('id'), c)), (10, 2, 30, 3))
137
138    return unittest.TestLoader().loadTestsFromTestCase(AnnotationsTest)
139