1 /*
2 * Copyright (C) 2010, Nokia <ivan.frade@nokia.com>
3 * Copyright (C) 2017, Red Hat, Inc.
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
14 *
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
19 */
20
21 #include "config.h"
22
23 #include "tracker-direct.h"
24 #include "tracker-direct-batch.h"
25 #include "tracker-direct-statement.h"
26 #include "libtracker-sparql/tracker-private.h"
27 #include <libtracker-common/tracker-utils.h>
28 #include <libtracker-data/tracker-data.h>
29 #include <libtracker-data/tracker-sparql.h>
30 #include <libtracker-sparql/tracker-notifier-private.h>
31 #include <libtracker-sparql/tracker-private.h>
32
33 typedef struct _TrackerDirectConnectionPrivate TrackerDirectConnectionPrivate;
34
35 struct _TrackerDirectConnectionPrivate
36 {
37 TrackerSparqlConnectionFlags flags;
38 GFile *store;
39 GFile *ontology;
40
41 TrackerNamespaceManager *namespace_manager;
42 TrackerDataManager *data_manager;
43 GMutex mutex;
44
45 GThreadPool *update_thread; /* Contains 1 exclusive thread */
46 GThreadPool *select_pool;
47
48 GList *notifiers;
49
50 gint64 timestamp;
51 gint64 cleanup_timestamp;
52
53 guint cleanup_timeout_id;
54
55 guint initialized : 1;
56 guint closing : 1;
57 };
58
59 typedef struct {
60 gchar *graph;
61 TrackerResource *resource;
62 } UpdateResource;
63
64 enum {
65 PROP_0,
66 PROP_FLAGS,
67 PROP_STORE_LOCATION,
68 PROP_ONTOLOGY_LOCATION,
69 N_PROPS
70 };
71
72 static GParamSpec *props[N_PROPS] = { NULL };
73
74 typedef enum {
75 TASK_TYPE_QUERY,
76 TASK_TYPE_UPDATE,
77 TASK_TYPE_UPDATE_BLANK,
78 TASK_TYPE_UPDATE_RESOURCE,
79 TASK_TYPE_UPDATE_BATCH,
80 TASK_TYPE_RELEASE_MEMORY,
81 } TaskType;
82
83 typedef struct {
84 TaskType type;
85 gpointer data;
86 GDestroyNotify destroy;
87 } TaskData;
88
89 static void tracker_direct_connection_initable_iface_init (GInitableIface *iface);
90 static void tracker_direct_connection_async_initable_iface_init (GAsyncInitableIface *iface);
91
G_DEFINE_QUARK(TrackerDirectNotifier,tracker_direct_notifier)92 G_DEFINE_QUARK (TrackerDirectNotifier, tracker_direct_notifier)
93
94 G_DEFINE_TYPE_WITH_CODE (TrackerDirectConnection, tracker_direct_connection,
95 TRACKER_TYPE_SPARQL_CONNECTION,
96 G_ADD_PRIVATE (TrackerDirectConnection)
97 G_IMPLEMENT_INTERFACE (G_TYPE_INITABLE,
98 tracker_direct_connection_initable_iface_init)
99 G_IMPLEMENT_INTERFACE (G_TYPE_ASYNC_INITABLE,
100 tracker_direct_connection_async_initable_iface_init))
101
102 static TaskData *
103 task_data_query_new (TaskType type,
104 gpointer data,
105 GDestroyNotify destroy)
106 {
107 TaskData *task;
108
109 task = g_new0 (TaskData, 1);
110 task->type = type;
111 task->data = data;
112 task->destroy = destroy;
113
114 return task;
115 }
116
117 static void
task_data_free(TaskData * task)118 task_data_free (TaskData *task)
119 {
120 if (task->destroy && task->data)
121 task->destroy (task->data);
122 g_free (task);
123 }
124
125 static gboolean
cleanup_timeout_cb(gpointer user_data)126 cleanup_timeout_cb (gpointer user_data)
127 {
128 TrackerDirectConnection *conn = user_data;
129 TrackerDirectConnectionPrivate *priv;
130 gint64 timestamp;
131 GTask *task;
132
133 priv = tracker_direct_connection_get_instance_private (conn);
134 timestamp = g_get_monotonic_time ();
135
136 /* If we already cleaned up */
137 if (priv->timestamp < priv->cleanup_timestamp)
138 return G_SOURCE_CONTINUE;
139 /* If the connection was used less than 10s ago */
140 if (timestamp - priv->timestamp < 10 * G_USEC_PER_SEC)
141 return G_SOURCE_CONTINUE;
142
143 priv->cleanup_timestamp = timestamp;
144
145 task = g_task_new (conn, NULL, NULL, NULL);
146 g_task_set_task_data (task,
147 task_data_query_new (TASK_TYPE_RELEASE_MEMORY, NULL, NULL),
148 (GDestroyNotify) task_data_free);
149
150 g_thread_pool_push (priv->update_thread, task, NULL);
151
152 return G_SOURCE_CONTINUE;
153 }
154
155 gboolean
update_resource(TrackerData * data,const gchar * graph,TrackerResource * resource,GError ** error)156 update_resource (TrackerData *data,
157 const gchar *graph,
158 TrackerResource *resource,
159 GError **error)
160 {
161 GError *inner_error = NULL;
162
163 tracker_data_begin_transaction (data, &inner_error);
164 if (inner_error)
165 goto error;
166
167 tracker_data_update_resource (data,
168 graph,
169 resource,
170 NULL,
171 &inner_error);
172
173 if (inner_error) {
174 tracker_data_rollback_transaction (data);
175 goto error;
176 }
177
178 tracker_data_commit_transaction (data, &inner_error);
179 if (inner_error)
180 goto error;
181
182 return TRUE;
183
184 error:
185 g_propagate_error (error, inner_error);
186 return FALSE;
187 }
188
189 static void
update_thread_func(gpointer data,gpointer user_data)190 update_thread_func (gpointer data,
191 gpointer user_data)
192 {
193 TrackerDirectConnectionPrivate *priv;
194 TrackerDirectConnection *conn;
195 GTask *task = data;
196 TaskData *task_data = g_task_get_task_data (task);
197 TrackerData *tracker_data;
198 GError *error = NULL;
199 gpointer retval = NULL;
200 GDestroyNotify destroy_notify = NULL;
201 gboolean update_timestamp = TRUE;
202
203 conn = user_data;
204 priv = tracker_direct_connection_get_instance_private (conn);
205
206 g_mutex_lock (&priv->mutex);
207 tracker_data = tracker_data_manager_get_data (priv->data_manager);
208
209 switch (task_data->type) {
210 case TASK_TYPE_QUERY:
211 g_warning ("Queries don't go through this thread");
212 break;
213 case TASK_TYPE_UPDATE:
214 tracker_data_update_sparql (tracker_data, task_data->data, &error);
215 break;
216 case TASK_TYPE_UPDATE_BLANK:
217 retval = tracker_data_update_sparql_blank (tracker_data, task_data->data, &error);
218 destroy_notify = (GDestroyNotify) g_variant_unref;
219 break;
220 case TASK_TYPE_UPDATE_RESOURCE: {
221 UpdateResource *data = task_data->data;
222 update_resource (tracker_data, data->graph, data->resource, &error);
223 break;
224 }
225 case TASK_TYPE_UPDATE_BATCH:
226 tracker_direct_batch_update (task_data->data, priv->data_manager, &error);
227 break;
228 case TASK_TYPE_RELEASE_MEMORY:
229 tracker_data_manager_release_memory (priv->data_manager);
230 update_timestamp = FALSE;
231 break;
232 }
233
234 if (error)
235 g_task_return_error (task, error);
236 else if (retval)
237 g_task_return_pointer (task, retval, destroy_notify);
238 else
239 g_task_return_boolean (task, TRUE);
240
241 g_object_unref (task);
242
243 if (update_timestamp)
244 tracker_direct_connection_update_timestamp (conn);
245
246 g_mutex_unlock (&priv->mutex);
247 }
248
249 static void
query_thread_pool_func(gpointer data,gpointer user_data)250 query_thread_pool_func (gpointer data,
251 gpointer user_data)
252 {
253 TrackerDirectConnection *conn = user_data;
254 TrackerDirectConnectionPrivate *priv;
255 TrackerSparqlCursor *cursor;
256 GTask *task = data;
257 TaskData *task_data = g_task_get_task_data (task);
258 GError *error = NULL;
259
260 g_assert (task_data->type == TASK_TYPE_QUERY);
261
262 priv = tracker_direct_connection_get_instance_private (conn);
263
264 if (priv->closing) {
265 g_task_return_new_error (task,
266 G_IO_ERROR,
267 G_IO_ERROR_CONNECTION_CLOSED,
268 "Connection is closed");
269 g_object_unref (task);
270 return;
271 }
272
273 cursor = tracker_sparql_connection_query (TRACKER_SPARQL_CONNECTION (g_task_get_source_object (task)),
274 task_data->data,
275 g_task_get_cancellable (task),
276 &error);
277 if (cursor)
278 g_task_return_pointer (task, cursor, g_object_unref);
279 else
280 g_task_return_error (task, error);
281
282 g_object_unref (task);
283 }
284
285 static gint
task_compare_func(GTask * a,GTask * b,gpointer user_data)286 task_compare_func (GTask *a,
287 GTask *b,
288 gpointer user_data)
289 {
290 return g_task_get_priority (b) - g_task_get_priority (a);
291 }
292
293 static gboolean
set_up_thread_pools(TrackerDirectConnection * conn,GError ** error)294 set_up_thread_pools (TrackerDirectConnection *conn,
295 GError **error)
296 {
297 TrackerDirectConnectionPrivate *priv;
298
299 priv = tracker_direct_connection_get_instance_private (conn);
300
301 priv->select_pool = g_thread_pool_new (query_thread_pool_func,
302 conn, 16, FALSE, error);
303 if (!priv->select_pool)
304 return FALSE;
305
306 priv->update_thread = g_thread_pool_new (update_thread_func,
307 conn, 1, TRUE, error);
308 if (!priv->update_thread)
309 return FALSE;
310
311 g_thread_pool_set_sort_function (priv->select_pool,
312 (GCompareDataFunc) task_compare_func,
313 conn);
314 g_thread_pool_set_sort_function (priv->update_thread,
315 (GCompareDataFunc) task_compare_func,
316 conn);
317 return TRUE;
318 }
319
320 static TrackerDBManagerFlags
translate_flags(TrackerSparqlConnectionFlags flags)321 translate_flags (TrackerSparqlConnectionFlags flags)
322 {
323 TrackerDBManagerFlags db_flags = TRACKER_DB_MANAGER_ENABLE_MUTEXES;
324
325 if ((flags & TRACKER_SPARQL_CONNECTION_FLAGS_READONLY) != 0)
326 db_flags |= TRACKER_DB_MANAGER_READONLY;
327 if ((flags & TRACKER_SPARQL_CONNECTION_FLAGS_FTS_ENABLE_STEMMER) != 0)
328 db_flags |= TRACKER_DB_MANAGER_FTS_ENABLE_STEMMER;
329 if ((flags & TRACKER_SPARQL_CONNECTION_FLAGS_FTS_ENABLE_UNACCENT) != 0)
330 db_flags |= TRACKER_DB_MANAGER_FTS_ENABLE_UNACCENT;
331 if ((flags & TRACKER_SPARQL_CONNECTION_FLAGS_FTS_ENABLE_STOP_WORDS) != 0)
332 db_flags |= TRACKER_DB_MANAGER_FTS_ENABLE_STOP_WORDS;
333 if ((flags & TRACKER_SPARQL_CONNECTION_FLAGS_FTS_IGNORE_NUMBERS) != 0)
334 db_flags |= TRACKER_DB_MANAGER_FTS_IGNORE_NUMBERS;
335
336 return db_flags;
337 }
338
339 static gboolean
tracker_direct_connection_initable_init(GInitable * initable,GCancellable * cancellable,GError ** error)340 tracker_direct_connection_initable_init (GInitable *initable,
341 GCancellable *cancellable,
342 GError **error)
343 {
344 TrackerDirectConnectionPrivate *priv;
345 TrackerDirectConnection *conn;
346 TrackerDBManagerFlags db_flags;
347 GHashTable *namespaces;
348 GHashTableIter iter;
349 gchar *prefix, *ns;
350 GError *inner_error = NULL;
351
352 conn = TRACKER_DIRECT_CONNECTION (initable);
353 priv = tracker_direct_connection_get_instance_private (conn);
354
355 tracker_locale_sanity_check ();
356
357 if (!set_up_thread_pools (conn, error))
358 return FALSE;
359
360 db_flags = translate_flags (priv->flags);
361
362 if (!priv->store) {
363 db_flags |= TRACKER_DB_MANAGER_IN_MEMORY;
364 }
365
366 priv->data_manager = tracker_data_manager_new (db_flags, priv->store,
367 priv->ontology,
368 100, 100);
369 if (!g_initable_init (G_INITABLE (priv->data_manager), cancellable, &inner_error)) {
370 g_propagate_error (error, _translate_internal_error (inner_error));
371 g_clear_object (&priv->data_manager);
372 return FALSE;
373 }
374
375 /* Initialize namespace manager */
376 priv->namespace_manager = tracker_namespace_manager_new ();
377 namespaces = tracker_data_manager_get_namespaces (priv->data_manager);
378 g_hash_table_iter_init (&iter, namespaces);
379
380 while (g_hash_table_iter_next (&iter, (gpointer*) &prefix, (gpointer*) &ns)) {
381 tracker_namespace_manager_add_prefix (priv->namespace_manager,
382 prefix, ns);
383 }
384
385 g_hash_table_unref (namespaces);
386
387 priv->cleanup_timeout_id =
388 g_timeout_add_seconds (30, cleanup_timeout_cb, conn);
389
390 return TRUE;
391 }
392
393 static void
tracker_direct_connection_initable_iface_init(GInitableIface * iface)394 tracker_direct_connection_initable_iface_init (GInitableIface *iface)
395 {
396 iface->init = tracker_direct_connection_initable_init;
397 }
398
399 static void
async_initable_thread_func(GTask * task,gpointer source_object,gpointer task_data,GCancellable * cancellable)400 async_initable_thread_func (GTask *task,
401 gpointer source_object,
402 gpointer task_data,
403 GCancellable *cancellable)
404 {
405 GError *error = NULL;
406
407 if (!g_initable_init (G_INITABLE (source_object), cancellable, &error))
408 g_task_return_error (task, error);
409 else
410 g_task_return_boolean (task, TRUE);
411
412 g_object_unref (task);
413 }
414
415 static void
tracker_direct_connection_async_initable_init_async(GAsyncInitable * async_initable,gint priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)416 tracker_direct_connection_async_initable_init_async (GAsyncInitable *async_initable,
417 gint priority,
418 GCancellable *cancellable,
419 GAsyncReadyCallback callback,
420 gpointer user_data)
421 {
422 GTask *task;
423
424 task = g_task_new (async_initable, cancellable, callback, user_data);
425 g_task_set_priority (task, priority);
426 g_task_run_in_thread (task, async_initable_thread_func);
427 }
428
429 static gboolean
tracker_direct_connection_async_initable_init_finish(GAsyncInitable * async_initable,GAsyncResult * res,GError ** error)430 tracker_direct_connection_async_initable_init_finish (GAsyncInitable *async_initable,
431 GAsyncResult *res,
432 GError **error)
433 {
434 return g_task_propagate_boolean (G_TASK (res), error);
435 }
436
437 static void
tracker_direct_connection_async_initable_iface_init(GAsyncInitableIface * iface)438 tracker_direct_connection_async_initable_iface_init (GAsyncInitableIface *iface)
439 {
440 iface->init_async = tracker_direct_connection_async_initable_init_async;
441 iface->init_finish = tracker_direct_connection_async_initable_init_finish;
442 }
443
444 static void
tracker_direct_connection_init(TrackerDirectConnection * conn)445 tracker_direct_connection_init (TrackerDirectConnection *conn)
446 {
447 }
448
449 static GHashTable *
get_event_cache_ht(TrackerNotifier * notifier)450 get_event_cache_ht (TrackerNotifier *notifier)
451 {
452 GHashTable *events;
453
454 events = g_object_get_qdata (G_OBJECT (notifier), tracker_direct_notifier_quark ());
455 if (!events) {
456 events = g_hash_table_new_full (NULL, NULL, NULL,
457 (GDestroyNotify) _tracker_notifier_event_cache_free);
458 g_object_set_qdata_full (G_OBJECT (notifier), tracker_direct_notifier_quark (),
459 events, (GDestroyNotify) g_hash_table_unref);
460 }
461
462 return events;
463 }
464
465 static TrackerNotifierEventCache *
lookup_event_cache(TrackerNotifier * notifier,gint graph_id,const gchar * graph)466 lookup_event_cache (TrackerNotifier *notifier,
467 gint graph_id,
468 const gchar *graph)
469 {
470 TrackerNotifierEventCache *cache;
471 GHashTable *events;
472
473 events = get_event_cache_ht (notifier);
474 cache = g_hash_table_lookup (events, GINT_TO_POINTER (graph_id));
475
476 if (!cache) {
477 cache = _tracker_notifier_event_cache_new (notifier, graph);
478 g_hash_table_insert (events, GINT_TO_POINTER (graph_id), cache);
479 }
480
481 return cache;
482 }
483
484 /* These callbacks will be called from a different thread
485 * (always the same one though), handle with care.
486 */
487 static void
insert_statement_cb(gint graph_id,const gchar * graph,gint subject_id,const gchar * subject,gint predicate_id,gint object_id,const gchar * object,GPtrArray * rdf_types,gpointer user_data)488 insert_statement_cb (gint graph_id,
489 const gchar *graph,
490 gint subject_id,
491 const gchar *subject,
492 gint predicate_id,
493 gint object_id,
494 const gchar *object,
495 GPtrArray *rdf_types,
496 gpointer user_data)
497 {
498 TrackerNotifier *notifier = user_data;
499 TrackerSparqlConnection *conn = _tracker_notifier_get_connection (notifier);
500 TrackerDirectConnection *direct = TRACKER_DIRECT_CONNECTION (conn);
501 TrackerDirectConnectionPrivate *priv = tracker_direct_connection_get_instance_private (direct);
502 TrackerOntologies *ontologies = tracker_data_manager_get_ontologies (priv->data_manager);
503 TrackerProperty *rdf_type = tracker_ontologies_get_rdf_type (ontologies);
504 TrackerNotifierEventCache *cache;
505 TrackerClass *new_class = NULL;
506 gint i;
507
508 cache = lookup_event_cache (notifier, graph_id, graph);
509
510 if (predicate_id == tracker_property_get_id (rdf_type)) {
511 const gchar *uri;
512
513 uri = tracker_ontologies_get_uri_by_id (ontologies, object_id);
514 new_class = tracker_ontologies_get_class_by_uri (ontologies, uri);
515 }
516
517 for (i = 0; i < rdf_types->len; i++) {
518 TrackerClass *class = g_ptr_array_index (rdf_types, i);
519 TrackerNotifierEventType event_type;
520
521 if (!tracker_class_get_notify (class))
522 continue;
523
524 if (class == new_class)
525 event_type = TRACKER_NOTIFIER_EVENT_CREATE;
526 else
527 event_type = TRACKER_NOTIFIER_EVENT_UPDATE;
528
529 _tracker_notifier_event_cache_push_event (cache, subject_id, event_type);
530 }
531 }
532
533 static void
delete_statement_cb(gint graph_id,const gchar * graph,gint subject_id,const gchar * subject,gint predicate_id,gint object_id,const gchar * object,GPtrArray * rdf_types,gpointer user_data)534 delete_statement_cb (gint graph_id,
535 const gchar *graph,
536 gint subject_id,
537 const gchar *subject,
538 gint predicate_id,
539 gint object_id,
540 const gchar *object,
541 GPtrArray *rdf_types,
542 gpointer user_data)
543 {
544 TrackerNotifier *notifier = user_data;
545 TrackerSparqlConnection *conn = _tracker_notifier_get_connection (notifier);
546 TrackerDirectConnection *direct = TRACKER_DIRECT_CONNECTION (conn);
547 TrackerDirectConnectionPrivate *priv = tracker_direct_connection_get_instance_private (direct);
548 TrackerOntologies *ontologies = tracker_data_manager_get_ontologies (priv->data_manager);
549 TrackerProperty *rdf_type = tracker_ontologies_get_rdf_type (ontologies);
550 TrackerNotifierEventCache *cache;
551 TrackerClass *class_being_removed = NULL;
552 gint i;
553
554 cache = lookup_event_cache (notifier, graph_id, graph);
555
556 if (predicate_id == tracker_property_get_id (rdf_type)) {
557 class_being_removed = tracker_ontologies_get_class_by_uri (ontologies, object);
558 }
559
560 for (i = 0; i < rdf_types->len; i++) {
561 TrackerClass *class = g_ptr_array_index (rdf_types, i);
562 TrackerNotifierEventType event_type;
563
564 if (!tracker_class_get_notify (class))
565 continue;
566
567 if (class_being_removed && class == class_being_removed) {
568 event_type = TRACKER_NOTIFIER_EVENT_DELETE;
569 } else {
570 event_type = TRACKER_NOTIFIER_EVENT_UPDATE;
571 }
572
573 _tracker_notifier_event_cache_push_event (cache, subject_id, event_type);
574 }
575 }
576
577 static void
commit_statement_cb(gpointer user_data)578 commit_statement_cb (gpointer user_data)
579 {
580 TrackerNotifierEventCache *cache;
581 TrackerNotifier *notifier = user_data;
582 GHashTable *events;
583 GHashTableIter iter;
584
585 events = get_event_cache_ht (notifier);
586 g_hash_table_iter_init (&iter, events);
587
588 while (g_hash_table_iter_next (&iter, NULL, (gpointer *) &cache)) {
589 g_hash_table_iter_steal (&iter);
590 _tracker_notifier_event_cache_flush_events (cache);
591 }
592 }
593
594 static void
rollback_statement_cb(gpointer user_data)595 rollback_statement_cb (gpointer user_data)
596 {
597 TrackerNotifier *notifier = user_data;
598 GHashTable *events;
599
600 events = get_event_cache_ht (notifier);
601 g_hash_table_remove_all (events);
602 }
603
604 static void
detach_notifier(TrackerDirectConnection * conn,TrackerNotifier * notifier)605 detach_notifier (TrackerDirectConnection *conn,
606 TrackerNotifier *notifier)
607 {
608 TrackerDirectConnectionPrivate *priv;
609 TrackerData *tracker_data;
610
611 priv = tracker_direct_connection_get_instance_private (conn);
612
613 priv->notifiers = g_list_remove (priv->notifiers, notifier);
614
615 tracker_data = tracker_data_manager_get_data (priv->data_manager);
616 tracker_data_remove_insert_statement_callback (tracker_data,
617 insert_statement_cb,
618 notifier);
619 tracker_data_remove_delete_statement_callback (tracker_data,
620 delete_statement_cb,
621 notifier);
622 tracker_data_remove_commit_statement_callback (tracker_data,
623 commit_statement_cb,
624 notifier);
625 tracker_data_remove_rollback_statement_callback (tracker_data,
626 rollback_statement_cb,
627 notifier);
628 }
629
630 static void
weak_ref_notify(gpointer data,GObject * prev_location)631 weak_ref_notify (gpointer data,
632 GObject *prev_location)
633 {
634 TrackerDirectConnection *conn = data;
635
636 detach_notifier (conn, (TrackerNotifier *) prev_location);
637 }
638
639 static void
tracker_direct_connection_finalize(GObject * object)640 tracker_direct_connection_finalize (GObject *object)
641 {
642 TrackerDirectConnectionPrivate *priv;
643 TrackerDirectConnection *conn;
644
645 conn = TRACKER_DIRECT_CONNECTION (object);
646 priv = tracker_direct_connection_get_instance_private (conn);
647
648 if (!priv->closing)
649 tracker_sparql_connection_close (TRACKER_SPARQL_CONNECTION (object));
650
651 g_clear_object (&priv->store);
652 g_clear_object (&priv->ontology);
653 g_clear_object (&priv->namespace_manager);
654
655 G_OBJECT_CLASS (tracker_direct_connection_parent_class)->finalize (object);
656 }
657
658 static void
tracker_direct_connection_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)659 tracker_direct_connection_set_property (GObject *object,
660 guint prop_id,
661 const GValue *value,
662 GParamSpec *pspec)
663 {
664 TrackerDirectConnectionPrivate *priv;
665 TrackerDirectConnection *conn;
666
667 conn = TRACKER_DIRECT_CONNECTION (object);
668 priv = tracker_direct_connection_get_instance_private (conn);
669
670 switch (prop_id) {
671 case PROP_FLAGS:
672 priv->flags = g_value_get_flags (value);
673 break;
674 case PROP_STORE_LOCATION:
675 priv->store = g_value_dup_object (value);
676 break;
677 case PROP_ONTOLOGY_LOCATION:
678 priv->ontology = g_value_dup_object (value);
679 break;
680 default:
681 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
682 break;
683 }
684 }
685
686 static void
tracker_direct_connection_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)687 tracker_direct_connection_get_property (GObject *object,
688 guint prop_id,
689 GValue *value,
690 GParamSpec *pspec)
691 {
692 TrackerDirectConnectionPrivate *priv;
693 TrackerDirectConnection *conn;
694
695 conn = TRACKER_DIRECT_CONNECTION (object);
696 priv = tracker_direct_connection_get_instance_private (conn);
697
698 switch (prop_id) {
699 case PROP_FLAGS:
700 g_value_set_flags (value, priv->flags);
701 break;
702 case PROP_STORE_LOCATION:
703 g_value_set_object (value, priv->store);
704 break;
705 case PROP_ONTOLOGY_LOCATION:
706 g_value_set_object (value, priv->ontology);
707 break;
708 default:
709 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
710 break;
711 }
712 }
713
714 static TrackerSparqlCursor *
tracker_direct_connection_query(TrackerSparqlConnection * self,const gchar * sparql,GCancellable * cancellable,GError ** error)715 tracker_direct_connection_query (TrackerSparqlConnection *self,
716 const gchar *sparql,
717 GCancellable *cancellable,
718 GError **error)
719 {
720 TrackerDirectConnectionPrivate *priv;
721 TrackerDirectConnection *conn;
722 TrackerSparql *query;
723 TrackerSparqlCursor *cursor;
724 GError *inner_error = NULL;
725
726 conn = TRACKER_DIRECT_CONNECTION (self);
727 priv = tracker_direct_connection_get_instance_private (conn);
728
729 g_mutex_lock (&priv->mutex);
730 query = tracker_sparql_new (priv->data_manager, sparql);
731 cursor = tracker_sparql_execute_cursor (query, NULL, &inner_error);
732 tracker_direct_connection_update_timestamp (conn);
733 g_object_unref (query);
734
735 if (cursor)
736 tracker_sparql_cursor_set_connection (cursor, self);
737 g_mutex_unlock (&priv->mutex);
738
739 if (inner_error)
740 g_propagate_error (error, _translate_internal_error (inner_error));
741
742 return cursor;
743 }
744
745 static void
tracker_direct_connection_query_async(TrackerSparqlConnection * self,const gchar * sparql,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)746 tracker_direct_connection_query_async (TrackerSparqlConnection *self,
747 const gchar *sparql,
748 GCancellable *cancellable,
749 GAsyncReadyCallback callback,
750 gpointer user_data)
751 {
752 TrackerDirectConnectionPrivate *priv;
753 TrackerDirectConnection *conn;
754 GError *error = NULL;
755 GTask *task;
756
757 conn = TRACKER_DIRECT_CONNECTION (self);
758 priv = tracker_direct_connection_get_instance_private (conn);
759
760 task = g_task_new (self, cancellable, callback, user_data);
761 g_task_set_task_data (task,
762 task_data_query_new (TASK_TYPE_QUERY,
763 g_strdup (sparql),
764 g_free),
765 (GDestroyNotify) task_data_free);
766
767 if (!g_thread_pool_push (priv->select_pool, task, &error)) {
768 g_task_return_error (task, _translate_internal_error (error));
769 g_object_unref (task);
770 }
771 }
772
773 static TrackerSparqlCursor *
tracker_direct_connection_query_finish(TrackerSparqlConnection * self,GAsyncResult * res,GError ** error)774 tracker_direct_connection_query_finish (TrackerSparqlConnection *self,
775 GAsyncResult *res,
776 GError **error)
777 {
778 return g_task_propagate_pointer (G_TASK (res), error);
779 }
780
781 static TrackerSparqlStatement *
tracker_direct_connection_query_statement(TrackerSparqlConnection * self,const gchar * query,GCancellable * cancellable,GError ** error)782 tracker_direct_connection_query_statement (TrackerSparqlConnection *self,
783 const gchar *query,
784 GCancellable *cancellable,
785 GError **error)
786 {
787 return TRACKER_SPARQL_STATEMENT (tracker_direct_statement_new (self, query, error));
788 }
789
790 static void
tracker_direct_connection_update(TrackerSparqlConnection * self,const gchar * sparql,GCancellable * cancellable,GError ** error)791 tracker_direct_connection_update (TrackerSparqlConnection *self,
792 const gchar *sparql,
793 GCancellable *cancellable,
794 GError **error)
795 {
796 TrackerDirectConnectionPrivate *priv;
797 TrackerDirectConnection *conn;
798 TrackerData *data;
799 GError *inner_error = NULL;
800
801 conn = TRACKER_DIRECT_CONNECTION (self);
802 priv = tracker_direct_connection_get_instance_private (conn);
803
804 g_mutex_lock (&priv->mutex);
805 data = tracker_data_manager_get_data (priv->data_manager);
806 tracker_data_update_sparql (data, sparql, &inner_error);
807 tracker_direct_connection_update_timestamp (conn);
808 g_mutex_unlock (&priv->mutex);
809
810 if (inner_error)
811 g_propagate_error (error, inner_error);
812 }
813
814 static void
tracker_direct_connection_update_async(TrackerSparqlConnection * self,const gchar * sparql,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)815 tracker_direct_connection_update_async (TrackerSparqlConnection *self,
816 const gchar *sparql,
817 GCancellable *cancellable,
818 GAsyncReadyCallback callback,
819 gpointer user_data)
820 {
821 TrackerDirectConnectionPrivate *priv;
822 TrackerDirectConnection *conn;
823 GTask *task;
824
825 conn = TRACKER_DIRECT_CONNECTION (self);
826 priv = tracker_direct_connection_get_instance_private (conn);
827
828 task = g_task_new (self, cancellable, callback, user_data);
829 g_task_set_task_data (task,
830 task_data_query_new (TASK_TYPE_UPDATE,
831 g_strdup (sparql),
832 g_free),
833 (GDestroyNotify) task_data_free);
834
835 g_thread_pool_push (priv->update_thread, task, NULL);
836 }
837
838 static void
tracker_direct_connection_update_finish(TrackerSparqlConnection * self,GAsyncResult * res,GError ** error)839 tracker_direct_connection_update_finish (TrackerSparqlConnection *self,
840 GAsyncResult *res,
841 GError **error)
842 {
843 GError *inner_error = NULL;
844
845 g_task_propagate_boolean (G_TASK (res), &inner_error);
846 if (inner_error)
847 g_propagate_error (error, _translate_internal_error (inner_error));
848 }
849
850 static void
on_batch_finished(GObject * source,GAsyncResult * result,gpointer user_data)851 on_batch_finished (GObject *source,
852 GAsyncResult *result,
853 gpointer user_data)
854 {
855 TrackerBatch *batch = TRACKER_BATCH (source);
856 GTask *task = user_data;
857 GError *error = NULL;
858 gboolean retval;
859
860 retval = tracker_batch_execute_finish (batch, result, &error);
861
862 if (retval)
863 g_task_return_boolean (task, TRUE);
864 else
865 g_task_return_error (task, error);
866
867 g_object_unref (task);
868 }
869
870 static void
tracker_direct_connection_update_array_async(TrackerSparqlConnection * self,gchar ** updates,gint n_updates,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)871 tracker_direct_connection_update_array_async (TrackerSparqlConnection *self,
872 gchar **updates,
873 gint n_updates,
874 GCancellable *cancellable,
875 GAsyncReadyCallback callback,
876 gpointer user_data)
877 {
878 TrackerBatch *batch;
879 GTask *task;
880 gint i;
881
882 batch = tracker_sparql_connection_create_batch (self);
883
884 for (i = 0; i < n_updates; i++)
885 tracker_batch_add_sparql (batch, updates[i]);
886
887 task = g_task_new (self, cancellable, callback, user_data);
888 tracker_batch_execute_async (batch, cancellable, on_batch_finished, task);
889 g_object_unref (batch);
890 }
891
892 static gboolean
tracker_direct_connection_update_array_finish(TrackerSparqlConnection * self,GAsyncResult * res,GError ** error)893 tracker_direct_connection_update_array_finish (TrackerSparqlConnection *self,
894 GAsyncResult *res,
895 GError **error)
896 {
897 GError *inner_error = NULL;
898 gboolean result;
899
900 result = g_task_propagate_boolean (G_TASK (res), &inner_error);
901 if (inner_error)
902 g_propagate_error (error, _translate_internal_error (inner_error));
903
904 return result;
905 }
906
907 static GVariant *
tracker_direct_connection_update_blank(TrackerSparqlConnection * self,const gchar * sparql,GCancellable * cancellable,GError ** error)908 tracker_direct_connection_update_blank (TrackerSparqlConnection *self,
909 const gchar *sparql,
910 GCancellable *cancellable,
911 GError **error)
912 {
913 TrackerDirectConnectionPrivate *priv;
914 TrackerDirectConnection *conn;
915 TrackerData *data;
916 GVariant *blank_nodes;
917 GError *inner_error = NULL;
918
919 conn = TRACKER_DIRECT_CONNECTION (self);
920 priv = tracker_direct_connection_get_instance_private (conn);
921
922 g_mutex_lock (&priv->mutex);
923 data = tracker_data_manager_get_data (priv->data_manager);
924 blank_nodes = tracker_data_update_sparql_blank (data, sparql, &inner_error);
925 tracker_direct_connection_update_timestamp (conn);
926 g_mutex_unlock (&priv->mutex);
927
928 if (inner_error)
929 g_propagate_error (error, _translate_internal_error (inner_error));
930 return blank_nodes;
931 }
932
933 static void
tracker_direct_connection_update_blank_async(TrackerSparqlConnection * self,const gchar * sparql,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)934 tracker_direct_connection_update_blank_async (TrackerSparqlConnection *self,
935 const gchar *sparql,
936 GCancellable *cancellable,
937 GAsyncReadyCallback callback,
938 gpointer user_data)
939 {
940 TrackerDirectConnectionPrivate *priv;
941 TrackerDirectConnection *conn;
942 GTask *task;
943
944 conn = TRACKER_DIRECT_CONNECTION (self);
945 priv = tracker_direct_connection_get_instance_private (conn);
946
947 task = g_task_new (self, cancellable, callback, user_data);
948 g_task_set_task_data (task,
949 task_data_query_new (TASK_TYPE_UPDATE_BLANK,
950 g_strdup (sparql),
951 g_free),
952 (GDestroyNotify) task_data_free);
953
954 g_thread_pool_push (priv->update_thread, task, NULL);
955 }
956
957 static GVariant *
tracker_direct_connection_update_blank_finish(TrackerSparqlConnection * self,GAsyncResult * res,GError ** error)958 tracker_direct_connection_update_blank_finish (TrackerSparqlConnection *self,
959 GAsyncResult *res,
960 GError **error)
961 {
962 GError *inner_error = NULL;
963 GVariant *result;
964
965 result = g_task_propagate_pointer (G_TASK (res), &inner_error);
966 if (inner_error)
967 g_propagate_error (error, _translate_internal_error (inner_error));
968
969 return result;
970 }
971
972 static TrackerNamespaceManager *
tracker_direct_connection_get_namespace_manager(TrackerSparqlConnection * self)973 tracker_direct_connection_get_namespace_manager (TrackerSparqlConnection *self)
974 {
975 TrackerDirectConnectionPrivate *priv;
976
977 priv = tracker_direct_connection_get_instance_private (TRACKER_DIRECT_CONNECTION (self));
978
979 return priv->namespace_manager;
980 }
981
982 static TrackerNotifier *
tracker_direct_connection_create_notifier(TrackerSparqlConnection * self)983 tracker_direct_connection_create_notifier (TrackerSparqlConnection *self)
984 {
985 TrackerDirectConnectionPrivate *priv;
986 TrackerNotifier *notifier;
987 TrackerData *tracker_data;
988
989 priv = tracker_direct_connection_get_instance_private (TRACKER_DIRECT_CONNECTION (self));
990
991 notifier = g_object_new (TRACKER_TYPE_NOTIFIER,
992 "connection", self,
993 NULL);
994
995 tracker_data = tracker_data_manager_get_data (priv->data_manager);
996 tracker_data_add_insert_statement_callback (tracker_data,
997 insert_statement_cb,
998 notifier);
999 tracker_data_add_delete_statement_callback (tracker_data,
1000 delete_statement_cb,
1001 notifier);
1002 tracker_data_add_commit_statement_callback (tracker_data,
1003 commit_statement_cb,
1004 notifier);
1005 tracker_data_add_rollback_statement_callback (tracker_data,
1006 rollback_statement_cb,
1007 notifier);
1008
1009 g_object_weak_ref (G_OBJECT (notifier), weak_ref_notify, self);
1010 priv->notifiers = g_list_prepend (priv->notifiers, notifier);
1011
1012 return notifier;
1013 }
1014
1015 static void
tracker_direct_connection_close(TrackerSparqlConnection * self)1016 tracker_direct_connection_close (TrackerSparqlConnection *self)
1017 {
1018 TrackerDirectConnectionPrivate *priv;
1019 TrackerDirectConnection *conn;
1020
1021 conn = TRACKER_DIRECT_CONNECTION (self);
1022 priv = tracker_direct_connection_get_instance_private (conn);
1023 priv->closing = TRUE;
1024
1025 if (priv->cleanup_timeout_id) {
1026 g_source_remove (priv->cleanup_timeout_id);
1027 priv->cleanup_timeout_id = 0;
1028 }
1029
1030 if (priv->update_thread) {
1031 g_thread_pool_free (priv->update_thread, TRUE, TRUE);
1032 priv->update_thread = NULL;
1033 }
1034
1035 if (priv->select_pool) {
1036 g_thread_pool_free (priv->select_pool, TRUE, TRUE);
1037 priv->select_pool = NULL;
1038 }
1039
1040 while (priv->notifiers) {
1041 TrackerNotifier *notifier = priv->notifiers->data;
1042
1043 g_object_weak_unref (G_OBJECT (notifier),
1044 weak_ref_notify,
1045 conn);
1046 detach_notifier (conn, notifier);
1047 }
1048
1049 if (priv->data_manager) {
1050 tracker_data_manager_shutdown (priv->data_manager);
1051 g_clear_object (&priv->data_manager);
1052 }
1053 }
1054
1055 static void
async_close_thread_func(GTask * task,gpointer source_object,gpointer task_data,GCancellable * cancellable)1056 async_close_thread_func (GTask *task,
1057 gpointer source_object,
1058 gpointer task_data,
1059 GCancellable *cancellable)
1060 {
1061 if (g_task_return_error_if_cancelled (task))
1062 return;
1063
1064 tracker_sparql_connection_close (source_object);
1065 g_task_return_boolean (task, TRUE);
1066 }
1067
1068 static void
tracker_direct_connection_close_async(TrackerSparqlConnection * connection,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1069 tracker_direct_connection_close_async (TrackerSparqlConnection *connection,
1070 GCancellable *cancellable,
1071 GAsyncReadyCallback callback,
1072 gpointer user_data)
1073 {
1074 GTask *task;
1075
1076 task = g_task_new (connection, cancellable, callback, user_data);
1077 g_task_run_in_thread (task, async_close_thread_func);
1078 g_object_unref (task);
1079 }
1080
1081 static gboolean
tracker_direct_connection_close_finish(TrackerSparqlConnection * connection,GAsyncResult * res,GError ** error)1082 tracker_direct_connection_close_finish (TrackerSparqlConnection *connection,
1083 GAsyncResult *res,
1084 GError **error)
1085 {
1086 return g_task_propagate_boolean (G_TASK (res), error);
1087 }
1088
1089 static UpdateResource *
update_resource_data_new(const gchar * graph,TrackerResource * resource)1090 update_resource_data_new (const gchar *graph,
1091 TrackerResource *resource)
1092 {
1093 UpdateResource *data;
1094
1095 data = g_new0 (UpdateResource, 1);
1096 data->graph = g_strdup (graph);
1097 data->resource = g_object_ref (resource);
1098
1099 return data;
1100 }
1101
1102 static void
update_resource_data_free(UpdateResource * data)1103 update_resource_data_free (UpdateResource *data)
1104 {
1105 g_free (data->graph);
1106 g_object_unref (data->resource);
1107 g_free (data);
1108 }
1109
1110 static gboolean
tracker_direct_connection_update_resource(TrackerSparqlConnection * self,const gchar * graph,TrackerResource * resource,GCancellable * cancellable,GError ** error)1111 tracker_direct_connection_update_resource (TrackerSparqlConnection *self,
1112 const gchar *graph,
1113 TrackerResource *resource,
1114 GCancellable *cancellable,
1115 GError **error)
1116 {
1117 TrackerDirectConnectionPrivate *priv;
1118 TrackerDirectConnection *conn;
1119 TrackerData *data;
1120 GError *inner_error = NULL;
1121
1122 conn = TRACKER_DIRECT_CONNECTION (self);
1123 priv = tracker_direct_connection_get_instance_private (conn);
1124
1125 g_mutex_lock (&priv->mutex);
1126 data = tracker_data_manager_get_data (priv->data_manager);
1127 update_resource (data, graph, resource, &inner_error);
1128 tracker_direct_connection_update_timestamp (conn);
1129 g_mutex_unlock (&priv->mutex);
1130
1131 if (inner_error) {
1132 g_propagate_error (error, inner_error);
1133 return FALSE;
1134 }
1135
1136 return TRUE;
1137 }
1138
1139 static void
tracker_direct_connection_update_resource_async(TrackerSparqlConnection * self,const gchar * graph,TrackerResource * resource,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1140 tracker_direct_connection_update_resource_async (TrackerSparqlConnection *self,
1141 const gchar *graph,
1142 TrackerResource *resource,
1143 GCancellable *cancellable,
1144 GAsyncReadyCallback callback,
1145 gpointer user_data)
1146 {
1147 TrackerDirectConnectionPrivate *priv;
1148 TrackerDirectConnection *conn;
1149 TaskData *task_data;
1150 GTask *task;
1151
1152 conn = TRACKER_DIRECT_CONNECTION (self);
1153 priv = tracker_direct_connection_get_instance_private (conn);
1154
1155 task_data = task_data_query_new (TASK_TYPE_UPDATE_RESOURCE,
1156 update_resource_data_new (graph, resource),
1157 (GDestroyNotify) update_resource_data_free);
1158
1159 task = g_task_new (self, cancellable, callback, user_data);
1160 g_task_set_task_data (task, task_data,
1161 (GDestroyNotify) task_data_free);
1162
1163 g_thread_pool_push (priv->update_thread, task, NULL);
1164 }
1165
1166 static gboolean
tracker_direct_connection_update_resource_finish(TrackerSparqlConnection * connection,GAsyncResult * res,GError ** error)1167 tracker_direct_connection_update_resource_finish (TrackerSparqlConnection *connection,
1168 GAsyncResult *res,
1169 GError **error)
1170 {
1171 return g_task_propagate_boolean (G_TASK (res), error);
1172 }
1173
1174 static TrackerBatch *
tracker_direct_connection_create_batch(TrackerSparqlConnection * connection)1175 tracker_direct_connection_create_batch (TrackerSparqlConnection *connection)
1176 {
1177 TrackerDirectConnectionPrivate *priv;
1178 TrackerDirectConnection *conn;
1179
1180 conn = TRACKER_DIRECT_CONNECTION (connection);
1181 priv = tracker_direct_connection_get_instance_private (conn);
1182
1183 if (priv->flags & TRACKER_SPARQL_CONNECTION_FLAGS_READONLY)
1184 return NULL;
1185
1186 return tracker_direct_batch_new (connection);
1187 }
1188
1189 static gboolean
tracker_direct_connection_lookup_dbus_service(TrackerSparqlConnection * connection,const gchar * dbus_name,const gchar * dbus_path,gchar ** name,gchar ** path)1190 tracker_direct_connection_lookup_dbus_service (TrackerSparqlConnection *connection,
1191 const gchar *dbus_name,
1192 const gchar *dbus_path,
1193 gchar **name,
1194 gchar **path)
1195 {
1196 TrackerDirectConnectionPrivate *priv;
1197 TrackerDirectConnection *conn;
1198 TrackerSparqlConnection *remote;
1199 GError *error = NULL;
1200 gchar *uri;
1201
1202 conn = TRACKER_DIRECT_CONNECTION (connection);
1203 priv = tracker_direct_connection_get_instance_private (conn);
1204
1205 uri = tracker_util_build_dbus_uri (G_BUS_TYPE_SESSION,
1206 dbus_name, dbus_path);
1207 remote = tracker_data_manager_get_remote_connection (priv->data_manager,
1208 uri, &error);
1209 if (error) {
1210 g_warning ("Error getting remote connection '%s': %s", uri, error->message);
1211 g_error_free (error);
1212 }
1213
1214 g_free (uri);
1215
1216 if (!remote)
1217 return FALSE;
1218 if (!g_object_class_find_property (G_OBJECT_GET_CLASS (remote), "bus-name"))
1219 return FALSE;
1220
1221 g_object_get (remote,
1222 "bus-name", name,
1223 "bus-object-path", path,
1224 NULL);
1225
1226 return TRUE;
1227 }
1228
1229 static void
tracker_direct_connection_class_init(TrackerDirectConnectionClass * klass)1230 tracker_direct_connection_class_init (TrackerDirectConnectionClass *klass)
1231 {
1232 TrackerSparqlConnectionClass *sparql_connection_class;
1233 GObjectClass *object_class;
1234
1235 object_class = G_OBJECT_CLASS (klass);
1236 sparql_connection_class = TRACKER_SPARQL_CONNECTION_CLASS (klass);
1237
1238 object_class->finalize = tracker_direct_connection_finalize;
1239 object_class->set_property = tracker_direct_connection_set_property;
1240 object_class->get_property = tracker_direct_connection_get_property;
1241
1242 sparql_connection_class->query = tracker_direct_connection_query;
1243 sparql_connection_class->query_async = tracker_direct_connection_query_async;
1244 sparql_connection_class->query_finish = tracker_direct_connection_query_finish;
1245 sparql_connection_class->query_statement = tracker_direct_connection_query_statement;
1246 sparql_connection_class->update = tracker_direct_connection_update;
1247 sparql_connection_class->update_async = tracker_direct_connection_update_async;
1248 sparql_connection_class->update_finish = tracker_direct_connection_update_finish;
1249 sparql_connection_class->update_array_async = tracker_direct_connection_update_array_async;
1250 sparql_connection_class->update_array_finish = tracker_direct_connection_update_array_finish;
1251 sparql_connection_class->update_blank = tracker_direct_connection_update_blank;
1252 sparql_connection_class->update_blank_async = tracker_direct_connection_update_blank_async;
1253 sparql_connection_class->update_blank_finish = tracker_direct_connection_update_blank_finish;
1254 sparql_connection_class->get_namespace_manager = tracker_direct_connection_get_namespace_manager;
1255 sparql_connection_class->create_notifier = tracker_direct_connection_create_notifier;
1256 sparql_connection_class->close = tracker_direct_connection_close;
1257 sparql_connection_class->close_async = tracker_direct_connection_close_async;
1258 sparql_connection_class->close_finish = tracker_direct_connection_close_finish;
1259 sparql_connection_class->update_resource = tracker_direct_connection_update_resource;
1260 sparql_connection_class->update_resource_async = tracker_direct_connection_update_resource_async;
1261 sparql_connection_class->update_resource_finish = tracker_direct_connection_update_resource_finish;
1262 sparql_connection_class->create_batch = tracker_direct_connection_create_batch;
1263 sparql_connection_class->lookup_dbus_service = tracker_direct_connection_lookup_dbus_service;
1264
1265 props[PROP_FLAGS] =
1266 g_param_spec_flags ("flags",
1267 "Flags",
1268 "Flags",
1269 TRACKER_TYPE_SPARQL_CONNECTION_FLAGS,
1270 TRACKER_SPARQL_CONNECTION_FLAGS_NONE,
1271 G_PARAM_READWRITE |
1272 G_PARAM_CONSTRUCT_ONLY);
1273 props[PROP_STORE_LOCATION] =
1274 g_param_spec_object ("store-location",
1275 "Store location",
1276 "Store location",
1277 G_TYPE_FILE,
1278 G_PARAM_READWRITE |
1279 G_PARAM_CONSTRUCT_ONLY);
1280 props[PROP_ONTOLOGY_LOCATION] =
1281 g_param_spec_object ("ontology-location",
1282 "Ontology location",
1283 "Ontology location",
1284 G_TYPE_FILE,
1285 G_PARAM_READWRITE |
1286 G_PARAM_CONSTRUCT_ONLY);
1287
1288 g_object_class_install_properties (object_class, N_PROPS, props);
1289 }
1290
1291 TrackerDirectConnection *
tracker_direct_connection_new(TrackerSparqlConnectionFlags flags,GFile * store,GFile * ontology,GError ** error)1292 tracker_direct_connection_new (TrackerSparqlConnectionFlags flags,
1293 GFile *store,
1294 GFile *ontology,
1295 GError **error)
1296 {
1297 g_return_val_if_fail (!store || G_IS_FILE (store), NULL);
1298 g_return_val_if_fail (!ontology || G_IS_FILE (ontology), NULL);
1299 g_return_val_if_fail (!error || !*error, NULL);
1300
1301 return g_object_new (TRACKER_TYPE_DIRECT_CONNECTION,
1302 "flags", flags,
1303 "store-location", store,
1304 "ontology-location", ontology,
1305 NULL);
1306 }
1307
1308 TrackerDataManager *
tracker_direct_connection_get_data_manager(TrackerDirectConnection * conn)1309 tracker_direct_connection_get_data_manager (TrackerDirectConnection *conn)
1310 {
1311 TrackerDirectConnectionPrivate *priv;
1312
1313 priv = tracker_direct_connection_get_instance_private (conn);
1314 return priv->data_manager;
1315 }
1316
1317 void
tracker_direct_connection_update_timestamp(TrackerDirectConnection * conn)1318 tracker_direct_connection_update_timestamp (TrackerDirectConnection *conn)
1319 {
1320 TrackerDirectConnectionPrivate *priv;
1321
1322 priv = tracker_direct_connection_get_instance_private (conn);
1323 priv->timestamp = g_get_monotonic_time ();
1324 }
1325
1326 gboolean
tracker_direct_connection_update_batch(TrackerDirectConnection * conn,TrackerBatch * batch,GError ** error)1327 tracker_direct_connection_update_batch (TrackerDirectConnection *conn,
1328 TrackerBatch *batch,
1329 GError **error)
1330 {
1331 TrackerDirectConnectionPrivate *priv;
1332 GError *inner_error = NULL;
1333
1334 priv = tracker_direct_connection_get_instance_private (conn);
1335
1336 g_mutex_lock (&priv->mutex);
1337 tracker_direct_batch_update (TRACKER_DIRECT_BATCH (batch),
1338 priv->data_manager, &inner_error);
1339 tracker_direct_connection_update_timestamp (conn);
1340 g_mutex_unlock (&priv->mutex);
1341
1342 if (inner_error) {
1343 g_propagate_error (error, inner_error);
1344 return FALSE;
1345 }
1346
1347 return TRUE;
1348 }
1349
1350 void
tracker_direct_connection_update_batch_async(TrackerDirectConnection * conn,TrackerBatch * batch,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1351 tracker_direct_connection_update_batch_async (TrackerDirectConnection *conn,
1352 TrackerBatch *batch,
1353 GCancellable *cancellable,
1354 GAsyncReadyCallback callback,
1355 gpointer user_data)
1356 {
1357 TrackerDirectConnectionPrivate *priv;
1358 GTask *task;
1359
1360 priv = tracker_direct_connection_get_instance_private (conn);
1361
1362 task = g_task_new (batch, cancellable, callback, user_data);
1363 g_task_set_task_data (task,
1364 task_data_query_new (TASK_TYPE_UPDATE_BATCH,
1365 g_object_ref (batch),
1366 g_object_unref),
1367 (GDestroyNotify) task_data_free);
1368
1369 g_thread_pool_push (priv->update_thread, task, NULL);
1370 }
1371
1372 gboolean
tracker_direct_connection_update_batch_finish(TrackerDirectConnection * conn,GAsyncResult * res,GError ** error)1373 tracker_direct_connection_update_batch_finish (TrackerDirectConnection *conn,
1374 GAsyncResult *res,
1375 GError **error)
1376 {
1377 GError *inner_error = NULL;
1378
1379 g_task_propagate_boolean (G_TASK (res), &inner_error);
1380 if (inner_error) {
1381 g_propagate_error (error, _translate_internal_error (inner_error));
1382 return FALSE;
1383 }
1384
1385 return TRUE;
1386 }
1387