1#!/usr/local/bin/python3.8 2# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:fdm=marker:ai 3 4 5__license__ = 'GPL v3' 6__copyright__ = '2013, Kovid Goyal <kovid at kovidgoyal.net>' 7__docformat__ = 'restructuredtext en' 8 9from collections import namedtuple 10from functools import partial 11from io import BytesIO 12 13from calibre.ebooks.metadata import author_to_author_sort, title_sort 14from calibre.ebooks.metadata.book.base import Metadata 15from calibre.utils.date import UNDEFINED_DATE 16from calibre.db.tests.base import BaseTest, IMG 17from calibre.db.backend import FTSQueryError 18from polyglot.builtins import iteritems, itervalues 19 20 21class WritingTest(BaseTest): 22 23 # Utils {{{ 24 def create_getter(self, name, getter=None): 25 if getter is None: 26 if name.endswith('_index'): 27 ans = lambda db:partial(db.get_custom_extra, index_is_id=True, 28 label=name[1:].replace('_index', '')) 29 else: 30 ans = lambda db:partial(db.get_custom, label=name[1:], 31 index_is_id=True) 32 else: 33 ans = lambda db:partial(getattr(db, getter), index_is_id=True) 34 return ans 35 36 def create_setter(self, name, setter=None): 37 if setter is None: 38 ans = lambda db:partial(db.set_custom, label=name[1:], commit=True) 39 else: 40 ans = lambda db:partial(getattr(db, setter), commit=True) 41 return ans 42 43 def create_test(self, name, vals, getter=None, setter=None): 44 T = namedtuple('Test', 'name vals getter setter') 45 return T(name, vals, self.create_getter(name, getter), 46 self.create_setter(name, setter)) 47 48 def run_tests(self, tests): 49 results = {} 50 for test in tests: 51 results[test] = [] 52 for val in test.vals: 53 cl = self.cloned_library 54 cache = self.init_cache(cl) 55 cache.set_field(test.name, {1: val}) 56 cached_res = cache.field_for(test.name, 1) 57 del cache 58 db = self.init_old(cl) 59 getter = test.getter(db) 60 sqlite_res = getter(1) 61 if test.name.endswith('_index'): 62 val = float(val) if val is not None else 1.0 63 self.assertEqual(sqlite_res, val, 64 'Failed setting for %s with value %r, sqlite value not the same. val: %r != sqlite_val: %r'%( 65 test.name, val, val, sqlite_res)) 66 else: 67 test.setter(db)(1, val) 68 old_cached_res = getter(1) 69 self.assertEqual(old_cached_res, cached_res, 70 'Failed setting for %s with value %r, cached value not the same. Old: %r != New: %r'%( 71 test.name, val, old_cached_res, cached_res)) 72 db.refresh() 73 old_sqlite_res = getter(1) 74 self.assertEqual(old_sqlite_res, sqlite_res, 75 'Failed setting for %s, sqlite value not the same: %r != %r'%( 76 test.name, old_sqlite_res, sqlite_res)) 77 del db 78 # }}} 79 80 def test_one_one(self): # {{{ 81 'Test setting of values in one-one fields' 82 tests = [self.create_test('#yesno', (True, False, 'true', 'false', None))] 83 for name, getter, setter in ( 84 ('#series_index', None, None), 85 ('series_index', 'series_index', 'set_series_index'), 86 ('#float', None, None), 87 ): 88 vals = ['1.5', None, 0, 1.0] 89 tests.append(self.create_test(name, tuple(vals), getter, setter)) 90 91 for name, getter, setter in ( 92 ('pubdate', 'pubdate', 'set_pubdate'), 93 ('timestamp', 'timestamp', 'set_timestamp'), 94 ('#date', None, None), 95 ): 96 tests.append(self.create_test( 97 name, ('2011-1-12', UNDEFINED_DATE, None), getter, setter)) 98 99 for name, getter, setter in ( 100 ('title', 'title', 'set_title'), 101 ('uuid', 'uuid', 'set_uuid'), 102 ('author_sort', 'author_sort', 'set_author_sort'), 103 ('sort', 'title_sort', 'set_title_sort'), 104 ('#comments', None, None), 105 ('comments', 'comments', 'set_comment'), 106 ): 107 vals = ['something', None] 108 if name not in {'comments', '#comments'}: 109 # Setting text column to '' returns None in the new backend 110 # and '' in the old. I think None is more correct. 111 vals.append('') 112 if name == 'comments': 113 # Again new behavior of deleting comment rather than setting 114 # empty string is more correct. 115 vals.remove(None) 116 tests.append(self.create_test(name, tuple(vals), getter, setter)) 117 118 self.run_tests(tests) 119 # }}} 120 121 def test_many_one_basic(self): # {{{ 122 'Test the different code paths for writing to a many-one field' 123 cl = self.cloned_library 124 cache = self.init_cache(cl) 125 f = cache.fields['publisher'] 126 item_ids = {f.ids_for_book(1)[0], f.ids_for_book(2)[0]} 127 val = 'Changed' 128 self.assertEqual(cache.set_field('publisher', {1:val, 2:val}), {1, 2}) 129 cache2 = self.init_cache(cl) 130 for book_id in (1, 2): 131 for c in (cache, cache2): 132 self.assertEqual(c.field_for('publisher', book_id), val) 133 self.assertFalse(item_ids.intersection(set(c.fields['publisher'].table.id_map))) 134 del cache2 135 self.assertFalse(cache.set_field('publisher', {1:val, 2:val})) 136 val = val.lower() 137 self.assertFalse(cache.set_field('publisher', {1:val, 2:val}, 138 allow_case_change=False)) 139 self.assertEqual(cache.set_field('publisher', {1:val, 2:val}), {1, 2}) 140 cache2 = self.init_cache(cl) 141 for book_id in (1, 2): 142 for c in (cache, cache2): 143 self.assertEqual(c.field_for('publisher', book_id), val) 144 del cache2 145 self.assertEqual(cache.set_field('publisher', {1:'new', 2:'New'}), {1, 2}) 146 self.assertEqual(cache.field_for('publisher', 1).lower(), 'new') 147 self.assertEqual(cache.field_for('publisher', 2).lower(), 'new') 148 self.assertEqual(cache.set_field('publisher', {1:None, 2:'NEW'}), {1, 2}) 149 self.assertEqual(len(f.table.id_map), 1) 150 self.assertEqual(cache.set_field('publisher', {2:None}), {2}) 151 self.assertEqual(len(f.table.id_map), 0) 152 cache2 = self.init_cache(cl) 153 self.assertEqual(len(cache2.fields['publisher'].table.id_map), 0) 154 del cache2 155 self.assertEqual(cache.set_field('publisher', {1:'one', 2:'two', 156 3:'three'}), {1, 2, 3}) 157 self.assertEqual(cache.set_field('publisher', {1:''}), {1}) 158 self.assertEqual(cache.set_field('publisher', {1:'two'}), {1}) 159 self.assertEqual(tuple(map(f.for_book, (1,2,3))), ('two', 'two', 'three')) 160 self.assertEqual(cache.set_field('publisher', {1:'Two'}), {1, 2}) 161 cache2 = self.init_cache(cl) 162 self.assertEqual(tuple(map(f.for_book, (1,2,3))), ('Two', 'Two', 'three')) 163 del cache2 164 165 # Enum 166 self.assertFalse(cache.set_field('#enum', {1:'Not allowed'})) 167 self.assertEqual(cache.set_field('#enum', {1:'One', 2:'One', 3:'Three'}), {1, 3}) 168 self.assertEqual(cache.set_field('#enum', {1:None}), {1}) 169 cache2 = self.init_cache(cl) 170 for c in (cache, cache2): 171 for i, val in iteritems({1:None, 2:'One', 3:'Three'}): 172 self.assertEqual(c.field_for('#enum', i), val) 173 del cache2 174 175 # Rating 176 self.assertFalse(cache.set_field('rating', {1:6, 2:4})) 177 self.assertEqual(cache.set_field('rating', {1:0, 3:2}), {1, 3}) 178 self.assertEqual(cache.set_field('#rating', {1:None, 2:4, 3:8}), {1, 2, 3}) 179 cache2 = self.init_cache(cl) 180 for c in (cache, cache2): 181 for i, val in iteritems({1:None, 2:4, 3:2}): 182 self.assertEqual(c.field_for('rating', i), val) 183 for i, val in iteritems({1:None, 2:4, 3:8}): 184 self.assertEqual(c.field_for('#rating', i), val) 185 del cache2 186 187 # Series 188 self.assertFalse(cache.set_field('series', 189 {1:'a series one', 2:'a series one'}, allow_case_change=False)) 190 self.assertEqual(cache.set_field('series', {3:'Series [3]'}), {3}) 191 self.assertEqual(cache.set_field('#series', {1:'Series', 3:'Series'}), 192 {1, 3}) 193 self.assertEqual(cache.set_field('#series', {2:'Series [0]'}), {2}) 194 cache2 = self.init_cache(cl) 195 for c in (cache, cache2): 196 for i, val in iteritems({1:'A Series One', 2:'A Series One', 3:'Series'}): 197 self.assertEqual(c.field_for('series', i), val) 198 cs_indices = {1:c.field_for('#series_index', 1), 3:c.field_for('#series_index', 3)} 199 for i in (1, 2, 3): 200 self.assertEqual(c.field_for('#series', i), 'Series') 201 for i, val in iteritems({1:2, 2:1, 3:3}): 202 self.assertEqual(c.field_for('series_index', i), val) 203 for i, val in iteritems({1:cs_indices[1], 2:0, 3:cs_indices[3]}): 204 self.assertEqual(c.field_for('#series_index', i), val) 205 del cache2 206 207 # }}} 208 209 def test_many_many_basic(self): # {{{ 210 'Test the different code paths for writing to a many-many field' 211 cl = self.cloned_library 212 cache = self.init_cache(cl) 213 ae, af, sf = self.assertEqual, self.assertFalse, cache.set_field 214 215 # Tags 216 ae(sf('#tags', {1:cache.field_for('tags', 1), 2:cache.field_for('tags', 2)}), 217 {1, 2}) 218 for name in ('tags', '#tags'): 219 f = cache.fields[name] 220 af(sf(name, {1:('News', 'tag one')}, allow_case_change=False)) 221 ae(sf(name, {1:'tag one, News'}), {1, 2}) 222 ae(sf(name, {3:('tag two', 'sep,sep2')}), {2, 3}) 223 ae(len(f.table.id_map), 4) 224 ae(sf(name, {1:None}), {1}) 225 cache2 = self.init_cache(cl) 226 for c in (cache, cache2): 227 ae(c.field_for(name, 3), ('tag two', 'sep;sep2')) 228 ae(len(c.fields[name].table.id_map), 3) 229 ae(len(c.fields[name].table.id_map), 3) 230 ae(c.field_for(name, 1), ()) 231 ae(c.field_for(name, 2), ('tag two', 'tag one')) 232 del cache2 233 234 # Authors 235 ae(sf('#authors', {k:cache.field_for('authors', k) for k in (1,2,3)}), 236 {1,2,3}) 237 238 for name in ('authors', '#authors'): 239 f = cache.fields[name] 240 ae(len(f.table.id_map), 3) 241 af(cache.set_field(name, {3:'Unknown'})) 242 ae(cache.set_field(name, {3:'Kovid Goyal & Divok Layog'}), {3}) 243 ae(cache.set_field(name, {1:'', 2:'An, Author'}), {1,2}) 244 cache2 = self.init_cache(cl) 245 for c in (cache, cache2): 246 ae(len(c.fields[name].table.id_map), 4 if name =='authors' else 3) 247 ae(c.field_for(name, 3), ('Kovid Goyal', 'Divok Layog')) 248 ae(c.field_for(name, 2), ('An, Author',)) 249 ae(c.field_for(name, 1), (_('Unknown'),) if name=='authors' else ()) 250 if name == 'authors': 251 ae(c.field_for('author_sort', 1), author_to_author_sort(_('Unknown'))) 252 ae(c.field_for('author_sort', 2), author_to_author_sort('An, Author')) 253 ae(c.field_for('author_sort', 3), author_to_author_sort('Kovid Goyal') + ' & ' + author_to_author_sort('Divok Layog')) 254 del cache2 255 ae(cache.set_field('authors', {1:'KoviD GoyaL'}), {1, 3}) 256 ae(cache.field_for('author_sort', 1), 'GoyaL, KoviD') 257 ae(cache.field_for('author_sort', 3), 'GoyaL, KoviD & Layog, Divok') 258 259 # Languages 260 f = cache.fields['languages'] 261 ae(f.table.id_map, {1: 'eng', 2: 'deu'}) 262 ae(sf('languages', {1:''}), {1}) 263 ae(cache.field_for('languages', 1), ()) 264 ae(sf('languages', {2:('und',)}), {2}) 265 af(f.table.id_map) 266 ae(sf('languages', {1:'eng,fra,deu', 2:'es,Dutch', 3:'English'}), {1, 2, 3}) 267 ae(cache.field_for('languages', 1), ('eng', 'fra', 'deu')) 268 ae(cache.field_for('languages', 2), ('spa', 'nld')) 269 ae(cache.field_for('languages', 3), ('eng',)) 270 ae(sf('languages', {3:None}), {3}) 271 ae(cache.field_for('languages', 3), ()) 272 ae(sf('languages', {1:'deu,fra,eng'}), {1}, 'Changing order failed') 273 ae(sf('languages', {2:'deu,eng,eng'}), {2}) 274 cache2 = self.init_cache(cl) 275 for c in (cache, cache2): 276 ae(cache.field_for('languages', 1), ('deu', 'fra', 'eng')) 277 ae(cache.field_for('languages', 2), ('deu', 'eng')) 278 del cache2 279 280 # Identifiers 281 f = cache.fields['identifiers'] 282 ae(sf('identifiers', {3: 'one:1,two:2'}), {3}) 283 ae(sf('identifiers', {2:None}), {2}) 284 ae(sf('identifiers', {1: {'test':'1', 'two':'2'}}), {1}) 285 cache2 = self.init_cache(cl) 286 for c in (cache, cache2): 287 ae(c.field_for('identifiers', 3), {'one':'1', 'two':'2'}) 288 ae(c.field_for('identifiers', 2), {}) 289 ae(c.field_for('identifiers', 1), {'test':'1', 'two':'2'}) 290 del cache2 291 292 # Test setting of title sort 293 ae(sf('title', {1:'The Moose', 2:'Cat'}), {1, 2}) 294 cache2 = self.init_cache(cl) 295 for c in (cache, cache2): 296 ae(c.field_for('sort', 1), title_sort('The Moose')) 297 ae(c.field_for('sort', 2), title_sort('Cat')) 298 299 # Test setting with the same value repeated 300 ae(sf('tags', {3: ('a', 'b', 'a')}), {3}) 301 ae(sf('tags', {3: ('x', 'X')}), {3}, 'Failed when setting tag twice with different cases') 302 ae(('x',), cache.field_for('tags', 3)) 303 304 # Test setting of authors with | in their names (for legacy database 305 # format compatibility | is replaced by ,) 306 ae(sf('authors', {3: ('Some| Author',)}), {3}) 307 ae(('Some, Author',), cache.field_for('authors', 3)) 308 309 # }}} 310 311 def test_dirtied(self): # {{{ 312 'Test the setting of the dirtied flag and the last_modified column' 313 cl = self.cloned_library 314 cache = self.init_cache(cl) 315 ae, af, sf = self.assertEqual, self.assertFalse, cache.set_field 316 # First empty dirtied 317 cache.dump_metadata() 318 af(cache.dirtied_cache) 319 af(self.init_cache(cl).dirtied_cache) 320 321 prev = cache.field_for('last_modified', 3) 322 import calibre.db.cache as c 323 from datetime import timedelta 324 utime = prev+timedelta(days=1) 325 onowf = c.nowf 326 c.nowf = lambda: utime 327 try: 328 ae(sf('title', {3:'xxx'}), {3}) 329 self.assertTrue(3 in cache.dirtied_cache) 330 ae(cache.field_for('last_modified', 3), utime) 331 cache.dump_metadata() 332 raw = cache.read_backup(3) 333 from calibre.ebooks.metadata.opf2 import OPF 334 opf = OPF(BytesIO(raw)) 335 ae(opf.title, 'xxx') 336 finally: 337 c.nowf = onowf 338 # }}} 339 340 def test_backup(self): # {{{ 341 'Test the automatic backup of changed metadata' 342 cl = self.cloned_library 343 cache = self.init_cache(cl) 344 ae, af, sf = self.assertEqual, self.assertFalse, cache.set_field 345 # First empty dirtied 346 cache.dump_metadata() 347 af(cache.dirtied_cache) 348 from calibre.db.backup import MetadataBackup 349 interval = 0.01 350 mb = MetadataBackup(cache, interval=interval, scheduling_interval=0) 351 mb.start() 352 try: 353 ae(sf('title', {1:'title1', 2:'title2', 3:'title3'}), {1,2,3}) 354 ae(sf('authors', {1:'author1 & author2', 2:'author1 & author2', 3:'author1 & author2'}), {1,2,3}) 355 count = 6 356 while cache.dirty_queue_length() and count > 0: 357 mb.join(2) 358 count -= 1 359 af(cache.dirty_queue_length()) 360 finally: 361 mb.stop() 362 mb.join(2) 363 af(mb.is_alive()) 364 from calibre.ebooks.metadata.opf2 import OPF 365 for book_id in (1, 2, 3): 366 raw = cache.read_backup(book_id) 367 opf = OPF(BytesIO(raw)) 368 ae(opf.title, 'title%d'%book_id) 369 ae(opf.authors, ['author1', 'author2']) 370 # }}} 371 372 def test_set_cover(self): # {{{ 373 ' Test setting of cover ' 374 cache = self.init_cache() 375 ae = self.assertEqual 376 377 # Test removing a cover 378 ae(cache.field_for('cover', 1), 1) 379 ae(cache.set_cover({1:None}), {1}) 380 ae(cache.field_for('cover', 1), 0) 381 img = IMG 382 383 # Test setting a cover 384 ae(cache.set_cover({bid:img for bid in (1, 2, 3)}), {1, 2, 3}) 385 old = self.init_old() 386 for book_id in (1, 2, 3): 387 ae(cache.cover(book_id), img, 'Cover was not set correctly for book %d' % book_id) 388 ae(cache.field_for('cover', book_id), 1) 389 ae(old.cover(book_id, index_is_id=True), img, 'Cover was not set correctly for book %d' % book_id) 390 self.assertTrue(old.has_cover(book_id)) 391 old.close() 392 old.break_cycles() 393 del old 394 # }}} 395 396 def test_set_metadata(self): # {{{ 397 ' Test setting of metadata ' 398 ae = self.assertEqual 399 cache = self.init_cache(self.cloned_library) 400 401 # Check that changing title/author updates the path 402 mi = cache.get_metadata(1) 403 old_path = cache.field_for('path', 1) 404 old_title, old_author = mi.title, mi.authors[0] 405 ae(old_path, '%s/%s (1)' % (old_author, old_title)) 406 mi.title, mi.authors = 'New Title', ['New Author'] 407 cache.set_metadata(1, mi) 408 ae(cache.field_for('path', 1), '%s/%s (1)' % (mi.authors[0], mi.title)) 409 p = cache.format_abspath(1, 'FMT1') 410 self.assertTrue(mi.authors[0] in p and mi.title in p) 411 412 # Compare old and new set_metadata() 413 db = self.init_old(self.cloned_library) 414 mi = db.get_metadata(1, index_is_id=True, get_cover=True, cover_as_data=True) 415 mi2 = db.get_metadata(3, index_is_id=True, get_cover=True, cover_as_data=True) 416 db.set_metadata(2, mi) 417 db.set_metadata(1, mi2, force_changes=True) 418 oldmi = db.get_metadata(2, index_is_id=True, get_cover=True, cover_as_data=True) 419 oldmi2 = db.get_metadata(1, index_is_id=True, get_cover=True, cover_as_data=True) 420 db.close() 421 del db 422 cache = self.init_cache(self.cloned_library) 423 cache.set_metadata(2, mi) 424 nmi = cache.get_metadata(2, get_cover=True, cover_as_data=True) 425 ae(oldmi.cover_data, nmi.cover_data) 426 self.compare_metadata(nmi, oldmi, exclude={'last_modified', 'format_metadata', 'formats'}) 427 cache.set_metadata(1, mi2, force_changes=True) 428 nmi2 = cache.get_metadata(1, get_cover=True, cover_as_data=True) 429 self.compare_metadata(nmi2, oldmi2, exclude={'last_modified', 'format_metadata', 'formats'}) 430 431 cache = self.init_cache(self.cloned_library) 432 mi = cache.get_metadata(1) 433 otags = mi.tags 434 mi.tags = [x.upper() for x in mi.tags] 435 cache.set_metadata(3, mi) 436 self.assertEqual(set(otags), set(cache.field_for('tags', 3)), 'case changes should not be allowed in set_metadata') 437 438 # test that setting authors without author sort results in an 439 # auto-generated authors sort 440 mi = Metadata('empty', ['a1', 'a2']) 441 cache.set_metadata(1, mi) 442 self.assertEqual('a1 & a2', cache.field_for('author_sort', 1)) 443 cache.set_sort_for_authors({cache.get_item_id('authors', 'a1'): 'xy'}) 444 self.assertEqual('xy & a2', cache.field_for('author_sort', 1)) 445 mi = Metadata('empty', ['a1']) 446 cache.set_metadata(1, mi) 447 self.assertEqual('xy', cache.field_for('author_sort', 1)) 448 449 # }}} 450 451 def test_conversion_options(self): # {{{ 452 ' Test saving of conversion options ' 453 cache = self.init_cache() 454 all_ids = cache.all_book_ids() 455 self.assertFalse(cache.has_conversion_options(all_ids)) 456 self.assertIsNone(cache.conversion_options(1)) 457 op1, op2 = b"{'xx':'yy'}", b"{'yy':'zz'}" 458 cache.set_conversion_options({1:op1, 2:op2}) 459 self.assertTrue(cache.has_conversion_options(all_ids)) 460 self.assertEqual(cache.conversion_options(1), op1) 461 self.assertEqual(cache.conversion_options(2), op2) 462 cache.set_conversion_options({1:op2}) 463 self.assertEqual(cache.conversion_options(1), op2) 464 cache.delete_conversion_options(all_ids) 465 self.assertFalse(cache.has_conversion_options(all_ids)) 466 # }}} 467 468 def test_remove_items(self): # {{{ 469 ' Test removal of many-(many,one) items ' 470 cache = self.init_cache() 471 tmap = cache.get_id_map('tags') 472 self.assertEqual(cache.remove_items('tags', tmap), {1, 2}) 473 tmap = cache.get_id_map('#tags') 474 t = {v:k for k, v in iteritems(tmap)}['My Tag Two'] 475 self.assertEqual(cache.remove_items('#tags', (t,)), {1, 2}) 476 477 smap = cache.get_id_map('series') 478 self.assertEqual(cache.remove_items('series', smap), {1, 2}) 479 smap = cache.get_id_map('#series') 480 s = {v:k for k, v in iteritems(smap)}['My Series Two'] 481 self.assertEqual(cache.remove_items('#series', (s,)), {1}) 482 483 for c in (cache, self.init_cache()): 484 self.assertFalse(c.get_id_map('tags')) 485 self.assertFalse(c.all_field_names('tags')) 486 for bid in c.all_book_ids(): 487 self.assertFalse(c.field_for('tags', bid)) 488 489 self.assertEqual(len(c.get_id_map('#tags')), 1) 490 self.assertEqual(c.all_field_names('#tags'), {'My Tag One'}) 491 for bid in c.all_book_ids(): 492 self.assertIn(c.field_for('#tags', bid), ((), ('My Tag One',))) 493 494 for bid in (1, 2): 495 self.assertEqual(c.field_for('series_index', bid), 1.0) 496 self.assertFalse(c.get_id_map('series')) 497 self.assertFalse(c.all_field_names('series')) 498 for bid in c.all_book_ids(): 499 self.assertFalse(c.field_for('series', bid)) 500 501 self.assertEqual(c.field_for('series_index', 1), 1.0) 502 self.assertEqual(c.all_field_names('#series'), {'My Series One'}) 503 for bid in c.all_book_ids(): 504 self.assertIn(c.field_for('#series', bid), (None, 'My Series One')) 505 506 # Now test with restriction 507 cache = self.init_cache() 508 cache.set_field('tags', {1:'a,b,c', 2:'b,a', 3:'x,y,z'}) 509 cache.set_field('series', {1:'a', 2:'a', 3:'b'}) 510 cache.set_field('series_index', {1:8, 2:9, 3:3}) 511 tmap, smap = cache.get_id_map('tags'), cache.get_id_map('series') 512 self.assertEqual(cache.remove_items('tags', tmap, restrict_to_book_ids=()), set()) 513 self.assertEqual(cache.remove_items('tags', tmap, restrict_to_book_ids={1}), {1}) 514 self.assertEqual(cache.remove_items('series', smap, restrict_to_book_ids=()), set()) 515 self.assertEqual(cache.remove_items('series', smap, restrict_to_book_ids=(1,)), {1}) 516 c2 = self.init_cache() 517 for c in (cache, c2): 518 self.assertEqual(c.field_for('tags', 1), ()) 519 self.assertEqual(c.field_for('tags', 2), ('b', 'a')) 520 self.assertNotIn('c', set(itervalues(c.get_id_map('tags')))) 521 self.assertEqual(c.field_for('series', 1), None) 522 self.assertEqual(c.field_for('series', 2), 'a') 523 self.assertEqual(c.field_for('series_index', 1), 1.0) 524 self.assertEqual(c.field_for('series_index', 2), 9) 525 526 # }}} 527 528 def test_rename_items(self): # {{{ 529 ' Test renaming of many-(many,one) items ' 530 cl = self.cloned_library 531 cache = self.init_cache(cl) 532 # Check that renaming authors updates author sort and path 533 a = {v:k for k, v in iteritems(cache.get_id_map('authors'))}['Unknown'] 534 self.assertEqual(cache.rename_items('authors', {a:'New Author'})[0], {3}) 535 a = {v:k for k, v in iteritems(cache.get_id_map('authors'))}['Author One'] 536 self.assertEqual(cache.rename_items('authors', {a:'Author Two'})[0], {1, 2}) 537 for c in (cache, self.init_cache(cl)): 538 self.assertEqual(c.all_field_names('authors'), {'New Author', 'Author Two'}) 539 self.assertEqual(c.field_for('author_sort', 3), 'Author, New') 540 self.assertIn('New Author/', c.field_for('path', 3)) 541 self.assertEqual(c.field_for('authors', 1), ('Author Two',)) 542 self.assertEqual(c.field_for('author_sort', 1), 'Two, Author') 543 544 t = {v:k for k, v in iteritems(cache.get_id_map('tags'))}['Tag One'] 545 # Test case change 546 self.assertEqual(cache.rename_items('tags', {t:'tag one'}), ({1, 2}, {t:t})) 547 for c in (cache, self.init_cache(cl)): 548 self.assertEqual(c.all_field_names('tags'), {'tag one', 'Tag Two', 'News'}) 549 self.assertEqual(set(c.field_for('tags', 1)), {'tag one', 'News'}) 550 self.assertEqual(set(c.field_for('tags', 2)), {'tag one', 'Tag Two'}) 551 # Test new name 552 self.assertEqual(cache.rename_items('tags', {t:'t1'})[0], {1,2}) 553 for c in (cache, self.init_cache(cl)): 554 self.assertEqual(c.all_field_names('tags'), {'t1', 'Tag Two', 'News'}) 555 self.assertEqual(set(c.field_for('tags', 1)), {'t1', 'News'}) 556 self.assertEqual(set(c.field_for('tags', 2)), {'t1', 'Tag Two'}) 557 # Test rename to existing 558 self.assertEqual(cache.rename_items('tags', {t:'Tag Two'})[0], {1,2}) 559 for c in (cache, self.init_cache(cl)): 560 self.assertEqual(c.all_field_names('tags'), {'Tag Two', 'News'}) 561 self.assertEqual(set(c.field_for('tags', 1)), {'Tag Two', 'News'}) 562 self.assertEqual(set(c.field_for('tags', 2)), {'Tag Two'}) 563 # Test on a custom column 564 t = {v:k for k, v in iteritems(cache.get_id_map('#tags'))}['My Tag One'] 565 self.assertEqual(cache.rename_items('#tags', {t:'My Tag Two'})[0], {2}) 566 for c in (cache, self.init_cache(cl)): 567 self.assertEqual(c.all_field_names('#tags'), {'My Tag Two'}) 568 self.assertEqual(set(c.field_for('#tags', 2)), {'My Tag Two'}) 569 570 # Test a Many-one field 571 s = {v:k for k, v in iteritems(cache.get_id_map('series'))}['A Series One'] 572 # Test case change 573 self.assertEqual(cache.rename_items('series', {s:'a series one'}), ({1, 2}, {s:s})) 574 for c in (cache, self.init_cache(cl)): 575 self.assertEqual(c.all_field_names('series'), {'a series one'}) 576 self.assertEqual(c.field_for('series', 1), 'a series one') 577 self.assertEqual(c.field_for('series_index', 1), 2.0) 578 579 # Test new name 580 self.assertEqual(cache.rename_items('series', {s:'series'})[0], {1, 2}) 581 for c in (cache, self.init_cache(cl)): 582 self.assertEqual(c.all_field_names('series'), {'series'}) 583 self.assertEqual(c.field_for('series', 1), 'series') 584 self.assertEqual(c.field_for('series', 2), 'series') 585 self.assertEqual(c.field_for('series_index', 1), 2.0) 586 587 s = {v:k for k, v in iteritems(cache.get_id_map('#series'))}['My Series One'] 588 # Test custom column with rename to existing 589 self.assertEqual(cache.rename_items('#series', {s:'My Series Two'})[0], {2}) 590 for c in (cache, self.init_cache(cl)): 591 self.assertEqual(c.all_field_names('#series'), {'My Series Two'}) 592 self.assertEqual(c.field_for('#series', 2), 'My Series Two') 593 self.assertEqual(c.field_for('#series_index', 1), 3.0) 594 self.assertEqual(c.field_for('#series_index', 2), 4.0) 595 596 # Test renaming many-many items to multiple items 597 cache = self.init_cache(self.cloned_library) 598 t = {v:k for k, v in iteritems(cache.get_id_map('tags'))}['Tag One'] 599 affected_books, id_map = cache.rename_items('tags', {t:'Something, Else, Entirely'}) 600 self.assertEqual({1, 2}, affected_books) 601 tmap = cache.get_id_map('tags') 602 self.assertEqual('Something', tmap[id_map[t]]) 603 self.assertEqual(1, len(id_map)) 604 f1, f2 = cache.field_for('tags', 1), cache.field_for('tags', 2) 605 for f in (f1, f2): 606 for t in 'Something,Else,Entirely'.split(','): 607 self.assertIn(t, f) 608 self.assertNotIn('Tag One', f) 609 610 # Test with restriction 611 cache = self.init_cache() 612 cache.set_field('tags', {1:'a,b,c', 2:'x,y,z', 3:'a,x,z'}) 613 tmap = {v:k for k, v in iteritems(cache.get_id_map('tags'))} 614 self.assertEqual(cache.rename_items('tags', {tmap['a']:'r'}, restrict_to_book_ids=()), (set(), {})) 615 self.assertEqual(cache.rename_items('tags', {tmap['a']:'r', tmap['b']:'q'}, restrict_to_book_ids=(1,))[0], {1}) 616 self.assertEqual(cache.rename_items('tags', {tmap['x']:'X'}, restrict_to_book_ids=(2,))[0], {2}) 617 c2 = self.init_cache() 618 for c in (cache, c2): 619 self.assertEqual(c.field_for('tags', 1), ('r', 'q', 'c')) 620 self.assertEqual(c.field_for('tags', 2), ('X', 'y', 'z')) 621 self.assertEqual(c.field_for('tags', 3), ('a', 'X', 'z')) 622 # }}} 623 624 def test_composite_cache(self): # {{{ 625 ' Test that the composite field cache is properly invalidated on writes ' 626 cache = self.init_cache() 627 cache.create_custom_column('tc', 'TC', 'composite', False, display={ 628 'composite_template':'{title} {author_sort} {title_sort} {formats} {tags} {series} {series_index}'}) 629 cache = self.init_cache() 630 631 def test_invalidate(): 632 c = self.init_cache() 633 for bid in cache.all_book_ids(): 634 self.assertEqual(cache.field_for('#tc', bid), c.field_for('#tc', bid)) 635 636 cache.set_field('title', {1:'xx', 3:'yy'}) 637 test_invalidate() 638 cache.set_field('series_index', {1:9, 3:11}) 639 test_invalidate() 640 cache.rename_items('tags', {cache.get_item_id('tags', 'Tag One'):'xxx', cache.get_item_id('tags', 'News'):'news'}) 641 test_invalidate() 642 cache.remove_items('tags', (cache.get_item_id('tags', 'news'),)) 643 test_invalidate() 644 cache.set_sort_for_authors({cache.get_item_id('authors', 'Author One'):'meow'}) 645 test_invalidate() 646 cache.remove_formats({1:{'FMT1'}}) 647 test_invalidate() 648 cache.add_format(1, 'ADD', BytesIO(b'xxxx')) 649 test_invalidate() 650 # }}} 651 652 def test_dump_and_restore(self): # {{{ 653 ' Test roundtripping the db through SQL ' 654 import warnings 655 with warnings.catch_warnings(): 656 # on python 3.10 apsw raises a deprecation warning which causes this test to fail on CI 657 warnings.simplefilter('ignore', DeprecationWarning) 658 cache = self.init_cache() 659 uv = int(cache.backend.user_version) 660 all_ids = cache.all_book_ids() 661 cache.dump_and_restore() 662 self.assertEqual(cache.set_field('title', {1:'nt'}), {1}, 'database connection broken') 663 cache = self.init_cache() 664 self.assertEqual(cache.all_book_ids(), all_ids, 'dump and restore broke database') 665 self.assertEqual(int(cache.backend.user_version), uv) 666 # }}} 667 668 def test_set_author_data(self): # {{{ 669 cache = self.init_cache() 670 adata = cache.author_data() 671 ldata = {aid:str(aid) for aid in adata} 672 self.assertEqual({1,2,3}, cache.set_link_for_authors(ldata)) 673 for c in (cache, self.init_cache()): 674 self.assertEqual(ldata, {aid:d['link'] for aid, d in iteritems(c.author_data())}) 675 self.assertEqual({3}, cache.set_link_for_authors({aid:'xxx' if aid == max(adata) else str(aid) for aid in adata}), 676 'Setting the author link to the same value as before, incorrectly marked some books as dirty') 677 sdata = {aid:'%s, changed' % aid for aid in adata} 678 self.assertEqual({1,2,3}, cache.set_sort_for_authors(sdata)) 679 for bid in (1, 2, 3): 680 self.assertIn(', changed', cache.field_for('author_sort', bid)) 681 sdata = {aid:'%s, changed' % (aid*2 if aid == max(adata) else aid) for aid in adata} 682 self.assertEqual({3}, cache.set_sort_for_authors(sdata), 683 'Setting the author sort to the same value as before, incorrectly marked some books as dirty') 684 # }}} 685 686 def test_fix_case_duplicates(self): # {{{ 687 ' Test fixing of databases that have items in is_many fields that differ only by case ' 688 ae = self.assertEqual 689 cache = self.init_cache() 690 conn = cache.backend.conn 691 conn.execute('INSERT INTO publishers (name) VALUES ("mūs")') 692 lid = conn.last_insert_rowid() 693 conn.execute('INSERT INTO publishers (name) VALUES ("MŪS")') 694 uid = conn.last_insert_rowid() 695 conn.execute('DELETE FROM books_publishers_link') 696 conn.execute('INSERT INTO books_publishers_link (book,publisher) VALUES (1, %d)' % lid) 697 conn.execute('INSERT INTO books_publishers_link (book,publisher) VALUES (2, %d)' % uid) 698 conn.execute('INSERT INTO books_publishers_link (book,publisher) VALUES (3, %d)' % uid) 699 cache.reload_from_db() 700 t = cache.fields['publisher'].table 701 for x in (lid, uid): 702 self.assertIn(x, t.id_map) 703 self.assertIn(x, t.col_book_map) 704 ae(t.book_col_map[1], lid) 705 ae(t.book_col_map[2], uid) 706 t.fix_case_duplicates(cache.backend) 707 for c in (cache, self.init_cache()): 708 t = c.fields['publisher'].table 709 self.assertNotIn(uid, t.id_map) 710 self.assertNotIn(uid, t.col_book_map) 711 for bid in (1, 2, 3): 712 ae(c.field_for('publisher', bid), "mūs") 713 c.close() 714 715 cache = self.init_cache() 716 conn = cache.backend.conn 717 conn.execute('INSERT INTO tags (name) VALUES ("mūūs")') 718 lid = conn.last_insert_rowid() 719 conn.execute('INSERT INTO tags (name) VALUES ("MŪŪS")') 720 uid = conn.last_insert_rowid() 721 conn.execute('INSERT INTO tags (name) VALUES ("mūŪS")') 722 mid = conn.last_insert_rowid() 723 conn.execute('INSERT INTO tags (name) VALUES ("t")') 724 norm = conn.last_insert_rowid() 725 conn.execute('DELETE FROM books_tags_link') 726 for book_id, vals in iteritems({1:(lid, uid), 2:(uid, mid), 3:(lid, norm)}): 727 conn.executemany('INSERT INTO books_tags_link (book,tag) VALUES (?,?)', 728 tuple((book_id, x) for x in vals)) 729 cache.reload_from_db() 730 t = cache.fields['tags'].table 731 for x in (lid, uid, mid): 732 self.assertIn(x, t.id_map) 733 self.assertIn(x, t.col_book_map) 734 t.fix_case_duplicates(cache.backend) 735 for c in (cache, self.init_cache()): 736 t = c.fields['tags'].table 737 for x in (uid, mid): 738 self.assertNotIn(x, t.id_map) 739 self.assertNotIn(x, t.col_book_map) 740 ae(c.field_for('tags', 1), (t.id_map[lid],)) 741 ae(c.field_for('tags', 2), (t.id_map[lid],), 'failed for book 2') 742 ae(c.field_for('tags', 3), (t.id_map[lid], t.id_map[norm])) 743 # }}} 744 745 def test_preferences(self): # {{{ 746 ' Test getting and setting of preferences, especially with mutable objects ' 747 cache = self.init_cache() 748 changes = [] 749 cache.backend.conn.setupdatehook(lambda typ, dbname, tblname, rowid: changes.append(rowid)) 750 prefs = cache.backend.prefs 751 prefs['test mutable'] = [1, 2, 3] 752 self.assertEqual(len(changes), 1) 753 a = prefs['test mutable'] 754 a.append(4) 755 self.assertIn(4, prefs['test mutable']) 756 prefs['test mutable'] = a 757 self.assertEqual(len(changes), 2) 758 prefs.load_from_db() 759 self.assertIn(4, prefs['test mutable']) 760 prefs['test mutable'] = {k:k for k in range(10)} 761 self.assertEqual(len(changes), 3) 762 prefs['test mutable'] = {k:k for k in reversed(range(10))} 763 self.assertEqual(len(changes), 3, 'The database was written to despite there being no change in value') 764 # }}} 765 766 def test_annotations(self): # {{{ 767 'Test handling of annotations' 768 from calibre.utils.date import utcnow, EPOCH 769 cl = self.cloned_library 770 cache = self.init_cache(cl) 771 # First empty dirtied 772 cache.dump_metadata() 773 self.assertFalse(cache.dirtied_cache) 774 775 def a(**kw): 776 ts = utcnow() 777 kw['timestamp'] = utcnow().isoformat() 778 return kw, (ts - EPOCH).total_seconds() 779 780 annot_list = [ 781 a(type='bookmark', title='bookmark1 changed', seq=1), 782 a(type='highlight', highlighted_text='text1', uuid='1', seq=2), 783 a(type='highlight', highlighted_text='text2', uuid='2', seq=3, notes='notes2 some word changed again'), 784 ] 785 786 def map_as_list(amap): 787 ans = [] 788 for items in amap.values(): 789 ans.extend(items) 790 ans.sort(key=lambda x:x['seq']) 791 return ans 792 793 cache.set_annotations_for_book(1, 'moo', annot_list) 794 amap = cache.annotations_map_for_book(1, 'moo') 795 self.assertEqual(3, len(cache.all_annotations_for_book(1))) 796 self.assertEqual([x[0] for x in annot_list], map_as_list(amap)) 797 self.assertFalse(cache.dirtied_cache) 798 cache.check_dirtied_annotations() 799 self.assertEqual(set(cache.dirtied_cache), {1}) 800 cache.dump_metadata() 801 cache.check_dirtied_annotations() 802 self.assertFalse(cache.dirtied_cache) 803 804 # Test searching 805 results = cache.search_annotations('"changed"') 806 self.assertEqual([1, 3], [x['id'] for x in results]) 807 results = cache.search_annotations('"changed"', annotation_type='bookmark') 808 self.assertEqual([1], [x['id'] for x in results]) 809 results = cache.search_annotations('"Changed"') # changed and change stem differently in english and other euro languages 810 self.assertEqual([1, 3], [x['id'] for x in results]) 811 results = cache.search_annotations('"SOMe"') 812 self.assertEqual([3], [x['id'] for x in results]) 813 results = cache.search_annotations('"change"', use_stemming=False) 814 self.assertFalse(results) 815 results = cache.search_annotations('"bookmark1"', highlight_start='[', highlight_end=']') 816 self.assertEqual(results[0]['text'], '[bookmark1] changed') 817 results = cache.search_annotations('"word"', highlight_start='[', highlight_end=']', snippet_size=3) 818 self.assertEqual(results[0]['text'], '…some [word] changed…') 819 self.assertRaises(FTSQueryError, cache.search_annotations, 'AND OR') 820 fts_l = [a(type='bookmark', title='路坎坷走来', seq=1),] 821 cache.set_annotations_for_book(1, 'moo', fts_l) 822 results = cache.search_annotations('路', highlight_start='[', highlight_end=']') 823 self.assertEqual(results[0]['text'], '[路]坎坷走来') 824 825 annot_list[0][0]['title'] = 'changed title' 826 cache.set_annotations_for_book(1, 'moo', annot_list) 827 amap = cache.annotations_map_for_book(1, 'moo') 828 self.assertEqual([x[0] for x in annot_list], map_as_list(amap)) 829 830 del annot_list[1] 831 cache.set_annotations_for_book(1, 'moo', annot_list) 832 amap = cache.annotations_map_for_book(1, 'moo') 833 self.assertEqual([x[0] for x in annot_list], map_as_list(amap)) 834 cache.check_dirtied_annotations() 835 cache.dump_metadata() 836 from calibre.ebooks.metadata.opf2 import OPF 837 raw = cache.read_backup(1) 838 opf = OPF(BytesIO(raw)) 839 cache.restore_annotations(1, list(opf.read_annotations())) 840 amap = cache.annotations_map_for_book(1, 'moo') 841 self.assertEqual([x[0] for x in annot_list], map_as_list(amap)) 842 843 # }}} 844 845 def test_changed_events(self): # {{{ 846 def ae(l, r): 847 # We need to sleep a bit to allow events to happen on its thread 848 import time 849 time.sleep(.001) 850 self.assertEqual(l, r) 851 852 cache = self.init_cache(self.cloned_library) 853 ae(cache.all_book_ids(), {1, 2, 3}) 854 855 event_set = set() 856 857 def event_func(t, library_id, *args): 858 nonlocal event_set, ae 859 event_set.update(args[0][1]) 860 cache.add_listener(event_func) 861 862 # Test that setting metadata to itself doesn't generate any events 863 for id_ in cache.all_book_ids(): 864 cache.set_metadata(id_, cache.get_metadata(id_)) 865 866 ae(event_set, set()) 867 868 # test setting a single field 869 cache.set_field('tags', {1:'foo'}) 870 ae(event_set, {1}) 871 872 # test setting multiple books. Book 1 shouldn't get an event because it 873 # isn't being changed 874 event_set = set() 875 cache.set_field('tags', {1:'foo', 2:'bar', 3:'mumble'}) 876 ae(event_set, {2, 3}) 877 878 # test setting a many-many field to empty 879 event_set = set() 880 cache.set_field('tags', {1:''}) 881 ae(event_set, {1,}) 882 event_set = set() 883 cache.set_field('tags', {1:''}) 884 ae(event_set, set()) 885 886 # test setting title 887 event_set = set() 888 cache.set_field('title', {1:'Book 1'}) 889 ae(event_set, {1}) 890 ae(cache.field_for('title', 1), 'Book 1') 891 892 # test setting series 893 event_set = set() 894 cache.set_field('series', {1:'GreatBooks [1]'}) 895 cache.set_field('series', {2:'GreatBooks [0]'}) 896 ae(event_set, {1,2}) 897 ae(cache.field_for('series', 1), 'GreatBooks') 898 ae(cache.field_for('series_index', 1), 1.0) 899 ae(cache.field_for('series', 2), 'GreatBooks') 900 ae(cache.field_for('series_index', 2), 0.0) 901 902 # now series_index 903 event_set = set() 904 cache.set_field('series_index', {1:2}) 905 ae(event_set, {1}) 906 ae(cache.field_for('series_index', 1), 2.0) 907 event_set = set() 908 cache.set_field('series_index', {1:2, 2:3.5}) # book 1 isn't changed 909 ae(event_set, {2}) 910 ae(cache.field_for('series_index', 1), 2.0) 911 ae(cache.field_for('series_index', 2), 3.5) 912 913 # }}} 914