1#! /usr/bin/env python3
2# -.- coding: utf-8 -.-
3
4# remote-test.py
5#
6# Copyright © 2009-2011 Seif Lotfy <seif@lotfy.com>
7# Copyright © 2009-2011 Siegfried-Angel Gevatter Pujals <siegfried@gevatter.com>
8# Copyright © 2009-2011 Mikkel Kamstrup Erlandsen <mikkel.kamstrup@gmail.com>
9# Copyright © 2009-2011 Markus Korn <thekorn@gmx.de>
10# Copyright © 2011-2012 Collabora Ltd.
11#             By Siegfried-Angel Gevatter Pujals <siegfried@gevatter.com>
12#             By Seif Lotfy <seif@lotfy.com>
13#
14# This program is free software: you can redistribute it and/or modify
15# it under the terms of the GNU Lesser General Public License as published by
16# the Free Software Foundation, either version 2.1 of the License, or
17# (at your option) any later version.
18#
19# This program is distributed in the hope that it will be useful,
20# but WITHOUT ANY WARRANTY; without even the implied warranty of
21# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22# GNU Lesser General Public License for more details.
23#
24# You should have received a copy of the GNU Lesser General Public License
25# along with this program.  If not, see <http://www.gnu.org/licenses/>.
26
27import signal
28
29from zeitgeist.datamodel import (Event, Subject, Interpretation, Manifestation,
30	TimeRange, StorageState, DataSource, NULL_EVENT, ResultType)
31
32import testutils
33from dbus.exceptions import DBusException
34from testutils import parse_events, import_events
35
36class ZeitgeistRemoteAPITest(testutils.RemoteTestCase):
37
38	def testInsertAndGetEvent(self):
39		# Insert an event
40		events = parse_events("test/data/single_event.js")
41		ids = self.insertEventsAndWait(events)
42		self.assertEqual(1, len(ids))
43
44		# Now get it back and check it hasn't changed
45		retrieved_events = self.getEventsAndWait(ids)
46		self.assertEqual(1, len(retrieved_events))
47		self.assertEventsEqual(retrieved_events[0], events[0])
48
49	def testUnicodeInsert(self):
50		events = parse_events("test/data/unicode_event.js")
51		ids = self.insertEventsAndWait(events)
52		self.assertEqual(len(ids), len(events))
53		result_events = self.getEventsAndWait(ids)
54		self.assertEqual(len(ids), len(result_events))
55
56	def testGetEvents(self):
57		events = parse_events("test/data/five_events.js")
58		ids = self.insertEventsAndWait(events) + [1000, 2000]
59		result = self.getEventsAndWait(ids)
60		self.assertEqual(len([_f for _f in result if _f]), len(events))
61		self.assertEqual(len([event for event in result if event is None]), 2)
62
63	def testInsertAndDeleteEvent(self):
64		# Insert an event
65		events = parse_events("test/data/single_event.js")
66		ids = self.insertEventsAndWait(events)
67
68		# Delete it, make sure the returned time range is correct
69		time_range = self.deleteEventsAndWait(ids)
70		self.assertEqual(time_range[0], time_range[1])
71		self.assertEqual(time_range[0], int(events[0].timestamp))
72
73		# Make sure the event is gone
74		retrieved_events = self.getEventsAndWait(ids)
75		self.assertEqual(retrieved_events[0], None)
76
77	def testDeleteNonExistantEvent(self):
78		# Insert an event (populate the database so it isn't empty)
79		events = parse_events("test/data/single_event.js")
80		ids = self.insertEventsAndWait(events)
81
82		# Try deleting a non-existant event
83		events = parse_events("test/data/single_event.js")
84		time_range = self.deleteEventsAndWait([int(ids[0]) + 1000])
85		self.assertEqual(time_range[0], time_range[1])
86		self.assertEqual(time_range[0], -1)
87
88		# Make sure the inserted event is still there
89		retrieved_events = self.getEventsAndWait(ids)
90		self.assertEqual(1, len(retrieved_events))
91		self.assertEventsEqual(retrieved_events[0], events[0])
92
93	def testDeleteTwoSimilarEvents(self):
94		# Insert a couple similar events
95		event1 = parse_events("test/data/single_event.js")[0]
96		event2 = Event(event1)
97		event2.timestamp = int(event1.timestamp) + 1
98		ids = self.insertEventsAndWait([event1, event2])
99
100		# Try deleting one of them
101		self.deleteEventsAndWait([ids[0]])
102
103		# Make sure it's gone, but the second one is still there
104		retrieved_events = self.getEventsAndWait(ids)
105		self.assertEqual(retrieved_events[0], None)
106		self.assertEventsEqual(retrieved_events[1], event2)
107
108class ZeitgeistRemoteAPITestAdvanced(testutils.RemoteTestCase):
109
110	def testFindTwoOfThreeEvents(self):
111		events = parse_events("test/data/three_events.js")
112		ids = self.insertEventsAndWait(events)
113		self.assertEqual(3, len(ids))
114
115		events = self.getEventsAndWait(ids)
116		self.assertEqual(3, len(events))
117		for event in events:
118			self.assertTrue(isinstance(event, Event))
119			self.assertEqual(Manifestation.USER_ACTIVITY, event.manifestation)
120			self.assertTrue(event.actor.startswith("Boogaloo"))
121
122		# Search for everything
123		ids = self.findEventIdsAndWait([], num_events=3)
124		self.assertEqual(3, len(ids))
125
126		# Search for some specific templates
127		subj_templ1 = Subject.new_for_values(manifestation=Manifestation.FILE_DATA_OBJECT)
128		subj_templ2 = Subject.new_for_values(interpretation=Interpretation.IMAGE)
129		event_template = Event.new_for_values(
130					actor="Boogaloo*",
131					interpretation=Interpretation.ACCESS_EVENT,
132					subjects=[subj_templ1, subj_templ2])
133		ids = self.findEventIdsAndWait([event_template],
134						num_events=10)
135		self.assertEqual(1, len(ids))
136
137	def testFindOneOfThreeEvents(self):
138		events = parse_events("test/data/three_events.js")
139		ids = self.insertEventsAndWait(events)
140		self.assertEqual(3, len(ids))
141
142		events = self.getEventsAndWait(ids)
143		self.assertEqual(3, len(events))
144		for event in events:
145			self.assertTrue(isinstance(event, Event))
146			self.assertEqual(Manifestation.USER_ACTIVITY, event.manifestation)
147			self.assertTrue(event.actor.startswith("Boogaloo"))
148
149		# Search for everything
150		ids = self.findEventIdsAndWait([], num_events=3)
151		self.assertEqual(3, len(ids))
152
153		# Search for some specific templates
154		subj_templ1 = Subject.new_for_values(interpretation="!"+Interpretation.AUDIO)
155		subj_templ2 = Subject.new_for_values(interpretation="!"+Interpretation.IMAGE)
156		event_template = Event.new_for_values(
157					actor="Boogaloo*",
158					interpretation=Interpretation.ACCESS_EVENT,
159					subjects=[subj_templ1, subj_templ2])
160		ids = self.findEventIdsAndWait([event_template],
161						num_events=10)
162		self.assertEqual(1, len(ids))
163		events = self.getEventsAndWait(ids)
164		event = events[0]
165		self.assertEqual(event.subjects[0].interpretation, Interpretation.DOCUMENT)
166
167	def testFindEventsWithMultipleSubjects(self):
168		events = parse_events("test/data/three_events.js")
169		ids = self.insertEventsAndWait(events)
170
171		results = self.findEventsForTemplatesAndWait([], num_events=5)
172		self.assertEqual(3, len(results))
173
174		self.assertEqual(len(results[2].get_subjects()), 2)
175		self.assertEqual(len(results[1].get_subjects()), 1)
176		self.assertEqual(len(results[0].get_subjects()), 1)
177
178	def testFindEventsWithNoexpandOperator(self):
179		events = parse_events("test/data/three_events.js")
180		ids = self.insertEventsAndWait(events)
181
182		template = Event.new_for_values(
183			subject_interpretation=Interpretation.MEDIA)
184		results = self.findEventsForTemplatesAndWait([template],
185			num_events=5)
186		self.assertEqual(3, len(results))
187
188		template = Event.new_for_values(
189			subject_interpretation='+%s' % Interpretation.MEDIA)
190		results = self.findEventsForTemplatesAndWait([template],
191			num_events=5)
192		self.assertEqual(0, len(results))
193
194		template = Event.new_for_values(
195			subject_interpretation='+%s' % Interpretation.AUDIO)
196		results = self.findEventsForTemplatesAndWait([template],
197			num_events=5)
198		self.assertEqual(1, len(results))
199		self.assertEqual(results[0].get_subjects()[0].interpretation,
200			Interpretation.AUDIO)
201
202	def testFindEventsLimitWhenDuplicates(self):
203		events = parse_events("test/data/three_events.js")
204		ids = self.insertEventsAndWait(events)
205
206		# This test makes sure that we get the requested number of events
207		# when some of them have multiple subjects (so more than one row
208		# with the same event id).
209		results = self.findEventsForTemplatesAndWait([], num_events=3)
210		self.assertEqual(3, len(results))
211
212	def testInsertWithEmptySubjectInterpretationManifestation(self):
213		events = parse_events("test/data/incomplete_events.js")
214		ids = self.insertEventsAndWait(events[:3])
215		self.assertEqual(3, len(ids))
216
217		event = self.getEventsAndWait([ids[0]])[0]
218		self.assertEqual("Hi", event.subjects[0].manifestation)
219		self.assertEqual("", event.subjects[0].interpretation)
220		self.assertEqual("Something", event.subjects[1].manifestation)
221		self.assertEqual("", event.subjects[1].interpretation)
222
223		event = self.getEventsAndWait([ids[1]])[0]
224		self.assertEqual(Manifestation.FILE_DATA_OBJECT, event.subjects[0].manifestation)
225		self.assertEqual(Interpretation.SOURCE_CODE, event.subjects[0].interpretation)
226		self.assertEqual(Manifestation.FILE_DATA_OBJECT, event.subjects[1].manifestation)
227		self.assertEqual("a", event.subjects[1].interpretation)
228		self.assertEqual("b", event.subjects[2].manifestation)
229		self.assertEqual(Interpretation.SOURCE_CODE, event.subjects[2].interpretation)
230
231		event = self.getEventsAndWait([ids[2]])[0]
232		self.assertEqual("something else", event.subjects[0].manifestation)
233		self.assertEqual("#Audio", event.subjects[0].interpretation)
234
235	def testInsertWithEmptySubjectMimeType(self):
236		events = parse_events("test/data/incomplete_events.js")
237		ids = self.insertEventsAndWait([events[7]])
238		self.assertEqual(1, len(ids))
239
240		event = self.getEventsAndWait([ids[0]])[0]
241		self.assertEqual(1, len(event.subjects))
242
243		subject = event.subjects[0]
244		self.assertEqual("file:///unknown-mimetype-file", subject.uri)
245		self.assertEqual("", subject.mimetype)
246		self.assertEqual(Manifestation.FILE_DATA_OBJECT, subject.manifestation)  # FIXME
247		self.assertEqual("", subject.interpretation) # FIXME
248
249	def testInsertIncompleteEvent(self):
250		events = parse_events("test/data/incomplete_events.js")
251
252		# Missing interpretation
253		ids = self.insertEventsAndWait([events[3]])
254		self.assertEqual(0, len(ids))
255
256		# Missing manifestation
257		ids = self.insertEventsAndWait([events[4]])
258		self.assertEqual(0, len(ids))
259
260		# Missing actor
261		ids = self.insertEventsAndWait([events[5]])
262		self.assertEqual(0, len(ids))
263
264	def testInsertIncompleteSubject(self):
265		events = parse_events("test/data/incomplete_events.js")
266
267		# Missing one subject URI
268		ids = self.insertEventsAndWait([events[6]])
269		self.assertEqual(0, len(ids))
270
271class ZeitgeistRemoteFindEventIdsTest(testutils.RemoteTestCase):
272	"""
273	Test cases with basic tests for FindEventIds.
274
275	Since they are all using the same test events and all tests contained
276	here are read-only, I'd make sense to use something like setUpClass/
277	tearDownClass to speed up test execution.
278	"""
279
280	def setUp(self):
281		super(ZeitgeistRemoteFindEventIdsTest, self).setUp()
282
283		# Insert some events...
284		events = parse_events("test/data/five_events.js")
285		self.ids = self.insertEventsAndWait(events)
286
287	def testFindEventIds(self):
288		# Retrieve all existing event IDs, make sure they are correct
289		retrieved_ids = self.findEventIdsAndWait([])
290		self.assertEqual(set(retrieved_ids), set(self.ids))
291
292	def testFindEventIdsForId(self):
293		# Retrieve events for a particular event ID
294		template = Event([["3", "", "", "", "", ""], [], ""])
295		ids = self.findEventIdsAndWait([template])
296		self.assertEqual(ids, [3])
297
298	def testFindEventIdsForTimeRange(self):
299		# Make sure that filtering by time range we get the right ones
300		retrieved_ids = self.findEventIdsAndWait([],
301			timerange=TimeRange(133, 153))
302		self.assertEqual(retrieved_ids, [4, 2, 3]) # TS: [133, 143, 153]
303
304		retrieved_ids = self.findEventIdsAndWait([],
305			timerange=TimeRange(163, 163))
306		self.assertEqual(retrieved_ids, [5]) # Timestamps: [163]
307
308	def testFindEventIdsForInterpretation(self):
309		# Retrieve events for a particular interpretation
310		template = Event.new_for_values(interpretation='stfu:OpenEvent')
311		ids = self.findEventIdsAndWait([template])
312		self.assertEqual(ids, [5, 1])
313
314		# Retrieve events excluding a particular interpretation
315		template = Event.new_for_values(interpretation='!stfu:OpenEvent')
316		ids = self.findEventIdsAndWait([template])
317		self.assertEqual(list(map(int, ids)), [4, 2, 3])
318
319	def testFindEventIdsForManifestation(self):
320		# Retrieve events for a particular manifestation
321		template = Event.new_for_values(manifestation='stfu:BooActivity')
322		ids = self.findEventIdsAndWait([template])
323		self.assertEqual(ids, [2])
324
325		# Retrieve events excluding a particular manifestation
326		template = Event.new_for_values(manifestation='!stfu:BooActivity')
327		ids = self.findEventIdsAndWait([template])
328		self.assertEqual(list(map(int, ids)), [5, 4, 3, 1])
329
330	def testFindEventIdsForActor(self):
331		# Retrieve events for a particular actor
332		template = Event.new_for_values(actor='gedit')
333		ids = self.findEventIdsAndWait([template])
334		self.assertEqual(ids, [3])
335
336		# Retrieve events excluding a particular actor
337		template = Event.new_for_values(actor='!gedit')
338		ids = self.findEventIdsAndWait([template])
339		self.assertEqual(list(map(int, ids)), [5, 4, 2, 1])
340
341		# Retrieve events with actor matching a prefix
342		template = Event.new_for_values(actor='g*')
343		ids = self.findEventIdsAndWait([template])
344		self.assertEqual(list(map(int, ids)), [2, 3])
345
346	def testFindEventIdsForEventOrigin(self):
347		# Retrieve events for a particular actor
348		template = Event.new_for_values(origin='big bang')
349		ids = self.findEventIdsAndWait([template])
350		self.assertEqual(ids, [5, 3])
351
352		# Now let's try with wildcard and negation
353		template = Event.new_for_values(origin='!big *')
354		ids = self.findEventIdsAndWait([template])
355		self.assertEqual(list(map(int, ids)), [4, 2, 1])
356
357	def testFindEventIdsForSubjectInterpretation(self):
358		# Retrieve events for a particular subject interpretation
359		template = Event.new_for_values(subject_interpretation='stfu:Document')
360		ids = self.findEventIdsAndWait([template])
361		self.assertEqual(ids, [1])
362
363		# Retrieve events excluding a particular subject interpretation
364		template = Event.new_for_values(subject_interpretation='!stfu:Document')
365		ids = self.findEventIdsAndWait([template])
366		self.assertEqual(list(map(int, ids)), [5, 4, 2, 3])
367
368	def testFindEventIdsForSubjectManifestation(self):
369		# Retrieve events for a particular subject manifestation
370		template = Event.new_for_values(subject_manifestation='stfu:File')
371		ids = self.findEventIdsAndWait([template])
372		self.assertEqual(ids, [5, 4, 3, 1])
373
374		# Retrieve events excluding a particular subject interpretation
375		template = Event.new_for_values(subject_manifestation='!stfu:File')
376		ids = self.findEventIdsAndWait([template])
377		self.assertEqual(list(map(int, ids)), [2])
378
379	def testFindEventIdsForSubjectMimeType(self):
380		# Retrieve events for a particular mime-type
381		template = Event.new_for_values(subject_mimetype='text/plain')
382		ids = self.findEventIdsAndWait([template])
383		self.assertEqual(ids, [4, 2, 3])
384
385		# Now let's try with wildcard and negation
386		template = Event.new_for_values(subject_mimetype='!meat/*')
387		ids = self.findEventIdsAndWait([template])
388		self.assertEqual(list(map(int, ids)), [5, 4, 2, 3])
389
390	def testFindEventIdsForSubjectUri(self):
391		# Retrieve events for a particular URI
392		template = Event.new_for_values(subject_uri='file:///tmp/foo.txt')
393		ids = self.findEventIdsAndWait([template])
394		self.assertEqual(ids, [2, 3])
395
396		# Now let's try with wildcard...
397		template = Event.new_for_values(subject_uri='http://*')
398		ids = self.findEventIdsAndWait([template])
399		self.assertEqual(list(map(int, ids)), [1])
400
401		# ... and negation
402		template = Event.new_for_values(subject_uri='!file:///tmp/foo.txt')
403		ids = self.findEventIdsAndWait([template])
404		self.assertEqual(list(map(int, ids)), [5, 4, 1])
405
406	def testFindEventIdsForSubjectOrigin(self):
407		# Retrieve events for a particular origin
408		template = Event.new_for_values(subject_origin='file:///tmp')
409		ids = self.findEventIdsAndWait([template])
410		self.assertEqual(ids, [4, 2, 3])
411
412		# Now let's try with wildcard and negation
413		template = Event.new_for_values(subject_origin='!file:*')
414		ids = self.findEventIdsAndWait([template])
415		self.assertEqual(list(map(int, ids)), [5, 1])
416
417	def testFindEventIdsForSubjectText(self):
418		# Retrieve events with a particular text
419		template = Event.new_for_values(subject_text='this item *')
420		ids = self.findEventIdsAndWait([template])
421		self.assertEqual(ids, [4])
422
423	def testFindEventIdsForSubjectCurrentUri(self):
424		# Retrieve events for a particular current URI
425		template = Event.new_for_values(subject_current_uri='http://www.google.de')
426		ids = self.findEventIdsAndWait([template])
427		self.assertEqual(ids, [1])
428
429		# Now let's try with wildcard and negation
430		template = Event.new_for_values(subject_current_uri='!http:*')
431		ids = self.findEventIdsAndWait([template])
432		self.assertEqual(list(map(int, ids)), [5, 4, 2, 3])
433
434	def testFindEventIdsForSubjectCurrentOrigin(self):
435		# Retrieve events for a particular current origin
436		template = Event.new_for_values(subject_current_origin='file:///tmp')
437		ids = self.findEventIdsAndWait([template])
438		self.assertEqual(ids, [4, 2, 3])
439
440		# Now let's try with wildcard and negation
441		template = Event.new_for_values(subject_current_origin='!file:*')
442		ids = self.findEventIdsAndWait([template])
443		self.assertEqual(list(map(int, ids)), [5, 1])
444
445		# Now let's try with wildcard and negation
446		template = Event.new_for_values(subject_current_origin='!http:*')
447		ids = self.findEventIdsAndWait([template])
448		self.assertEqual(list(map(int, ids)), [4, 2, 3])
449
450	def testFindEventIdsForSubjectStorage(self):
451		# Retrieve events for a particular subject storage
452		template = Event.new_for_values(subject_storage=
453			'368c991f-8b59-4018-8130-3ce0ec944157')
454		ids = self.findEventIdsAndWait([template])
455		self.assertEqual(ids, [4, 2, 3, 1])
456
457	def testFindEventIdsWithStorageState(self):
458		"""
459		Test FindEventIds with different storage states.
460
461		Although currently there isn't much point in this test, since
462		all events have storage state set to NULL and so are always returned.
463		"""
464
465		# Retrieve events with storage state "any"
466		ids = self.findEventIdsAndWait([], storage_state=StorageState.Any)
467		self.assertEqual(ids, [5, 4, 2, 3, 1])
468
469		# Retrieve events with storage state "available"
470		ids = self.findEventIdsAndWait([], storage_state=StorageState.Available)
471		self.assertEqual(ids, [5, 4, 2, 3, 1])
472
473		# Retrieve events with storage state "not available"
474		ids = self.findEventIdsAndWait([],
475			storage_state=StorageState.NotAvailable)
476		self.assertEqual(ids, [5, 4, 2, 3, 1])
477
478	def testFindEventIdsWithUnknownStorageState(self):
479		"""
480		Events with storage state "unknown" should always be considered
481		as being available.
482		"""
483
484		event = parse_events("test/data/single_event.js")[0]
485		event.subjects[0].uri = 'file:///i-am-unknown'
486		event.subjects[0].storage = 'unknown'
487
488		self.insertEventsAndWait([event])
489
490		tmpl = Event.new_for_values(subject_uri='file:///i-am-unknown')
491		ids = self.findEventIdsAndWait([tmpl], storage_state=StorageState.Available)
492		self.assertEqual(ids, [6])
493
494class ZeitgeistRemoteInterfaceTest(testutils.RemoteTestCase):
495
496	def testQuit(self):
497		"""
498		Calling Quit() on the remote interface should shutdown the
499		engine in a clean way.
500		"""
501		try:
502			self.client._iface.Quit()
503		except DBusException as e:
504			# expect a silent remote disconnection
505			if e.get_dbus_name() != "org.freedesktop.DBus.Error.NoReply":
506				raise (e)
507		self.daemon.wait()
508		self.assertRaises(OSError, self.kill_daemon)
509		self.spawn_daemon()
510
511	def testSIGHUP(self):
512		"""
513		Sending a SIGHUP signal to a running deamon instance should result
514		in a clean shutdown.
515		"""
516		code = self.kill_daemon(signal.SIGHUP)
517		self.assertEqual(code, 0)
518		self.spawn_daemon()
519
520
521class ZeitgeistRemotePropertiesTest(testutils.RemoteTestCase):
522
523	def __init__(self, methodName):
524		super(ZeitgeistRemotePropertiesTest, self).__init__(methodName)
525
526	def testVersion(self):
527		self.assertTrue(len(self.client.get_version()) >= 2)
528
529
530if __name__ == "__main__":
531	testutils.run()
532
533# vim:noexpandtab:ts=4:sw=4
534