1#!/usr/local/bin/python3.8
2# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:fdm=marker:ai
3
4
5__license__   = 'GPL v3'
6__copyright__ = '2013, Kovid Goyal <kovid at kovidgoyal.net>'
7__docformat__ = 'restructuredtext en'
8
9from collections import namedtuple
10from functools import partial
11from io import BytesIO
12
13from calibre.ebooks.metadata import author_to_author_sort, title_sort
14from calibre.ebooks.metadata.book.base import Metadata
15from calibre.utils.date import UNDEFINED_DATE
16from calibre.db.tests.base import BaseTest, IMG
17from calibre.db.backend import FTSQueryError
18from polyglot.builtins import iteritems, itervalues
19
20
21class WritingTest(BaseTest):
22
23    # Utils {{{
24    def create_getter(self, name, getter=None):
25        if getter is None:
26            if name.endswith('_index'):
27                ans = lambda db:partial(db.get_custom_extra, index_is_id=True,
28                                        label=name[1:].replace('_index', ''))
29            else:
30                ans = lambda db:partial(db.get_custom, label=name[1:],
31                                       index_is_id=True)
32        else:
33            ans = lambda db:partial(getattr(db, getter), index_is_id=True)
34        return ans
35
36    def create_setter(self, name, setter=None):
37        if setter is None:
38            ans = lambda db:partial(db.set_custom, label=name[1:], commit=True)
39        else:
40            ans = lambda db:partial(getattr(db, setter), commit=True)
41        return ans
42
43    def create_test(self, name, vals, getter=None, setter=None):
44        T = namedtuple('Test', 'name vals getter setter')
45        return T(name, vals, self.create_getter(name, getter),
46                 self.create_setter(name, setter))
47
48    def run_tests(self, tests):
49        results = {}
50        for test in tests:
51            results[test] = []
52            for val in test.vals:
53                cl = self.cloned_library
54                cache = self.init_cache(cl)
55                cache.set_field(test.name, {1: val})
56                cached_res = cache.field_for(test.name, 1)
57                del cache
58                db = self.init_old(cl)
59                getter = test.getter(db)
60                sqlite_res = getter(1)
61                if test.name.endswith('_index'):
62                    val = float(val) if val is not None else 1.0
63                    self.assertEqual(sqlite_res, val,
64                        'Failed setting for %s with value %r, sqlite value not the same. val: %r != sqlite_val: %r'%(
65                            test.name, val, val, sqlite_res))
66                else:
67                    test.setter(db)(1, val)
68                    old_cached_res = getter(1)
69                    self.assertEqual(old_cached_res, cached_res,
70                                    'Failed setting for %s with value %r, cached value not the same. Old: %r != New: %r'%(
71                            test.name, val, old_cached_res, cached_res))
72                    db.refresh()
73                    old_sqlite_res = getter(1)
74                    self.assertEqual(old_sqlite_res, sqlite_res,
75                        'Failed setting for %s, sqlite value not the same: %r != %r'%(
76                            test.name, old_sqlite_res, sqlite_res))
77                del db
78    # }}}
79
80    def test_one_one(self):  # {{{
81        'Test setting of values in one-one fields'
82        tests = [self.create_test('#yesno', (True, False, 'true', 'false', None))]
83        for name, getter, setter in (
84            ('#series_index', None, None),
85            ('series_index', 'series_index', 'set_series_index'),
86            ('#float', None, None),
87        ):
88            vals = ['1.5', None, 0, 1.0]
89            tests.append(self.create_test(name, tuple(vals), getter, setter))
90
91        for name, getter, setter in (
92            ('pubdate', 'pubdate', 'set_pubdate'),
93            ('timestamp', 'timestamp', 'set_timestamp'),
94            ('#date', None, None),
95        ):
96            tests.append(self.create_test(
97                name, ('2011-1-12', UNDEFINED_DATE, None), getter, setter))
98
99        for name, getter, setter in (
100            ('title', 'title', 'set_title'),
101            ('uuid', 'uuid', 'set_uuid'),
102            ('author_sort', 'author_sort', 'set_author_sort'),
103            ('sort', 'title_sort', 'set_title_sort'),
104            ('#comments', None, None),
105            ('comments', 'comments', 'set_comment'),
106        ):
107            vals = ['something', None]
108            if name not in {'comments', '#comments'}:
109                # Setting text column to '' returns None in the new backend
110                # and '' in the old. I think None is more correct.
111                vals.append('')
112            if name == 'comments':
113                # Again new behavior of deleting comment rather than setting
114                # empty string is more correct.
115                vals.remove(None)
116            tests.append(self.create_test(name, tuple(vals), getter, setter))
117
118        self.run_tests(tests)
119    # }}}
120
121    def test_many_one_basic(self):  # {{{
122        'Test the different code paths for writing to a many-one field'
123        cl = self.cloned_library
124        cache = self.init_cache(cl)
125        f = cache.fields['publisher']
126        item_ids = {f.ids_for_book(1)[0], f.ids_for_book(2)[0]}
127        val = 'Changed'
128        self.assertEqual(cache.set_field('publisher', {1:val, 2:val}), {1, 2})
129        cache2 = self.init_cache(cl)
130        for book_id in (1, 2):
131            for c in (cache, cache2):
132                self.assertEqual(c.field_for('publisher', book_id), val)
133                self.assertFalse(item_ids.intersection(set(c.fields['publisher'].table.id_map)))
134        del cache2
135        self.assertFalse(cache.set_field('publisher', {1:val, 2:val}))
136        val = val.lower()
137        self.assertFalse(cache.set_field('publisher', {1:val, 2:val},
138                                         allow_case_change=False))
139        self.assertEqual(cache.set_field('publisher', {1:val, 2:val}), {1, 2})
140        cache2 = self.init_cache(cl)
141        for book_id in (1, 2):
142            for c in (cache, cache2):
143                self.assertEqual(c.field_for('publisher', book_id), val)
144        del cache2
145        self.assertEqual(cache.set_field('publisher', {1:'new', 2:'New'}), {1, 2})
146        self.assertEqual(cache.field_for('publisher', 1).lower(), 'new')
147        self.assertEqual(cache.field_for('publisher', 2).lower(), 'new')
148        self.assertEqual(cache.set_field('publisher', {1:None, 2:'NEW'}), {1, 2})
149        self.assertEqual(len(f.table.id_map), 1)
150        self.assertEqual(cache.set_field('publisher', {2:None}), {2})
151        self.assertEqual(len(f.table.id_map), 0)
152        cache2 = self.init_cache(cl)
153        self.assertEqual(len(cache2.fields['publisher'].table.id_map), 0)
154        del cache2
155        self.assertEqual(cache.set_field('publisher', {1:'one', 2:'two',
156                                                       3:'three'}), {1, 2, 3})
157        self.assertEqual(cache.set_field('publisher', {1:''}), {1})
158        self.assertEqual(cache.set_field('publisher', {1:'two'}), {1})
159        self.assertEqual(tuple(map(f.for_book, (1,2,3))), ('two', 'two', 'three'))
160        self.assertEqual(cache.set_field('publisher', {1:'Two'}), {1, 2})
161        cache2 = self.init_cache(cl)
162        self.assertEqual(tuple(map(f.for_book, (1,2,3))), ('Two', 'Two', 'three'))
163        del cache2
164
165        # Enum
166        self.assertFalse(cache.set_field('#enum', {1:'Not allowed'}))
167        self.assertEqual(cache.set_field('#enum', {1:'One', 2:'One', 3:'Three'}), {1, 3})
168        self.assertEqual(cache.set_field('#enum', {1:None}), {1})
169        cache2 = self.init_cache(cl)
170        for c in (cache, cache2):
171            for i, val in iteritems({1:None, 2:'One', 3:'Three'}):
172                self.assertEqual(c.field_for('#enum', i), val)
173        del cache2
174
175        # Rating
176        self.assertFalse(cache.set_field('rating', {1:6, 2:4}))
177        self.assertEqual(cache.set_field('rating', {1:0, 3:2}), {1, 3})
178        self.assertEqual(cache.set_field('#rating', {1:None, 2:4, 3:8}), {1, 2, 3})
179        cache2 = self.init_cache(cl)
180        for c in (cache, cache2):
181            for i, val in iteritems({1:None, 2:4, 3:2}):
182                self.assertEqual(c.field_for('rating', i), val)
183            for i, val in iteritems({1:None, 2:4, 3:8}):
184                self.assertEqual(c.field_for('#rating', i), val)
185        del cache2
186
187        # Series
188        self.assertFalse(cache.set_field('series',
189                {1:'a series one', 2:'a series one'}, allow_case_change=False))
190        self.assertEqual(cache.set_field('series', {3:'Series [3]'}), {3})
191        self.assertEqual(cache.set_field('#series', {1:'Series', 3:'Series'}),
192                                         {1, 3})
193        self.assertEqual(cache.set_field('#series', {2:'Series [0]'}), {2})
194        cache2 = self.init_cache(cl)
195        for c in (cache, cache2):
196            for i, val in iteritems({1:'A Series One', 2:'A Series One', 3:'Series'}):
197                self.assertEqual(c.field_for('series', i), val)
198            cs_indices = {1:c.field_for('#series_index', 1), 3:c.field_for('#series_index', 3)}
199            for i in (1, 2, 3):
200                self.assertEqual(c.field_for('#series', i), 'Series')
201            for i, val in iteritems({1:2, 2:1, 3:3}):
202                self.assertEqual(c.field_for('series_index', i), val)
203            for i, val in iteritems({1:cs_indices[1], 2:0, 3:cs_indices[3]}):
204                self.assertEqual(c.field_for('#series_index', i), val)
205        del cache2
206
207    # }}}
208
209    def test_many_many_basic(self):  # {{{
210        'Test the different code paths for writing to a many-many field'
211        cl = self.cloned_library
212        cache = self.init_cache(cl)
213        ae, af, sf = self.assertEqual, self.assertFalse, cache.set_field
214
215        # Tags
216        ae(sf('#tags', {1:cache.field_for('tags', 1), 2:cache.field_for('tags', 2)}),
217            {1, 2})
218        for name in ('tags', '#tags'):
219            f = cache.fields[name]
220            af(sf(name, {1:('News', 'tag one')}, allow_case_change=False))
221            ae(sf(name, {1:'tag one, News'}), {1, 2})
222            ae(sf(name, {3:('tag two', 'sep,sep2')}), {2, 3})
223            ae(len(f.table.id_map), 4)
224            ae(sf(name, {1:None}), {1})
225            cache2 = self.init_cache(cl)
226            for c in (cache, cache2):
227                ae(c.field_for(name, 3), ('tag two', 'sep;sep2'))
228                ae(len(c.fields[name].table.id_map), 3)
229                ae(len(c.fields[name].table.id_map), 3)
230                ae(c.field_for(name, 1), ())
231                ae(c.field_for(name, 2), ('tag two', 'tag one'))
232            del cache2
233
234        # Authors
235        ae(sf('#authors', {k:cache.field_for('authors', k) for k in (1,2,3)}),
236           {1,2,3})
237
238        for name in ('authors', '#authors'):
239            f = cache.fields[name]
240            ae(len(f.table.id_map), 3)
241            af(cache.set_field(name, {3:'Unknown'}))
242            ae(cache.set_field(name, {3:'Kovid Goyal & Divok Layog'}), {3})
243            ae(cache.set_field(name, {1:'', 2:'An, Author'}), {1,2})
244            cache2 = self.init_cache(cl)
245            for c in (cache, cache2):
246                ae(len(c.fields[name].table.id_map), 4 if name =='authors' else 3)
247                ae(c.field_for(name, 3), ('Kovid Goyal', 'Divok Layog'))
248                ae(c.field_for(name, 2), ('An, Author',))
249                ae(c.field_for(name, 1), (_('Unknown'),) if name=='authors' else ())
250                if name == 'authors':
251                    ae(c.field_for('author_sort', 1), author_to_author_sort(_('Unknown')))
252                    ae(c.field_for('author_sort', 2), author_to_author_sort('An, Author'))
253                    ae(c.field_for('author_sort', 3), author_to_author_sort('Kovid Goyal') + ' & ' + author_to_author_sort('Divok Layog'))
254            del cache2
255        ae(cache.set_field('authors', {1:'KoviD GoyaL'}), {1, 3})
256        ae(cache.field_for('author_sort', 1), 'GoyaL, KoviD')
257        ae(cache.field_for('author_sort', 3), 'GoyaL, KoviD & Layog, Divok')
258
259        # Languages
260        f = cache.fields['languages']
261        ae(f.table.id_map, {1: 'eng', 2: 'deu'})
262        ae(sf('languages', {1:''}), {1})
263        ae(cache.field_for('languages', 1), ())
264        ae(sf('languages', {2:('und',)}), {2})
265        af(f.table.id_map)
266        ae(sf('languages', {1:'eng,fra,deu', 2:'es,Dutch', 3:'English'}), {1, 2, 3})
267        ae(cache.field_for('languages', 1), ('eng', 'fra', 'deu'))
268        ae(cache.field_for('languages', 2), ('spa', 'nld'))
269        ae(cache.field_for('languages', 3), ('eng',))
270        ae(sf('languages', {3:None}), {3})
271        ae(cache.field_for('languages', 3), ())
272        ae(sf('languages', {1:'deu,fra,eng'}), {1}, 'Changing order failed')
273        ae(sf('languages', {2:'deu,eng,eng'}), {2})
274        cache2 = self.init_cache(cl)
275        for c in (cache, cache2):
276            ae(cache.field_for('languages', 1), ('deu', 'fra', 'eng'))
277            ae(cache.field_for('languages', 2), ('deu', 'eng'))
278        del cache2
279
280        # Identifiers
281        f = cache.fields['identifiers']
282        ae(sf('identifiers', {3: 'one:1,two:2'}), {3})
283        ae(sf('identifiers', {2:None}), {2})
284        ae(sf('identifiers', {1: {'test':'1', 'two':'2'}}), {1})
285        cache2 = self.init_cache(cl)
286        for c in (cache, cache2):
287            ae(c.field_for('identifiers', 3), {'one':'1', 'two':'2'})
288            ae(c.field_for('identifiers', 2), {})
289            ae(c.field_for('identifiers', 1), {'test':'1', 'two':'2'})
290        del cache2
291
292        # Test setting of title sort
293        ae(sf('title', {1:'The Moose', 2:'Cat'}), {1, 2})
294        cache2 = self.init_cache(cl)
295        for c in (cache, cache2):
296            ae(c.field_for('sort', 1), title_sort('The Moose'))
297            ae(c.field_for('sort', 2), title_sort('Cat'))
298
299        # Test setting with the same value repeated
300        ae(sf('tags', {3: ('a', 'b', 'a')}), {3})
301        ae(sf('tags', {3: ('x', 'X')}), {3}, 'Failed when setting tag twice with different cases')
302        ae(('x',), cache.field_for('tags', 3))
303
304        # Test setting of authors with | in their names (for legacy database
305        # format compatibility | is replaced by ,)
306        ae(sf('authors', {3: ('Some| Author',)}), {3})
307        ae(('Some, Author',), cache.field_for('authors', 3))
308
309    # }}}
310
311    def test_dirtied(self):  # {{{
312        'Test the setting of the dirtied flag and the last_modified column'
313        cl = self.cloned_library
314        cache = self.init_cache(cl)
315        ae, af, sf = self.assertEqual, self.assertFalse, cache.set_field
316        # First empty dirtied
317        cache.dump_metadata()
318        af(cache.dirtied_cache)
319        af(self.init_cache(cl).dirtied_cache)
320
321        prev = cache.field_for('last_modified', 3)
322        import calibre.db.cache as c
323        from datetime import timedelta
324        utime = prev+timedelta(days=1)
325        onowf = c.nowf
326        c.nowf = lambda: utime
327        try:
328            ae(sf('title', {3:'xxx'}), {3})
329            self.assertTrue(3 in cache.dirtied_cache)
330            ae(cache.field_for('last_modified', 3), utime)
331            cache.dump_metadata()
332            raw = cache.read_backup(3)
333            from calibre.ebooks.metadata.opf2 import OPF
334            opf = OPF(BytesIO(raw))
335            ae(opf.title, 'xxx')
336        finally:
337            c.nowf = onowf
338    # }}}
339
340    def test_backup(self):  # {{{
341        'Test the automatic backup of changed metadata'
342        cl = self.cloned_library
343        cache = self.init_cache(cl)
344        ae, af, sf = self.assertEqual, self.assertFalse, cache.set_field
345        # First empty dirtied
346        cache.dump_metadata()
347        af(cache.dirtied_cache)
348        from calibre.db.backup import MetadataBackup
349        interval = 0.01
350        mb = MetadataBackup(cache, interval=interval, scheduling_interval=0)
351        mb.start()
352        try:
353            ae(sf('title', {1:'title1', 2:'title2', 3:'title3'}), {1,2,3})
354            ae(sf('authors', {1:'author1 & author2', 2:'author1 & author2', 3:'author1 & author2'}), {1,2,3})
355            count = 6
356            while cache.dirty_queue_length() and count > 0:
357                mb.join(2)
358                count -= 1
359            af(cache.dirty_queue_length())
360        finally:
361            mb.stop()
362        mb.join(2)
363        af(mb.is_alive())
364        from calibre.ebooks.metadata.opf2 import OPF
365        for book_id in (1, 2, 3):
366            raw = cache.read_backup(book_id)
367            opf = OPF(BytesIO(raw))
368            ae(opf.title, 'title%d'%book_id)
369            ae(opf.authors, ['author1', 'author2'])
370    # }}}
371
372    def test_set_cover(self):  # {{{
373        ' Test setting of cover '
374        cache = self.init_cache()
375        ae = self.assertEqual
376
377        # Test removing a cover
378        ae(cache.field_for('cover', 1), 1)
379        ae(cache.set_cover({1:None}), {1})
380        ae(cache.field_for('cover', 1), 0)
381        img = IMG
382
383        # Test setting a cover
384        ae(cache.set_cover({bid:img for bid in (1, 2, 3)}), {1, 2, 3})
385        old = self.init_old()
386        for book_id in (1, 2, 3):
387            ae(cache.cover(book_id), img, 'Cover was not set correctly for book %d' % book_id)
388            ae(cache.field_for('cover', book_id), 1)
389            ae(old.cover(book_id, index_is_id=True), img, 'Cover was not set correctly for book %d' % book_id)
390            self.assertTrue(old.has_cover(book_id))
391        old.close()
392        old.break_cycles()
393        del old
394    # }}}
395
396    def test_set_metadata(self):  # {{{
397        ' Test setting of metadata '
398        ae = self.assertEqual
399        cache = self.init_cache(self.cloned_library)
400
401        # Check that changing title/author updates the path
402        mi = cache.get_metadata(1)
403        old_path = cache.field_for('path', 1)
404        old_title, old_author = mi.title, mi.authors[0]
405        ae(old_path, '%s/%s (1)' % (old_author, old_title))
406        mi.title, mi.authors = 'New Title', ['New Author']
407        cache.set_metadata(1, mi)
408        ae(cache.field_for('path', 1), '%s/%s (1)' % (mi.authors[0], mi.title))
409        p = cache.format_abspath(1, 'FMT1')
410        self.assertTrue(mi.authors[0] in p and mi.title in p)
411
412        # Compare old and new set_metadata()
413        db = self.init_old(self.cloned_library)
414        mi = db.get_metadata(1, index_is_id=True, get_cover=True, cover_as_data=True)
415        mi2 = db.get_metadata(3, index_is_id=True, get_cover=True, cover_as_data=True)
416        db.set_metadata(2, mi)
417        db.set_metadata(1, mi2, force_changes=True)
418        oldmi = db.get_metadata(2, index_is_id=True, get_cover=True, cover_as_data=True)
419        oldmi2 = db.get_metadata(1, index_is_id=True, get_cover=True, cover_as_data=True)
420        db.close()
421        del db
422        cache = self.init_cache(self.cloned_library)
423        cache.set_metadata(2, mi)
424        nmi = cache.get_metadata(2, get_cover=True, cover_as_data=True)
425        ae(oldmi.cover_data, nmi.cover_data)
426        self.compare_metadata(nmi, oldmi, exclude={'last_modified', 'format_metadata', 'formats'})
427        cache.set_metadata(1, mi2, force_changes=True)
428        nmi2 = cache.get_metadata(1, get_cover=True, cover_as_data=True)
429        self.compare_metadata(nmi2, oldmi2, exclude={'last_modified', 'format_metadata', 'formats'})
430
431        cache = self.init_cache(self.cloned_library)
432        mi = cache.get_metadata(1)
433        otags = mi.tags
434        mi.tags = [x.upper() for x in mi.tags]
435        cache.set_metadata(3, mi)
436        self.assertEqual(set(otags), set(cache.field_for('tags', 3)), 'case changes should not be allowed in set_metadata')
437
438        # test that setting authors without author sort results in an
439        # auto-generated authors sort
440        mi = Metadata('empty', ['a1', 'a2'])
441        cache.set_metadata(1, mi)
442        self.assertEqual('a1 & a2', cache.field_for('author_sort', 1))
443        cache.set_sort_for_authors({cache.get_item_id('authors', 'a1'): 'xy'})
444        self.assertEqual('xy & a2', cache.field_for('author_sort', 1))
445        mi = Metadata('empty', ['a1'])
446        cache.set_metadata(1, mi)
447        self.assertEqual('xy', cache.field_for('author_sort', 1))
448
449    # }}}
450
451    def test_conversion_options(self):  # {{{
452        ' Test saving of conversion options '
453        cache = self.init_cache()
454        all_ids = cache.all_book_ids()
455        self.assertFalse(cache.has_conversion_options(all_ids))
456        self.assertIsNone(cache.conversion_options(1))
457        op1, op2 = b"{'xx':'yy'}", b"{'yy':'zz'}"
458        cache.set_conversion_options({1:op1, 2:op2})
459        self.assertTrue(cache.has_conversion_options(all_ids))
460        self.assertEqual(cache.conversion_options(1), op1)
461        self.assertEqual(cache.conversion_options(2), op2)
462        cache.set_conversion_options({1:op2})
463        self.assertEqual(cache.conversion_options(1), op2)
464        cache.delete_conversion_options(all_ids)
465        self.assertFalse(cache.has_conversion_options(all_ids))
466    # }}}
467
468    def test_remove_items(self):  # {{{
469        ' Test removal of many-(many,one) items '
470        cache = self.init_cache()
471        tmap = cache.get_id_map('tags')
472        self.assertEqual(cache.remove_items('tags', tmap), {1, 2})
473        tmap = cache.get_id_map('#tags')
474        t = {v:k for k, v in iteritems(tmap)}['My Tag Two']
475        self.assertEqual(cache.remove_items('#tags', (t,)), {1, 2})
476
477        smap = cache.get_id_map('series')
478        self.assertEqual(cache.remove_items('series', smap), {1, 2})
479        smap = cache.get_id_map('#series')
480        s = {v:k for k, v in iteritems(smap)}['My Series Two']
481        self.assertEqual(cache.remove_items('#series', (s,)), {1})
482
483        for c in (cache, self.init_cache()):
484            self.assertFalse(c.get_id_map('tags'))
485            self.assertFalse(c.all_field_names('tags'))
486            for bid in c.all_book_ids():
487                self.assertFalse(c.field_for('tags', bid))
488
489            self.assertEqual(len(c.get_id_map('#tags')), 1)
490            self.assertEqual(c.all_field_names('#tags'), {'My Tag One'})
491            for bid in c.all_book_ids():
492                self.assertIn(c.field_for('#tags', bid), ((), ('My Tag One',)))
493
494            for bid in (1, 2):
495                self.assertEqual(c.field_for('series_index', bid), 1.0)
496            self.assertFalse(c.get_id_map('series'))
497            self.assertFalse(c.all_field_names('series'))
498            for bid in c.all_book_ids():
499                self.assertFalse(c.field_for('series', bid))
500
501            self.assertEqual(c.field_for('series_index', 1), 1.0)
502            self.assertEqual(c.all_field_names('#series'), {'My Series One'})
503            for bid in c.all_book_ids():
504                self.assertIn(c.field_for('#series', bid), (None, 'My Series One'))
505
506        # Now test with restriction
507        cache = self.init_cache()
508        cache.set_field('tags', {1:'a,b,c', 2:'b,a', 3:'x,y,z'})
509        cache.set_field('series', {1:'a', 2:'a', 3:'b'})
510        cache.set_field('series_index', {1:8, 2:9, 3:3})
511        tmap, smap = cache.get_id_map('tags'), cache.get_id_map('series')
512        self.assertEqual(cache.remove_items('tags', tmap, restrict_to_book_ids=()), set())
513        self.assertEqual(cache.remove_items('tags', tmap, restrict_to_book_ids={1}), {1})
514        self.assertEqual(cache.remove_items('series', smap, restrict_to_book_ids=()), set())
515        self.assertEqual(cache.remove_items('series', smap, restrict_to_book_ids=(1,)), {1})
516        c2 = self.init_cache()
517        for c in (cache, c2):
518            self.assertEqual(c.field_for('tags', 1), ())
519            self.assertEqual(c.field_for('tags', 2), ('b', 'a'))
520            self.assertNotIn('c', set(itervalues(c.get_id_map('tags'))))
521            self.assertEqual(c.field_for('series', 1), None)
522            self.assertEqual(c.field_for('series', 2), 'a')
523            self.assertEqual(c.field_for('series_index', 1), 1.0)
524            self.assertEqual(c.field_for('series_index', 2), 9)
525
526    # }}}
527
528    def test_rename_items(self):  # {{{
529        ' Test renaming of many-(many,one) items '
530        cl = self.cloned_library
531        cache = self.init_cache(cl)
532        # Check that renaming authors updates author sort and path
533        a = {v:k for k, v in iteritems(cache.get_id_map('authors'))}['Unknown']
534        self.assertEqual(cache.rename_items('authors', {a:'New Author'})[0], {3})
535        a = {v:k for k, v in iteritems(cache.get_id_map('authors'))}['Author One']
536        self.assertEqual(cache.rename_items('authors', {a:'Author Two'})[0], {1, 2})
537        for c in (cache, self.init_cache(cl)):
538            self.assertEqual(c.all_field_names('authors'), {'New Author', 'Author Two'})
539            self.assertEqual(c.field_for('author_sort', 3), 'Author, New')
540            self.assertIn('New Author/', c.field_for('path', 3))
541            self.assertEqual(c.field_for('authors', 1), ('Author Two',))
542            self.assertEqual(c.field_for('author_sort', 1), 'Two, Author')
543
544        t = {v:k for k, v in iteritems(cache.get_id_map('tags'))}['Tag One']
545        # Test case change
546        self.assertEqual(cache.rename_items('tags', {t:'tag one'}), ({1, 2}, {t:t}))
547        for c in (cache, self.init_cache(cl)):
548            self.assertEqual(c.all_field_names('tags'), {'tag one', 'Tag Two', 'News'})
549            self.assertEqual(set(c.field_for('tags', 1)), {'tag one', 'News'})
550            self.assertEqual(set(c.field_for('tags', 2)), {'tag one', 'Tag Two'})
551        # Test new name
552        self.assertEqual(cache.rename_items('tags', {t:'t1'})[0], {1,2})
553        for c in (cache, self.init_cache(cl)):
554            self.assertEqual(c.all_field_names('tags'), {'t1', 'Tag Two', 'News'})
555            self.assertEqual(set(c.field_for('tags', 1)), {'t1', 'News'})
556            self.assertEqual(set(c.field_for('tags', 2)), {'t1', 'Tag Two'})
557        # Test rename to existing
558        self.assertEqual(cache.rename_items('tags', {t:'Tag Two'})[0], {1,2})
559        for c in (cache, self.init_cache(cl)):
560            self.assertEqual(c.all_field_names('tags'), {'Tag Two', 'News'})
561            self.assertEqual(set(c.field_for('tags', 1)), {'Tag Two', 'News'})
562            self.assertEqual(set(c.field_for('tags', 2)), {'Tag Two'})
563        # Test on a custom column
564        t = {v:k for k, v in iteritems(cache.get_id_map('#tags'))}['My Tag One']
565        self.assertEqual(cache.rename_items('#tags', {t:'My Tag Two'})[0], {2})
566        for c in (cache, self.init_cache(cl)):
567            self.assertEqual(c.all_field_names('#tags'), {'My Tag Two'})
568            self.assertEqual(set(c.field_for('#tags', 2)), {'My Tag Two'})
569
570        # Test a Many-one field
571        s = {v:k for k, v in iteritems(cache.get_id_map('series'))}['A Series One']
572        # Test case change
573        self.assertEqual(cache.rename_items('series', {s:'a series one'}), ({1, 2}, {s:s}))
574        for c in (cache, self.init_cache(cl)):
575            self.assertEqual(c.all_field_names('series'), {'a series one'})
576            self.assertEqual(c.field_for('series', 1), 'a series one')
577            self.assertEqual(c.field_for('series_index', 1), 2.0)
578
579        # Test new name
580        self.assertEqual(cache.rename_items('series', {s:'series'})[0], {1, 2})
581        for c in (cache, self.init_cache(cl)):
582            self.assertEqual(c.all_field_names('series'), {'series'})
583            self.assertEqual(c.field_for('series', 1), 'series')
584            self.assertEqual(c.field_for('series', 2), 'series')
585            self.assertEqual(c.field_for('series_index', 1), 2.0)
586
587        s = {v:k for k, v in iteritems(cache.get_id_map('#series'))}['My Series One']
588        # Test custom column with rename to existing
589        self.assertEqual(cache.rename_items('#series', {s:'My Series Two'})[0], {2})
590        for c in (cache, self.init_cache(cl)):
591            self.assertEqual(c.all_field_names('#series'), {'My Series Two'})
592            self.assertEqual(c.field_for('#series', 2), 'My Series Two')
593            self.assertEqual(c.field_for('#series_index', 1), 3.0)
594            self.assertEqual(c.field_for('#series_index', 2), 4.0)
595
596        # Test renaming many-many items to multiple items
597        cache = self.init_cache(self.cloned_library)
598        t = {v:k for k, v in iteritems(cache.get_id_map('tags'))}['Tag One']
599        affected_books, id_map = cache.rename_items('tags', {t:'Something, Else, Entirely'})
600        self.assertEqual({1, 2}, affected_books)
601        tmap = cache.get_id_map('tags')
602        self.assertEqual('Something', tmap[id_map[t]])
603        self.assertEqual(1, len(id_map))
604        f1, f2 = cache.field_for('tags', 1), cache.field_for('tags', 2)
605        for f in (f1, f2):
606            for t in 'Something,Else,Entirely'.split(','):
607                self.assertIn(t, f)
608            self.assertNotIn('Tag One', f)
609
610        # Test with restriction
611        cache = self.init_cache()
612        cache.set_field('tags', {1:'a,b,c', 2:'x,y,z', 3:'a,x,z'})
613        tmap = {v:k for k, v in iteritems(cache.get_id_map('tags'))}
614        self.assertEqual(cache.rename_items('tags', {tmap['a']:'r'}, restrict_to_book_ids=()), (set(), {}))
615        self.assertEqual(cache.rename_items('tags', {tmap['a']:'r', tmap['b']:'q'}, restrict_to_book_ids=(1,))[0], {1})
616        self.assertEqual(cache.rename_items('tags', {tmap['x']:'X'}, restrict_to_book_ids=(2,))[0], {2})
617        c2 = self.init_cache()
618        for c in (cache, c2):
619            self.assertEqual(c.field_for('tags', 1), ('r', 'q', 'c'))
620            self.assertEqual(c.field_for('tags', 2), ('X', 'y', 'z'))
621            self.assertEqual(c.field_for('tags', 3), ('a', 'X', 'z'))
622    # }}}
623
624    def test_composite_cache(self):  # {{{
625        ' Test that the composite field cache is properly invalidated on writes '
626        cache = self.init_cache()
627        cache.create_custom_column('tc', 'TC', 'composite', False, display={
628            'composite_template':'{title} {author_sort} {title_sort} {formats} {tags} {series} {series_index}'})
629        cache = self.init_cache()
630
631        def test_invalidate():
632            c = self.init_cache()
633            for bid in cache.all_book_ids():
634                self.assertEqual(cache.field_for('#tc', bid), c.field_for('#tc', bid))
635
636        cache.set_field('title', {1:'xx', 3:'yy'})
637        test_invalidate()
638        cache.set_field('series_index', {1:9, 3:11})
639        test_invalidate()
640        cache.rename_items('tags', {cache.get_item_id('tags', 'Tag One'):'xxx', cache.get_item_id('tags', 'News'):'news'})
641        test_invalidate()
642        cache.remove_items('tags', (cache.get_item_id('tags', 'news'),))
643        test_invalidate()
644        cache.set_sort_for_authors({cache.get_item_id('authors', 'Author One'):'meow'})
645        test_invalidate()
646        cache.remove_formats({1:{'FMT1'}})
647        test_invalidate()
648        cache.add_format(1, 'ADD', BytesIO(b'xxxx'))
649        test_invalidate()
650    # }}}
651
652    def test_dump_and_restore(self):  # {{{
653        ' Test roundtripping the db through SQL '
654        import warnings
655        with warnings.catch_warnings():
656            # on python 3.10 apsw raises a deprecation warning which causes this test to fail on CI
657            warnings.simplefilter('ignore', DeprecationWarning)
658            cache = self.init_cache()
659            uv = int(cache.backend.user_version)
660            all_ids = cache.all_book_ids()
661            cache.dump_and_restore()
662            self.assertEqual(cache.set_field('title', {1:'nt'}), {1}, 'database connection broken')
663            cache = self.init_cache()
664            self.assertEqual(cache.all_book_ids(), all_ids, 'dump and restore broke database')
665            self.assertEqual(int(cache.backend.user_version), uv)
666    # }}}
667
668    def test_set_author_data(self):  # {{{
669        cache = self.init_cache()
670        adata = cache.author_data()
671        ldata = {aid:str(aid) for aid in adata}
672        self.assertEqual({1,2,3}, cache.set_link_for_authors(ldata))
673        for c in (cache, self.init_cache()):
674            self.assertEqual(ldata, {aid:d['link'] for aid, d in iteritems(c.author_data())})
675        self.assertEqual({3}, cache.set_link_for_authors({aid:'xxx' if aid == max(adata) else str(aid) for aid in adata}),
676                         'Setting the author link to the same value as before, incorrectly marked some books as dirty')
677        sdata = {aid:'%s, changed' % aid for aid in adata}
678        self.assertEqual({1,2,3}, cache.set_sort_for_authors(sdata))
679        for bid in (1, 2, 3):
680            self.assertIn(', changed', cache.field_for('author_sort', bid))
681        sdata = {aid:'%s, changed' % (aid*2 if aid == max(adata) else aid) for aid in adata}
682        self.assertEqual({3}, cache.set_sort_for_authors(sdata),
683                         'Setting the author sort to the same value as before, incorrectly marked some books as dirty')
684    # }}}
685
686    def test_fix_case_duplicates(self):  # {{{
687        ' Test fixing of databases that have items in is_many fields that differ only by case '
688        ae = self.assertEqual
689        cache = self.init_cache()
690        conn = cache.backend.conn
691        conn.execute('INSERT INTO publishers (name) VALUES ("mūs")')
692        lid = conn.last_insert_rowid()
693        conn.execute('INSERT INTO publishers (name) VALUES ("MŪS")')
694        uid = conn.last_insert_rowid()
695        conn.execute('DELETE FROM books_publishers_link')
696        conn.execute('INSERT INTO books_publishers_link (book,publisher) VALUES (1, %d)' % lid)
697        conn.execute('INSERT INTO books_publishers_link (book,publisher) VALUES (2, %d)' % uid)
698        conn.execute('INSERT INTO books_publishers_link (book,publisher) VALUES (3, %d)' % uid)
699        cache.reload_from_db()
700        t = cache.fields['publisher'].table
701        for x in (lid, uid):
702            self.assertIn(x, t.id_map)
703            self.assertIn(x, t.col_book_map)
704        ae(t.book_col_map[1], lid)
705        ae(t.book_col_map[2], uid)
706        t.fix_case_duplicates(cache.backend)
707        for c in (cache, self.init_cache()):
708            t = c.fields['publisher'].table
709            self.assertNotIn(uid, t.id_map)
710            self.assertNotIn(uid, t.col_book_map)
711            for bid in (1, 2, 3):
712                ae(c.field_for('publisher', bid), "mūs")
713            c.close()
714
715        cache = self.init_cache()
716        conn = cache.backend.conn
717        conn.execute('INSERT INTO tags (name) VALUES ("mūūs")')
718        lid = conn.last_insert_rowid()
719        conn.execute('INSERT INTO tags (name) VALUES ("MŪŪS")')
720        uid = conn.last_insert_rowid()
721        conn.execute('INSERT INTO tags (name) VALUES ("mūŪS")')
722        mid = conn.last_insert_rowid()
723        conn.execute('INSERT INTO tags (name) VALUES ("t")')
724        norm = conn.last_insert_rowid()
725        conn.execute('DELETE FROM books_tags_link')
726        for book_id, vals in iteritems({1:(lid, uid), 2:(uid, mid), 3:(lid, norm)}):
727            conn.executemany('INSERT INTO books_tags_link (book,tag) VALUES (?,?)',
728                             tuple((book_id, x) for x in vals))
729        cache.reload_from_db()
730        t = cache.fields['tags'].table
731        for x in (lid, uid, mid):
732            self.assertIn(x, t.id_map)
733            self.assertIn(x, t.col_book_map)
734        t.fix_case_duplicates(cache.backend)
735        for c in (cache, self.init_cache()):
736            t = c.fields['tags'].table
737            for x in (uid, mid):
738                self.assertNotIn(x, t.id_map)
739                self.assertNotIn(x, t.col_book_map)
740            ae(c.field_for('tags', 1), (t.id_map[lid],))
741            ae(c.field_for('tags', 2), (t.id_map[lid],), 'failed for book 2')
742            ae(c.field_for('tags', 3), (t.id_map[lid], t.id_map[norm]))
743    # }}}
744
745    def test_preferences(self):  # {{{
746        ' Test getting and setting of preferences, especially with mutable objects '
747        cache = self.init_cache()
748        changes = []
749        cache.backend.conn.setupdatehook(lambda typ, dbname, tblname, rowid: changes.append(rowid))
750        prefs = cache.backend.prefs
751        prefs['test mutable'] =  [1, 2, 3]
752        self.assertEqual(len(changes), 1)
753        a = prefs['test mutable']
754        a.append(4)
755        self.assertIn(4, prefs['test mutable'])
756        prefs['test mutable'] = a
757        self.assertEqual(len(changes), 2)
758        prefs.load_from_db()
759        self.assertIn(4, prefs['test mutable'])
760        prefs['test mutable'] = {k:k for k in range(10)}
761        self.assertEqual(len(changes), 3)
762        prefs['test mutable'] = {k:k for k in reversed(range(10))}
763        self.assertEqual(len(changes), 3, 'The database was written to despite there being no change in value')
764    # }}}
765
766    def test_annotations(self):  # {{{
767        'Test handling of annotations'
768        from calibre.utils.date import utcnow, EPOCH
769        cl = self.cloned_library
770        cache = self.init_cache(cl)
771        # First empty dirtied
772        cache.dump_metadata()
773        self.assertFalse(cache.dirtied_cache)
774
775        def a(**kw):
776            ts = utcnow()
777            kw['timestamp'] = utcnow().isoformat()
778            return kw, (ts - EPOCH).total_seconds()
779
780        annot_list = [
781            a(type='bookmark', title='bookmark1 changed', seq=1),
782            a(type='highlight', highlighted_text='text1', uuid='1', seq=2),
783            a(type='highlight', highlighted_text='text2', uuid='2', seq=3, notes='notes2 some word changed again'),
784        ]
785
786        def map_as_list(amap):
787            ans = []
788            for items in amap.values():
789                ans.extend(items)
790            ans.sort(key=lambda x:x['seq'])
791            return ans
792
793        cache.set_annotations_for_book(1, 'moo', annot_list)
794        amap = cache.annotations_map_for_book(1, 'moo')
795        self.assertEqual(3, len(cache.all_annotations_for_book(1)))
796        self.assertEqual([x[0] for x in annot_list], map_as_list(amap))
797        self.assertFalse(cache.dirtied_cache)
798        cache.check_dirtied_annotations()
799        self.assertEqual(set(cache.dirtied_cache), {1})
800        cache.dump_metadata()
801        cache.check_dirtied_annotations()
802        self.assertFalse(cache.dirtied_cache)
803
804        # Test searching
805        results = cache.search_annotations('"changed"')
806        self.assertEqual([1, 3], [x['id'] for x in results])
807        results = cache.search_annotations('"changed"', annotation_type='bookmark')
808        self.assertEqual([1], [x['id'] for x in results])
809        results = cache.search_annotations('"Changed"')  # changed and change stem differently in english and other euro languages
810        self.assertEqual([1, 3], [x['id'] for x in results])
811        results = cache.search_annotations('"SOMe"')
812        self.assertEqual([3], [x['id'] for x in results])
813        results = cache.search_annotations('"change"', use_stemming=False)
814        self.assertFalse(results)
815        results = cache.search_annotations('"bookmark1"', highlight_start='[', highlight_end=']')
816        self.assertEqual(results[0]['text'], '[bookmark1] changed')
817        results = cache.search_annotations('"word"', highlight_start='[', highlight_end=']', snippet_size=3)
818        self.assertEqual(results[0]['text'], '…some [word] changed…')
819        self.assertRaises(FTSQueryError, cache.search_annotations, 'AND OR')
820        fts_l = [a(type='bookmark', title='路坎坷走来', seq=1),]
821        cache.set_annotations_for_book(1, 'moo', fts_l)
822        results = cache.search_annotations('路', highlight_start='[', highlight_end=']')
823        self.assertEqual(results[0]['text'], '[路]坎坷走来')
824
825        annot_list[0][0]['title'] = 'changed title'
826        cache.set_annotations_for_book(1, 'moo', annot_list)
827        amap = cache.annotations_map_for_book(1, 'moo')
828        self.assertEqual([x[0] for x in annot_list], map_as_list(amap))
829
830        del annot_list[1]
831        cache.set_annotations_for_book(1, 'moo', annot_list)
832        amap = cache.annotations_map_for_book(1, 'moo')
833        self.assertEqual([x[0] for x in annot_list], map_as_list(amap))
834        cache.check_dirtied_annotations()
835        cache.dump_metadata()
836        from calibre.ebooks.metadata.opf2 import OPF
837        raw = cache.read_backup(1)
838        opf = OPF(BytesIO(raw))
839        cache.restore_annotations(1, list(opf.read_annotations()))
840        amap = cache.annotations_map_for_book(1, 'moo')
841        self.assertEqual([x[0] for x in annot_list], map_as_list(amap))
842
843    # }}}
844
845    def test_changed_events(self):  # {{{
846        def ae(l, r):
847            # We need to sleep a bit to allow events to happen on its thread
848            import time
849            time.sleep(.001)
850            self.assertEqual(l, r)
851
852        cache = self.init_cache(self.cloned_library)
853        ae(cache.all_book_ids(), {1, 2, 3})
854
855        event_set = set()
856
857        def event_func(t, library_id, *args):
858            nonlocal event_set, ae
859            event_set.update(args[0][1])
860        cache.add_listener(event_func)
861
862        # Test that setting metadata to itself doesn't generate any events
863        for id_ in cache.all_book_ids():
864            cache.set_metadata(id_, cache.get_metadata(id_))
865
866        ae(event_set, set())
867
868        # test setting a single field
869        cache.set_field('tags', {1:'foo'})
870        ae(event_set, {1})
871
872        # test setting multiple books. Book 1 shouldn't get an event because it
873        # isn't being changed
874        event_set = set()
875        cache.set_field('tags', {1:'foo', 2:'bar', 3:'mumble'})
876        ae(event_set, {2, 3})
877
878        # test setting a many-many field to empty
879        event_set = set()
880        cache.set_field('tags', {1:''})
881        ae(event_set, {1,})
882        event_set = set()
883        cache.set_field('tags', {1:''})
884        ae(event_set, set())
885
886        # test setting title
887        event_set = set()
888        cache.set_field('title', {1:'Book 1'})
889        ae(event_set, {1})
890        ae(cache.field_for('title', 1), 'Book 1')
891
892        # test setting series
893        event_set = set()
894        cache.set_field('series', {1:'GreatBooks [1]'})
895        cache.set_field('series', {2:'GreatBooks [0]'})
896        ae(event_set, {1,2})
897        ae(cache.field_for('series', 1), 'GreatBooks')
898        ae(cache.field_for('series_index', 1), 1.0)
899        ae(cache.field_for('series', 2), 'GreatBooks')
900        ae(cache.field_for('series_index', 2), 0.0)
901
902        # now series_index
903        event_set = set()
904        cache.set_field('series_index', {1:2})
905        ae(event_set, {1})
906        ae(cache.field_for('series_index', 1), 2.0)
907        event_set = set()
908        cache.set_field('series_index', {1:2, 2:3.5})  # book 1 isn't changed
909        ae(event_set, {2})
910        ae(cache.field_for('series_index', 1), 2.0)
911        ae(cache.field_for('series_index', 2), 3.5)
912
913    # }}}
914