1#!/usr/local/bin/python3.8
2# vim:fileencoding=utf-8
3
4
5__license__ = 'GPL v3'
6__copyright__ = '2013, Kovid Goyal <kovid at kovidgoyal.net>'
7
8import inspect, time, numbers
9from io import BytesIO
10from functools import partial
11from operator import itemgetter
12
13from calibre.library.field_metadata import fm_as_dict
14from calibre.db.tests.base import BaseTest
15from polyglot.builtins import iteritems
16from polyglot import reprlib
17
18# Utils {{{
19
20
21class ET:
22
23    def __init__(self, func_name, args, kwargs={}, old=None, legacy=None):
24        self.func_name = func_name
25        self.args, self.kwargs = args, kwargs
26        self.old, self.legacy = old, legacy
27
28    def __call__(self, test):
29        old = self.old or test.init_old(test.cloned_library)
30        legacy = self.legacy or test.init_legacy(test.cloned_library)
31        oldres = getattr(old, self.func_name)(*self.args, **self.kwargs)
32        newres = getattr(legacy, self.func_name)(*self.args, **self.kwargs)
33        test.assertEqual(oldres, newres, 'Equivalence test for %s with args: %s and kwargs: %s failed' % (
34            self.func_name, reprlib.repr(self.args), reprlib.repr(self.kwargs)))
35        self.retval = newres
36        return newres
37
38
39def get_defaults(spec):
40    num = len(spec.defaults or ())
41    if not num:
42        return {}
43    return dict(zip(spec.args[-num:], spec.defaults))
44
45
46def compare_argspecs(old, new, attr):
47    # We dont compare the names of the non-keyword arguments as they are often
48    # different and they dont affect the usage of the API.
49
50    ok = len(old.args) == len(new.args) and get_defaults(old) == get_defaults(new)
51    if not ok:
52        raise AssertionError('The argspec for %s does not match. %r != %r' % (attr, old, new))
53
54
55def run_funcs(self, db, ndb, funcs):
56    for func in funcs:
57        meth, args = func[0], func[1:]
58        if callable(meth):
59            meth(*args)
60        else:
61            fmt = lambda x:x
62            if meth[0] in {'!', '@', '#', '+', '$', '-', '%'}:
63                if meth[0] != '+':
64                    fmt = {'!':dict, '@':lambda x:frozenset(x or ()), '#':lambda x:set((x or '').split(',')),
65                           '$':lambda x:{tuple(y) for y in x}, '-':lambda x:None,
66                           '%':lambda x: set((x or '').split(','))}[meth[0]]
67                else:
68                    fmt = args[-1]
69                    args = args[:-1]
70                meth = meth[1:]
71            res1, res2 = fmt(getattr(db, meth)(*args)), fmt(getattr(ndb, meth)(*args))
72            self.assertEqual(res1, res2, 'The method: %s() returned different results for argument %s' % (meth, args))
73# }}}
74
75
76class LegacyTest(BaseTest):
77
78    ''' Test the emulation of the legacy interface. '''
79
80    def test_library_wide_properties(self):  # {{{
81        'Test library wide properties'
82        def to_unicode(x):
83            if isinstance(x, bytes):
84                return x.decode('utf-8')
85            if isinstance(x, dict):
86                # We ignore the key rec_index, since it is not stable for
87                # custom columns (it is created by iterating over a dict)
88                return {k.decode('utf-8') if isinstance(k, bytes) else k:to_unicode(v)
89                        for k, v in iteritems(x) if k != 'rec_index'}
90            return x
91
92        def get_props(db):
93            props = ('user_version', 'is_second_db', 'library_id',
94                    'custom_column_label_map', 'custom_column_num_map', 'library_path', 'dbpath')
95            fprops = ('last_modified', )
96            ans = {x:getattr(db, x) for x in props}
97            ans.update({x:getattr(db, x)() for x in fprops})
98            ans['all_ids'] = frozenset(db.all_ids())
99            ans['field_metadata'] = fm_as_dict(db.field_metadata)
100            return to_unicode(ans)
101
102        old = self.init_old()
103        oldvals = get_props(old)
104        old.close()
105        del old
106        db = self.init_legacy()
107        newvals = get_props(db)
108        self.assertEqual(oldvals, newvals)
109        db.close()
110    # }}}
111
112    def test_get_property(self):  # {{{
113        'Test the get_property interface for reading data'
114        def get_values(db):
115            ans = {}
116            for label, loc in iteritems(db.FIELD_MAP):
117                if isinstance(label, numbers.Integral):
118                    label = '#'+db.custom_column_num_map[label]['label']
119                label = str(label)
120                ans[label] = tuple(db.get_property(i, index_is_id=True, loc=loc)
121                                   for i in db.all_ids())
122                if label in ('id', 'title', '#tags'):
123                    with self.assertRaises(IndexError):
124                        db.get_property(9999, loc=loc)
125                    with self.assertRaises(IndexError):
126                        db.get_property(9999, index_is_id=True, loc=loc)
127                if label in {'tags', 'formats'}:
128                    # Order is random in the old db for these
129                    ans[label] = tuple(set(x.split(',')) if x else x for x in ans[label])
130                if label == 'series_sort':
131                    # The old db code did not take book language into account
132                    # when generating series_sort values
133                    ans[label] = None
134            return ans
135
136        db = self.init_legacy()
137        new_vals = get_values(db)
138        db.close()
139
140        old = self.init_old()
141        old_vals = get_values(old)
142        old.close()
143        old = None
144        self.assertEqual(old_vals, new_vals)
145
146    # }}}
147
148    def test_refresh(self):  # {{{
149        ' Test refreshing the view after a change to metadata.db '
150        db = self.init_legacy()
151        db2 = self.init_legacy()
152        # Ensure that the following change will actually update the timestamp
153        # on filesystems with one second resolution (OS X)
154        time.sleep(1)
155        self.assertEqual(db2.data.cache.set_field('title', {1:'xxx'}), {1})
156        db2.close()
157        del db2
158        self.assertNotEqual(db.title(1, index_is_id=True), 'xxx')
159        db.check_if_modified()
160        self.assertEqual(db.title(1, index_is_id=True), 'xxx')
161    # }}}
162
163    def test_legacy_getters(self):  # {{{
164        ' Test various functions to get individual bits of metadata '
165        old = self.init_old()
166        getters = ('path', 'abspath', 'title', 'title_sort', 'authors', 'series',
167                   'publisher', 'author_sort', 'authors', 'comments',
168                   'comment', 'publisher', 'rating', 'series_index', 'tags',
169                   'timestamp', 'uuid', 'pubdate', 'ondevice',
170                   'metadata_last_modified', 'languages')
171        oldvals = {g:tuple(getattr(old, g)(x) for x in range(3)) + tuple(getattr(old, g)(x, True) for x in (1,2,3)) for g in getters}
172        old_rows = {tuple(r)[:5] for r in old}
173        old.close()
174        db = self.init_legacy()
175        newvals = {g:tuple(getattr(db, g)(x) for x in range(3)) + tuple(getattr(db, g)(x, True) for x in (1,2,3)) for g in getters}
176        new_rows = {tuple(r)[:5] for r in db}
177        for x in (oldvals, newvals):
178            x['tags'] = tuple(set(y.split(',')) if y else y for y in x['tags'])
179        self.assertEqual(oldvals, newvals)
180        self.assertEqual(old_rows, new_rows)
181
182    # }}}
183
184    def test_legacy_direct(self):  # {{{
185        'Test read-only methods that are directly equivalent in the old and new interface'
186        from calibre.ebooks.metadata.book.base import Metadata
187        from datetime import timedelta
188        ndb = self.init_legacy(self.cloned_library)
189        db = self.init_old()
190        newstag = ndb.new_api.get_item_id('tags', 'news')
191
192        self.assertEqual(dict(db.prefs), dict(ndb.prefs))
193
194        for meth, args in iteritems({
195            'find_identical_books': [(Metadata('title one', ['author one']),), (Metadata('unknown'),), (Metadata('xxxx'),)],
196            'get_books_for_category': [('tags', newstag), ('#formats', 'FMT1')],
197            'get_next_series_num_for': [('A Series One',)],
198            'get_id_from_uuid':[('ddddd',), (db.uuid(1, True),)],
199            'cover':[(0,), (1,), (2,)],
200            'get_author_id': [('author one',), ('unknown',), ('xxxxx',)],
201            'series_id': [(0,), (1,), (2,)],
202            'publisher_id': [(0,), (1,), (2,)],
203            '@tags_older_than': [
204                ('News', None), ('Tag One', None), ('xxxx', None), ('Tag One', None, 'News'), ('News', None, 'xxxx'),
205                ('News', None, None, ['xxxxxxx']), ('News', None, 'Tag One', ['Author Two', 'Author One']),
206                ('News', timedelta(0), None, None), ('News', timedelta(100000)),
207            ],
208            'format':[(1, 'FMT1', True), (2, 'FMT1', True), (0, 'xxxxxx')],
209            'has_format':[(1, 'FMT1', True), (2, 'FMT1', True), (0, 'xxxxxx')],
210            'sizeof_format':[(1, 'FMT1', True), (2, 'FMT1', True), (0, 'xxxxxx')],
211            '@format_files':[(0,),(1,),(2,)],
212            'formats':[(0,),(1,),(2,)],
213            'max_size':[(0,),(1,),(2,)],
214            'format_hash':[(1, 'FMT1'),(1, 'FMT2'), (2, 'FMT1')],
215            'author_sort_from_authors': [(['Author One', 'Author Two', 'Unknown'],)],
216            'has_book':[(Metadata('title one'),), (Metadata('xxxx1111'),)],
217            'has_id':[(1,), (2,), (3,), (9999,)],
218            'id':[(1,), (2,), (0,),],
219            'index':[(1,), (2,), (3,), ],
220            'row':[(1,), (2,), (3,), ],
221            'is_empty':[()],
222            'count':[()],
223            'all_author_names':[()],
224            'all_tag_names':[()],
225            'all_series_names':[()],
226            'all_publisher_names':[()],
227            '!all_authors':[()],
228            '!all_tags2':[()],
229            '@all_tags':[()],
230            '@get_all_identifier_types':[()],
231            '!all_publishers':[()],
232            '!all_titles':[()],
233            '!all_series':[()],
234            'standard_field_keys':[()],
235            'all_field_keys':[()],
236            'searchable_fields':[()],
237            'search_term_to_field_key':[('author',), ('tag',)],
238            'metadata_for_field':[('title',), ('tags',)],
239            'sortable_field_keys':[()],
240            'custom_field_keys':[(True,), (False,)],
241            '!get_usage_count_by_id':[('authors',), ('tags',), ('series',), ('publisher',), ('#tags',), ('languages',)],
242            'get_field':[(1, 'title'), (2, 'tags'), (0, 'rating'), (1, 'authors'), (2, 'series'), (1, '#tags')],
243            'all_formats':[()],
244            'get_authors_with_ids':[()],
245            '!get_tags_with_ids':[()],
246            '!get_series_with_ids':[()],
247            '!get_publishers_with_ids':[()],
248            '!get_ratings_with_ids':[()],
249            '!get_languages_with_ids':[()],
250            'tag_name':[(3,)],
251            'author_name':[(3,)],
252            'series_name':[(3,)],
253            'authors_sort_strings':[(0,), (1,), (2,)],
254            'author_sort_from_book':[(0,), (1,), (2,)],
255            'authors_with_sort_strings':[(0,), (1,), (2,)],
256            'book_on_device_string':[(1,), (2,), (3,)],
257            'books_in_series_of':[(0,), (1,), (2,)],
258            'books_with_same_title':[(Metadata(db.title(0)),), (Metadata(db.title(1)),), (Metadata('1234'),)],
259        }):
260            fmt = lambda x: x
261            if meth[0] in {'!', '@'}:
262                fmt = {'!':dict, '@':frozenset}[meth[0]]
263                meth = meth[1:]
264            elif meth == 'get_authors_with_ids':
265                fmt = lambda val:{x[0]:tuple(x[1:]) for x in val}
266            for a in args:
267                self.assertEqual(fmt(getattr(db, meth)(*a)), fmt(getattr(ndb, meth)(*a)),
268                                 'The method: %s() returned different results for argument %s' % (meth, a))
269
270        def f(x, y):  # get_top_level_move_items is broken in the old db on case-insensitive file systems
271            x.discard('metadata_db_prefs_backup.json')
272            return x, y
273        self.assertEqual(f(*db.get_top_level_move_items()), f(*ndb.get_top_level_move_items()))
274        d1, d2 = BytesIO(), BytesIO()
275        db.copy_cover_to(1, d1, True)
276        ndb.copy_cover_to(1, d2, True)
277        self.assertTrue(d1.getvalue() == d2.getvalue())
278        d1, d2 = BytesIO(), BytesIO()
279        db.copy_format_to(1, 'FMT1', d1, True)
280        ndb.copy_format_to(1, 'FMT1', d2, True)
281        self.assertTrue(d1.getvalue() == d2.getvalue())
282        old = db.get_data_as_dict(prefix='test-prefix')
283        new = ndb.get_data_as_dict(prefix='test-prefix')
284        for o, n in zip(old, new):
285            o = {str(k) if isinstance(k, bytes) else k:set(v) if isinstance(v, list) else v for k, v in iteritems(o)}
286            n = {k:set(v) if isinstance(v, list) else v for k, v in iteritems(n)}
287            self.assertEqual(o, n)
288
289        ndb.search('title:Unknown')
290        db.search('title:Unknown')
291        self.assertEqual(db.row(3), ndb.row(3))
292        self.assertRaises(ValueError, ndb.row, 2)
293        self.assertRaises(ValueError, db.row, 2)
294        db.close()
295        # }}}
296
297    def test_legacy_conversion_options(self):  # {{{
298        'Test conversion options API'
299        ndb = self.init_legacy()
300        db  = self.init_old()
301        all_ids = ndb.new_api.all_book_ids()
302        op1 = {'xx': 'yy'}
303
304        def decode(x):
305            if isinstance(x, bytes):
306                x = x.decode('utf-8')
307            return x
308
309        for x in (
310            ('has_conversion_options', all_ids),
311            ('conversion_options', 1, 'PIPE'),
312            ('set_conversion_options', 1, 'PIPE', op1),
313            ('has_conversion_options', all_ids),
314            ('conversion_options', 1, 'PIPE'),
315            ('delete_conversion_options', 1, 'PIPE'),
316            ('has_conversion_options', all_ids),
317        ):
318            meth, args = x[0], x[1:]
319            self.assertEqual(
320                decode(getattr(db, meth)(*args)), decode(getattr(ndb, meth)(*args)),
321                'The method: %s() returned different results for argument %s' % (meth, args)
322            )
323        db.close()
324    # }}}
325
326    def test_legacy_delete_using(self):  # {{{
327        'Test delete_using() API'
328        ndb = self.init_legacy()
329        db = self.init_old()
330        cache = ndb.new_api
331        tmap = cache.get_id_map('tags')
332        t = next(iter(tmap))
333        pmap = cache.get_id_map('publisher')
334        p = next(iter(pmap))
335        run_funcs(self, db, ndb, (
336            ('delete_tag_using_id', t),
337            ('delete_publisher_using_id', p),
338            (db.refresh,),
339            ('all_tag_names',), ('tags', 0), ('tags', 1), ('tags', 2),
340            ('all_publisher_names',), ('publisher', 0), ('publisher', 1), ('publisher', 2),
341        ))
342        db.close()
343    # }}}
344
345    def test_legacy_adding_books(self):  # {{{
346        'Test various adding/deleting books methods'
347        import sqlite3
348        con = sqlite3.connect(":memory:")
349        try:
350            con.execute("create virtual table recipe using fts5(name, ingredients)")
351        except Exception:
352            self.skipTest('python sqlite3 module does not have FTS5 support')
353        con.close()
354        del con
355        from calibre.ebooks.metadata.book.base import Metadata
356        from calibre.ptempfile import TemporaryFile
357        legacy, old = self.init_legacy(self.cloned_library), self.init_old(self.cloned_library)
358        mi = Metadata('Added Book0', authors=('Added Author',))
359        with TemporaryFile(suffix='.aff') as name:
360            with open(name, 'wb') as f:
361                f.write(b'xxx')
362            T = partial(ET, 'add_books', ([name], ['AFF'], [mi]), old=old, legacy=legacy)
363            T()(self)
364            book_id = T(kwargs={'return_ids':True})(self)[1][0]
365            self.assertEqual(legacy.new_api.formats(book_id), ('AFF',))
366            T(kwargs={'add_duplicates':False})(self)
367            mi.title = 'Added Book1'
368            mi.uuid = 'uuu'
369            T = partial(ET, 'import_book', (mi,[name]), old=old, legacy=legacy)
370            book_id = T()(self)
371            self.assertNotEqual(legacy.uuid(book_id, index_is_id=True), old.uuid(book_id, index_is_id=True))
372            book_id = T(kwargs={'preserve_uuid':True})(self)
373            self.assertEqual(legacy.uuid(book_id, index_is_id=True), old.uuid(book_id, index_is_id=True))
374            self.assertEqual(legacy.new_api.formats(book_id), ('AFF',))
375
376            T = partial(ET, 'add_format', old=old, legacy=legacy)
377            T((0, 'AFF', BytesIO(b'fffff')))(self)
378            T((0, 'AFF', BytesIO(b'fffff')))(self)
379            T((0, 'AFF', BytesIO(b'fffff')), {'replace':True})(self)
380        with TemporaryFile(suffix='.opf') as name:
381            with open(name, 'wb') as f:
382                f.write(b'zzzz')
383            T = partial(ET, 'import_book', (mi,[name]), old=old, legacy=legacy)
384            book_id = T()(self)
385            self.assertFalse(legacy.new_api.formats(book_id))
386
387        mi.title = 'Added Book2'
388        T = partial(ET, 'create_book_entry', (mi,), old=old, legacy=legacy)
389        T()
390        T({'add_duplicates':False})
391        T({'force_id':1000})
392
393        with TemporaryFile(suffix='.txt') as name:
394            with open(name, 'wb') as f:
395                f.write(b'tttttt')
396            bid = legacy.add_catalog(name, 'My Catalog')
397            self.assertEqual(old.add_catalog(name, 'My Catalog'), bid)
398            cache = legacy.new_api
399            self.assertEqual(cache.formats(bid), ('TXT',))
400            self.assertEqual(cache.field_for('title', bid), 'My Catalog')
401            self.assertEqual(cache.field_for('authors', bid), ('calibre',))
402            self.assertEqual(cache.field_for('tags', bid), (_('Catalog'),))
403            self.assertTrue(bid < legacy.add_catalog(name, 'Something else'))
404            self.assertEqual(legacy.add_catalog(name, 'My Catalog'), bid)
405            self.assertEqual(old.add_catalog(name, 'My Catalog'), bid)
406
407            bid = legacy.add_news(name, {'title':'Events', 'add_title_tag':True, 'custom_tags':('one', 'two')})
408            self.assertEqual(cache.formats(bid), ('TXT',))
409            self.assertEqual(cache.field_for('authors', bid), ('calibre',))
410            self.assertEqual(cache.field_for('tags', bid), (_('News'), 'Events', 'one', 'two'))
411
412        self.assertTrue(legacy.cover(1, index_is_id=True))
413        origcov = legacy.cover(1, index_is_id=True)
414        self.assertTrue(legacy.has_cover(1))
415        legacy.remove_cover(1)
416        self.assertFalse(legacy.has_cover(1))
417        self.assertFalse(legacy.cover(1, index_is_id=True))
418        legacy.set_cover(3, origcov)
419        self.assertEqual(legacy.cover(3, index_is_id=True), origcov)
420        self.assertTrue(legacy.has_cover(3))
421
422        self.assertTrue(legacy.format(1, 'FMT1', index_is_id=True))
423        legacy.remove_format(1, 'FMT1', index_is_id=True)
424        self.assertIsNone(legacy.format(1, 'FMT1', index_is_id=True))
425
426        legacy.delete_book(1)
427        old.delete_book(1)
428        self.assertNotIn(1, legacy.all_ids())
429        legacy.dump_metadata((2,3))
430        old.close()
431    # }}}
432
433    def test_legacy_coverage(self):  # {{{
434        ' Check that the emulation of the legacy interface is (almost) total '
435        cl = self.cloned_library
436        db = self.init_old(cl)
437        ndb = self.init_legacy()
438
439        SKIP_ATTRS = {
440            'TCat_Tag', '_add_newbook_tag', '_clean_identifier', '_library_id_', '_set_authors',
441            '_set_title', '_set_custom', '_update_author_in_cache',
442            # Feeds are now stored in the config folder
443            'get_feeds', 'get_feed', 'update_feed', 'remove_feeds', 'add_feed', 'set_feeds',
444            # Obsolete/broken methods
445            'author_id',  # replaced by get_author_id
446            'books_for_author',  # broken
447            'books_in_old_database', 'sizeof_old_database',  # unused
448            'migrate_old',  # no longer supported
449            'remove_unused_series',  # superseded by clean API
450            'move_library_to',  # API changed, no code uses old API
451            # Added compiled_rules() for calibredb add
452            'find_books_in_directory', 'import_book_directory', 'import_book_directory_multiple', 'recursive_import',
453
454            # Internal API
455            'clean_user_categories',  'cleanup_tags',  'books_list_filter', 'conn', 'connect', 'construct_file_name',
456            'construct_path_name', 'clear_dirtied', 'initialize_database', 'initialize_dynamic',
457            'run_import_plugins', 'vacuum', 'set_path', 'row_factory', 'rows', 'rmtree', 'series_index_pat',
458            'import_old_database', 'dirtied_lock', 'dirtied_cache', 'dirty_books_referencing',
459            'windows_check_if_files_in_use', 'get_metadata_for_dump', 'get_a_dirtied_book', 'dirtied_sequence',
460            'format_filename_cache', 'format_metadata_cache', 'filter', 'create_version1', 'normpath', 'custom_data_adapters',
461            'custom_table_names', 'custom_columns_in_meta', 'custom_tables',
462        }
463        SKIP_ARGSPEC = {
464            '__init__',
465        }
466
467        missing = []
468
469        try:
470            total = 0
471            for attr in dir(db):
472                if attr in SKIP_ATTRS or attr.startswith('upgrade_version'):
473                    continue
474                total += 1
475                if not hasattr(ndb, attr):
476                    missing.append(attr)
477                    continue
478                obj, nobj  = getattr(db, attr), getattr(ndb, attr)
479                if attr not in SKIP_ARGSPEC:
480                    try:
481                        argspec = inspect.getfullargspec(obj)
482                        nargspec = inspect.getfullargspec(nobj)
483                    except (TypeError, ValueError):
484                        pass
485                    else:
486                        compare_argspecs(argspec, nargspec, attr)
487        finally:
488            for db in (ndb, db):
489                db.close()
490                db.break_cycles()
491
492        if missing:
493            pc = len(missing)/total
494            raise AssertionError('{0:.1%} of API ({2} attrs) are missing: {1}'.format(pc, ', '.join(missing), len(missing)))
495
496    # }}}
497
498    def test_legacy_custom_data(self):  # {{{
499        'Test the API for custom data storage'
500        legacy, old = self.init_legacy(self.cloned_library), self.init_old(self.cloned_library)
501        for name in ('name1', 'name2', 'name3'):
502            T = partial(ET, 'add_custom_book_data', old=old, legacy=legacy)
503            T((1, name, 'val1'))(self)
504            T((2, name, 'val2'))(self)
505            T((3, name, 'val3'))(self)
506            T = partial(ET, 'get_ids_for_custom_book_data', old=old, legacy=legacy)
507            T((name,))(self)
508            T = partial(ET, 'get_custom_book_data', old=old, legacy=legacy)
509            T((1, name, object()))
510            T((9, name, object()))
511            T = partial(ET, 'get_all_custom_book_data', old=old, legacy=legacy)
512            T((name, object()))
513            T((name+'!', object()))
514            T = partial(ET, 'delete_custom_book_data', old=old, legacy=legacy)
515            T((name, 1))
516            T = partial(ET, 'get_all_custom_book_data', old=old, legacy=legacy)
517            T((name, object()))
518            T = partial(ET, 'delete_all_custom_book_data', old=old, legacy=legacy)
519            T(name)
520            T = partial(ET, 'get_all_custom_book_data', old=old, legacy=legacy)
521            T((name, object()))
522
523        T = partial(ET, 'add_multiple_custom_book_data', old=old, legacy=legacy)
524        T(('n', {1:'val1', 2:'val2'}))(self)
525        T = partial(ET, 'get_all_custom_book_data', old=old, legacy=legacy)
526        T(('n', object()))
527        old.close()
528    # }}}
529
530    def test_legacy_setters(self):  # {{{
531        'Test methods that are directly equivalent in the old and new interface'
532        from calibre.ebooks.metadata.book.base import Metadata
533        from calibre.utils.date import now
534        n = now()
535        ndb = self.init_legacy(self.cloned_library)
536        amap = ndb.new_api.get_id_map('authors')
537        sorts = [(aid, 's%d' % aid) for aid in amap]
538        db = self.init_old(self.cloned_library)
539        run_funcs(self, db, ndb, (
540            ('+format_metadata', 1, 'FMT1', itemgetter('size')),
541            ('+format_metadata', 1, 'FMT2', itemgetter('size')),
542            ('+format_metadata', 2, 'FMT1', itemgetter('size')),
543            ('get_tags', 0), ('get_tags', 1), ('get_tags', 2),
544            ('is_tag_used', 'News'), ('is_tag_used', 'xchkjgfh'),
545            ('bulk_modify_tags', (1,), ['t1'], ['News']),
546            ('bulk_modify_tags', (2,), ['t1'], ['Tag One', 'Tag Two']),
547            ('bulk_modify_tags', (3,), ['t1', 't2', 't3']),
548            (db.clean,),
549            ('@all_tags',),
550            ('@tags', 0), ('@tags', 1), ('@tags', 2),
551
552            ('unapply_tags', 1, ['t1']),
553            ('unapply_tags', 2, ['xxxx']),
554            ('unapply_tags', 3, ['t2', 't3']),
555            (db.clean,),
556            ('@all_tags',),
557            ('@tags', 0), ('@tags', 1), ('@tags', 2),
558
559            ('update_last_modified', (1,), True, n), ('update_last_modified', (3,), True, n),
560            ('metadata_last_modified', 1, True), ('metadata_last_modified', 3, True),
561            ('set_sort_field_for_author', sorts[0][0], sorts[0][1]),
562            ('set_sort_field_for_author', sorts[1][0], sorts[1][1]),
563            ('set_sort_field_for_author', sorts[2][0], sorts[2][1]),
564            ('set_link_field_for_author', sorts[0][0], sorts[0][1]),
565            ('set_link_field_for_author', sorts[1][0], sorts[1][1]),
566            ('set_link_field_for_author', sorts[2][0], sorts[2][1]),
567            (db.refresh,),
568            ('author_sort', 0), ('author_sort', 1), ('author_sort', 2),
569        ))
570        omi = [db.get_metadata(x) for x in (0, 1, 2)]
571        nmi = [ndb.get_metadata(x) for x in (0, 1, 2)]
572        self.assertEqual([x.author_sort_map for x in omi], [x.author_sort_map for x in nmi])
573        self.assertEqual([x.author_link_map for x in omi], [x.author_link_map for x in nmi])
574        db.close()
575
576        ndb = self.init_legacy(self.cloned_library)
577        db = self.init_old(self.cloned_library)
578
579        run_funcs(self, db, ndb, (
580            ('set_authors', 1, ('author one',),), ('set_authors', 2, ('author two',), True, True, True),
581            ('set_author_sort', 3, 'new_aus'),
582            ('set_comment', 1, ''), ('set_comment', 2, None), ('set_comment', 3, '<p>a comment</p>'),
583            ('set_has_cover', 1, True), ('set_has_cover', 2, True), ('set_has_cover', 3, 1),
584            ('set_identifiers', 2, {'test':'', 'a':'b'}), ('set_identifiers', 3, {'id':'1', 'isbn':'9783161484100'}), ('set_identifiers', 1, {}),
585            ('set_languages', 1, ('en',)),
586            ('set_languages', 2, ()),
587            ('set_languages', 3, ('deu', 'spa', 'fra')),
588            ('set_pubdate', 1, None), ('set_pubdate', 2, '2011-1-7'),
589            ('set_series', 1, 'a series one'), ('set_series', 2, 'another series [7]'), ('set_series', 3, 'a third series'),
590            ('set_publisher', 1, 'publisher two'), ('set_publisher', 2, None), ('set_publisher', 3, 'a third puB'),
591            ('set_rating', 1, 2.3), ('set_rating', 2, 0), ('set_rating', 3, 8),
592            ('set_timestamp', 1, None), ('set_timestamp', 2, '2011-1-7'),
593            ('set_uuid', 1, None), ('set_uuid', 2, 'a test uuid'),
594            ('set_title', 1, 'title two'), ('set_title', 2, None), ('set_title', 3, 'The Test Title'),
595            ('set_tags', 1, ['a1', 'a2'], True), ('set_tags', 2, ['b1', 'tag one'], False, False, False, True), ('set_tags', 3, ['A1']),
596            (db.refresh,),
597            ('title', 0), ('title', 1), ('title', 2),
598            ('title_sort', 0), ('title_sort', 1), ('title_sort', 2),
599            ('authors', 0), ('authors', 1), ('authors', 2),
600            ('author_sort', 0), ('author_sort', 1), ('author_sort', 2),
601            ('has_cover', 3), ('has_cover', 1), ('has_cover', 2),
602            ('get_identifiers', 0), ('get_identifiers', 1), ('get_identifiers', 2),
603            ('pubdate', 0), ('pubdate', 1), ('pubdate', 2),
604            ('timestamp', 0), ('timestamp', 1), ('timestamp', 2),
605            ('publisher', 0), ('publisher', 1), ('publisher', 2),
606            ('rating', 0), ('+rating', 1, lambda x: x or 0), ('rating', 2),
607            ('series', 0), ('series', 1), ('series', 2),
608            ('series_index', 0), ('series_index', 1), ('series_index', 2),
609            ('uuid', 0), ('uuid', 1), ('uuid', 2),
610            ('isbn', 0), ('isbn', 1), ('isbn', 2),
611            ('@tags', 0), ('@tags', 1), ('@tags', 2),
612            ('@all_tags',),
613            ('@get_all_identifier_types',),
614
615            ('set_title_sort', 1, 'Title Two'), ('set_title_sort', 2, None), ('set_title_sort', 3, 'The Test Title_sort'),
616            ('set_series_index', 1, 2.3), ('set_series_index', 2, 0), ('set_series_index', 3, 8),
617            ('set_identifier', 1, 'moose', 'val'), ('set_identifier', 2, 'test', ''), ('set_identifier', 3, '', ''),
618            (db.refresh,),
619            ('series_index', 0), ('series_index', 1), ('series_index', 2),
620            ('title_sort', 0), ('title_sort', 1), ('title_sort', 2),
621            ('get_identifiers', 0), ('get_identifiers', 1), ('get_identifiers', 2),
622            ('@get_all_identifier_types',),
623
624            ('set_metadata', 1, Metadata('title', ('a1',)), False, False, False, True, True),
625            ('set_metadata', 3, Metadata('title', ('a1',))),
626            (db.refresh,),
627            ('title', 0), ('title', 1), ('title', 2),
628            ('title_sort', 0), ('title_sort', 1), ('title_sort', 2),
629            ('authors', 0), ('authors', 1), ('authors', 2),
630            ('author_sort', 0), ('author_sort', 1), ('author_sort', 2),
631            ('@tags', 0), ('@tags', 1), ('@tags', 2),
632            ('@all_tags',),
633            ('@get_all_identifier_types',),
634        ))
635        db.close()
636
637        ndb = self.init_legacy(self.cloned_library)
638        db = self.init_old(self.cloned_library)
639
640        run_funcs(self, db, ndb, (
641            ('set', 0, 'title', 'newtitle'),
642            ('set', 0, 'tags', 't1,t2,tag one', True),
643            ('set', 0, 'authors', 'author one & Author Two', True),
644            ('set', 0, 'rating', 3.2),
645            ('set', 0, 'publisher', 'publisher one', False),
646            (db.refresh,),
647            ('title', 0),
648            ('rating', 0),
649            ('#tags', 0), ('#tags', 1), ('#tags', 2),
650            ('authors', 0), ('authors', 1), ('authors', 2),
651            ('publisher', 0), ('publisher', 1), ('publisher', 2),
652            ('delete_tag', 'T1'), ('delete_tag', 'T2'), ('delete_tag', 'Tag one'), ('delete_tag', 'News'),
653            (db.clean,), (db.refresh,),
654            ('@all_tags',),
655            ('#tags', 0), ('#tags', 1), ('#tags', 2),
656        ))
657        db.close()
658
659        ndb = self.init_legacy(self.cloned_library)
660        db = self.init_old(self.cloned_library)
661        run_funcs(self, db, ndb, (
662            ('remove_all_tags', (1, 2, 3)),
663            (db.clean,),
664            ('@all_tags',),
665            ('@tags', 0), ('@tags', 1), ('@tags', 2),
666        ))
667        db.close()
668
669        ndb = self.init_legacy(self.cloned_library)
670        db = self.init_old(self.cloned_library)
671        a = {v:k for k, v in iteritems(ndb.new_api.get_id_map('authors'))}['Author One']
672        t = {v:k for k, v in iteritems(ndb.new_api.get_id_map('tags'))}['Tag One']
673        s = {v:k for k, v in iteritems(ndb.new_api.get_id_map('series'))}['A Series One']
674        p = {v:k for k, v in iteritems(ndb.new_api.get_id_map('publisher'))}['Publisher One']
675        run_funcs(self, db, ndb, (
676            ('rename_author', a, 'Author Two'),
677            ('rename_tag', t, 'News'),
678            ('rename_series', s, 'ss'),
679            ('rename_publisher', p, 'publisher one'),
680            (db.clean,),
681            (db.refresh,),
682            ('@all_tags',),
683            ('tags', 0), ('tags', 1), ('tags', 2),
684            ('series', 0), ('series', 1), ('series', 2),
685            ('publisher', 0), ('publisher', 1), ('publisher', 2),
686            ('series_index', 0), ('series_index', 1), ('series_index', 2),
687            ('authors', 0), ('authors', 1), ('authors', 2),
688            ('author_sort', 0), ('author_sort', 1), ('author_sort', 2),
689        ))
690        db.close()
691
692    # }}}
693
694    def test_legacy_custom(self):  # {{{
695        'Test the legacy API for custom columns'
696        ndb = self.init_legacy(self.cloned_library)
697        db = self.init_old(self.cloned_library)
698        # Test getting
699        run_funcs(self, db, ndb, (
700            ('all_custom', 'series'), ('all_custom', 'tags'), ('all_custom', 'rating'), ('all_custom', 'authors'), ('all_custom', None, 7),
701            ('get_next_cc_series_num_for', 'My Series One', 'series'), ('get_next_cc_series_num_for', 'My Series Two', 'series'),
702            ('is_item_used_in_multiple', 'My Tag One', 'tags'),
703            ('is_item_used_in_multiple', 'My Series One', 'series'),
704            ('$get_custom_items_with_ids', 'series'), ('$get_custom_items_with_ids', 'tags'), ('$get_custom_items_with_ids', 'float'),
705            ('$get_custom_items_with_ids', 'rating'), ('$get_custom_items_with_ids', 'authors'), ('$get_custom_items_with_ids', None, 7),
706        ))
707        for label in ('tags', 'series', 'authors', 'comments', 'rating', 'date', 'yesno', 'isbn', 'enum', 'formats', 'float', 'comp_tags'):
708            for func in ('get_custom', 'get_custom_extra', 'get_custom_and_extra'):
709                run_funcs(self, db, ndb, [(func, idx, label) for idx in range(3)])
710
711        # Test renaming/deleting
712        t = {v:k for k, v in iteritems(ndb.new_api.get_id_map('#tags'))}['My Tag One']
713        t2 = {v:k for k, v in iteritems(ndb.new_api.get_id_map('#tags'))}['My Tag Two']
714        a = {v:k for k, v in iteritems(ndb.new_api.get_id_map('#authors'))}['My Author Two']
715        a2 = {v:k for k, v in iteritems(ndb.new_api.get_id_map('#authors'))}['Custom One']
716        s = {v:k for k, v in iteritems(ndb.new_api.get_id_map('#series'))}['My Series One']
717        run_funcs(self, db, ndb, (
718            ('delete_custom_item_using_id', t, 'tags'),
719            ('delete_custom_item_using_id', a, 'authors'),
720            ('rename_custom_item', t2, 't2', 'tags'),
721            ('rename_custom_item', a2, 'custom one', 'authors'),
722            ('rename_custom_item', s, 'My Series Two', 'series'),
723            ('delete_item_from_multiple', 'custom two', 'authors'),
724            (db.clean,),
725            (db.refresh,),
726            ('all_custom', 'series'), ('all_custom', 'tags'), ('all_custom', 'authors'),
727        ))
728        for label in ('tags', 'authors', 'series'):
729            run_funcs(self, db, ndb, [('get_custom_and_extra', idx, label) for idx in range(3)])
730        db.close()
731
732        ndb = self.init_legacy(self.cloned_library)
733        db = self.init_old(self.cloned_library)
734        # Test setting
735        run_funcs(self, db, ndb, (
736            ('-set_custom', 1, 't1 & t2', 'authors'),
737            ('-set_custom', 1, 't3 & t4', 'authors', None, True),
738            ('-set_custom', 3, 'test one & test Two', 'authors'),
739            ('-set_custom', 1, 'ijfkghkjdf', 'enum'),
740            ('-set_custom', 3, 'One', 'enum'),
741            ('-set_custom', 3, 'xxx', 'formats'),
742            ('-set_custom', 1, 'my tag two', 'tags', None, False, False, None, True, True),
743            (db.clean,), (db.refresh,),
744            ('all_custom', 'series'), ('all_custom', 'tags'), ('all_custom', 'authors'),
745        ))
746        for label in ('tags', 'series', 'authors', 'comments', 'rating', 'date', 'yesno', 'isbn', 'enum', 'formats', 'float', 'comp_tags'):
747            for func in ('get_custom', 'get_custom_extra', 'get_custom_and_extra'):
748                run_funcs(self, db, ndb, [(func, idx, label) for idx in range(3)])
749        db.close()
750
751        ndb = self.init_legacy(self.cloned_library)
752        db = self.init_old(self.cloned_library)
753        # Test setting bulk
754        run_funcs(self, db, ndb, (
755            ('set_custom_bulk', (1,2,3), 't1 & t2', 'authors'),
756            ('set_custom_bulk', (1,2,3), 'a series', 'series', None, False, False, (9, 10, 11)),
757            ('set_custom_bulk', (1,2,3), 't1', 'tags', None, True),
758            (db.clean,), (db.refresh,),
759            ('all_custom', 'series'), ('all_custom', 'tags'), ('all_custom', 'authors'),
760        ))
761        for label in ('tags', 'series', 'authors', 'comments', 'rating', 'date', 'yesno', 'isbn', 'enum', 'formats', 'float', 'comp_tags'):
762            for func in ('get_custom', 'get_custom_extra', 'get_custom_and_extra'):
763                run_funcs(self, db, ndb, [(func, idx, label) for idx in range(3)])
764        db.close()
765
766        ndb = self.init_legacy(self.cloned_library)
767        db = self.init_old(self.cloned_library)
768        # Test bulk multiple
769        run_funcs(self, db, ndb, (
770            ('set_custom_bulk_multiple', (1,2,3), ['t1'], ['My Tag One'], 'tags'),
771            (db.clean,), (db.refresh,),
772            ('all_custom', 'tags'),
773            ('get_custom', 0, 'tags'), ('get_custom', 1, 'tags'), ('get_custom', 2, 'tags'),
774        ))
775        db.close()
776
777        o = self.cloned_library
778        n = self.cloned_library
779        ndb, db = self.init_legacy(n), self.init_old(o)
780        ndb.create_custom_column('created', 'Created', 'text', True, True, {'moose':'cat'})
781        db.create_custom_column('created', 'Created', 'text', True, True, {'moose':'cat'})
782        db.close()
783        ndb, db = self.init_legacy(n), self.init_old(o)
784        self.assertEqual(db.custom_column_label_map['created'], ndb.backend.custom_field_metadata('created'))
785        num = db.custom_column_label_map['created']['num']
786        ndb.set_custom_column_metadata(num, is_editable=False, name='Crikey', display={})
787        db.set_custom_column_metadata(num, is_editable=False, name='Crikey', display={})
788        db.close()
789        ndb, db = self.init_legacy(n), self.init_old(o)
790        self.assertEqual(db.custom_column_label_map['created'], ndb.backend.custom_field_metadata('created'))
791        db.close()
792        ndb = self.init_legacy(n)
793        ndb.delete_custom_column('created')
794        ndb = self.init_legacy(n)
795        self.assertRaises(KeyError, ndb.custom_field_name, num=num)
796
797        # Test setting custom series
798        ndb = self.init_legacy(self.cloned_library)
799        ndb.set_custom(1, 'TS [9]', label='series')
800        self.assertEqual(ndb.new_api.field_for('#series', 1), 'TS')
801        self.assertEqual(ndb.new_api.field_for('#series_index', 1), 9)
802    # }}}
803
804    def test_legacy_saved_search(self):  # {{{
805        ' Test legacy saved search API '
806        db, ndb = self.init_old(), self.init_legacy()
807        run_funcs(self, db, ndb, (
808            ('saved_search_set_all', {'one':'a', 'two':'b'}),
809            ('saved_search_names',),
810            ('saved_search_lookup', 'one'),
811            ('saved_search_lookup', 'two'),
812            ('saved_search_lookup', 'xxx'),
813            ('saved_search_rename', 'one', '1'),
814            ('saved_search_names',),
815            ('saved_search_lookup', '1'),
816            ('saved_search_delete', '1'),
817            ('saved_search_names',),
818            ('saved_search_add', 'n', 'm'),
819            ('saved_search_names',),
820            ('saved_search_lookup', 'n'),
821        ))
822        db.close()
823    # }}}
824