1 /* gom-adapter.c
2  *
3  * Copyright (C) 2011 Christian Hergert <chris@dronelabs.com>
4  *
5  * This file is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This file is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
17  */
18 
19 #include <sqlite3.h>
20 
21 #include "gom-adapter.h"
22 #include "gom-command.h"
23 #include "gom-error.h"
24 
25 G_DEFINE_TYPE(GomAdapter, gom_adapter, G_TYPE_OBJECT)
26 
27 struct _GomAdapterPrivate
28 {
29    sqlite3 *db;
30    GThread *thread;
31    GAsyncQueue *queue;
32 };
33 
34 typedef enum {
35    ASYNC_CMD_TYPE_OPEN,
36    ASYNC_CMD_TYPE_READ,
37    ASYNC_CMD_TYPE_WRITE,
38    ASYNC_CMD_TYPE_CLOSE
39 } GomAsyncCmdType;
40 
41 typedef struct {
42   GomAdapter *adapter;
43   GomAsyncCmdType type;
44   GomAdapterCallback callback;
45   gpointer callback_data;
46 } GomAsyncCmd;
47 
48 static GomAsyncCmd *
_async_cmd_new(GomAdapter * adapter,GomAsyncCmdType type,GomAdapterCallback callback,gpointer callback_data)49 _async_cmd_new(GomAdapter         *adapter,
50                GomAsyncCmdType     type,
51                GomAdapterCallback  callback,
52                gpointer            callback_data)
53 {
54    GomAsyncCmd *cmd;
55    cmd = g_new0(GomAsyncCmd, 1);
56    cmd->adapter = g_object_ref(adapter);
57    cmd->type = type;
58    cmd->callback = callback;
59    cmd->callback_data = callback_data;
60    return cmd;
61 }
62 
63 GomAdapter *
gom_adapter_new(void)64 gom_adapter_new (void)
65 {
66    return g_object_new(GOM_TYPE_ADAPTER, NULL);
67 }
68 
69 /**
70  * gom_adapter_get_handle:
71  * @adapter: (in): A #GomAdapter.
72  *
73  * Fetches the sqlite3 structure used by the adapter.
74  *
75  * Returns: (transfer none): A handle to the #sqlite3 structure.
76  * Side effects: None.
77  */
78 gpointer
gom_adapter_get_handle(GomAdapter * adapter)79 gom_adapter_get_handle (GomAdapter *adapter)
80 {
81    g_return_val_if_fail(GOM_IS_ADAPTER(adapter), NULL);
82    g_return_val_if_fail(adapter->priv->thread != NULL, NULL);
83    g_assert (g_thread_self () == adapter->priv->thread);
84    return adapter->priv->db;
85 }
86 
87 static gpointer
gom_adapter_worker(gpointer data)88 gom_adapter_worker (gpointer data)
89 {
90    GomAsyncCmd *cmd;
91    GAsyncQueue *queue = data;
92 
93    /*
94     * First item is open request.
95     */
96    cmd = g_async_queue_pop(queue);
97    g_assert (cmd->type == ASYNC_CMD_TYPE_OPEN);
98    cmd->callback(cmd->adapter, cmd->callback_data);
99    g_object_unref(cmd->adapter);
100    g_free(cmd);
101 
102    /*
103     * Handle additional requests.
104     */
105    while ((cmd = g_async_queue_pop(queue))) {
106       /*
107        * XXX: Right now, we synchronize all requests. I hope to make this
108        *      more performant when necessary.
109        */
110       cmd->callback(cmd->adapter, cmd->callback_data);
111       if (cmd->type == ASYNC_CMD_TYPE_CLOSE) {
112          g_object_unref(cmd->adapter);
113          g_free(cmd);
114          break;
115       }
116 
117       g_object_unref(cmd->adapter);
118       g_free(cmd);
119    }
120 
121    return NULL;
122 }
123 
124 /**
125  * gom_adapter_queue_write:
126  * @adapter: (in): A #GomAdapter.
127  * @callback: (in) (scope async): A callback to execute write queries on SQLite.
128  * @user_data: (in): User data for @callback.
129  *
130  * Queues a callback to be executed within the SQLite thwrite. The callback can
131  * perform reads and writes.
132  */
133 void
gom_adapter_queue_write(GomAdapter * adapter,GomAdapterCallback callback,gpointer user_data)134 gom_adapter_queue_write (GomAdapter         *adapter,
135                          GomAdapterCallback  callback,
136                          gpointer            user_data)
137 {
138    GomAdapterPrivate *priv;
139    GomAsyncCmd *cmd;
140 
141    g_return_if_fail(GOM_IS_ADAPTER(adapter));
142    g_return_if_fail(callback != NULL);
143    g_return_if_fail(adapter->priv->queue != NULL);
144 
145    priv = adapter->priv;
146 
147    cmd = _async_cmd_new(adapter, ASYNC_CMD_TYPE_WRITE, callback, user_data);
148 
149    g_async_queue_push(priv->queue, cmd);
150 }
151 
152 /**
153  * gom_adapter_queue_read:
154  * @adapter: (in): A #GomAdapter.
155  * @callback: (in) (scope async): A callback to execute read queries on SQLite.
156  * @user_data: (in): User data for @callback.
157  *
158  * Queues a callback to be executed within the SQLite thread. The callback is
159  * expected to perform reads only.
160  */
161 void
gom_adapter_queue_read(GomAdapter * adapter,GomAdapterCallback callback,gpointer user_data)162 gom_adapter_queue_read (GomAdapter         *adapter,
163                         GomAdapterCallback  callback,
164                         gpointer            user_data)
165 {
166    GomAdapterPrivate *priv;
167    GomAsyncCmd *cmd;
168 
169    g_return_if_fail(GOM_IS_ADAPTER(adapter));
170    g_return_if_fail(callback != NULL);
171    g_return_if_fail(adapter->priv->queue != NULL);
172 
173    priv = adapter->priv;
174 
175    cmd = _async_cmd_new(adapter, ASYNC_CMD_TYPE_READ, callback, user_data);
176 
177    g_async_queue_push(priv->queue, cmd);
178 }
179 
180 static void
open_callback(GomAdapter * adapter,gpointer user_data)181 open_callback (GomAdapter *adapter,
182                gpointer    user_data)
183 {
184    GSimpleAsyncResult *simple = user_data;
185    GAsyncQueue *queue;
186    const char *uri;
187    gint flags;
188    gint ret;
189 
190    queue = g_object_get_data(G_OBJECT(simple), "queue");
191    uri = g_object_get_data(G_OBJECT(simple), "uri");
192    flags = SQLITE_OPEN_CREATE | SQLITE_OPEN_READWRITE | SQLITE_OPEN_URI | SQLITE_OPEN_NOMUTEX;
193    ret = sqlite3_open_v2(uri, &adapter->priv->db, flags, NULL);
194    if (ret != SQLITE_OK) {
195       g_simple_async_result_set_error(simple, GOM_ERROR,
196                                       GOM_ERROR_ADAPTER_OPEN,
197                                       "Failed to open database at %s", uri);
198    }
199    g_simple_async_result_set_op_res_gboolean(simple, ret == SQLITE_OK);
200    if (!queue)
201       g_simple_async_result_complete_in_idle(simple);
202    else
203       g_async_queue_push(queue, GINT_TO_POINTER(TRUE));
204 }
205 
206 gboolean
gom_adapter_open_sync(GomAdapter * adapter,const gchar * uri,GError ** error)207 gom_adapter_open_sync (GomAdapter           *adapter,
208                        const gchar          *uri,
209                        GError              **error)
210 {
211    GomAdapterPrivate *priv;
212    GSimpleAsyncResult *simple;
213    GomAsyncCmd *cmd;
214    GAsyncQueue *queue;
215    gboolean ret;
216 
217    g_return_val_if_fail(GOM_IS_ADAPTER(adapter), FALSE);
218    g_return_val_if_fail(uri != NULL, FALSE);
219 
220    priv = adapter->priv;
221 
222    if (priv->thread) {
223       g_warning("%s may only be called once per adapter.",
224                 G_STRFUNC);
225       return FALSE;
226    }
227 
228    priv->queue = g_async_queue_new();
229 
230 #if GLIB_CHECK_VERSION(2, 32, 0)
231    priv->thread = g_thread_new("gom-adapter-worker",
232                                gom_adapter_worker,
233                                priv->queue);
234 #else
235    priv->thread = g_thread_create(gom_adapter_worker, priv->queue,
236                                   TRUE, NULL);
237 #endif
238 
239    queue = g_async_queue_new();
240 
241    simple = g_simple_async_result_new(G_OBJECT(adapter), NULL, NULL,
242                                       gom_adapter_open_sync);
243    g_object_set_data_full(G_OBJECT(simple), "uri",
244                           g_strdup(uri), g_free);
245    g_object_set_data (G_OBJECT(simple), "queue", queue);
246 
247    cmd = _async_cmd_new(adapter, ASYNC_CMD_TYPE_OPEN, open_callback, simple);
248 
249    g_async_queue_push(priv->queue, cmd);
250    g_async_queue_pop(queue);
251    g_async_queue_unref(queue);
252 
253    if (!(ret = g_simple_async_result_get_op_res_gboolean(simple))) {
254       g_simple_async_result_propagate_error(simple, error);
255    }
256    g_object_unref(simple);
257 
258    return ret;
259 }
260 
261 /**
262  * gom_adapter_open_async:
263  * @adapter: a #GomAdapter
264  * @uri: a URI understood by SQLite
265  * @callback: the function to call when the operation finished, or %NULL
266  * @user_data: the user data to pass to the callback function
267  *
268  * Opens the database pointed to by @uri. @uri can be in any format understood
269  * by SQLite. See <ulink type="http" url="http://www.sqlite.org/c3ref/open.html">http://www.sqlite.org/c3ref/open.html</ulink>
270  * for details.
271  */
272 void
gom_adapter_open_async(GomAdapter * adapter,const gchar * uri,GAsyncReadyCallback callback,gpointer user_data)273 gom_adapter_open_async (GomAdapter          *adapter,
274                         const gchar         *uri,
275                         GAsyncReadyCallback  callback,
276                         gpointer             user_data)
277 {
278    GomAdapterPrivate *priv;
279    GSimpleAsyncResult *simple;
280    GomAsyncCmd *cmd;
281 
282    g_return_if_fail(GOM_IS_ADAPTER(adapter));
283    g_return_if_fail(uri != NULL);
284    g_return_if_fail(callback != NULL);
285 
286    priv = adapter->priv;
287 
288    if (priv->thread) {
289       g_warning("%s may only be called once per adapter.",
290                 G_STRFUNC);
291       return;
292    }
293 
294    priv->queue = g_async_queue_new();
295 
296 #if GLIB_CHECK_VERSION(2, 32, 0)
297    priv->thread = g_thread_new("gom-adapter-worker",
298                                gom_adapter_worker,
299                                priv->queue);
300 #else
301    priv->thread = g_thread_create(gom_adapter_worker, priv->queue,
302                                   TRUE, NULL);
303 #endif
304 
305    simple = g_simple_async_result_new(G_OBJECT(adapter), callback, user_data,
306                                       gom_adapter_open_async);
307    g_object_set_data_full(G_OBJECT(simple), "uri",
308                           g_strdup(uri), g_free);
309 
310    cmd = _async_cmd_new(adapter, ASYNC_CMD_TYPE_OPEN, open_callback, simple);
311 
312    g_async_queue_push(priv->queue, cmd);
313 }
314 
315 gboolean
gom_adapter_open_finish(GomAdapter * adapter,GAsyncResult * result,GError ** error)316 gom_adapter_open_finish (GomAdapter    *adapter,
317                          GAsyncResult  *result,
318                          GError       **error)
319 {
320    GSimpleAsyncResult *simple = (GSimpleAsyncResult *)result;
321    gboolean ret;
322 
323    g_return_val_if_fail(GOM_IS_ADAPTER(adapter), FALSE);
324    g_return_val_if_fail(G_IS_SIMPLE_ASYNC_RESULT(simple), FALSE);
325 
326    if (!(ret = g_simple_async_result_get_op_res_gboolean(simple))) {
327       g_simple_async_result_propagate_error(simple, error);
328    }
329    g_object_unref(simple);
330 
331    return ret;
332 }
333 
334 static void
close_callback(GomAdapter * adapter,gpointer user_data)335 close_callback (GomAdapter *adapter,
336                 gpointer    user_data)
337 {
338    GSimpleAsyncResult *simple = user_data;
339    GAsyncQueue *queue;
340 
341    queue = g_object_get_data(user_data, "queue");
342 
343    sqlite3_close(adapter->priv->db);
344    adapter->priv->db = NULL;
345 
346    g_simple_async_result_set_op_res_gboolean(simple, TRUE);
347    if (!queue)
348       g_simple_async_result_complete_in_idle(simple);
349    else
350       g_async_queue_push(queue, GINT_TO_POINTER(TRUE));
351 }
352 
353 gboolean
gom_adapter_close_sync(GomAdapter * adapter,GError ** error)354 gom_adapter_close_sync (GomAdapter    *adapter,
355                         GError       **error)
356 {
357    GomAsyncCmd *cmd;
358    GomAdapterPrivate *priv;
359    GSimpleAsyncResult *simple;
360    GAsyncQueue *queue;
361    gboolean ret;
362 
363    g_return_val_if_fail(GOM_IS_ADAPTER(adapter), FALSE);
364 
365    priv = adapter->priv;
366 
367    if (!priv->db) {
368       return TRUE;
369    }
370 
371    queue = g_async_queue_new();
372 
373    simple = g_simple_async_result_new(G_OBJECT(adapter), NULL, NULL,
374                                       gom_adapter_close_sync);
375    g_object_set_data(G_OBJECT(simple), "queue", queue);
376 
377    cmd = _async_cmd_new(adapter, ASYNC_CMD_TYPE_CLOSE, close_callback, simple);
378 
379    g_async_queue_push(priv->queue, cmd);
380    g_async_queue_pop(queue);
381    g_async_queue_unref(queue);
382 
383    if (!(ret = g_simple_async_result_get_op_res_gboolean(simple))) {
384       g_simple_async_result_propagate_error(simple, error);
385    }
386    g_object_unref(simple);
387 
388    return ret;
389 }
390 
391 void
gom_adapter_close_async(GomAdapter * adapter,GAsyncReadyCallback callback,gpointer user_data)392 gom_adapter_close_async (GomAdapter          *adapter,
393                          GAsyncReadyCallback  callback,
394                          gpointer             user_data)
395 {
396    GomAdapterPrivate *priv;
397    GSimpleAsyncResult *simple;
398    GomAsyncCmd *cmd;
399 
400    g_return_if_fail(GOM_IS_ADAPTER(adapter));
401    g_return_if_fail(callback != NULL);
402 
403    priv = adapter->priv;
404 
405    simple = g_simple_async_result_new(G_OBJECT(adapter), callback, user_data,
406                                       gom_adapter_close_async);
407 
408    if (!priv->db) {
409       g_simple_async_result_set_op_res_gboolean(simple, TRUE);
410       g_simple_async_result_complete_in_idle(simple);
411       g_object_unref(simple);
412       return;
413    }
414 
415    cmd = _async_cmd_new(adapter, ASYNC_CMD_TYPE_CLOSE, close_callback, simple);
416 
417    g_async_queue_push(priv->queue, cmd);
418 }
419 
420 gboolean
gom_adapter_close_finish(GomAdapter * adapter,GAsyncResult * result,GError ** error)421 gom_adapter_close_finish (GomAdapter    *adapter,
422                           GAsyncResult  *result,
423                           GError       **error)
424 {
425    GSimpleAsyncResult *simple = (GSimpleAsyncResult *)result;
426    gboolean ret;
427 
428    g_return_val_if_fail(GOM_IS_ADAPTER(adapter), FALSE);
429    g_return_val_if_fail(G_IS_SIMPLE_ASYNC_RESULT(simple), FALSE);
430 
431    if (!(ret = g_simple_async_result_get_op_res_gboolean(simple))) {
432       g_simple_async_result_propagate_error(simple, error);
433    }
434    g_object_unref(simple);
435 
436    return ret;
437 }
438 
439 /**
440  * gom_adapter_execute_sql:
441  * @adapter: A #GomAdapter.
442  * @sql: SQL to execute.
443  * @error: a #GError
444  *
445  * This is a helper function to make simple execution of SQL easier.
446  * It is primarily meant for things like "BEGIN;" and "COMMIT;".
447  *
448  * This MUST be called from within a write transaction using
449  * gom_adapter_queue_write().
450  *
451  * Returns: %TRUE if successful;
452  */
453 gboolean
gom_adapter_execute_sql(GomAdapter * adapter,const gchar * sql,GError ** error)454 gom_adapter_execute_sql (GomAdapter   *adapter,
455                          const gchar  *sql,
456                          GError      **error)
457 {
458    GomCommand *command;
459    gboolean ret;
460 
461    g_return_val_if_fail(GOM_IS_ADAPTER(adapter), FALSE);
462    g_return_val_if_fail(sql, FALSE);
463    g_assert (g_thread_self () == adapter->priv->thread);
464 
465    command = g_object_new(GOM_TYPE_COMMAND,
466                           "adapter", adapter,
467                           "sql", sql,
468                           NULL);
469    ret = gom_command_execute(command, NULL, error);
470    g_object_unref(command);
471 
472    return ret;
473 }
474 
475 /**
476  * gom_adapter_finalize:
477  * @object: (in): A #GomAdapter.
478  *
479  * Finalizer for a #GomAdapter instance.  Frees any resources held by
480  * the instance.
481  */
482 static void
gom_adapter_finalize(GObject * object)483 gom_adapter_finalize (GObject *object)
484 {
485    GomAdapterPrivate *priv = GOM_ADAPTER(object)->priv;
486 
487    if (priv->db)
488       g_warning("Adapter not closed, leaking!");
489 
490    g_clear_pointer(&priv->queue, g_async_queue_unref);
491    g_clear_pointer(&priv->thread, g_thread_unref);
492 
493    G_OBJECT_CLASS(gom_adapter_parent_class)->finalize(object);
494 }
495 
496 /**
497  * gom_adapter_class_init:
498  * @klass: (in): A #GomAdapterClass.
499  *
500  * Initializes the #GomAdapterClass and prepares the vtable.
501  */
502 static void
gom_adapter_class_init(GomAdapterClass * klass)503 gom_adapter_class_init (GomAdapterClass *klass)
504 {
505    GObjectClass *object_class;
506 
507    object_class = G_OBJECT_CLASS(klass);
508    object_class->finalize = gom_adapter_finalize;
509    g_type_class_add_private(object_class, sizeof(GomAdapterPrivate));
510 }
511 
512 /**
513  * gom_adapter_init:
514  * @adapter: (in): A #GomAdapter.
515  *
516  * Initializes the newly created #GomAdapter instance.
517  */
518 static void
gom_adapter_init(GomAdapter * adapter)519 gom_adapter_init (GomAdapter *adapter)
520 {
521    adapter->priv =
522       G_TYPE_INSTANCE_GET_PRIVATE(adapter,
523                                   GOM_TYPE_ADAPTER,
524                                   GomAdapterPrivate);
525 }
526