1 /*
2  * Copyright (C) 2010, Nokia <ivan.frade@nokia.com>
3  * Copyright (C) 2017, Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA  02110-1301, USA.
19  */
20 
21 #include "config.h"
22 
23 #include "tracker-direct.h"
24 #include "tracker-direct-batch.h"
25 #include "tracker-direct-statement.h"
26 #include "libtracker-sparql/tracker-private.h"
27 #include <libtracker-common/tracker-utils.h>
28 #include <libtracker-data/tracker-data.h>
29 #include <libtracker-data/tracker-sparql.h>
30 #include <libtracker-sparql/tracker-notifier-private.h>
31 #include <libtracker-sparql/tracker-private.h>
32 
33 typedef struct _TrackerDirectConnectionPrivate TrackerDirectConnectionPrivate;
34 
35 struct _TrackerDirectConnectionPrivate
36 {
37 	TrackerSparqlConnectionFlags flags;
38 	GFile *store;
39 	GFile *ontology;
40 
41 	TrackerNamespaceManager *namespace_manager;
42 	TrackerDataManager *data_manager;
43 	GMutex mutex;
44 
45 	GThreadPool *update_thread; /* Contains 1 exclusive thread */
46 	GThreadPool *select_pool;
47 
48 	GList *notifiers;
49 
50 	gint64 timestamp;
51 	gint64 cleanup_timestamp;
52 
53 	guint cleanup_timeout_id;
54 
55 	guint initialized : 1;
56 	guint closing     : 1;
57 };
58 
59 typedef struct {
60 	gchar *graph;
61 	TrackerResource *resource;
62 } UpdateResource;
63 
64 enum {
65 	PROP_0,
66 	PROP_FLAGS,
67 	PROP_STORE_LOCATION,
68 	PROP_ONTOLOGY_LOCATION,
69 	N_PROPS
70 };
71 
72 static GParamSpec *props[N_PROPS] = { NULL };
73 
74 typedef enum {
75 	TASK_TYPE_QUERY,
76 	TASK_TYPE_UPDATE,
77 	TASK_TYPE_UPDATE_BLANK,
78 	TASK_TYPE_UPDATE_RESOURCE,
79 	TASK_TYPE_UPDATE_BATCH,
80 	TASK_TYPE_RELEASE_MEMORY,
81 } TaskType;
82 
83 typedef struct {
84 	TaskType type;
85 	gpointer data;
86 	GDestroyNotify destroy;
87 } TaskData;
88 
89 static void tracker_direct_connection_initable_iface_init (GInitableIface *iface);
90 static void tracker_direct_connection_async_initable_iface_init (GAsyncInitableIface *iface);
91 
G_DEFINE_QUARK(TrackerDirectNotifier,tracker_direct_notifier)92 G_DEFINE_QUARK (TrackerDirectNotifier, tracker_direct_notifier)
93 
94 G_DEFINE_TYPE_WITH_CODE (TrackerDirectConnection, tracker_direct_connection,
95                          TRACKER_TYPE_SPARQL_CONNECTION,
96                          G_ADD_PRIVATE (TrackerDirectConnection)
97                          G_IMPLEMENT_INTERFACE (G_TYPE_INITABLE,
98                                                 tracker_direct_connection_initable_iface_init)
99                          G_IMPLEMENT_INTERFACE (G_TYPE_ASYNC_INITABLE,
100                                                 tracker_direct_connection_async_initable_iface_init))
101 
102 static TaskData *
103 task_data_query_new (TaskType       type,
104                      gpointer       data,
105                      GDestroyNotify destroy)
106 {
107 	TaskData *task;
108 
109 	task = g_new0 (TaskData, 1);
110 	task->type = type;
111 	task->data = data;
112 	task->destroy = destroy;
113 
114 	return task;
115 }
116 
117 static void
task_data_free(TaskData * task)118 task_data_free (TaskData *task)
119 {
120 	if (task->destroy && task->data)
121 		task->destroy (task->data);
122 	g_free (task);
123 }
124 
125 static gboolean
cleanup_timeout_cb(gpointer user_data)126 cleanup_timeout_cb (gpointer user_data)
127 {
128 	TrackerDirectConnection *conn = user_data;
129 	TrackerDirectConnectionPrivate *priv;
130 	gint64 timestamp;
131 	GTask *task;
132 
133 	priv = tracker_direct_connection_get_instance_private (conn);
134 	timestamp = g_get_monotonic_time ();
135 
136 	/* If we already cleaned up */
137 	if (priv->timestamp < priv->cleanup_timestamp)
138 		return G_SOURCE_CONTINUE;
139 	/* If the connection was used less than 10s ago */
140 	if (timestamp - priv->timestamp < 10 * G_USEC_PER_SEC)
141 		return G_SOURCE_CONTINUE;
142 
143 	priv->cleanup_timestamp = timestamp;
144 
145 	task = g_task_new (conn, NULL, NULL, NULL);
146 	g_task_set_task_data (task,
147 	                      task_data_query_new (TASK_TYPE_RELEASE_MEMORY, NULL, NULL),
148 	                      (GDestroyNotify) task_data_free);
149 
150 	g_thread_pool_push (priv->update_thread, task, NULL);
151 
152 	return G_SOURCE_CONTINUE;
153 }
154 
155 gboolean
update_resource(TrackerData * data,const gchar * graph,TrackerResource * resource,GError ** error)156 update_resource (TrackerData      *data,
157                  const gchar      *graph,
158                  TrackerResource  *resource,
159                  GError          **error)
160 {
161 	GError *inner_error = NULL;
162 
163 	tracker_data_begin_transaction (data, &inner_error);
164 	if (inner_error)
165 		goto error;
166 
167 	tracker_data_update_resource (data,
168 	                              graph,
169 	                              resource,
170 	                              NULL,
171 	                              &inner_error);
172 
173 	if (inner_error) {
174 		tracker_data_rollback_transaction (data);
175 		goto error;
176 	}
177 
178 	tracker_data_commit_transaction (data, &inner_error);
179 	if (inner_error)
180 		goto error;
181 
182 	return TRUE;
183 
184 error:
185 	g_propagate_error (error, inner_error);
186 	return FALSE;
187 }
188 
189 static void
update_thread_func(gpointer data,gpointer user_data)190 update_thread_func (gpointer data,
191                     gpointer user_data)
192 {
193 	TrackerDirectConnectionPrivate *priv;
194 	TrackerDirectConnection *conn;
195 	GTask *task = data;
196 	TaskData *task_data = g_task_get_task_data (task);
197 	TrackerData *tracker_data;
198 	GError *error = NULL;
199 	gpointer retval = NULL;
200 	GDestroyNotify destroy_notify = NULL;
201 	gboolean update_timestamp = TRUE;
202 
203 	conn = user_data;
204 	priv = tracker_direct_connection_get_instance_private (conn);
205 
206 	g_mutex_lock (&priv->mutex);
207 	tracker_data = tracker_data_manager_get_data (priv->data_manager);
208 
209 	switch (task_data->type) {
210 	case TASK_TYPE_QUERY:
211 		g_warning ("Queries don't go through this thread");
212 		break;
213 	case TASK_TYPE_UPDATE:
214 		tracker_data_update_sparql (tracker_data, task_data->data, &error);
215 		break;
216 	case TASK_TYPE_UPDATE_BLANK:
217 		retval = tracker_data_update_sparql_blank (tracker_data, task_data->data, &error);
218 		destroy_notify = (GDestroyNotify) g_variant_unref;
219 		break;
220 	case TASK_TYPE_UPDATE_RESOURCE: {
221 		UpdateResource *data = task_data->data;
222 		update_resource (tracker_data, data->graph, data->resource, &error);
223 		break;
224 	}
225 	case TASK_TYPE_UPDATE_BATCH:
226 		tracker_direct_batch_update (task_data->data, priv->data_manager, &error);
227 		break;
228 	case TASK_TYPE_RELEASE_MEMORY:
229 		tracker_data_manager_release_memory (priv->data_manager);
230 		update_timestamp = FALSE;
231 		break;
232 	}
233 
234 	if (error)
235 		g_task_return_error (task, error);
236 	else if (retval)
237 		g_task_return_pointer (task, retval, destroy_notify);
238 	else
239 		g_task_return_boolean (task, TRUE);
240 
241 	g_object_unref (task);
242 
243 	if (update_timestamp)
244 		tracker_direct_connection_update_timestamp (conn);
245 
246 	g_mutex_unlock (&priv->mutex);
247 }
248 
249 static void
query_thread_pool_func(gpointer data,gpointer user_data)250 query_thread_pool_func (gpointer data,
251                         gpointer user_data)
252 {
253 	TrackerDirectConnection *conn = user_data;
254 	TrackerDirectConnectionPrivate *priv;
255 	TrackerSparqlCursor *cursor;
256 	GTask *task = data;
257 	TaskData *task_data = g_task_get_task_data (task);
258 	GError *error = NULL;
259 
260 	g_assert (task_data->type == TASK_TYPE_QUERY);
261 
262 	priv = tracker_direct_connection_get_instance_private (conn);
263 
264 	if (priv->closing) {
265 		g_task_return_new_error (task,
266 		                         G_IO_ERROR,
267 		                         G_IO_ERROR_CONNECTION_CLOSED,
268 		                         "Connection is closed");
269 		g_object_unref (task);
270 		return;
271 	}
272 
273 	cursor = tracker_sparql_connection_query (TRACKER_SPARQL_CONNECTION (g_task_get_source_object (task)),
274 	                                          task_data->data,
275 	                                          g_task_get_cancellable (task),
276 	                                          &error);
277 	if (cursor)
278 		g_task_return_pointer (task, cursor, g_object_unref);
279 	else
280 		g_task_return_error (task, error);
281 
282 	g_object_unref (task);
283 }
284 
285 static gint
task_compare_func(GTask * a,GTask * b,gpointer user_data)286 task_compare_func (GTask    *a,
287                    GTask    *b,
288                    gpointer  user_data)
289 {
290 	return g_task_get_priority (b) - g_task_get_priority (a);
291 }
292 
293 static gboolean
set_up_thread_pools(TrackerDirectConnection * conn,GError ** error)294 set_up_thread_pools (TrackerDirectConnection  *conn,
295 		     GError                  **error)
296 {
297 	TrackerDirectConnectionPrivate *priv;
298 
299 	priv = tracker_direct_connection_get_instance_private (conn);
300 
301 	priv->select_pool = g_thread_pool_new (query_thread_pool_func,
302 	                                       conn, 16, FALSE, error);
303 	if (!priv->select_pool)
304 		return FALSE;
305 
306 	priv->update_thread = g_thread_pool_new (update_thread_func,
307 	                                         conn, 1, TRUE, error);
308 	if (!priv->update_thread)
309 		return FALSE;
310 
311 	g_thread_pool_set_sort_function (priv->select_pool,
312 	                                 (GCompareDataFunc) task_compare_func,
313 	                                 conn);
314 	g_thread_pool_set_sort_function (priv->update_thread,
315 	                                 (GCompareDataFunc) task_compare_func,
316 	                                 conn);
317 	return TRUE;
318 }
319 
320 static TrackerDBManagerFlags
translate_flags(TrackerSparqlConnectionFlags flags)321 translate_flags (TrackerSparqlConnectionFlags flags)
322 {
323 	TrackerDBManagerFlags db_flags = TRACKER_DB_MANAGER_ENABLE_MUTEXES;
324 
325 	if ((flags & TRACKER_SPARQL_CONNECTION_FLAGS_READONLY) != 0)
326 		db_flags |= TRACKER_DB_MANAGER_READONLY;
327 	if ((flags & TRACKER_SPARQL_CONNECTION_FLAGS_FTS_ENABLE_STEMMER) != 0)
328 		db_flags |= TRACKER_DB_MANAGER_FTS_ENABLE_STEMMER;
329 	if ((flags & TRACKER_SPARQL_CONNECTION_FLAGS_FTS_ENABLE_UNACCENT) != 0)
330 		db_flags |= TRACKER_DB_MANAGER_FTS_ENABLE_UNACCENT;
331 	if ((flags & TRACKER_SPARQL_CONNECTION_FLAGS_FTS_ENABLE_STOP_WORDS) != 0)
332 		db_flags |= TRACKER_DB_MANAGER_FTS_ENABLE_STOP_WORDS;
333 	if ((flags & TRACKER_SPARQL_CONNECTION_FLAGS_FTS_IGNORE_NUMBERS) != 0)
334 		db_flags |= TRACKER_DB_MANAGER_FTS_IGNORE_NUMBERS;
335 
336 	return db_flags;
337 }
338 
339 static gboolean
tracker_direct_connection_initable_init(GInitable * initable,GCancellable * cancellable,GError ** error)340 tracker_direct_connection_initable_init (GInitable     *initable,
341                                          GCancellable  *cancellable,
342                                          GError       **error)
343 {
344 	TrackerDirectConnectionPrivate *priv;
345 	TrackerDirectConnection *conn;
346 	TrackerDBManagerFlags db_flags;
347 	GHashTable *namespaces;
348 	GHashTableIter iter;
349 	gchar *prefix, *ns;
350 	GError *inner_error = NULL;
351 
352 	conn = TRACKER_DIRECT_CONNECTION (initable);
353 	priv = tracker_direct_connection_get_instance_private (conn);
354 
355 	tracker_locale_sanity_check ();
356 
357 	if (!set_up_thread_pools (conn, error))
358 		return FALSE;
359 
360 	db_flags = translate_flags (priv->flags);
361 
362 	if (!priv->store) {
363 		db_flags |= TRACKER_DB_MANAGER_IN_MEMORY;
364 	}
365 
366 	priv->data_manager = tracker_data_manager_new (db_flags, priv->store,
367 	                                               priv->ontology,
368 	                                               100, 100);
369 	if (!g_initable_init (G_INITABLE (priv->data_manager), cancellable, &inner_error)) {
370 		g_propagate_error (error, _translate_internal_error (inner_error));
371 		g_clear_object (&priv->data_manager);
372 		return FALSE;
373 	}
374 
375 	/* Initialize namespace manager */
376 	priv->namespace_manager = tracker_namespace_manager_new ();
377 	namespaces = tracker_data_manager_get_namespaces (priv->data_manager);
378 	g_hash_table_iter_init (&iter, namespaces);
379 
380 	while (g_hash_table_iter_next (&iter, (gpointer*) &prefix, (gpointer*) &ns)) {
381 		tracker_namespace_manager_add_prefix (priv->namespace_manager,
382 		                                      prefix, ns);
383 	}
384 
385 	g_hash_table_unref (namespaces);
386 
387 	priv->cleanup_timeout_id =
388 		g_timeout_add_seconds (30, cleanup_timeout_cb, conn);
389 
390 	return TRUE;
391 }
392 
393 static void
tracker_direct_connection_initable_iface_init(GInitableIface * iface)394 tracker_direct_connection_initable_iface_init (GInitableIface *iface)
395 {
396 	iface->init = tracker_direct_connection_initable_init;
397 }
398 
399 static void
async_initable_thread_func(GTask * task,gpointer source_object,gpointer task_data,GCancellable * cancellable)400 async_initable_thread_func (GTask        *task,
401                             gpointer      source_object,
402                             gpointer      task_data,
403                             GCancellable *cancellable)
404 {
405 	GError *error = NULL;
406 
407 	if (!g_initable_init (G_INITABLE (source_object), cancellable, &error))
408 		g_task_return_error (task, error);
409 	else
410 		g_task_return_boolean (task, TRUE);
411 
412 	g_object_unref (task);
413 }
414 
415 static void
tracker_direct_connection_async_initable_init_async(GAsyncInitable * async_initable,gint priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)416 tracker_direct_connection_async_initable_init_async (GAsyncInitable      *async_initable,
417                                                      gint                 priority,
418                                                      GCancellable        *cancellable,
419                                                      GAsyncReadyCallback  callback,
420                                                      gpointer             user_data)
421 {
422 	GTask *task;
423 
424 	task = g_task_new (async_initable, cancellable, callback, user_data);
425 	g_task_set_priority (task, priority);
426 	g_task_run_in_thread (task, async_initable_thread_func);
427 }
428 
429 static gboolean
tracker_direct_connection_async_initable_init_finish(GAsyncInitable * async_initable,GAsyncResult * res,GError ** error)430 tracker_direct_connection_async_initable_init_finish (GAsyncInitable  *async_initable,
431                                                       GAsyncResult    *res,
432                                                       GError         **error)
433 {
434 	return g_task_propagate_boolean (G_TASK (res), error);
435 }
436 
437 static void
tracker_direct_connection_async_initable_iface_init(GAsyncInitableIface * iface)438 tracker_direct_connection_async_initable_iface_init (GAsyncInitableIface *iface)
439 {
440 	iface->init_async = tracker_direct_connection_async_initable_init_async;
441 	iface->init_finish = tracker_direct_connection_async_initable_init_finish;
442 }
443 
444 static void
tracker_direct_connection_init(TrackerDirectConnection * conn)445 tracker_direct_connection_init (TrackerDirectConnection *conn)
446 {
447 }
448 
449 static GHashTable *
get_event_cache_ht(TrackerNotifier * notifier)450 get_event_cache_ht (TrackerNotifier *notifier)
451 {
452 	GHashTable *events;
453 
454 	events = g_object_get_qdata (G_OBJECT (notifier), tracker_direct_notifier_quark ());
455 	if (!events) {
456 		events = g_hash_table_new_full (NULL, NULL, NULL,
457 		                                (GDestroyNotify) _tracker_notifier_event_cache_free);
458 		g_object_set_qdata_full (G_OBJECT (notifier), tracker_direct_notifier_quark (),
459 		                         events, (GDestroyNotify) g_hash_table_unref);
460 	}
461 
462 	return events;
463 }
464 
465 static TrackerNotifierEventCache *
lookup_event_cache(TrackerNotifier * notifier,gint graph_id,const gchar * graph)466 lookup_event_cache (TrackerNotifier *notifier,
467                     gint             graph_id,
468                     const gchar     *graph)
469 {
470 	TrackerNotifierEventCache *cache;
471 	GHashTable *events;
472 
473 	events = get_event_cache_ht (notifier);
474 	cache = g_hash_table_lookup (events, GINT_TO_POINTER (graph_id));
475 
476 	if (!cache) {
477 		cache = _tracker_notifier_event_cache_new (notifier, graph);
478 		g_hash_table_insert (events, GINT_TO_POINTER (graph_id), cache);
479 	}
480 
481 	return cache;
482 }
483 
484 /* These callbacks will be called from a different thread
485  * (always the same one though), handle with care.
486  */
487 static void
insert_statement_cb(gint graph_id,const gchar * graph,gint subject_id,const gchar * subject,gint predicate_id,gint object_id,const gchar * object,GPtrArray * rdf_types,gpointer user_data)488 insert_statement_cb (gint         graph_id,
489                      const gchar *graph,
490                      gint         subject_id,
491                      const gchar *subject,
492                      gint         predicate_id,
493                      gint         object_id,
494                      const gchar *object,
495                      GPtrArray   *rdf_types,
496                      gpointer     user_data)
497 {
498 	TrackerNotifier *notifier = user_data;
499 	TrackerSparqlConnection *conn = _tracker_notifier_get_connection (notifier);
500 	TrackerDirectConnection *direct = TRACKER_DIRECT_CONNECTION (conn);
501 	TrackerDirectConnectionPrivate *priv = tracker_direct_connection_get_instance_private (direct);
502 	TrackerOntologies *ontologies = tracker_data_manager_get_ontologies (priv->data_manager);
503 	TrackerProperty *rdf_type = tracker_ontologies_get_rdf_type (ontologies);
504 	TrackerNotifierEventCache *cache;
505 	TrackerClass *new_class = NULL;
506 	gint i;
507 
508 	cache = lookup_event_cache (notifier, graph_id, graph);
509 
510 	if (predicate_id == tracker_property_get_id (rdf_type)) {
511 		const gchar *uri;
512 
513 		uri = tracker_ontologies_get_uri_by_id (ontologies, object_id);
514 		new_class = tracker_ontologies_get_class_by_uri (ontologies, uri);
515 	}
516 
517 	for (i = 0; i < rdf_types->len; i++) {
518 		TrackerClass *class = g_ptr_array_index (rdf_types, i);
519 		TrackerNotifierEventType event_type;
520 
521 		if (!tracker_class_get_notify (class))
522 			continue;
523 
524 		if (class == new_class)
525 			event_type = TRACKER_NOTIFIER_EVENT_CREATE;
526 		else
527 			event_type = TRACKER_NOTIFIER_EVENT_UPDATE;
528 
529 		_tracker_notifier_event_cache_push_event (cache, subject_id, event_type);
530 	}
531 }
532 
533 static void
delete_statement_cb(gint graph_id,const gchar * graph,gint subject_id,const gchar * subject,gint predicate_id,gint object_id,const gchar * object,GPtrArray * rdf_types,gpointer user_data)534 delete_statement_cb (gint         graph_id,
535                      const gchar *graph,
536                      gint         subject_id,
537                      const gchar *subject,
538                      gint         predicate_id,
539                      gint         object_id,
540                      const gchar *object,
541                      GPtrArray   *rdf_types,
542                      gpointer     user_data)
543 {
544 	TrackerNotifier *notifier = user_data;
545 	TrackerSparqlConnection *conn = _tracker_notifier_get_connection (notifier);
546 	TrackerDirectConnection *direct = TRACKER_DIRECT_CONNECTION (conn);
547 	TrackerDirectConnectionPrivate *priv = tracker_direct_connection_get_instance_private (direct);
548 	TrackerOntologies *ontologies = tracker_data_manager_get_ontologies (priv->data_manager);
549 	TrackerProperty *rdf_type = tracker_ontologies_get_rdf_type (ontologies);
550 	TrackerNotifierEventCache *cache;
551 	TrackerClass *class_being_removed = NULL;
552 	gint i;
553 
554 	cache = lookup_event_cache (notifier, graph_id, graph);
555 
556 	if (predicate_id == tracker_property_get_id (rdf_type)) {
557 		class_being_removed = tracker_ontologies_get_class_by_uri (ontologies, object);
558 	}
559 
560 	for (i = 0; i < rdf_types->len; i++) {
561 		TrackerClass *class = g_ptr_array_index (rdf_types, i);
562 		TrackerNotifierEventType event_type;
563 
564 		if (!tracker_class_get_notify (class))
565 			continue;
566 
567 		if (class_being_removed && class == class_being_removed) {
568 			event_type = TRACKER_NOTIFIER_EVENT_DELETE;
569 		} else {
570 			event_type = TRACKER_NOTIFIER_EVENT_UPDATE;
571 		}
572 
573 		_tracker_notifier_event_cache_push_event (cache, subject_id, event_type);
574 	}
575 }
576 
577 static void
commit_statement_cb(gpointer user_data)578 commit_statement_cb (gpointer user_data)
579 {
580 	TrackerNotifierEventCache *cache;
581 	TrackerNotifier *notifier = user_data;
582 	GHashTable *events;
583 	GHashTableIter iter;
584 
585 	events = get_event_cache_ht (notifier);
586 	g_hash_table_iter_init (&iter, events);
587 
588 	while (g_hash_table_iter_next (&iter, NULL, (gpointer *) &cache)) {
589 		g_hash_table_iter_steal (&iter);
590 		_tracker_notifier_event_cache_flush_events (cache);
591 	}
592 }
593 
594 static void
rollback_statement_cb(gpointer user_data)595 rollback_statement_cb (gpointer user_data)
596 {
597 	TrackerNotifier *notifier = user_data;
598 	GHashTable *events;
599 
600 	events = get_event_cache_ht (notifier);
601 	g_hash_table_remove_all (events);
602 }
603 
604 static void
detach_notifier(TrackerDirectConnection * conn,TrackerNotifier * notifier)605 detach_notifier (TrackerDirectConnection *conn,
606                  TrackerNotifier         *notifier)
607 {
608 	TrackerDirectConnectionPrivate *priv;
609 	TrackerData *tracker_data;
610 
611 	priv = tracker_direct_connection_get_instance_private (conn);
612 
613 	priv->notifiers = g_list_remove (priv->notifiers, notifier);
614 
615 	tracker_data = tracker_data_manager_get_data (priv->data_manager);
616 	tracker_data_remove_insert_statement_callback (tracker_data,
617 	                                               insert_statement_cb,
618 	                                               notifier);
619 	tracker_data_remove_delete_statement_callback (tracker_data,
620 	                                               delete_statement_cb,
621 	                                               notifier);
622 	tracker_data_remove_commit_statement_callback (tracker_data,
623 	                                               commit_statement_cb,
624 	                                               notifier);
625 	tracker_data_remove_rollback_statement_callback (tracker_data,
626 	                                                 rollback_statement_cb,
627 	                                                 notifier);
628 }
629 
630 static void
weak_ref_notify(gpointer data,GObject * prev_location)631 weak_ref_notify (gpointer  data,
632                  GObject  *prev_location)
633 {
634 	TrackerDirectConnection *conn = data;
635 
636 	detach_notifier (conn, (TrackerNotifier *) prev_location);
637 }
638 
639 static void
tracker_direct_connection_finalize(GObject * object)640 tracker_direct_connection_finalize (GObject *object)
641 {
642 	TrackerDirectConnectionPrivate *priv;
643 	TrackerDirectConnection *conn;
644 
645 	conn = TRACKER_DIRECT_CONNECTION (object);
646 	priv = tracker_direct_connection_get_instance_private (conn);
647 
648 	if (!priv->closing)
649 		tracker_sparql_connection_close (TRACKER_SPARQL_CONNECTION (object));
650 
651 	g_clear_object (&priv->store);
652 	g_clear_object (&priv->ontology);
653 	g_clear_object (&priv->namespace_manager);
654 
655 	G_OBJECT_CLASS (tracker_direct_connection_parent_class)->finalize (object);
656 }
657 
658 static void
tracker_direct_connection_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)659 tracker_direct_connection_set_property (GObject      *object,
660                                         guint         prop_id,
661                                         const GValue *value,
662                                         GParamSpec   *pspec)
663 {
664 	TrackerDirectConnectionPrivate *priv;
665 	TrackerDirectConnection *conn;
666 
667 	conn = TRACKER_DIRECT_CONNECTION (object);
668 	priv = tracker_direct_connection_get_instance_private (conn);
669 
670 	switch (prop_id) {
671 	case PROP_FLAGS:
672 		priv->flags = g_value_get_flags (value);
673 		break;
674 	case PROP_STORE_LOCATION:
675 		priv->store = g_value_dup_object (value);
676 		break;
677 	case PROP_ONTOLOGY_LOCATION:
678 		priv->ontology = g_value_dup_object (value);
679 		break;
680 	default:
681 		G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
682 		break;
683 	}
684 }
685 
686 static void
tracker_direct_connection_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)687 tracker_direct_connection_get_property (GObject    *object,
688                                         guint       prop_id,
689                                         GValue     *value,
690                                         GParamSpec *pspec)
691 {
692 	TrackerDirectConnectionPrivate *priv;
693 	TrackerDirectConnection *conn;
694 
695 	conn = TRACKER_DIRECT_CONNECTION (object);
696 	priv = tracker_direct_connection_get_instance_private (conn);
697 
698 	switch (prop_id) {
699 	case PROP_FLAGS:
700 		g_value_set_flags (value, priv->flags);
701 		break;
702 	case PROP_STORE_LOCATION:
703 		g_value_set_object (value, priv->store);
704 		break;
705 	case PROP_ONTOLOGY_LOCATION:
706 		g_value_set_object (value, priv->ontology);
707 		break;
708 	default:
709 		G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
710 		break;
711 	}
712 }
713 
714 static TrackerSparqlCursor *
tracker_direct_connection_query(TrackerSparqlConnection * self,const gchar * sparql,GCancellable * cancellable,GError ** error)715 tracker_direct_connection_query (TrackerSparqlConnection  *self,
716                                  const gchar              *sparql,
717                                  GCancellable             *cancellable,
718                                  GError                  **error)
719 {
720 	TrackerDirectConnectionPrivate *priv;
721 	TrackerDirectConnection *conn;
722 	TrackerSparql *query;
723 	TrackerSparqlCursor *cursor;
724 	GError *inner_error = NULL;
725 
726 	conn = TRACKER_DIRECT_CONNECTION (self);
727 	priv = tracker_direct_connection_get_instance_private (conn);
728 
729 	g_mutex_lock (&priv->mutex);
730 	query = tracker_sparql_new (priv->data_manager, sparql);
731 	cursor = tracker_sparql_execute_cursor (query, NULL, &inner_error);
732 	tracker_direct_connection_update_timestamp (conn);
733 	g_object_unref (query);
734 
735 	if (cursor)
736 		tracker_sparql_cursor_set_connection (cursor, self);
737 	g_mutex_unlock (&priv->mutex);
738 
739 	if (inner_error)
740 		g_propagate_error (error, _translate_internal_error (inner_error));
741 
742 	return cursor;
743 }
744 
745 static void
tracker_direct_connection_query_async(TrackerSparqlConnection * self,const gchar * sparql,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)746 tracker_direct_connection_query_async (TrackerSparqlConnection *self,
747                                        const gchar             *sparql,
748                                        GCancellable            *cancellable,
749                                        GAsyncReadyCallback      callback,
750                                        gpointer                 user_data)
751 {
752 	TrackerDirectConnectionPrivate *priv;
753 	TrackerDirectConnection *conn;
754 	GError *error = NULL;
755 	GTask *task;
756 
757 	conn = TRACKER_DIRECT_CONNECTION (self);
758 	priv = tracker_direct_connection_get_instance_private (conn);
759 
760 	task = g_task_new (self, cancellable, callback, user_data);
761 	g_task_set_task_data (task,
762 	                      task_data_query_new (TASK_TYPE_QUERY,
763 	                                           g_strdup (sparql),
764 	                                           g_free),
765 	                      (GDestroyNotify) task_data_free);
766 
767 	if (!g_thread_pool_push (priv->select_pool, task, &error)) {
768 		g_task_return_error (task, _translate_internal_error (error));
769 		g_object_unref (task);
770 	}
771 }
772 
773 static TrackerSparqlCursor *
tracker_direct_connection_query_finish(TrackerSparqlConnection * self,GAsyncResult * res,GError ** error)774 tracker_direct_connection_query_finish (TrackerSparqlConnection  *self,
775                                         GAsyncResult             *res,
776                                         GError                  **error)
777 {
778 	return g_task_propagate_pointer (G_TASK (res), error);
779 }
780 
781 static TrackerSparqlStatement *
tracker_direct_connection_query_statement(TrackerSparqlConnection * self,const gchar * query,GCancellable * cancellable,GError ** error)782 tracker_direct_connection_query_statement (TrackerSparqlConnection  *self,
783                                            const gchar              *query,
784                                            GCancellable             *cancellable,
785                                            GError                  **error)
786 {
787 	return TRACKER_SPARQL_STATEMENT (tracker_direct_statement_new (self, query, error));
788 }
789 
790 static void
tracker_direct_connection_update(TrackerSparqlConnection * self,const gchar * sparql,GCancellable * cancellable,GError ** error)791 tracker_direct_connection_update (TrackerSparqlConnection  *self,
792                                   const gchar              *sparql,
793                                   GCancellable             *cancellable,
794                                   GError                  **error)
795 {
796 	TrackerDirectConnectionPrivate *priv;
797 	TrackerDirectConnection *conn;
798 	TrackerData *data;
799 	GError *inner_error = NULL;
800 
801 	conn = TRACKER_DIRECT_CONNECTION (self);
802 	priv = tracker_direct_connection_get_instance_private (conn);
803 
804 	g_mutex_lock (&priv->mutex);
805 	data = tracker_data_manager_get_data (priv->data_manager);
806 	tracker_data_update_sparql (data, sparql, &inner_error);
807 	tracker_direct_connection_update_timestamp (conn);
808 	g_mutex_unlock (&priv->mutex);
809 
810 	if (inner_error)
811 		g_propagate_error (error, inner_error);
812 }
813 
814 static void
tracker_direct_connection_update_async(TrackerSparqlConnection * self,const gchar * sparql,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)815 tracker_direct_connection_update_async (TrackerSparqlConnection *self,
816                                         const gchar             *sparql,
817                                         GCancellable            *cancellable,
818                                         GAsyncReadyCallback      callback,
819                                         gpointer                 user_data)
820 {
821 	TrackerDirectConnectionPrivate *priv;
822 	TrackerDirectConnection *conn;
823 	GTask *task;
824 
825 	conn = TRACKER_DIRECT_CONNECTION (self);
826 	priv = tracker_direct_connection_get_instance_private (conn);
827 
828 	task = g_task_new (self, cancellable, callback, user_data);
829 	g_task_set_task_data (task,
830 	                      task_data_query_new (TASK_TYPE_UPDATE,
831 	                                           g_strdup (sparql),
832 	                                           g_free),
833 	                      (GDestroyNotify) task_data_free);
834 
835 	g_thread_pool_push (priv->update_thread, task, NULL);
836 }
837 
838 static void
tracker_direct_connection_update_finish(TrackerSparqlConnection * self,GAsyncResult * res,GError ** error)839 tracker_direct_connection_update_finish (TrackerSparqlConnection  *self,
840                                          GAsyncResult             *res,
841                                          GError                  **error)
842 {
843 	GError *inner_error = NULL;
844 
845 	g_task_propagate_boolean (G_TASK (res), &inner_error);
846 	if (inner_error)
847 		g_propagate_error (error, _translate_internal_error (inner_error));
848 }
849 
850 static void
on_batch_finished(GObject * source,GAsyncResult * result,gpointer user_data)851 on_batch_finished (GObject      *source,
852                    GAsyncResult *result,
853                    gpointer      user_data)
854 {
855 	TrackerBatch *batch = TRACKER_BATCH (source);
856 	GTask *task = user_data;
857 	GError *error = NULL;
858 	gboolean retval;
859 
860 	retval = tracker_batch_execute_finish (batch, result, &error);
861 
862 	if (retval)
863 		g_task_return_boolean (task, TRUE);
864 	else
865 		g_task_return_error (task, error);
866 
867 	g_object_unref (task);
868 }
869 
870 static void
tracker_direct_connection_update_array_async(TrackerSparqlConnection * self,gchar ** updates,gint n_updates,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)871 tracker_direct_connection_update_array_async (TrackerSparqlConnection  *self,
872                                               gchar                   **updates,
873                                               gint                      n_updates,
874                                               GCancellable             *cancellable,
875                                               GAsyncReadyCallback       callback,
876                                               gpointer                  user_data)
877 {
878 	TrackerBatch *batch;
879 	GTask *task;
880 	gint i;
881 
882 	batch = tracker_sparql_connection_create_batch (self);
883 
884 	for (i = 0; i < n_updates; i++)
885 		tracker_batch_add_sparql (batch, updates[i]);
886 
887 	task = g_task_new (self, cancellable, callback, user_data);
888 	tracker_batch_execute_async (batch, cancellable, on_batch_finished, task);
889 	g_object_unref (batch);
890 }
891 
892 static gboolean
tracker_direct_connection_update_array_finish(TrackerSparqlConnection * self,GAsyncResult * res,GError ** error)893 tracker_direct_connection_update_array_finish (TrackerSparqlConnection  *self,
894                                                GAsyncResult             *res,
895                                                GError                  **error)
896 {
897 	GError *inner_error = NULL;
898 	gboolean result;
899 
900 	result = g_task_propagate_boolean (G_TASK (res), &inner_error);
901 	if (inner_error)
902 		g_propagate_error (error, _translate_internal_error (inner_error));
903 
904 	return result;
905 }
906 
907 static GVariant *
tracker_direct_connection_update_blank(TrackerSparqlConnection * self,const gchar * sparql,GCancellable * cancellable,GError ** error)908 tracker_direct_connection_update_blank (TrackerSparqlConnection  *self,
909                                         const gchar              *sparql,
910                                         GCancellable             *cancellable,
911                                         GError                  **error)
912 {
913 	TrackerDirectConnectionPrivate *priv;
914 	TrackerDirectConnection *conn;
915 	TrackerData *data;
916 	GVariant *blank_nodes;
917 	GError *inner_error = NULL;
918 
919 	conn = TRACKER_DIRECT_CONNECTION (self);
920 	priv = tracker_direct_connection_get_instance_private (conn);
921 
922 	g_mutex_lock (&priv->mutex);
923 	data = tracker_data_manager_get_data (priv->data_manager);
924 	blank_nodes = tracker_data_update_sparql_blank (data, sparql, &inner_error);
925 	tracker_direct_connection_update_timestamp (conn);
926 	g_mutex_unlock (&priv->mutex);
927 
928 	if (inner_error)
929 		g_propagate_error (error, _translate_internal_error (inner_error));
930 	return blank_nodes;
931 }
932 
933 static void
tracker_direct_connection_update_blank_async(TrackerSparqlConnection * self,const gchar * sparql,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)934 tracker_direct_connection_update_blank_async (TrackerSparqlConnection *self,
935                                               const gchar             *sparql,
936                                               GCancellable            *cancellable,
937                                               GAsyncReadyCallback      callback,
938                                               gpointer                 user_data)
939 {
940 	TrackerDirectConnectionPrivate *priv;
941 	TrackerDirectConnection *conn;
942 	GTask *task;
943 
944 	conn = TRACKER_DIRECT_CONNECTION (self);
945 	priv = tracker_direct_connection_get_instance_private (conn);
946 
947 	task = g_task_new (self, cancellable, callback, user_data);
948 	g_task_set_task_data (task,
949 	                      task_data_query_new (TASK_TYPE_UPDATE_BLANK,
950 	                                           g_strdup (sparql),
951 	                                           g_free),
952 	                      (GDestroyNotify) task_data_free);
953 
954 	g_thread_pool_push (priv->update_thread, task, NULL);
955 }
956 
957 static GVariant *
tracker_direct_connection_update_blank_finish(TrackerSparqlConnection * self,GAsyncResult * res,GError ** error)958 tracker_direct_connection_update_blank_finish (TrackerSparqlConnection  *self,
959                                                GAsyncResult             *res,
960                                                GError                  **error)
961 {
962 	GError *inner_error = NULL;
963 	GVariant *result;
964 
965 	result = g_task_propagate_pointer (G_TASK (res), &inner_error);
966 	if (inner_error)
967 		g_propagate_error (error, _translate_internal_error (inner_error));
968 
969 	return result;
970 }
971 
972 static TrackerNamespaceManager *
tracker_direct_connection_get_namespace_manager(TrackerSparqlConnection * self)973 tracker_direct_connection_get_namespace_manager (TrackerSparqlConnection *self)
974 {
975 	TrackerDirectConnectionPrivate *priv;
976 
977 	priv = tracker_direct_connection_get_instance_private (TRACKER_DIRECT_CONNECTION (self));
978 
979 	return priv->namespace_manager;
980 }
981 
982 static TrackerNotifier *
tracker_direct_connection_create_notifier(TrackerSparqlConnection * self)983 tracker_direct_connection_create_notifier (TrackerSparqlConnection *self)
984 {
985 	TrackerDirectConnectionPrivate *priv;
986 	TrackerNotifier *notifier;
987 	TrackerData *tracker_data;
988 
989 	priv = tracker_direct_connection_get_instance_private (TRACKER_DIRECT_CONNECTION (self));
990 
991 	notifier = g_object_new (TRACKER_TYPE_NOTIFIER,
992 	                         "connection", self,
993 				 NULL);
994 
995 	tracker_data = tracker_data_manager_get_data (priv->data_manager);
996 	tracker_data_add_insert_statement_callback (tracker_data,
997 						    insert_statement_cb,
998 						    notifier);
999 	tracker_data_add_delete_statement_callback (tracker_data,
1000 						    delete_statement_cb,
1001 						    notifier);
1002 	tracker_data_add_commit_statement_callback (tracker_data,
1003 						    commit_statement_cb,
1004 						    notifier);
1005 	tracker_data_add_rollback_statement_callback (tracker_data,
1006 						      rollback_statement_cb,
1007 						      notifier);
1008 
1009 	g_object_weak_ref (G_OBJECT (notifier), weak_ref_notify, self);
1010 	priv->notifiers = g_list_prepend (priv->notifiers, notifier);
1011 
1012 	return notifier;
1013 }
1014 
1015 static void
tracker_direct_connection_close(TrackerSparqlConnection * self)1016 tracker_direct_connection_close (TrackerSparqlConnection *self)
1017 {
1018 	TrackerDirectConnectionPrivate *priv;
1019 	TrackerDirectConnection *conn;
1020 
1021 	conn = TRACKER_DIRECT_CONNECTION (self);
1022 	priv = tracker_direct_connection_get_instance_private (conn);
1023 	priv->closing = TRUE;
1024 
1025 	if (priv->cleanup_timeout_id) {
1026 		g_source_remove (priv->cleanup_timeout_id);
1027 		priv->cleanup_timeout_id = 0;
1028 	}
1029 
1030 	if (priv->update_thread) {
1031 		g_thread_pool_free (priv->update_thread, TRUE, TRUE);
1032 		priv->update_thread = NULL;
1033 	}
1034 
1035 	if (priv->select_pool) {
1036 		g_thread_pool_free (priv->select_pool, TRUE, TRUE);
1037 		priv->select_pool = NULL;
1038 	}
1039 
1040 	while (priv->notifiers) {
1041 		TrackerNotifier *notifier = priv->notifiers->data;
1042 
1043 		g_object_weak_unref (G_OBJECT (notifier),
1044 		                     weak_ref_notify,
1045 		                     conn);
1046 		detach_notifier (conn, notifier);
1047 	}
1048 
1049 	if (priv->data_manager) {
1050 		tracker_data_manager_shutdown (priv->data_manager);
1051 		g_clear_object (&priv->data_manager);
1052 	}
1053 }
1054 
1055 static void
async_close_thread_func(GTask * task,gpointer source_object,gpointer task_data,GCancellable * cancellable)1056 async_close_thread_func (GTask        *task,
1057                          gpointer      source_object,
1058                          gpointer      task_data,
1059                          GCancellable *cancellable)
1060 {
1061 	if (g_task_return_error_if_cancelled (task))
1062 		return;
1063 
1064 	tracker_sparql_connection_close (source_object);
1065 	g_task_return_boolean (task, TRUE);
1066 }
1067 
1068 static void
tracker_direct_connection_close_async(TrackerSparqlConnection * connection,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1069 tracker_direct_connection_close_async (TrackerSparqlConnection *connection,
1070                                        GCancellable            *cancellable,
1071                                        GAsyncReadyCallback      callback,
1072                                        gpointer                 user_data)
1073 {
1074 	GTask *task;
1075 
1076 	task = g_task_new (connection, cancellable, callback, user_data);
1077 	g_task_run_in_thread (task, async_close_thread_func);
1078 	g_object_unref (task);
1079 }
1080 
1081 static gboolean
tracker_direct_connection_close_finish(TrackerSparqlConnection * connection,GAsyncResult * res,GError ** error)1082 tracker_direct_connection_close_finish (TrackerSparqlConnection  *connection,
1083                                         GAsyncResult             *res,
1084                                         GError                  **error)
1085 {
1086 	return g_task_propagate_boolean (G_TASK (res), error);
1087 }
1088 
1089 static UpdateResource *
update_resource_data_new(const gchar * graph,TrackerResource * resource)1090 update_resource_data_new (const gchar     *graph,
1091                           TrackerResource *resource)
1092 {
1093 	UpdateResource *data;
1094 
1095 	data = g_new0 (UpdateResource, 1);
1096 	data->graph = g_strdup (graph);
1097 	data->resource = g_object_ref (resource);
1098 
1099 	return data;
1100 }
1101 
1102 static void
update_resource_data_free(UpdateResource * data)1103 update_resource_data_free (UpdateResource *data)
1104 {
1105 	g_free (data->graph);
1106 	g_object_unref (data->resource);
1107 	g_free (data);
1108 }
1109 
1110 static gboolean
tracker_direct_connection_update_resource(TrackerSparqlConnection * self,const gchar * graph,TrackerResource * resource,GCancellable * cancellable,GError ** error)1111 tracker_direct_connection_update_resource (TrackerSparqlConnection  *self,
1112                                            const gchar              *graph,
1113                                            TrackerResource          *resource,
1114                                            GCancellable             *cancellable,
1115                                            GError                  **error)
1116 {
1117 	TrackerDirectConnectionPrivate *priv;
1118 	TrackerDirectConnection *conn;
1119 	TrackerData *data;
1120 	GError *inner_error = NULL;
1121 
1122 	conn = TRACKER_DIRECT_CONNECTION (self);
1123 	priv = tracker_direct_connection_get_instance_private (conn);
1124 
1125 	g_mutex_lock (&priv->mutex);
1126 	data = tracker_data_manager_get_data (priv->data_manager);
1127 	update_resource (data, graph, resource, &inner_error);
1128 	tracker_direct_connection_update_timestamp (conn);
1129 	g_mutex_unlock (&priv->mutex);
1130 
1131 	if (inner_error) {
1132 		g_propagate_error (error, inner_error);
1133 		return FALSE;
1134 	}
1135 
1136 	return TRUE;
1137 }
1138 
1139 static void
tracker_direct_connection_update_resource_async(TrackerSparqlConnection * self,const gchar * graph,TrackerResource * resource,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1140 tracker_direct_connection_update_resource_async (TrackerSparqlConnection *self,
1141                                                  const gchar             *graph,
1142                                                  TrackerResource         *resource,
1143                                                  GCancellable            *cancellable,
1144                                                  GAsyncReadyCallback      callback,
1145                                                  gpointer                 user_data)
1146 {
1147 	TrackerDirectConnectionPrivate *priv;
1148 	TrackerDirectConnection *conn;
1149 	TaskData *task_data;
1150 	GTask *task;
1151 
1152 	conn = TRACKER_DIRECT_CONNECTION (self);
1153 	priv = tracker_direct_connection_get_instance_private (conn);
1154 
1155 	task_data = task_data_query_new (TASK_TYPE_UPDATE_RESOURCE,
1156 	                                 update_resource_data_new (graph, resource),
1157 	                                 (GDestroyNotify) update_resource_data_free);
1158 
1159 	task = g_task_new (self, cancellable, callback, user_data);
1160 	g_task_set_task_data (task, task_data,
1161 	                      (GDestroyNotify) task_data_free);
1162 
1163 	g_thread_pool_push (priv->update_thread, task, NULL);
1164 }
1165 
1166 static gboolean
tracker_direct_connection_update_resource_finish(TrackerSparqlConnection * connection,GAsyncResult * res,GError ** error)1167 tracker_direct_connection_update_resource_finish (TrackerSparqlConnection  *connection,
1168                                                   GAsyncResult             *res,
1169                                                   GError                  **error)
1170 {
1171 	return g_task_propagate_boolean (G_TASK (res), error);
1172 }
1173 
1174 static TrackerBatch *
tracker_direct_connection_create_batch(TrackerSparqlConnection * connection)1175 tracker_direct_connection_create_batch (TrackerSparqlConnection *connection)
1176 {
1177 	TrackerDirectConnectionPrivate *priv;
1178 	TrackerDirectConnection *conn;
1179 
1180 	conn = TRACKER_DIRECT_CONNECTION (connection);
1181 	priv = tracker_direct_connection_get_instance_private (conn);
1182 
1183 	if (priv->flags & TRACKER_SPARQL_CONNECTION_FLAGS_READONLY)
1184 		return NULL;
1185 
1186 	return tracker_direct_batch_new (connection);
1187 }
1188 
1189 static gboolean
tracker_direct_connection_lookup_dbus_service(TrackerSparqlConnection * connection,const gchar * dbus_name,const gchar * dbus_path,gchar ** name,gchar ** path)1190 tracker_direct_connection_lookup_dbus_service (TrackerSparqlConnection  *connection,
1191                                                const gchar              *dbus_name,
1192                                                const gchar              *dbus_path,
1193                                                gchar                   **name,
1194                                                gchar                   **path)
1195 {
1196 	TrackerDirectConnectionPrivate *priv;
1197 	TrackerDirectConnection *conn;
1198 	TrackerSparqlConnection *remote;
1199 	GError *error = NULL;
1200 	gchar *uri;
1201 
1202 	conn = TRACKER_DIRECT_CONNECTION (connection);
1203 	priv = tracker_direct_connection_get_instance_private (conn);
1204 
1205 	uri = tracker_util_build_dbus_uri (G_BUS_TYPE_SESSION,
1206 	                                   dbus_name, dbus_path);
1207 	remote = tracker_data_manager_get_remote_connection (priv->data_manager,
1208 	                                                     uri, &error);
1209 	if (error) {
1210 		g_warning ("Error getting remote connection '%s': %s", uri, error->message);
1211 		g_error_free (error);
1212 	}
1213 
1214 	g_free (uri);
1215 
1216 	if (!remote)
1217 		return FALSE;
1218 	if (!g_object_class_find_property (G_OBJECT_GET_CLASS (remote), "bus-name"))
1219 		return FALSE;
1220 
1221 	g_object_get (remote,
1222 	              "bus-name", name,
1223 	              "bus-object-path", path,
1224 	              NULL);
1225 
1226 	return TRUE;
1227 }
1228 
1229 static void
tracker_direct_connection_class_init(TrackerDirectConnectionClass * klass)1230 tracker_direct_connection_class_init (TrackerDirectConnectionClass *klass)
1231 {
1232 	TrackerSparqlConnectionClass *sparql_connection_class;
1233 	GObjectClass *object_class;
1234 
1235 	object_class = G_OBJECT_CLASS (klass);
1236 	sparql_connection_class = TRACKER_SPARQL_CONNECTION_CLASS (klass);
1237 
1238 	object_class->finalize = tracker_direct_connection_finalize;
1239 	object_class->set_property = tracker_direct_connection_set_property;
1240 	object_class->get_property = tracker_direct_connection_get_property;
1241 
1242 	sparql_connection_class->query = tracker_direct_connection_query;
1243 	sparql_connection_class->query_async = tracker_direct_connection_query_async;
1244 	sparql_connection_class->query_finish = tracker_direct_connection_query_finish;
1245 	sparql_connection_class->query_statement = tracker_direct_connection_query_statement;
1246 	sparql_connection_class->update = tracker_direct_connection_update;
1247 	sparql_connection_class->update_async = tracker_direct_connection_update_async;
1248 	sparql_connection_class->update_finish = tracker_direct_connection_update_finish;
1249 	sparql_connection_class->update_array_async = tracker_direct_connection_update_array_async;
1250 	sparql_connection_class->update_array_finish = tracker_direct_connection_update_array_finish;
1251 	sparql_connection_class->update_blank = tracker_direct_connection_update_blank;
1252 	sparql_connection_class->update_blank_async = tracker_direct_connection_update_blank_async;
1253 	sparql_connection_class->update_blank_finish = tracker_direct_connection_update_blank_finish;
1254 	sparql_connection_class->get_namespace_manager = tracker_direct_connection_get_namespace_manager;
1255 	sparql_connection_class->create_notifier = tracker_direct_connection_create_notifier;
1256 	sparql_connection_class->close = tracker_direct_connection_close;
1257 	sparql_connection_class->close_async = tracker_direct_connection_close_async;
1258 	sparql_connection_class->close_finish = tracker_direct_connection_close_finish;
1259 	sparql_connection_class->update_resource = tracker_direct_connection_update_resource;
1260 	sparql_connection_class->update_resource_async = tracker_direct_connection_update_resource_async;
1261 	sparql_connection_class->update_resource_finish = tracker_direct_connection_update_resource_finish;
1262 	sparql_connection_class->create_batch = tracker_direct_connection_create_batch;
1263 	sparql_connection_class->lookup_dbus_service = tracker_direct_connection_lookup_dbus_service;
1264 
1265 	props[PROP_FLAGS] =
1266 		g_param_spec_flags ("flags",
1267 		                    "Flags",
1268 		                    "Flags",
1269 		                    TRACKER_TYPE_SPARQL_CONNECTION_FLAGS,
1270 		                    TRACKER_SPARQL_CONNECTION_FLAGS_NONE,
1271 		                    G_PARAM_READWRITE |
1272 		                    G_PARAM_CONSTRUCT_ONLY);
1273 	props[PROP_STORE_LOCATION] =
1274 		g_param_spec_object ("store-location",
1275 		                     "Store location",
1276 		                     "Store location",
1277 		                     G_TYPE_FILE,
1278 		                     G_PARAM_READWRITE |
1279 		                     G_PARAM_CONSTRUCT_ONLY);
1280 	props[PROP_ONTOLOGY_LOCATION] =
1281 		g_param_spec_object ("ontology-location",
1282 		                     "Ontology location",
1283 		                     "Ontology location",
1284 		                     G_TYPE_FILE,
1285 		                     G_PARAM_READWRITE |
1286 		                     G_PARAM_CONSTRUCT_ONLY);
1287 
1288 	g_object_class_install_properties (object_class, N_PROPS, props);
1289 }
1290 
1291 TrackerDirectConnection *
tracker_direct_connection_new(TrackerSparqlConnectionFlags flags,GFile * store,GFile * ontology,GError ** error)1292 tracker_direct_connection_new (TrackerSparqlConnectionFlags   flags,
1293                                GFile                         *store,
1294                                GFile                         *ontology,
1295                                GError                       **error)
1296 {
1297 	g_return_val_if_fail (!store || G_IS_FILE (store), NULL);
1298 	g_return_val_if_fail (!ontology || G_IS_FILE (ontology), NULL);
1299 	g_return_val_if_fail (!error || !*error, NULL);
1300 
1301 	return g_object_new (TRACKER_TYPE_DIRECT_CONNECTION,
1302 	                     "flags", flags,
1303 	                     "store-location", store,
1304 	                     "ontology-location", ontology,
1305 	                     NULL);
1306 }
1307 
1308 TrackerDataManager *
tracker_direct_connection_get_data_manager(TrackerDirectConnection * conn)1309 tracker_direct_connection_get_data_manager (TrackerDirectConnection *conn)
1310 {
1311 	TrackerDirectConnectionPrivate *priv;
1312 
1313 	priv = tracker_direct_connection_get_instance_private (conn);
1314 	return priv->data_manager;
1315 }
1316 
1317 void
tracker_direct_connection_update_timestamp(TrackerDirectConnection * conn)1318 tracker_direct_connection_update_timestamp (TrackerDirectConnection *conn)
1319 {
1320 	TrackerDirectConnectionPrivate *priv;
1321 
1322 	priv = tracker_direct_connection_get_instance_private (conn);
1323 	priv->timestamp = g_get_monotonic_time ();
1324 }
1325 
1326 gboolean
tracker_direct_connection_update_batch(TrackerDirectConnection * conn,TrackerBatch * batch,GError ** error)1327 tracker_direct_connection_update_batch (TrackerDirectConnection  *conn,
1328                                         TrackerBatch             *batch,
1329                                         GError                  **error)
1330 {
1331 	TrackerDirectConnectionPrivate *priv;
1332 	GError *inner_error = NULL;
1333 
1334 	priv = tracker_direct_connection_get_instance_private (conn);
1335 
1336 	g_mutex_lock (&priv->mutex);
1337 	tracker_direct_batch_update (TRACKER_DIRECT_BATCH (batch),
1338 	                             priv->data_manager, &inner_error);
1339 	tracker_direct_connection_update_timestamp (conn);
1340 	g_mutex_unlock (&priv->mutex);
1341 
1342 	if (inner_error) {
1343 		g_propagate_error (error, inner_error);
1344 		return FALSE;
1345 	}
1346 
1347 	return TRUE;
1348 }
1349 
1350 void
tracker_direct_connection_update_batch_async(TrackerDirectConnection * conn,TrackerBatch * batch,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1351 tracker_direct_connection_update_batch_async (TrackerDirectConnection  *conn,
1352                                               TrackerBatch             *batch,
1353                                               GCancellable             *cancellable,
1354                                               GAsyncReadyCallback       callback,
1355                                               gpointer                  user_data)
1356 {
1357 	TrackerDirectConnectionPrivate *priv;
1358 	GTask *task;
1359 
1360 	priv = tracker_direct_connection_get_instance_private (conn);
1361 
1362 	task = g_task_new (batch, cancellable, callback, user_data);
1363 	g_task_set_task_data (task,
1364 	                      task_data_query_new (TASK_TYPE_UPDATE_BATCH,
1365 	                                           g_object_ref (batch),
1366 	                                           g_object_unref),
1367 	                      (GDestroyNotify) task_data_free);
1368 
1369 	g_thread_pool_push (priv->update_thread, task, NULL);
1370 }
1371 
1372 gboolean
tracker_direct_connection_update_batch_finish(TrackerDirectConnection * conn,GAsyncResult * res,GError ** error)1373 tracker_direct_connection_update_batch_finish (TrackerDirectConnection  *conn,
1374                                                GAsyncResult             *res,
1375                                                GError                  **error)
1376 {
1377 	GError *inner_error = NULL;
1378 
1379 	g_task_propagate_boolean (G_TASK (res), &inner_error);
1380 	if (inner_error) {
1381 		g_propagate_error (error, _translate_internal_error (inner_error));
1382 		return FALSE;
1383 	}
1384 
1385 	return TRUE;
1386 }
1387