1#!/usr/local/bin/python3.8 2# vim:fileencoding=utf-8 3 4 5__license__ = 'GPL v3' 6__copyright__ = '2013, Kovid Goyal <kovid at kovidgoyal.net>' 7 8import inspect, time, numbers 9from io import BytesIO 10from functools import partial 11from operator import itemgetter 12 13from calibre.library.field_metadata import fm_as_dict 14from calibre.db.tests.base import BaseTest 15from polyglot.builtins import iteritems 16from polyglot import reprlib 17 18# Utils {{{ 19 20 21class ET: 22 23 def __init__(self, func_name, args, kwargs={}, old=None, legacy=None): 24 self.func_name = func_name 25 self.args, self.kwargs = args, kwargs 26 self.old, self.legacy = old, legacy 27 28 def __call__(self, test): 29 old = self.old or test.init_old(test.cloned_library) 30 legacy = self.legacy or test.init_legacy(test.cloned_library) 31 oldres = getattr(old, self.func_name)(*self.args, **self.kwargs) 32 newres = getattr(legacy, self.func_name)(*self.args, **self.kwargs) 33 test.assertEqual(oldres, newres, 'Equivalence test for %s with args: %s and kwargs: %s failed' % ( 34 self.func_name, reprlib.repr(self.args), reprlib.repr(self.kwargs))) 35 self.retval = newres 36 return newres 37 38 39def get_defaults(spec): 40 num = len(spec.defaults or ()) 41 if not num: 42 return {} 43 return dict(zip(spec.args[-num:], spec.defaults)) 44 45 46def compare_argspecs(old, new, attr): 47 # We dont compare the names of the non-keyword arguments as they are often 48 # different and they dont affect the usage of the API. 49 50 ok = len(old.args) == len(new.args) and get_defaults(old) == get_defaults(new) 51 if not ok: 52 raise AssertionError('The argspec for %s does not match. %r != %r' % (attr, old, new)) 53 54 55def run_funcs(self, db, ndb, funcs): 56 for func in funcs: 57 meth, args = func[0], func[1:] 58 if callable(meth): 59 meth(*args) 60 else: 61 fmt = lambda x:x 62 if meth[0] in {'!', '@', '#', '+', '$', '-', '%'}: 63 if meth[0] != '+': 64 fmt = {'!':dict, '@':lambda x:frozenset(x or ()), '#':lambda x:set((x or '').split(',')), 65 '$':lambda x:{tuple(y) for y in x}, '-':lambda x:None, 66 '%':lambda x: set((x or '').split(','))}[meth[0]] 67 else: 68 fmt = args[-1] 69 args = args[:-1] 70 meth = meth[1:] 71 res1, res2 = fmt(getattr(db, meth)(*args)), fmt(getattr(ndb, meth)(*args)) 72 self.assertEqual(res1, res2, 'The method: %s() returned different results for argument %s' % (meth, args)) 73# }}} 74 75 76class LegacyTest(BaseTest): 77 78 ''' Test the emulation of the legacy interface. ''' 79 80 def test_library_wide_properties(self): # {{{ 81 'Test library wide properties' 82 def to_unicode(x): 83 if isinstance(x, bytes): 84 return x.decode('utf-8') 85 if isinstance(x, dict): 86 # We ignore the key rec_index, since it is not stable for 87 # custom columns (it is created by iterating over a dict) 88 return {k.decode('utf-8') if isinstance(k, bytes) else k:to_unicode(v) 89 for k, v in iteritems(x) if k != 'rec_index'} 90 return x 91 92 def get_props(db): 93 props = ('user_version', 'is_second_db', 'library_id', 94 'custom_column_label_map', 'custom_column_num_map', 'library_path', 'dbpath') 95 fprops = ('last_modified', ) 96 ans = {x:getattr(db, x) for x in props} 97 ans.update({x:getattr(db, x)() for x in fprops}) 98 ans['all_ids'] = frozenset(db.all_ids()) 99 ans['field_metadata'] = fm_as_dict(db.field_metadata) 100 return to_unicode(ans) 101 102 old = self.init_old() 103 oldvals = get_props(old) 104 old.close() 105 del old 106 db = self.init_legacy() 107 newvals = get_props(db) 108 self.assertEqual(oldvals, newvals) 109 db.close() 110 # }}} 111 112 def test_get_property(self): # {{{ 113 'Test the get_property interface for reading data' 114 def get_values(db): 115 ans = {} 116 for label, loc in iteritems(db.FIELD_MAP): 117 if isinstance(label, numbers.Integral): 118 label = '#'+db.custom_column_num_map[label]['label'] 119 label = str(label) 120 ans[label] = tuple(db.get_property(i, index_is_id=True, loc=loc) 121 for i in db.all_ids()) 122 if label in ('id', 'title', '#tags'): 123 with self.assertRaises(IndexError): 124 db.get_property(9999, loc=loc) 125 with self.assertRaises(IndexError): 126 db.get_property(9999, index_is_id=True, loc=loc) 127 if label in {'tags', 'formats'}: 128 # Order is random in the old db for these 129 ans[label] = tuple(set(x.split(',')) if x else x for x in ans[label]) 130 if label == 'series_sort': 131 # The old db code did not take book language into account 132 # when generating series_sort values 133 ans[label] = None 134 return ans 135 136 db = self.init_legacy() 137 new_vals = get_values(db) 138 db.close() 139 140 old = self.init_old() 141 old_vals = get_values(old) 142 old.close() 143 old = None 144 self.assertEqual(old_vals, new_vals) 145 146 # }}} 147 148 def test_refresh(self): # {{{ 149 ' Test refreshing the view after a change to metadata.db ' 150 db = self.init_legacy() 151 db2 = self.init_legacy() 152 # Ensure that the following change will actually update the timestamp 153 # on filesystems with one second resolution (OS X) 154 time.sleep(1) 155 self.assertEqual(db2.data.cache.set_field('title', {1:'xxx'}), {1}) 156 db2.close() 157 del db2 158 self.assertNotEqual(db.title(1, index_is_id=True), 'xxx') 159 db.check_if_modified() 160 self.assertEqual(db.title(1, index_is_id=True), 'xxx') 161 # }}} 162 163 def test_legacy_getters(self): # {{{ 164 ' Test various functions to get individual bits of metadata ' 165 old = self.init_old() 166 getters = ('path', 'abspath', 'title', 'title_sort', 'authors', 'series', 167 'publisher', 'author_sort', 'authors', 'comments', 168 'comment', 'publisher', 'rating', 'series_index', 'tags', 169 'timestamp', 'uuid', 'pubdate', 'ondevice', 170 'metadata_last_modified', 'languages') 171 oldvals = {g:tuple(getattr(old, g)(x) for x in range(3)) + tuple(getattr(old, g)(x, True) for x in (1,2,3)) for g in getters} 172 old_rows = {tuple(r)[:5] for r in old} 173 old.close() 174 db = self.init_legacy() 175 newvals = {g:tuple(getattr(db, g)(x) for x in range(3)) + tuple(getattr(db, g)(x, True) for x in (1,2,3)) for g in getters} 176 new_rows = {tuple(r)[:5] for r in db} 177 for x in (oldvals, newvals): 178 x['tags'] = tuple(set(y.split(',')) if y else y for y in x['tags']) 179 self.assertEqual(oldvals, newvals) 180 self.assertEqual(old_rows, new_rows) 181 182 # }}} 183 184 def test_legacy_direct(self): # {{{ 185 'Test read-only methods that are directly equivalent in the old and new interface' 186 from calibre.ebooks.metadata.book.base import Metadata 187 from datetime import timedelta 188 ndb = self.init_legacy(self.cloned_library) 189 db = self.init_old() 190 newstag = ndb.new_api.get_item_id('tags', 'news') 191 192 self.assertEqual(dict(db.prefs), dict(ndb.prefs)) 193 194 for meth, args in iteritems({ 195 'find_identical_books': [(Metadata('title one', ['author one']),), (Metadata('unknown'),), (Metadata('xxxx'),)], 196 'get_books_for_category': [('tags', newstag), ('#formats', 'FMT1')], 197 'get_next_series_num_for': [('A Series One',)], 198 'get_id_from_uuid':[('ddddd',), (db.uuid(1, True),)], 199 'cover':[(0,), (1,), (2,)], 200 'get_author_id': [('author one',), ('unknown',), ('xxxxx',)], 201 'series_id': [(0,), (1,), (2,)], 202 'publisher_id': [(0,), (1,), (2,)], 203 '@tags_older_than': [ 204 ('News', None), ('Tag One', None), ('xxxx', None), ('Tag One', None, 'News'), ('News', None, 'xxxx'), 205 ('News', None, None, ['xxxxxxx']), ('News', None, 'Tag One', ['Author Two', 'Author One']), 206 ('News', timedelta(0), None, None), ('News', timedelta(100000)), 207 ], 208 'format':[(1, 'FMT1', True), (2, 'FMT1', True), (0, 'xxxxxx')], 209 'has_format':[(1, 'FMT1', True), (2, 'FMT1', True), (0, 'xxxxxx')], 210 'sizeof_format':[(1, 'FMT1', True), (2, 'FMT1', True), (0, 'xxxxxx')], 211 '@format_files':[(0,),(1,),(2,)], 212 'formats':[(0,),(1,),(2,)], 213 'max_size':[(0,),(1,),(2,)], 214 'format_hash':[(1, 'FMT1'),(1, 'FMT2'), (2, 'FMT1')], 215 'author_sort_from_authors': [(['Author One', 'Author Two', 'Unknown'],)], 216 'has_book':[(Metadata('title one'),), (Metadata('xxxx1111'),)], 217 'has_id':[(1,), (2,), (3,), (9999,)], 218 'id':[(1,), (2,), (0,),], 219 'index':[(1,), (2,), (3,), ], 220 'row':[(1,), (2,), (3,), ], 221 'is_empty':[()], 222 'count':[()], 223 'all_author_names':[()], 224 'all_tag_names':[()], 225 'all_series_names':[()], 226 'all_publisher_names':[()], 227 '!all_authors':[()], 228 '!all_tags2':[()], 229 '@all_tags':[()], 230 '@get_all_identifier_types':[()], 231 '!all_publishers':[()], 232 '!all_titles':[()], 233 '!all_series':[()], 234 'standard_field_keys':[()], 235 'all_field_keys':[()], 236 'searchable_fields':[()], 237 'search_term_to_field_key':[('author',), ('tag',)], 238 'metadata_for_field':[('title',), ('tags',)], 239 'sortable_field_keys':[()], 240 'custom_field_keys':[(True,), (False,)], 241 '!get_usage_count_by_id':[('authors',), ('tags',), ('series',), ('publisher',), ('#tags',), ('languages',)], 242 'get_field':[(1, 'title'), (2, 'tags'), (0, 'rating'), (1, 'authors'), (2, 'series'), (1, '#tags')], 243 'all_formats':[()], 244 'get_authors_with_ids':[()], 245 '!get_tags_with_ids':[()], 246 '!get_series_with_ids':[()], 247 '!get_publishers_with_ids':[()], 248 '!get_ratings_with_ids':[()], 249 '!get_languages_with_ids':[()], 250 'tag_name':[(3,)], 251 'author_name':[(3,)], 252 'series_name':[(3,)], 253 'authors_sort_strings':[(0,), (1,), (2,)], 254 'author_sort_from_book':[(0,), (1,), (2,)], 255 'authors_with_sort_strings':[(0,), (1,), (2,)], 256 'book_on_device_string':[(1,), (2,), (3,)], 257 'books_in_series_of':[(0,), (1,), (2,)], 258 'books_with_same_title':[(Metadata(db.title(0)),), (Metadata(db.title(1)),), (Metadata('1234'),)], 259 }): 260 fmt = lambda x: x 261 if meth[0] in {'!', '@'}: 262 fmt = {'!':dict, '@':frozenset}[meth[0]] 263 meth = meth[1:] 264 elif meth == 'get_authors_with_ids': 265 fmt = lambda val:{x[0]:tuple(x[1:]) for x in val} 266 for a in args: 267 self.assertEqual(fmt(getattr(db, meth)(*a)), fmt(getattr(ndb, meth)(*a)), 268 'The method: %s() returned different results for argument %s' % (meth, a)) 269 270 def f(x, y): # get_top_level_move_items is broken in the old db on case-insensitive file systems 271 x.discard('metadata_db_prefs_backup.json') 272 return x, y 273 self.assertEqual(f(*db.get_top_level_move_items()), f(*ndb.get_top_level_move_items())) 274 d1, d2 = BytesIO(), BytesIO() 275 db.copy_cover_to(1, d1, True) 276 ndb.copy_cover_to(1, d2, True) 277 self.assertTrue(d1.getvalue() == d2.getvalue()) 278 d1, d2 = BytesIO(), BytesIO() 279 db.copy_format_to(1, 'FMT1', d1, True) 280 ndb.copy_format_to(1, 'FMT1', d2, True) 281 self.assertTrue(d1.getvalue() == d2.getvalue()) 282 old = db.get_data_as_dict(prefix='test-prefix') 283 new = ndb.get_data_as_dict(prefix='test-prefix') 284 for o, n in zip(old, new): 285 o = {str(k) if isinstance(k, bytes) else k:set(v) if isinstance(v, list) else v for k, v in iteritems(o)} 286 n = {k:set(v) if isinstance(v, list) else v for k, v in iteritems(n)} 287 self.assertEqual(o, n) 288 289 ndb.search('title:Unknown') 290 db.search('title:Unknown') 291 self.assertEqual(db.row(3), ndb.row(3)) 292 self.assertRaises(ValueError, ndb.row, 2) 293 self.assertRaises(ValueError, db.row, 2) 294 db.close() 295 # }}} 296 297 def test_legacy_conversion_options(self): # {{{ 298 'Test conversion options API' 299 ndb = self.init_legacy() 300 db = self.init_old() 301 all_ids = ndb.new_api.all_book_ids() 302 op1 = {'xx': 'yy'} 303 304 def decode(x): 305 if isinstance(x, bytes): 306 x = x.decode('utf-8') 307 return x 308 309 for x in ( 310 ('has_conversion_options', all_ids), 311 ('conversion_options', 1, 'PIPE'), 312 ('set_conversion_options', 1, 'PIPE', op1), 313 ('has_conversion_options', all_ids), 314 ('conversion_options', 1, 'PIPE'), 315 ('delete_conversion_options', 1, 'PIPE'), 316 ('has_conversion_options', all_ids), 317 ): 318 meth, args = x[0], x[1:] 319 self.assertEqual( 320 decode(getattr(db, meth)(*args)), decode(getattr(ndb, meth)(*args)), 321 'The method: %s() returned different results for argument %s' % (meth, args) 322 ) 323 db.close() 324 # }}} 325 326 def test_legacy_delete_using(self): # {{{ 327 'Test delete_using() API' 328 ndb = self.init_legacy() 329 db = self.init_old() 330 cache = ndb.new_api 331 tmap = cache.get_id_map('tags') 332 t = next(iter(tmap)) 333 pmap = cache.get_id_map('publisher') 334 p = next(iter(pmap)) 335 run_funcs(self, db, ndb, ( 336 ('delete_tag_using_id', t), 337 ('delete_publisher_using_id', p), 338 (db.refresh,), 339 ('all_tag_names',), ('tags', 0), ('tags', 1), ('tags', 2), 340 ('all_publisher_names',), ('publisher', 0), ('publisher', 1), ('publisher', 2), 341 )) 342 db.close() 343 # }}} 344 345 def test_legacy_adding_books(self): # {{{ 346 'Test various adding/deleting books methods' 347 import sqlite3 348 con = sqlite3.connect(":memory:") 349 try: 350 con.execute("create virtual table recipe using fts5(name, ingredients)") 351 except Exception: 352 self.skipTest('python sqlite3 module does not have FTS5 support') 353 con.close() 354 del con 355 from calibre.ebooks.metadata.book.base import Metadata 356 from calibre.ptempfile import TemporaryFile 357 legacy, old = self.init_legacy(self.cloned_library), self.init_old(self.cloned_library) 358 mi = Metadata('Added Book0', authors=('Added Author',)) 359 with TemporaryFile(suffix='.aff') as name: 360 with open(name, 'wb') as f: 361 f.write(b'xxx') 362 T = partial(ET, 'add_books', ([name], ['AFF'], [mi]), old=old, legacy=legacy) 363 T()(self) 364 book_id = T(kwargs={'return_ids':True})(self)[1][0] 365 self.assertEqual(legacy.new_api.formats(book_id), ('AFF',)) 366 T(kwargs={'add_duplicates':False})(self) 367 mi.title = 'Added Book1' 368 mi.uuid = 'uuu' 369 T = partial(ET, 'import_book', (mi,[name]), old=old, legacy=legacy) 370 book_id = T()(self) 371 self.assertNotEqual(legacy.uuid(book_id, index_is_id=True), old.uuid(book_id, index_is_id=True)) 372 book_id = T(kwargs={'preserve_uuid':True})(self) 373 self.assertEqual(legacy.uuid(book_id, index_is_id=True), old.uuid(book_id, index_is_id=True)) 374 self.assertEqual(legacy.new_api.formats(book_id), ('AFF',)) 375 376 T = partial(ET, 'add_format', old=old, legacy=legacy) 377 T((0, 'AFF', BytesIO(b'fffff')))(self) 378 T((0, 'AFF', BytesIO(b'fffff')))(self) 379 T((0, 'AFF', BytesIO(b'fffff')), {'replace':True})(self) 380 with TemporaryFile(suffix='.opf') as name: 381 with open(name, 'wb') as f: 382 f.write(b'zzzz') 383 T = partial(ET, 'import_book', (mi,[name]), old=old, legacy=legacy) 384 book_id = T()(self) 385 self.assertFalse(legacy.new_api.formats(book_id)) 386 387 mi.title = 'Added Book2' 388 T = partial(ET, 'create_book_entry', (mi,), old=old, legacy=legacy) 389 T() 390 T({'add_duplicates':False}) 391 T({'force_id':1000}) 392 393 with TemporaryFile(suffix='.txt') as name: 394 with open(name, 'wb') as f: 395 f.write(b'tttttt') 396 bid = legacy.add_catalog(name, 'My Catalog') 397 self.assertEqual(old.add_catalog(name, 'My Catalog'), bid) 398 cache = legacy.new_api 399 self.assertEqual(cache.formats(bid), ('TXT',)) 400 self.assertEqual(cache.field_for('title', bid), 'My Catalog') 401 self.assertEqual(cache.field_for('authors', bid), ('calibre',)) 402 self.assertEqual(cache.field_for('tags', bid), (_('Catalog'),)) 403 self.assertTrue(bid < legacy.add_catalog(name, 'Something else')) 404 self.assertEqual(legacy.add_catalog(name, 'My Catalog'), bid) 405 self.assertEqual(old.add_catalog(name, 'My Catalog'), bid) 406 407 bid = legacy.add_news(name, {'title':'Events', 'add_title_tag':True, 'custom_tags':('one', 'two')}) 408 self.assertEqual(cache.formats(bid), ('TXT',)) 409 self.assertEqual(cache.field_for('authors', bid), ('calibre',)) 410 self.assertEqual(cache.field_for('tags', bid), (_('News'), 'Events', 'one', 'two')) 411 412 self.assertTrue(legacy.cover(1, index_is_id=True)) 413 origcov = legacy.cover(1, index_is_id=True) 414 self.assertTrue(legacy.has_cover(1)) 415 legacy.remove_cover(1) 416 self.assertFalse(legacy.has_cover(1)) 417 self.assertFalse(legacy.cover(1, index_is_id=True)) 418 legacy.set_cover(3, origcov) 419 self.assertEqual(legacy.cover(3, index_is_id=True), origcov) 420 self.assertTrue(legacy.has_cover(3)) 421 422 self.assertTrue(legacy.format(1, 'FMT1', index_is_id=True)) 423 legacy.remove_format(1, 'FMT1', index_is_id=True) 424 self.assertIsNone(legacy.format(1, 'FMT1', index_is_id=True)) 425 426 legacy.delete_book(1) 427 old.delete_book(1) 428 self.assertNotIn(1, legacy.all_ids()) 429 legacy.dump_metadata((2,3)) 430 old.close() 431 # }}} 432 433 def test_legacy_coverage(self): # {{{ 434 ' Check that the emulation of the legacy interface is (almost) total ' 435 cl = self.cloned_library 436 db = self.init_old(cl) 437 ndb = self.init_legacy() 438 439 SKIP_ATTRS = { 440 'TCat_Tag', '_add_newbook_tag', '_clean_identifier', '_library_id_', '_set_authors', 441 '_set_title', '_set_custom', '_update_author_in_cache', 442 # Feeds are now stored in the config folder 443 'get_feeds', 'get_feed', 'update_feed', 'remove_feeds', 'add_feed', 'set_feeds', 444 # Obsolete/broken methods 445 'author_id', # replaced by get_author_id 446 'books_for_author', # broken 447 'books_in_old_database', 'sizeof_old_database', # unused 448 'migrate_old', # no longer supported 449 'remove_unused_series', # superseded by clean API 450 'move_library_to', # API changed, no code uses old API 451 # Added compiled_rules() for calibredb add 452 'find_books_in_directory', 'import_book_directory', 'import_book_directory_multiple', 'recursive_import', 453 454 # Internal API 455 'clean_user_categories', 'cleanup_tags', 'books_list_filter', 'conn', 'connect', 'construct_file_name', 456 'construct_path_name', 'clear_dirtied', 'initialize_database', 'initialize_dynamic', 457 'run_import_plugins', 'vacuum', 'set_path', 'row_factory', 'rows', 'rmtree', 'series_index_pat', 458 'import_old_database', 'dirtied_lock', 'dirtied_cache', 'dirty_books_referencing', 459 'windows_check_if_files_in_use', 'get_metadata_for_dump', 'get_a_dirtied_book', 'dirtied_sequence', 460 'format_filename_cache', 'format_metadata_cache', 'filter', 'create_version1', 'normpath', 'custom_data_adapters', 461 'custom_table_names', 'custom_columns_in_meta', 'custom_tables', 462 } 463 SKIP_ARGSPEC = { 464 '__init__', 465 } 466 467 missing = [] 468 469 try: 470 total = 0 471 for attr in dir(db): 472 if attr in SKIP_ATTRS or attr.startswith('upgrade_version'): 473 continue 474 total += 1 475 if not hasattr(ndb, attr): 476 missing.append(attr) 477 continue 478 obj, nobj = getattr(db, attr), getattr(ndb, attr) 479 if attr not in SKIP_ARGSPEC: 480 try: 481 argspec = inspect.getfullargspec(obj) 482 nargspec = inspect.getfullargspec(nobj) 483 except (TypeError, ValueError): 484 pass 485 else: 486 compare_argspecs(argspec, nargspec, attr) 487 finally: 488 for db in (ndb, db): 489 db.close() 490 db.break_cycles() 491 492 if missing: 493 pc = len(missing)/total 494 raise AssertionError('{0:.1%} of API ({2} attrs) are missing: {1}'.format(pc, ', '.join(missing), len(missing))) 495 496 # }}} 497 498 def test_legacy_custom_data(self): # {{{ 499 'Test the API for custom data storage' 500 legacy, old = self.init_legacy(self.cloned_library), self.init_old(self.cloned_library) 501 for name in ('name1', 'name2', 'name3'): 502 T = partial(ET, 'add_custom_book_data', old=old, legacy=legacy) 503 T((1, name, 'val1'))(self) 504 T((2, name, 'val2'))(self) 505 T((3, name, 'val3'))(self) 506 T = partial(ET, 'get_ids_for_custom_book_data', old=old, legacy=legacy) 507 T((name,))(self) 508 T = partial(ET, 'get_custom_book_data', old=old, legacy=legacy) 509 T((1, name, object())) 510 T((9, name, object())) 511 T = partial(ET, 'get_all_custom_book_data', old=old, legacy=legacy) 512 T((name, object())) 513 T((name+'!', object())) 514 T = partial(ET, 'delete_custom_book_data', old=old, legacy=legacy) 515 T((name, 1)) 516 T = partial(ET, 'get_all_custom_book_data', old=old, legacy=legacy) 517 T((name, object())) 518 T = partial(ET, 'delete_all_custom_book_data', old=old, legacy=legacy) 519 T(name) 520 T = partial(ET, 'get_all_custom_book_data', old=old, legacy=legacy) 521 T((name, object())) 522 523 T = partial(ET, 'add_multiple_custom_book_data', old=old, legacy=legacy) 524 T(('n', {1:'val1', 2:'val2'}))(self) 525 T = partial(ET, 'get_all_custom_book_data', old=old, legacy=legacy) 526 T(('n', object())) 527 old.close() 528 # }}} 529 530 def test_legacy_setters(self): # {{{ 531 'Test methods that are directly equivalent in the old and new interface' 532 from calibre.ebooks.metadata.book.base import Metadata 533 from calibre.utils.date import now 534 n = now() 535 ndb = self.init_legacy(self.cloned_library) 536 amap = ndb.new_api.get_id_map('authors') 537 sorts = [(aid, 's%d' % aid) for aid in amap] 538 db = self.init_old(self.cloned_library) 539 run_funcs(self, db, ndb, ( 540 ('+format_metadata', 1, 'FMT1', itemgetter('size')), 541 ('+format_metadata', 1, 'FMT2', itemgetter('size')), 542 ('+format_metadata', 2, 'FMT1', itemgetter('size')), 543 ('get_tags', 0), ('get_tags', 1), ('get_tags', 2), 544 ('is_tag_used', 'News'), ('is_tag_used', 'xchkjgfh'), 545 ('bulk_modify_tags', (1,), ['t1'], ['News']), 546 ('bulk_modify_tags', (2,), ['t1'], ['Tag One', 'Tag Two']), 547 ('bulk_modify_tags', (3,), ['t1', 't2', 't3']), 548 (db.clean,), 549 ('@all_tags',), 550 ('@tags', 0), ('@tags', 1), ('@tags', 2), 551 552 ('unapply_tags', 1, ['t1']), 553 ('unapply_tags', 2, ['xxxx']), 554 ('unapply_tags', 3, ['t2', 't3']), 555 (db.clean,), 556 ('@all_tags',), 557 ('@tags', 0), ('@tags', 1), ('@tags', 2), 558 559 ('update_last_modified', (1,), True, n), ('update_last_modified', (3,), True, n), 560 ('metadata_last_modified', 1, True), ('metadata_last_modified', 3, True), 561 ('set_sort_field_for_author', sorts[0][0], sorts[0][1]), 562 ('set_sort_field_for_author', sorts[1][0], sorts[1][1]), 563 ('set_sort_field_for_author', sorts[2][0], sorts[2][1]), 564 ('set_link_field_for_author', sorts[0][0], sorts[0][1]), 565 ('set_link_field_for_author', sorts[1][0], sorts[1][1]), 566 ('set_link_field_for_author', sorts[2][0], sorts[2][1]), 567 (db.refresh,), 568 ('author_sort', 0), ('author_sort', 1), ('author_sort', 2), 569 )) 570 omi = [db.get_metadata(x) for x in (0, 1, 2)] 571 nmi = [ndb.get_metadata(x) for x in (0, 1, 2)] 572 self.assertEqual([x.author_sort_map for x in omi], [x.author_sort_map for x in nmi]) 573 self.assertEqual([x.author_link_map for x in omi], [x.author_link_map for x in nmi]) 574 db.close() 575 576 ndb = self.init_legacy(self.cloned_library) 577 db = self.init_old(self.cloned_library) 578 579 run_funcs(self, db, ndb, ( 580 ('set_authors', 1, ('author one',),), ('set_authors', 2, ('author two',), True, True, True), 581 ('set_author_sort', 3, 'new_aus'), 582 ('set_comment', 1, ''), ('set_comment', 2, None), ('set_comment', 3, '<p>a comment</p>'), 583 ('set_has_cover', 1, True), ('set_has_cover', 2, True), ('set_has_cover', 3, 1), 584 ('set_identifiers', 2, {'test':'', 'a':'b'}), ('set_identifiers', 3, {'id':'1', 'isbn':'9783161484100'}), ('set_identifiers', 1, {}), 585 ('set_languages', 1, ('en',)), 586 ('set_languages', 2, ()), 587 ('set_languages', 3, ('deu', 'spa', 'fra')), 588 ('set_pubdate', 1, None), ('set_pubdate', 2, '2011-1-7'), 589 ('set_series', 1, 'a series one'), ('set_series', 2, 'another series [7]'), ('set_series', 3, 'a third series'), 590 ('set_publisher', 1, 'publisher two'), ('set_publisher', 2, None), ('set_publisher', 3, 'a third puB'), 591 ('set_rating', 1, 2.3), ('set_rating', 2, 0), ('set_rating', 3, 8), 592 ('set_timestamp', 1, None), ('set_timestamp', 2, '2011-1-7'), 593 ('set_uuid', 1, None), ('set_uuid', 2, 'a test uuid'), 594 ('set_title', 1, 'title two'), ('set_title', 2, None), ('set_title', 3, 'The Test Title'), 595 ('set_tags', 1, ['a1', 'a2'], True), ('set_tags', 2, ['b1', 'tag one'], False, False, False, True), ('set_tags', 3, ['A1']), 596 (db.refresh,), 597 ('title', 0), ('title', 1), ('title', 2), 598 ('title_sort', 0), ('title_sort', 1), ('title_sort', 2), 599 ('authors', 0), ('authors', 1), ('authors', 2), 600 ('author_sort', 0), ('author_sort', 1), ('author_sort', 2), 601 ('has_cover', 3), ('has_cover', 1), ('has_cover', 2), 602 ('get_identifiers', 0), ('get_identifiers', 1), ('get_identifiers', 2), 603 ('pubdate', 0), ('pubdate', 1), ('pubdate', 2), 604 ('timestamp', 0), ('timestamp', 1), ('timestamp', 2), 605 ('publisher', 0), ('publisher', 1), ('publisher', 2), 606 ('rating', 0), ('+rating', 1, lambda x: x or 0), ('rating', 2), 607 ('series', 0), ('series', 1), ('series', 2), 608 ('series_index', 0), ('series_index', 1), ('series_index', 2), 609 ('uuid', 0), ('uuid', 1), ('uuid', 2), 610 ('isbn', 0), ('isbn', 1), ('isbn', 2), 611 ('@tags', 0), ('@tags', 1), ('@tags', 2), 612 ('@all_tags',), 613 ('@get_all_identifier_types',), 614 615 ('set_title_sort', 1, 'Title Two'), ('set_title_sort', 2, None), ('set_title_sort', 3, 'The Test Title_sort'), 616 ('set_series_index', 1, 2.3), ('set_series_index', 2, 0), ('set_series_index', 3, 8), 617 ('set_identifier', 1, 'moose', 'val'), ('set_identifier', 2, 'test', ''), ('set_identifier', 3, '', ''), 618 (db.refresh,), 619 ('series_index', 0), ('series_index', 1), ('series_index', 2), 620 ('title_sort', 0), ('title_sort', 1), ('title_sort', 2), 621 ('get_identifiers', 0), ('get_identifiers', 1), ('get_identifiers', 2), 622 ('@get_all_identifier_types',), 623 624 ('set_metadata', 1, Metadata('title', ('a1',)), False, False, False, True, True), 625 ('set_metadata', 3, Metadata('title', ('a1',))), 626 (db.refresh,), 627 ('title', 0), ('title', 1), ('title', 2), 628 ('title_sort', 0), ('title_sort', 1), ('title_sort', 2), 629 ('authors', 0), ('authors', 1), ('authors', 2), 630 ('author_sort', 0), ('author_sort', 1), ('author_sort', 2), 631 ('@tags', 0), ('@tags', 1), ('@tags', 2), 632 ('@all_tags',), 633 ('@get_all_identifier_types',), 634 )) 635 db.close() 636 637 ndb = self.init_legacy(self.cloned_library) 638 db = self.init_old(self.cloned_library) 639 640 run_funcs(self, db, ndb, ( 641 ('set', 0, 'title', 'newtitle'), 642 ('set', 0, 'tags', 't1,t2,tag one', True), 643 ('set', 0, 'authors', 'author one & Author Two', True), 644 ('set', 0, 'rating', 3.2), 645 ('set', 0, 'publisher', 'publisher one', False), 646 (db.refresh,), 647 ('title', 0), 648 ('rating', 0), 649 ('#tags', 0), ('#tags', 1), ('#tags', 2), 650 ('authors', 0), ('authors', 1), ('authors', 2), 651 ('publisher', 0), ('publisher', 1), ('publisher', 2), 652 ('delete_tag', 'T1'), ('delete_tag', 'T2'), ('delete_tag', 'Tag one'), ('delete_tag', 'News'), 653 (db.clean,), (db.refresh,), 654 ('@all_tags',), 655 ('#tags', 0), ('#tags', 1), ('#tags', 2), 656 )) 657 db.close() 658 659 ndb = self.init_legacy(self.cloned_library) 660 db = self.init_old(self.cloned_library) 661 run_funcs(self, db, ndb, ( 662 ('remove_all_tags', (1, 2, 3)), 663 (db.clean,), 664 ('@all_tags',), 665 ('@tags', 0), ('@tags', 1), ('@tags', 2), 666 )) 667 db.close() 668 669 ndb = self.init_legacy(self.cloned_library) 670 db = self.init_old(self.cloned_library) 671 a = {v:k for k, v in iteritems(ndb.new_api.get_id_map('authors'))}['Author One'] 672 t = {v:k for k, v in iteritems(ndb.new_api.get_id_map('tags'))}['Tag One'] 673 s = {v:k for k, v in iteritems(ndb.new_api.get_id_map('series'))}['A Series One'] 674 p = {v:k for k, v in iteritems(ndb.new_api.get_id_map('publisher'))}['Publisher One'] 675 run_funcs(self, db, ndb, ( 676 ('rename_author', a, 'Author Two'), 677 ('rename_tag', t, 'News'), 678 ('rename_series', s, 'ss'), 679 ('rename_publisher', p, 'publisher one'), 680 (db.clean,), 681 (db.refresh,), 682 ('@all_tags',), 683 ('tags', 0), ('tags', 1), ('tags', 2), 684 ('series', 0), ('series', 1), ('series', 2), 685 ('publisher', 0), ('publisher', 1), ('publisher', 2), 686 ('series_index', 0), ('series_index', 1), ('series_index', 2), 687 ('authors', 0), ('authors', 1), ('authors', 2), 688 ('author_sort', 0), ('author_sort', 1), ('author_sort', 2), 689 )) 690 db.close() 691 692 # }}} 693 694 def test_legacy_custom(self): # {{{ 695 'Test the legacy API for custom columns' 696 ndb = self.init_legacy(self.cloned_library) 697 db = self.init_old(self.cloned_library) 698 # Test getting 699 run_funcs(self, db, ndb, ( 700 ('all_custom', 'series'), ('all_custom', 'tags'), ('all_custom', 'rating'), ('all_custom', 'authors'), ('all_custom', None, 7), 701 ('get_next_cc_series_num_for', 'My Series One', 'series'), ('get_next_cc_series_num_for', 'My Series Two', 'series'), 702 ('is_item_used_in_multiple', 'My Tag One', 'tags'), 703 ('is_item_used_in_multiple', 'My Series One', 'series'), 704 ('$get_custom_items_with_ids', 'series'), ('$get_custom_items_with_ids', 'tags'), ('$get_custom_items_with_ids', 'float'), 705 ('$get_custom_items_with_ids', 'rating'), ('$get_custom_items_with_ids', 'authors'), ('$get_custom_items_with_ids', None, 7), 706 )) 707 for label in ('tags', 'series', 'authors', 'comments', 'rating', 'date', 'yesno', 'isbn', 'enum', 'formats', 'float', 'comp_tags'): 708 for func in ('get_custom', 'get_custom_extra', 'get_custom_and_extra'): 709 run_funcs(self, db, ndb, [(func, idx, label) for idx in range(3)]) 710 711 # Test renaming/deleting 712 t = {v:k for k, v in iteritems(ndb.new_api.get_id_map('#tags'))}['My Tag One'] 713 t2 = {v:k for k, v in iteritems(ndb.new_api.get_id_map('#tags'))}['My Tag Two'] 714 a = {v:k for k, v in iteritems(ndb.new_api.get_id_map('#authors'))}['My Author Two'] 715 a2 = {v:k for k, v in iteritems(ndb.new_api.get_id_map('#authors'))}['Custom One'] 716 s = {v:k for k, v in iteritems(ndb.new_api.get_id_map('#series'))}['My Series One'] 717 run_funcs(self, db, ndb, ( 718 ('delete_custom_item_using_id', t, 'tags'), 719 ('delete_custom_item_using_id', a, 'authors'), 720 ('rename_custom_item', t2, 't2', 'tags'), 721 ('rename_custom_item', a2, 'custom one', 'authors'), 722 ('rename_custom_item', s, 'My Series Two', 'series'), 723 ('delete_item_from_multiple', 'custom two', 'authors'), 724 (db.clean,), 725 (db.refresh,), 726 ('all_custom', 'series'), ('all_custom', 'tags'), ('all_custom', 'authors'), 727 )) 728 for label in ('tags', 'authors', 'series'): 729 run_funcs(self, db, ndb, [('get_custom_and_extra', idx, label) for idx in range(3)]) 730 db.close() 731 732 ndb = self.init_legacy(self.cloned_library) 733 db = self.init_old(self.cloned_library) 734 # Test setting 735 run_funcs(self, db, ndb, ( 736 ('-set_custom', 1, 't1 & t2', 'authors'), 737 ('-set_custom', 1, 't3 & t4', 'authors', None, True), 738 ('-set_custom', 3, 'test one & test Two', 'authors'), 739 ('-set_custom', 1, 'ijfkghkjdf', 'enum'), 740 ('-set_custom', 3, 'One', 'enum'), 741 ('-set_custom', 3, 'xxx', 'formats'), 742 ('-set_custom', 1, 'my tag two', 'tags', None, False, False, None, True, True), 743 (db.clean,), (db.refresh,), 744 ('all_custom', 'series'), ('all_custom', 'tags'), ('all_custom', 'authors'), 745 )) 746 for label in ('tags', 'series', 'authors', 'comments', 'rating', 'date', 'yesno', 'isbn', 'enum', 'formats', 'float', 'comp_tags'): 747 for func in ('get_custom', 'get_custom_extra', 'get_custom_and_extra'): 748 run_funcs(self, db, ndb, [(func, idx, label) for idx in range(3)]) 749 db.close() 750 751 ndb = self.init_legacy(self.cloned_library) 752 db = self.init_old(self.cloned_library) 753 # Test setting bulk 754 run_funcs(self, db, ndb, ( 755 ('set_custom_bulk', (1,2,3), 't1 & t2', 'authors'), 756 ('set_custom_bulk', (1,2,3), 'a series', 'series', None, False, False, (9, 10, 11)), 757 ('set_custom_bulk', (1,2,3), 't1', 'tags', None, True), 758 (db.clean,), (db.refresh,), 759 ('all_custom', 'series'), ('all_custom', 'tags'), ('all_custom', 'authors'), 760 )) 761 for label in ('tags', 'series', 'authors', 'comments', 'rating', 'date', 'yesno', 'isbn', 'enum', 'formats', 'float', 'comp_tags'): 762 for func in ('get_custom', 'get_custom_extra', 'get_custom_and_extra'): 763 run_funcs(self, db, ndb, [(func, idx, label) for idx in range(3)]) 764 db.close() 765 766 ndb = self.init_legacy(self.cloned_library) 767 db = self.init_old(self.cloned_library) 768 # Test bulk multiple 769 run_funcs(self, db, ndb, ( 770 ('set_custom_bulk_multiple', (1,2,3), ['t1'], ['My Tag One'], 'tags'), 771 (db.clean,), (db.refresh,), 772 ('all_custom', 'tags'), 773 ('get_custom', 0, 'tags'), ('get_custom', 1, 'tags'), ('get_custom', 2, 'tags'), 774 )) 775 db.close() 776 777 o = self.cloned_library 778 n = self.cloned_library 779 ndb, db = self.init_legacy(n), self.init_old(o) 780 ndb.create_custom_column('created', 'Created', 'text', True, True, {'moose':'cat'}) 781 db.create_custom_column('created', 'Created', 'text', True, True, {'moose':'cat'}) 782 db.close() 783 ndb, db = self.init_legacy(n), self.init_old(o) 784 self.assertEqual(db.custom_column_label_map['created'], ndb.backend.custom_field_metadata('created')) 785 num = db.custom_column_label_map['created']['num'] 786 ndb.set_custom_column_metadata(num, is_editable=False, name='Crikey', display={}) 787 db.set_custom_column_metadata(num, is_editable=False, name='Crikey', display={}) 788 db.close() 789 ndb, db = self.init_legacy(n), self.init_old(o) 790 self.assertEqual(db.custom_column_label_map['created'], ndb.backend.custom_field_metadata('created')) 791 db.close() 792 ndb = self.init_legacy(n) 793 ndb.delete_custom_column('created') 794 ndb = self.init_legacy(n) 795 self.assertRaises(KeyError, ndb.custom_field_name, num=num) 796 797 # Test setting custom series 798 ndb = self.init_legacy(self.cloned_library) 799 ndb.set_custom(1, 'TS [9]', label='series') 800 self.assertEqual(ndb.new_api.field_for('#series', 1), 'TS') 801 self.assertEqual(ndb.new_api.field_for('#series_index', 1), 9) 802 # }}} 803 804 def test_legacy_saved_search(self): # {{{ 805 ' Test legacy saved search API ' 806 db, ndb = self.init_old(), self.init_legacy() 807 run_funcs(self, db, ndb, ( 808 ('saved_search_set_all', {'one':'a', 'two':'b'}), 809 ('saved_search_names',), 810 ('saved_search_lookup', 'one'), 811 ('saved_search_lookup', 'two'), 812 ('saved_search_lookup', 'xxx'), 813 ('saved_search_rename', 'one', '1'), 814 ('saved_search_names',), 815 ('saved_search_lookup', '1'), 816 ('saved_search_delete', '1'), 817 ('saved_search_names',), 818 ('saved_search_add', 'n', 'm'), 819 ('saved_search_names',), 820 ('saved_search_lookup', 'n'), 821 )) 822 db.close() 823 # }}} 824