1# Copyright 2009-2017 Jaap Karssenberg <jaap.karssenberg@gmail.com> 2 3 4import tests 5from tests import os_native_path 6 7import os 8import sys 9import sqlite3 10import time 11 12from zim.notebook.layout import FilesLayout 13from zim.newfs import LocalFolder, File 14from zim.notebook import Path 15from zim.notebook.index import Index, DB_VERSION 16from zim.notebook.index.files import FilesIndexer, TestFilesDBTable, FilesIndexChecker, TYPE_FOLDER 17from zim.notebook.index.pages import PagesIndexer, TestPagesDBTable 18from zim.notebook.index.links import LinksIndexer 19from zim.notebook.index.tags import TagsIndexer 20 21 22def is_dir(path): 23 return path.endswith('/') or path.endswith('\\') 24 25 26@tests.slowTest 27class TestIndexInitialization(tests.TestCase): 28 29 def setUp(self): 30 self.folder = self.setUpFolder(mock=tests.MOCK_ALWAYS_REAL) 31 self.folder.touch() # Must exist for sane notebook 32 self.layout = FilesLayout(self.folder) 33 34 def testWithoutFileAndWithValidFile(self): 35 # Two tests combined because first needed as init for the second 36 file = self.folder.file('index.db') 37 self.assertFalse(file.exists()) 38 index = Index(file.path, self.layout) 39 self.assertTrue(file.exists()) 40 self.assertEqual(index.get_property('db_version'), DB_VERSION) 41 42 index._db.close() 43 del(index) 44 45 index = Index(file.path, self.layout) 46 self.assertTrue(file.exists()) 47 self.assertEqual(index.get_property('db_version'), DB_VERSION) 48 49 def testWithValidDBFile(self): 50 # E.g. old index, not conforming our table layout 51 file = self.folder.file('index.db') 52 self.assertFalse(file.exists()) 53 54 db = sqlite3.Connection(file.path) 55 db.execute('CREATE TABLE zim_index (key TEXT);') 56 db.close() 57 58 self.assertTrue(file.exists()) 59 index = Index(file.path, self.layout) 60 self.assertTrue(file.exists()) 61 self.assertEqual(index.get_property('db_version'), DB_VERSION) 62 63 def testWithBrokenFile(self): 64 file = self.folder.file('index.db') 65 file.write('this is not a database file...\n') 66 67 self.assertTrue(file.exists()) 68 with tests.LoggingFilter('zim.notebook.index', 'Overwriting'): 69 with tests.LoggingFilter('zim.notebook.index', 'Could not access'): 70 index = Index(file.path, self.layout) 71 self.assertTrue(file.exists()) 72 self.assertEqual(index.get_property('db_version'), DB_VERSION) 73 74 def testWithLockedFile(self): 75 file = self.folder.file('index.db') 76 file.write('this is not a database file...\n') 77 os.chmod(file.path, 0o000) # make read-only 78 self.addCleanup(lambda: os.chmod(file.path, 0o700)) 79 80 self.assertTrue(file.exists()) 81 with tests.LoggingFilter('zim.notebook.index', 'Overwriting'): 82 with tests.LoggingFilter('zim.notebook.index', 'Could not access'): 83 index = Index(file.path, self.layout) 84 self.assertTrue(file.exists()) 85 self.assertEqual(index.get_property('db_version'), DB_VERSION) 86 87 88class TestFilesIndexer(tests.TestCase, TestFilesDBTable): 89 90 FILES = tuple(map(os_native_path, ( 91 'foo.txt', # page with children 92 'foo/', 93 'foo/test.png', 94 'foo/sub1.txt', 95 'foo/sub2.txt', 96 97 'bar.txt', # page without children 98 'bar/', # empty folder 99 100 'foo-bar.txt', # page without children 101 102 'baz/', # page nested 2 folders deep 103 'baz/dus/', 104 'baz/dus/ja.txt', 105 106 'argh/', # not a page 107 'argh/somefile.pdf', 108 ))) 109 FILES_UPDATE = tuple(map(os_native_path, ( 110 'tmp.txt', # page with child 111 'tmp/', 112 'tmp/foo.txt', 113 114 'new/', # nested page 115 'new/page.txt', 116 117 'newfolder/', # nested page in subfolder 118 'newfolder/newsubfolder/', 119 'newfolder/newsubfolder/page.txt', 120 ))) 121 FILES_CHANGE = ( 122 'foo.txt', 123 ) 124 PAGE_TEXT = 'Content-Type: text/x-zim-wiki\n\ntest 123\n' 125 126 def runTest(self): 127 # Test in 3 parts: 128 # 1. Index existing files structure 129 # 2. Check and update after new files appear 130 # 3. Check and update after files disappear 131 132 self.root = self.setUpFolder(mock=tests.MOCK_DEFAULT_REAL) 133 self.db = sqlite3.connect(':memory:') 134 self.db.row_factory = sqlite3.Row 135 136 indexer = FilesIndexer(self.db, self.root) 137 138 def cb_filter_func(name, o, a): 139 #~ print('>>', name) 140 if name in ('start-update', 'finish-update'): 141 self.assertFalse(a) 142 return () 143 else: 144 row, = a 145 self.assertIsInstance(row, sqlite3.Row) 146 return row['path'] 147 148 signals = tests.SignalLogger(indexer, cb_filter_func) 149 150 def check_and_update_all(): 151 checker = FilesIndexChecker(indexer.db, indexer.folder) 152 checker.queue_check() 153 for out_of_date in checker.check_iter(): 154 if out_of_date: 155 for i in indexer.update_iter(): 156 pass 157 indexer.db.commit() 158 159 160 # 1. Index existing files structure 161 self.create_files(self.FILES) 162 check_and_update_all() 163 164 files = set(f for f in self.FILES if not is_dir(f)) 165 166 self.assertEqual(set(signals['file-row-inserted']), files) 167 self.assertEqual(set(signals['file-row-changed']), files) 168 self.assertEqual(signals['file-row-deleted'], []) 169 170 self.assertFilesDBConsistent(self.db) 171 self.assertFilesDBEquals(self.db, self.FILES) 172 173 # 2. Check and update after new files appear 174 signals.clear() 175 self.create_files( 176 self.FILES_UPDATE + self.FILES_CHANGE 177 ) 178 check_and_update_all() 179 180 files = set(f for f in self.FILES_UPDATE if not is_dir(f)) 181 update = files | set(self.FILES_CHANGE) 182 183 self.assertEqual(set(signals['file-row-inserted']), files) 184 self.assertEqual(set(signals['file-row-changed']), update) 185 self.assertEqual(signals['file-row-deleted'], []) 186 187 self.assertFilesDBConsistent(self.db) 188 self.assertFilesDBEquals(self.db, 189 self.FILES + self.FILES_UPDATE 190 ) 191 192 # 3. Check and update after files disappear 193 signals.clear() 194 self.remove_files(self.FILES_UPDATE) 195 check_and_update_all() 196 197 files = set(f for f in self.FILES_UPDATE if not is_dir(f)) 198 199 self.assertEqual(signals['file-row-inserted'], []) 200 self.assertEqual(signals['file-row-changed'], []) 201 self.assertEqual(set(signals['file-row-deleted']), files) 202 203 self.assertFilesDBConsistent(self.db) 204 self.assertFilesDBEquals(self.db, self.FILES) 205 206 def create_files(self, files): 207 for name in files: 208 if is_dir(name): 209 self.root.folder(name).touch() 210 else: 211 self.root.file(name).write(self.PAGE_TEXT) 212 213 def remove_files(self, files): 214 for name in reversed(files): 215 if is_dir(name): 216 self.root.folder(name).remove() 217 else: 218 self.root.child(name).remove() 219 220 221class TestFilesIndexerRobustForFolderMtime(TestFilesIndexer): 222 # Like TestFilesIndexer but explicitly hack the folder mtime in the database 223 # to not detect the folder structure change by mtime. Ensure robustness when 224 # filesystem mtime is not reliable for folders. 225 226 def create_files(self, files): 227 TestFilesIndexer.create_files(self, files) 228 self.reset_folder_mtimes() 229 230 def remove_files(self, files): 231 TestFilesIndexer.remove_files(self, files) 232 self.reset_folder_mtimes() 233 234 def reset_folder_mtimes(self): 235 count = 0 236 for node_id, rel_path in self.db.execute('SELECT id, path FROM files WHERE node_type = ?', (TYPE_FOLDER,)): 237 folder = self.root.folder(rel_path) 238 if folder.exists(): 239 mtime = folder.mtime() 240 self.db.execute('UPDATE files SET mtime = ? WHERE id = ?', (mtime, node_id)) 241 count += 1 242 assert count > 0 243 244 245class TestFilesIndexerWithCaseInsensitiveFilesytem(tests.TestCase, TestFilesDBTable): 246 247 def runTest(self): 248 folder = self.setUpFolder(mock=tests.MOCK_ALWAYS_MOCK) 249 folder._fs.set_case_sensitive(False) 250 251 db = sqlite3.connect(':memory:') 252 db.row_factory = sqlite3.Row 253 indexer = FilesIndexer(db, folder) 254 255 def check_and_update_all(): 256 checker = FilesIndexChecker(indexer.db, indexer.folder) 257 checker.queue_check() 258 for out_of_date in checker.check_iter(): 259 if out_of_date: 260 for i in indexer.update_iter(): 261 pass 262 indexer.db.commit() 263 264 for name in ('aaa.txt', 'bbb.txt', 'ccc.txt'): 265 folder.file(name).write('Test 123\n') 266 267 check_and_update_all() 268 self.assertFilesDBConsistent(db) 269 self.assertFilesDBEquals(db, ('aaa.txt', 'bbb.txt', 'ccc.txt')) 270 271 mtime = folder.mtime() 272 folder.file('aaa.txt').moveto(folder.file('AAA.txt')) 273 self.assertEqual(list(folder.list_names()), ['AAA.txt', 'bbb.txt', 'ccc.txt']) 274 self.assertNotEqual(folder.mtime(), mtime) 275 276 check_and_update_all() 277 self.assertFilesDBConsistent(db) 278 self.assertFilesDBEquals(db, ('AAA.txt', 'bbb.txt', 'ccc.txt')) 279 280 281class TestPagesIndexer(TestPagesDBTable, tests.TestCase): 282 283 FILES = tuple(map(os_native_path, ( 284 'foo.txt', # page with children 285 'foo/test.png', 286 'foo/sub1.txt', 287 'foo/sub2.txt', 288 'bar.txt', # page without children 289 'foo-bar.txt', # page without children 290 'baz/dus/ja.txt', # page nested 2 folders deep 291 'argh/somefile.pdf', # not a page 292 'foo/not_a_page.txt', # not a page - see below for missing content line 293 'not_a_page.txt', # not a page - see below for missing content line 294 ))) 295 PAGES = ( 296 'foo', 297 'foo:sub1', 298 'foo:sub2', 299 'bar', 300 'foo-bar', 301 'baz', 302 'baz:dus', 303 'baz:dus:ja', 304 ) 305 CONTENT = ( # These have a file 306 'foo', 307 'foo:sub1', 308 'foo:sub2', 309 'bar', 310 'foo-bar', 311 'baz:dus:ja', 312 ) 313 NAMESPACES = ( # These have also a folder 314 'foo', 315 'baz', 316 'baz:dus', 317 ) 318 PLACEHOLDERS = ( 319 'some:none_existing:page', 320 'foo:sub1:subsub', 321 'toplevel' 322 ) 323 PLACEHOLDERS_ALL = ( 324 'some:none_existing:page', 325 'some:none_existing', 326 'some', 327 'foo:sub1:subsub', 328 'toplevel' 329 ) 330 331 def runTest(self): 332 # Test in 4 parts: 333 # 1. insert files 334 # 2. update files 335 # 3. add some placeholders 336 # 4. delete files 337 338 self.root = self.setUpFolder() 339 layout = FilesLayout(self.root) 340 db = sqlite3.connect(':memory:') 341 db.row_factory = sqlite3.Row 342 343 file_indexer = tests.MockObject(methods=('connect',)) 344 345 indexer = PagesIndexer(db, layout, file_indexer) 346 347 def cb_filter_func(name, o, a): 348 if name == 'page-changed': 349 row, content = a 350 elif name == 'page-row-changed': 351 row, oldrow = a 352 else: 353 row, = a 354 355 self.assertIsInstance(row, sqlite3.Row) 356 return row['name'] 357 358 signals = tests.SignalLogger(indexer, cb_filter_func) 359 360 # 1. insert files 361 for i, path in enumerate(self.FILES): 362 file = self.root.file(path) 363 if path.endswith('.txt') and not "not_a_page" in path: 364 file.write('Content-Type: text/x-zim-wiki\n\ntest 123\n') 365 else: 366 file.write('test 123\n') 367 row = {'id': i, 'path': path} 368 indexer.on_file_row_inserted(file_indexer, row) 369 self.assertPagesDBConsistent(db) 370 371 self.assertPagesDBEquals(db, self.PAGES) 372 self.assertEqual(set(signals['page-row-inserted']), set(self.PAGES)) 373 self.assertEqual(set(signals['page-row-changed']), set(self.NAMESPACES)) 374 self.assertEqual(signals['page-row-deleted'], []) 375 self.assertEqual(signals['page-changed'], []) 376 377 # 2. update files 378 signals.clear() 379 for i, path in enumerate(self.FILES): 380 row = {'id': i, 'path': path} 381 indexer.on_file_row_changed(file_indexer, row) 382 self.assertPagesDBConsistent(db) 383 384 self.assertPagesDBEquals(db, self.PAGES) 385 self.assertEqual(signals['page-row-inserted'], []) 386 self.assertEqual(set(signals['page-row-changed']), set(self.CONTENT)) 387 self.assertEqual(signals['page-row-deleted'], []) 388 self.assertEqual(set(signals['page-changed']), set(self.CONTENT)) 389 390 # 3. add some placeholders 391 for pagename in self.PLACEHOLDERS: 392 indexer.insert_link_placeholder(Path(pagename)) 393 self.assertPagesDBConsistent(db) 394 395 self.assertPagesDBEquals(db, self.PAGES 396 + self.PLACEHOLDERS_ALL) 397 398 for pagename in self.PLACEHOLDERS: 399 indexer.delete_link_placeholder(Path(pagename)) 400 self.assertPagesDBConsistent(db) 401 402 self.assertPagesDBEquals(db, self.PAGES) 403 404 # 4. delete files 405 signals.clear() 406 for i, path in enumerate(self.FILES): 407 file = self.root.file(path) 408 file.remove() 409 row = {'id': i, 'path': path} 410 indexer.on_file_row_deleted(file_indexer, row) 411 self.assertPagesDBConsistent(db) 412 413 self.assertPagesDBEquals(db, []) 414 self.assertEqual(signals['page-row-inserted'], []) 415 self.assertEqual(set(signals['page-row-changed']), {'foo'}) 416 # "foo" has source that is deleted before children 417 self.assertEqual(set(signals['page-row-deleted']), set(self.PAGES)) 418 self.assertEqual(signals['page-changed'], ['foo']) 419 # "foo" has source that is deleted before children 420 421 422class TestPageNameConflict(tests.TestCase): 423 424 def runTest(self): 425 folder = self.setUpFolder() 426 layout = FilesLayout(folder) 427 db = sqlite3.connect(':memory:') 428 db.row_factory = sqlite3.Row 429 430 file_indexer = tests.MockObject(methods=('connect',)) 431 432 indexer = PagesIndexer(db, layout, file_indexer) 433 434 id1 = indexer.insert_page(Path('Test'), None) 435 with tests.LoggingFilter('zim.notebook.index', 'Error while inserting page'): 436 id2 = indexer.insert_page(Path('Test'), None) 437 438 self.assertEqual(id1, id2) 439 440 441from zim.utils import natural_sort_key 442from zim.notebook.index.pages import PagesViewInternal 443from zim.notebook.page import HRef 444from zim.formats.wiki import Parser as WikiParser 445from zim.newfs.mock import MockFile 446 447class TestLinksIndexer(tests.TestCase): 448 449 ## Intended layout ## 450 # 451 # page Foo --> page Bar 452 # page Foo --> placeholder Dus 453 454 PAGES = [ 455 (2, 'Bar', 'test123\n'), 456 (3, 'Foo', '[[Bar]]\n[[Dus]]\n'), 457 ] 458 459 def runTest(self): 460 def basename(name): 461 if ":" in name: 462 return name.split(":")[-1] 463 else: 464 return name 465 466 db = sqlite3.connect(':memory:') 467 db.row_factory = sqlite3.Row 468 pi = PagesIndexer(db, None, tests.MockObject(methods=('connect',))) 469 for i, name, cont in self.PAGES: 470 db.execute( 471 'INSERT INTO pages(id, name, lowerbasename, sortkey, parent, source_file) VALUES (?, ?, ?, ?, 1, 1)', 472 (i, name, basename(name).lower(), natural_sort_key(name)) 473 ) 474 475 ## Test PagesViewInternal methods 476 iview = PagesViewInternal(db) 477 i, pn = iview.resolve_pagename(Path(''), ['foo']) 478 self.assertEqual((i, pn), (3, Path('Foo'))) 479 480 i, pn = iview.resolve_link(Path('Foo'), HRef.new_from_wiki_link('Bar')) 481 self.assertEqual((i, pn), (2, Path('Bar'))) 482 483 ## Test the actual indexer 484 pageindexer = tests.MaskedObject(pi, ('connect',)) 485 indexer = LinksIndexer(db, pageindexer) 486 487 for i, name, cont in self.PAGES: 488 row = {'id': i, 'name': name, 'sortkey': natural_sort_key(name), 'is_link_placeholder': False} 489 indexer.on_page_row_inserted(pageindexer, row) 490 491 ### 492 pageindexer.setObjectAccess('insert_link_placeholder') 493 for i, name, text in self.PAGES: 494 tree = WikiParser().parse(text) 495 row = {'id': i, 'name': name} 496 indexer.on_page_changed(pageindexer, row, tree) 497 498 indexer.update() 499 500 links = sorted( 501 (r['source'], r['target']) 502 for r in db.execute('SELECT * FROM links') 503 ) 504 self.assertEqual(links, [(3, 2), (3, 4)]) 505 506 ### 507 pageindexer.setObjectAccess('remove_page') 508 for i, name, cont in self.PAGES: 509 row = {'id': i, 'name': name, 'is_link_placeholder': False} 510 indexer.on_page_row_deleted(pageindexer, row) 511 512 indexer.update() 513 514 rows = db.execute('SELECT * FROM links').fetchall() 515 self.assertEqual(rows, []) 516 517 518class TestUnicodeRepresentationAlternatives(tests.TestCase): 519 520 # Write "Glück" as either 521 # "Gl\u00fcck" using 'LATIN SMALL LETTER U WITH DIAERESIS' (U+00FC) 522 # or "GLu\u0308ck" using 'COMBINING DIAERESIS' (U+0308) 523 # 524 # Both are valid unicode and should be recognized as the same page. 525 # Gtk input methods seem to prefer single character 526 # Specifically Mac OS X seems to prefer combination character for filesystem 527 528 def testNotebook(self): 529 self._test_notebook("Gl\u00fcck", "GLu\u0308ck") 530 self._test_notebook("GLu\u0308ck", "Gl\u00fcck") 531 532 def _test_notebook(self, file_rep, link_rep): 533 # Create a notebook with different unicode representations for 534 # the page and the link name and check they get linked correctly 535 536 notebook = self.setUpNotebook(content={ 537 'page A': 'Link [[%s]]' % link_rep, 538 'page B': 'Link [[%s]]' % file_rep, 539 file_rep: 'Test 123' 540 }) 541 542 # Now check both page a and page b link to "file_rep", not placeholder 543 links_a = list(notebook.links.list_links(Path('page A'))) 544 links_b = list(notebook.links.list_links(Path('page B'))) 545 self.assertEqual(len(links_a), 1) 546 self.assertEqual(links_a[0].target, Path(file_rep)) 547 self.assertEqual(len(links_b), 1) 548 self.assertEqual(links_b[0].target, Path(file_rep)) 549 550 def testPlaceHolderFirst(self): 551 self._test_placeholder_first("Gl\u00fcck", "GLu\u0308ck") 552 self._test_placeholder_first("GLu\u0308ck", "Gl\u00fcck") 553 554 def _test_placeholder_first(self, file_rep, link_rep): 555 # First create a placeholder, then test placeholder is updated correctly 556 # when page is created 557 notebook = self.setUpNotebook(content={ 558 'page A': 'Link [[%s]]' % link_rep, 559 }) 560 links_a = list(notebook.links.list_links(Path('page A'))) 561 self.assertEqual(len(links_a), 1) 562 self.assertEqual(links_a[0].target, Path(link_rep)) 563 564 page = notebook.get_page(Path(file_rep)) 565 page.parse('wiki', 'test 123') 566 notebook.store_page(page) 567 568 links_a = list(notebook.links.list_links(Path('page A'))) 569 self.assertEqual(len(links_a), 1) 570 self.assertEqual(links_a[0].target, Path(file_rep)) 571 572 573class TestTagsIndexer(tests.TestCase): 574 575 PAGES = ( 576 (2, 'foo', '@tag1 @tag2'), 577 (3, 'bar', '@tag2 @tag3') 578 ) 579 580 def runTest(self): 581 db = sqlite3.connect(':memory:') 582 db.row_factory = sqlite3.Row 583 584 indexer = TagsIndexer(db, tests.MockObject(methods=('connect',))) 585 for i, name, text in self.PAGES: 586 tree = WikiParser().parse(text) 587 row = {'id': i, 'name': name} 588 indexer.on_page_changed(None, row, tree) 589 indexer.update() 590 591 self.assertTags(db, 592 [('tag1', 1), ('tag2', 2), ('tag3', 3)], 593 [(1, 2), (2, 2), (2, 3), (3, 3)] 594 ) 595 596 for i, name, content in self.PAGES: 597 row = {'id': i, 'name': name} 598 indexer.on_page_row_delete(None, row) 599 indexer.update() 600 601 self.assertTags(db, [], []) 602 603 def assertTags(self, db, wantedtags, wantedsources): 604 tags = [tuple(r) for r in db.execute( 605 'SELECT name, id FROM tags' 606 )] 607 self.assertEqual(tags, wantedtags) 608 609 tagsources = [tuple(r) for r in db.execute( 610 'SELECT tag, source FROM tagsources' 611 )] 612 self.assertEqual(tagsources, wantedsources) 613 614 615from zim.notebook.index import IndexUpdateIter 616 617 618def buildUpdateIter(folder): 619 db = sqlite3.connect(':memory:') 620 db.row_factory = sqlite3.Row 621 layout = FilesLayout(folder) 622 return IndexUpdateIter(db, layout) 623 624 625class TestFullIndexer(TestFilesIndexer): 626 627 # Just test that all indexers play nice together, 628 # no detailed assertions 629 630 PAGE_TEXT = 'Content-Type: text/x-zim-wiki\n\ntest 123\n[[foo:sub1]]\n[[sub1]]\n@tagfoo\n' 631 # link content choosen to have one link 632 # that resolves always and one link that 633 # resolves for some pages, but causes 634 # placeholder for other namespaces 635 636 def runTest(self): 637 # Test in 3 parts: 638 # 1. Index existing files structure 639 # 2. Check and update after new files appear 640 # 3. Check and update after files disappear 641 642 self.root = self.setUpFolder() 643 update_iter = buildUpdateIter(self.root) 644 645 # 1. Index existing files structure 646 self.create_files(self.FILES) 647 update_iter.check_and_update() 648 649 # 2. Check and update after new files appear 650 self.create_files(self.FILES_UPDATE) 651 update_iter.check_and_update() 652 653 # 3. Check and update after files disappear 654 self.remove_files(self.FILES_UPDATE) 655 update_iter.check_and_update() 656