1# Copyright 2009-2017 Jaap Karssenberg <jaap.karssenberg@gmail.com>
2
3
4import tests
5from tests import os_native_path
6
7import os
8import sys
9import sqlite3
10import time
11
12from zim.notebook.layout import FilesLayout
13from zim.newfs import LocalFolder, File
14from zim.notebook import Path
15from zim.notebook.index import Index, DB_VERSION
16from zim.notebook.index.files import FilesIndexer, TestFilesDBTable, FilesIndexChecker, TYPE_FOLDER
17from zim.notebook.index.pages import PagesIndexer, TestPagesDBTable
18from zim.notebook.index.links import LinksIndexer
19from zim.notebook.index.tags import TagsIndexer
20
21
22def is_dir(path):
23	return path.endswith('/') or path.endswith('\\')
24
25
26@tests.slowTest
27class TestIndexInitialization(tests.TestCase):
28
29	def setUp(self):
30		self.folder = self.setUpFolder(mock=tests.MOCK_ALWAYS_REAL)
31		self.folder.touch() # Must exist for sane notebook
32		self.layout = FilesLayout(self.folder)
33
34	def testWithoutFileAndWithValidFile(self):
35		# Two tests combined because first needed as init for the second
36		file = self.folder.file('index.db')
37		self.assertFalse(file.exists())
38		index = Index(file.path, self.layout)
39		self.assertTrue(file.exists())
40		self.assertEqual(index.get_property('db_version'), DB_VERSION)
41
42		index._db.close()
43		del(index)
44
45		index = Index(file.path, self.layout)
46		self.assertTrue(file.exists())
47		self.assertEqual(index.get_property('db_version'), DB_VERSION)
48
49	def testWithValidDBFile(self):
50		# E.g. old index, not conforming our table layout
51		file = self.folder.file('index.db')
52		self.assertFalse(file.exists())
53
54		db = sqlite3.Connection(file.path)
55		db.execute('CREATE TABLE zim_index (key TEXT);')
56		db.close()
57
58		self.assertTrue(file.exists())
59		index = Index(file.path, self.layout)
60		self.assertTrue(file.exists())
61		self.assertEqual(index.get_property('db_version'), DB_VERSION)
62
63	def testWithBrokenFile(self):
64		file = self.folder.file('index.db')
65		file.write('this is not a database file...\n')
66
67		self.assertTrue(file.exists())
68		with tests.LoggingFilter('zim.notebook.index', 'Overwriting'):
69			with tests.LoggingFilter('zim.notebook.index', 'Could not access'):
70				index = Index(file.path, self.layout)
71		self.assertTrue(file.exists())
72		self.assertEqual(index.get_property('db_version'), DB_VERSION)
73
74	def testWithLockedFile(self):
75		file = self.folder.file('index.db')
76		file.write('this is not a database file...\n')
77		os.chmod(file.path, 0o000) # make read-only
78		self.addCleanup(lambda: os.chmod(file.path, 0o700))
79
80		self.assertTrue(file.exists())
81		with tests.LoggingFilter('zim.notebook.index', 'Overwriting'):
82			with tests.LoggingFilter('zim.notebook.index', 'Could not access'):
83				index = Index(file.path, self.layout)
84		self.assertTrue(file.exists())
85		self.assertEqual(index.get_property('db_version'), DB_VERSION)
86
87
88class TestFilesIndexer(tests.TestCase, TestFilesDBTable):
89
90	FILES = tuple(map(os_native_path, (
91		'foo.txt', # page with children
92		'foo/',
93		'foo/test.png',
94		'foo/sub1.txt',
95		'foo/sub2.txt',
96
97		'bar.txt', # page without children
98		'bar/', # empty folder
99
100		'foo-bar.txt', # page without children
101
102		'baz/', # page nested 2 folders deep
103		'baz/dus/',
104		'baz/dus/ja.txt',
105
106		'argh/', # not a page
107		'argh/somefile.pdf',
108	)))
109	FILES_UPDATE = tuple(map(os_native_path, (
110		'tmp.txt', # page with child
111		'tmp/',
112		'tmp/foo.txt',
113
114		'new/', # nested page
115		'new/page.txt',
116
117		'newfolder/', # nested page in subfolder
118		'newfolder/newsubfolder/',
119		'newfolder/newsubfolder/page.txt',
120	)))
121	FILES_CHANGE = (
122		'foo.txt',
123	)
124	PAGE_TEXT = 'Content-Type: text/x-zim-wiki\n\ntest 123\n'
125
126	def runTest(self):
127		# Test in 3 parts:
128		#   1. Index existing files structure
129		#   2. Check and update after new files appear
130		#   3. Check and update after files disappear
131
132		self.root = self.setUpFolder(mock=tests.MOCK_DEFAULT_REAL)
133		self.db = sqlite3.connect(':memory:')
134		self.db.row_factory = sqlite3.Row
135
136		indexer = FilesIndexer(self.db, self.root)
137
138		def cb_filter_func(name, o, a):
139			#~ print('>>', name)
140			if name in ('start-update', 'finish-update'):
141				self.assertFalse(a)
142				return ()
143			else:
144				row, = a
145				self.assertIsInstance(row, sqlite3.Row)
146				return row['path']
147
148		signals = tests.SignalLogger(indexer, cb_filter_func)
149
150		def check_and_update_all():
151			checker = FilesIndexChecker(indexer.db, indexer.folder)
152			checker.queue_check()
153			for out_of_date in checker.check_iter():
154				if out_of_date:
155					for i in indexer.update_iter():
156						pass
157			indexer.db.commit()
158
159
160		# 1. Index existing files structure
161		self.create_files(self.FILES)
162		check_and_update_all()
163
164		files = set(f for f in self.FILES if not is_dir(f))
165
166		self.assertEqual(set(signals['file-row-inserted']), files)
167		self.assertEqual(set(signals['file-row-changed']), files)
168		self.assertEqual(signals['file-row-deleted'], [])
169
170		self.assertFilesDBConsistent(self.db)
171		self.assertFilesDBEquals(self.db, self.FILES)
172
173		# 2. Check and update after new files appear
174		signals.clear()
175		self.create_files(
176			self.FILES_UPDATE + self.FILES_CHANGE
177		)
178		check_and_update_all()
179
180		files = set(f for f in self.FILES_UPDATE if not is_dir(f))
181		update = files | set(self.FILES_CHANGE)
182
183		self.assertEqual(set(signals['file-row-inserted']), files)
184		self.assertEqual(set(signals['file-row-changed']), update)
185		self.assertEqual(signals['file-row-deleted'], [])
186
187		self.assertFilesDBConsistent(self.db)
188		self.assertFilesDBEquals(self.db,
189			self.FILES + self.FILES_UPDATE
190		)
191
192		# 3. Check and update after files disappear
193		signals.clear()
194		self.remove_files(self.FILES_UPDATE)
195		check_and_update_all()
196
197		files = set(f for f in self.FILES_UPDATE if not is_dir(f))
198
199		self.assertEqual(signals['file-row-inserted'], [])
200		self.assertEqual(signals['file-row-changed'], [])
201		self.assertEqual(set(signals['file-row-deleted']), files)
202
203		self.assertFilesDBConsistent(self.db)
204		self.assertFilesDBEquals(self.db, self.FILES)
205
206	def create_files(self, files):
207		for name in files:
208			if is_dir(name):
209				self.root.folder(name).touch()
210			else:
211				self.root.file(name).write(self.PAGE_TEXT)
212
213	def remove_files(self, files):
214		for name in reversed(files):
215			if is_dir(name):
216				self.root.folder(name).remove()
217			else:
218				self.root.child(name).remove()
219
220
221class TestFilesIndexerRobustForFolderMtime(TestFilesIndexer):
222	# Like TestFilesIndexer but explicitly hack the folder mtime in the database
223	# to not detect the folder structure change by mtime. Ensure robustness when
224	# filesystem mtime is not reliable for folders.
225
226	def create_files(self, files):
227		TestFilesIndexer.create_files(self, files)
228		self.reset_folder_mtimes()
229
230	def remove_files(self, files):
231		TestFilesIndexer.remove_files(self, files)
232		self.reset_folder_mtimes()
233
234	def reset_folder_mtimes(self):
235		count = 0
236		for node_id, rel_path in self.db.execute('SELECT id, path FROM files WHERE node_type = ?', (TYPE_FOLDER,)):
237			folder = self.root.folder(rel_path)
238			if folder.exists():
239				mtime = folder.mtime()
240				self.db.execute('UPDATE files SET mtime = ? WHERE id = ?', (mtime, node_id))
241			count += 1
242		assert count > 0
243
244
245class TestFilesIndexerWithCaseInsensitiveFilesytem(tests.TestCase, TestFilesDBTable):
246
247	def runTest(self):
248		folder = self.setUpFolder(mock=tests.MOCK_ALWAYS_MOCK)
249		folder._fs.set_case_sensitive(False)
250
251		db = sqlite3.connect(':memory:')
252		db.row_factory = sqlite3.Row
253		indexer = FilesIndexer(db, folder)
254
255		def check_and_update_all():
256			checker = FilesIndexChecker(indexer.db, indexer.folder)
257			checker.queue_check()
258			for out_of_date in checker.check_iter():
259				if out_of_date:
260					for i in indexer.update_iter():
261						pass
262			indexer.db.commit()
263
264		for name in ('aaa.txt', 'bbb.txt', 'ccc.txt'):
265			folder.file(name).write('Test 123\n')
266
267		check_and_update_all()
268		self.assertFilesDBConsistent(db)
269		self.assertFilesDBEquals(db, ('aaa.txt', 'bbb.txt', 'ccc.txt'))
270
271		mtime = folder.mtime()
272		folder.file('aaa.txt').moveto(folder.file('AAA.txt'))
273		self.assertEqual(list(folder.list_names()), ['AAA.txt', 'bbb.txt', 'ccc.txt'])
274		self.assertNotEqual(folder.mtime(), mtime)
275
276		check_and_update_all()
277		self.assertFilesDBConsistent(db)
278		self.assertFilesDBEquals(db, ('AAA.txt', 'bbb.txt', 'ccc.txt'))
279
280
281class TestPagesIndexer(TestPagesDBTable, tests.TestCase):
282
283	FILES = tuple(map(os_native_path, (
284		'foo.txt', # page with children
285		'foo/test.png',
286		'foo/sub1.txt',
287		'foo/sub2.txt',
288		'bar.txt', # page without children
289		'foo-bar.txt', # page without children
290		'baz/dus/ja.txt', # page nested 2 folders deep
291		'argh/somefile.pdf', # not a page
292		'foo/not_a_page.txt', # not a page - see below for missing content line
293		'not_a_page.txt', # not a page - see below for missing content line
294	)))
295	PAGES = (
296		'foo',
297		'foo:sub1',
298		'foo:sub2',
299		'bar',
300		'foo-bar',
301		'baz',
302		'baz:dus',
303		'baz:dus:ja',
304	)
305	CONTENT = ( # These have a file
306		'foo',
307		'foo:sub1',
308		'foo:sub2',
309		'bar',
310		'foo-bar',
311		'baz:dus:ja',
312	)
313	NAMESPACES = ( # These have also a folder
314		'foo',
315		'baz',
316		'baz:dus',
317	)
318	PLACEHOLDERS = (
319		'some:none_existing:page',
320		'foo:sub1:subsub',
321		'toplevel'
322	)
323	PLACEHOLDERS_ALL = (
324		'some:none_existing:page',
325		'some:none_existing',
326		'some',
327		'foo:sub1:subsub',
328		'toplevel'
329	)
330
331	def runTest(self):
332		# Test in 4 parts:
333		#   1. insert files
334		#   2. update files
335		#   3. add some placeholders
336		#   4. delete files
337
338		self.root = self.setUpFolder()
339		layout = FilesLayout(self.root)
340		db = sqlite3.connect(':memory:')
341		db.row_factory = sqlite3.Row
342
343		file_indexer = tests.MockObject(methods=('connect',))
344
345		indexer = PagesIndexer(db, layout, file_indexer)
346
347		def cb_filter_func(name, o, a):
348			if name == 'page-changed':
349				row, content = a
350			elif name == 'page-row-changed':
351				row, oldrow = a
352			else:
353				row, = a
354
355			self.assertIsInstance(row, sqlite3.Row)
356			return row['name']
357
358		signals = tests.SignalLogger(indexer, cb_filter_func)
359
360		# 1. insert files
361		for i, path in enumerate(self.FILES):
362			file = self.root.file(path)
363			if path.endswith('.txt') and not "not_a_page" in path:
364				file.write('Content-Type: text/x-zim-wiki\n\ntest 123\n')
365			else:
366				file.write('test 123\n')
367			row = {'id': i, 'path': path}
368			indexer.on_file_row_inserted(file_indexer, row)
369			self.assertPagesDBConsistent(db)
370
371		self.assertPagesDBEquals(db, self.PAGES)
372		self.assertEqual(set(signals['page-row-inserted']), set(self.PAGES))
373		self.assertEqual(set(signals['page-row-changed']), set(self.NAMESPACES))
374		self.assertEqual(signals['page-row-deleted'], [])
375		self.assertEqual(signals['page-changed'], [])
376
377		# 2. update files
378		signals.clear()
379		for i, path in enumerate(self.FILES):
380			row = {'id': i, 'path': path}
381			indexer.on_file_row_changed(file_indexer, row)
382			self.assertPagesDBConsistent(db)
383
384		self.assertPagesDBEquals(db, self.PAGES)
385		self.assertEqual(signals['page-row-inserted'], [])
386		self.assertEqual(set(signals['page-row-changed']), set(self.CONTENT))
387		self.assertEqual(signals['page-row-deleted'], [])
388		self.assertEqual(set(signals['page-changed']), set(self.CONTENT))
389
390		# 3. add some placeholders
391		for pagename in self.PLACEHOLDERS:
392			indexer.insert_link_placeholder(Path(pagename))
393			self.assertPagesDBConsistent(db)
394
395		self.assertPagesDBEquals(db, self.PAGES
396			+ self.PLACEHOLDERS_ALL)
397
398		for pagename in self.PLACEHOLDERS:
399			indexer.delete_link_placeholder(Path(pagename))
400			self.assertPagesDBConsistent(db)
401
402		self.assertPagesDBEquals(db, self.PAGES)
403
404		# 4. delete files
405		signals.clear()
406		for i, path in enumerate(self.FILES):
407			file = self.root.file(path)
408			file.remove()
409			row = {'id': i, 'path': path}
410			indexer.on_file_row_deleted(file_indexer, row)
411			self.assertPagesDBConsistent(db)
412
413		self.assertPagesDBEquals(db, [])
414		self.assertEqual(signals['page-row-inserted'], [])
415		self.assertEqual(set(signals['page-row-changed']), {'foo'})
416						 # "foo" has source that is deleted before children
417		self.assertEqual(set(signals['page-row-deleted']), set(self.PAGES))
418		self.assertEqual(signals['page-changed'], ['foo'])
419						 # "foo" has source that is deleted before children
420
421
422class TestPageNameConflict(tests.TestCase):
423
424	def runTest(self):
425		folder = self.setUpFolder()
426		layout = FilesLayout(folder)
427		db = sqlite3.connect(':memory:')
428		db.row_factory = sqlite3.Row
429
430		file_indexer = tests.MockObject(methods=('connect',))
431
432		indexer = PagesIndexer(db, layout, file_indexer)
433
434		id1 = indexer.insert_page(Path('Test'), None)
435		with tests.LoggingFilter('zim.notebook.index', 'Error while inserting page'):
436			id2 = indexer.insert_page(Path('Test'), None)
437
438		self.assertEqual(id1, id2)
439
440
441from zim.utils import natural_sort_key
442from zim.notebook.index.pages import PagesViewInternal
443from zim.notebook.page import HRef
444from zim.formats.wiki import Parser as WikiParser
445from zim.newfs.mock import MockFile
446
447class TestLinksIndexer(tests.TestCase):
448
449	## Intended layout ##
450	#
451	# page Foo --> page Bar
452	# page Foo --> placeholder Dus
453
454	PAGES = [
455		(2, 'Bar', 'test123\n'),
456		(3, 'Foo', '[[Bar]]\n[[Dus]]\n'),
457	]
458
459	def runTest(self):
460		def basename(name):
461			if ":" in name:
462				return name.split(":")[-1]
463			else:
464				return name
465
466		db = sqlite3.connect(':memory:')
467		db.row_factory = sqlite3.Row
468		pi = PagesIndexer(db, None, tests.MockObject(methods=('connect',)))
469		for i, name, cont in self.PAGES:
470			db.execute(
471				'INSERT INTO pages(id, name, lowerbasename, sortkey, parent, source_file) VALUES (?, ?, ?, ?, 1, 1)',
472				(i, name, basename(name).lower(), natural_sort_key(name))
473			)
474
475		## Test PagesViewInternal methods
476		iview = PagesViewInternal(db)
477		i, pn = iview.resolve_pagename(Path(''), ['foo'])
478		self.assertEqual((i, pn), (3, Path('Foo')))
479
480		i, pn = iview.resolve_link(Path('Foo'), HRef.new_from_wiki_link('Bar'))
481		self.assertEqual((i, pn), (2, Path('Bar')))
482
483		## Test the actual indexer
484		pageindexer = tests.MaskedObject(pi, ('connect',))
485		indexer = LinksIndexer(db, pageindexer)
486
487		for i, name, cont in self.PAGES:
488			row = {'id': i, 'name': name, 'sortkey': natural_sort_key(name), 'is_link_placeholder': False}
489			indexer.on_page_row_inserted(pageindexer, row)
490
491		###
492		pageindexer.setObjectAccess('insert_link_placeholder')
493		for i, name, text in self.PAGES:
494			tree = WikiParser().parse(text)
495			row = {'id': i, 'name': name}
496			indexer.on_page_changed(pageindexer, row, tree)
497
498		indexer.update()
499
500		links = sorted(
501			(r['source'], r['target'])
502				for r in db.execute('SELECT * FROM links')
503		)
504		self.assertEqual(links, [(3, 2), (3, 4)])
505
506		###
507		pageindexer.setObjectAccess('remove_page')
508		for i, name, cont in self.PAGES:
509			row = {'id': i, 'name': name, 'is_link_placeholder': False}
510			indexer.on_page_row_deleted(pageindexer, row)
511
512		indexer.update()
513
514		rows = db.execute('SELECT * FROM links').fetchall()
515		self.assertEqual(rows, [])
516
517
518class TestUnicodeRepresentationAlternatives(tests.TestCase):
519
520	# Write "Glück" as either
521	#    "Gl\u00fcck" using 'LATIN SMALL LETTER U WITH DIAERESIS' (U+00FC)
522	# or "GLu\u0308ck" using 'COMBINING DIAERESIS' (U+0308)
523	#
524	# Both are valid unicode and should be recognized as the same page.
525	# Gtk input methods seem to prefer single character
526	# Specifically Mac OS X seems to prefer combination character for filesystem
527
528	def testNotebook(self):
529		self._test_notebook("Gl\u00fcck", "GLu\u0308ck")
530		self._test_notebook("GLu\u0308ck", "Gl\u00fcck")
531
532	def _test_notebook(self, file_rep, link_rep):
533		# Create a notebook with different unicode representations for
534		# the page and the link name and check they get linked correctly
535
536		notebook = self.setUpNotebook(content={
537			'page A': 'Link [[%s]]' % link_rep,
538			'page B': 'Link [[%s]]' % file_rep,
539			file_rep: 'Test 123'
540		})
541
542		# Now check both page a and page b link to "file_rep", not placeholder
543		links_a = list(notebook.links.list_links(Path('page A')))
544		links_b = list(notebook.links.list_links(Path('page B')))
545		self.assertEqual(len(links_a), 1)
546		self.assertEqual(links_a[0].target, Path(file_rep))
547		self.assertEqual(len(links_b), 1)
548		self.assertEqual(links_b[0].target, Path(file_rep))
549
550	def testPlaceHolderFirst(self):
551		self._test_placeholder_first("Gl\u00fcck", "GLu\u0308ck")
552		self._test_placeholder_first("GLu\u0308ck", "Gl\u00fcck")
553
554	def _test_placeholder_first(self, file_rep, link_rep):
555		# First create a placeholder, then test placeholder is updated correctly
556		# when page is created
557		notebook = self.setUpNotebook(content={
558			'page A': 'Link [[%s]]' % link_rep,
559		})
560		links_a = list(notebook.links.list_links(Path('page A')))
561		self.assertEqual(len(links_a), 1)
562		self.assertEqual(links_a[0].target, Path(link_rep))
563
564		page = notebook.get_page(Path(file_rep))
565		page.parse('wiki', 'test 123')
566		notebook.store_page(page)
567
568		links_a = list(notebook.links.list_links(Path('page A')))
569		self.assertEqual(len(links_a), 1)
570		self.assertEqual(links_a[0].target, Path(file_rep))
571
572
573class TestTagsIndexer(tests.TestCase):
574
575	PAGES = (
576		(2, 'foo', '@tag1 @tag2'),
577		(3, 'bar', '@tag2 @tag3')
578	)
579
580	def runTest(self):
581		db = sqlite3.connect(':memory:')
582		db.row_factory = sqlite3.Row
583
584		indexer = TagsIndexer(db, tests.MockObject(methods=('connect',)))
585		for i, name, text in self.PAGES:
586			tree = WikiParser().parse(text)
587			row = {'id': i, 'name': name}
588			indexer.on_page_changed(None, row, tree)
589		indexer.update()
590
591		self.assertTags(db,
592			[('tag1', 1), ('tag2', 2), ('tag3', 3)],
593 			[(1, 2), (2, 2), (2, 3), (3, 3)]
594		)
595
596		for i, name, content in self.PAGES:
597			row = {'id': i, 'name': name}
598			indexer.on_page_row_delete(None, row)
599		indexer.update()
600
601		self.assertTags(db, [], [])
602
603	def assertTags(self, db, wantedtags, wantedsources):
604		tags = [tuple(r) for r in db.execute(
605			'SELECT name, id FROM tags'
606		)]
607		self.assertEqual(tags, wantedtags)
608
609		tagsources = [tuple(r) for r in db.execute(
610			'SELECT tag, source FROM tagsources'
611		)]
612		self.assertEqual(tagsources, wantedsources)
613
614
615from zim.notebook.index import IndexUpdateIter
616
617
618def buildUpdateIter(folder):
619	db = sqlite3.connect(':memory:')
620	db.row_factory = sqlite3.Row
621	layout = FilesLayout(folder)
622	return IndexUpdateIter(db, layout)
623
624
625class TestFullIndexer(TestFilesIndexer):
626
627	# Just test that all indexers play nice together,
628	# no detailed assertions
629
630	PAGE_TEXT = 'Content-Type: text/x-zim-wiki\n\ntest 123\n[[foo:sub1]]\n[[sub1]]\n@tagfoo\n'
631		# link content choosen to have one link
632		# that resolves always and one link that
633		# resolves for some pages, but causes
634		# placeholder for other namespaces
635
636	def runTest(self):
637		# Test in 3 parts:
638		#   1. Index existing files structure
639		#   2. Check and update after new files appear
640		#   3. Check and update after files disappear
641
642		self.root = self.setUpFolder()
643		update_iter = buildUpdateIter(self.root)
644
645		# 1. Index existing files structure
646		self.create_files(self.FILES)
647		update_iter.check_and_update()
648
649		# 2. Check and update after new files appear
650		self.create_files(self.FILES_UPDATE)
651		update_iter.check_and_update()
652
653		# 3. Check and update after files disappear
654		self.remove_files(self.FILES_UPDATE)
655		update_iter.check_and_update()
656