1 /* gom-adapter.c
2 *
3 * Copyright (C) 2011 Christian Hergert <chris@dronelabs.com>
4 *
5 * This file is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) any later version.
9 *
10 * This file is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 #include <sqlite3.h>
20
21 #include "gom-adapter.h"
22 #include "gom-command.h"
23 #include "gom-error.h"
24
25 G_DEFINE_TYPE(GomAdapter, gom_adapter, G_TYPE_OBJECT)
26
27 struct _GomAdapterPrivate
28 {
29 sqlite3 *db;
30 GThread *thread;
31 GAsyncQueue *queue;
32 };
33
34 typedef enum {
35 ASYNC_CMD_TYPE_OPEN,
36 ASYNC_CMD_TYPE_READ,
37 ASYNC_CMD_TYPE_WRITE,
38 ASYNC_CMD_TYPE_CLOSE
39 } GomAsyncCmdType;
40
41 typedef struct {
42 GomAdapter *adapter;
43 GomAsyncCmdType type;
44 GomAdapterCallback callback;
45 gpointer callback_data;
46 } GomAsyncCmd;
47
48 static GomAsyncCmd *
_async_cmd_new(GomAdapter * adapter,GomAsyncCmdType type,GomAdapterCallback callback,gpointer callback_data)49 _async_cmd_new(GomAdapter *adapter,
50 GomAsyncCmdType type,
51 GomAdapterCallback callback,
52 gpointer callback_data)
53 {
54 GomAsyncCmd *cmd;
55 cmd = g_new0(GomAsyncCmd, 1);
56 cmd->adapter = g_object_ref(adapter);
57 cmd->type = type;
58 cmd->callback = callback;
59 cmd->callback_data = callback_data;
60 return cmd;
61 }
62
63 GomAdapter *
gom_adapter_new(void)64 gom_adapter_new (void)
65 {
66 return g_object_new(GOM_TYPE_ADAPTER, NULL);
67 }
68
69 /**
70 * gom_adapter_get_handle:
71 * @adapter: (in): A #GomAdapter.
72 *
73 * Fetches the sqlite3 structure used by the adapter.
74 *
75 * Returns: (transfer none): A handle to the #sqlite3 structure.
76 * Side effects: None.
77 */
78 gpointer
gom_adapter_get_handle(GomAdapter * adapter)79 gom_adapter_get_handle (GomAdapter *adapter)
80 {
81 g_return_val_if_fail(GOM_IS_ADAPTER(adapter), NULL);
82 g_return_val_if_fail(adapter->priv->thread != NULL, NULL);
83 g_assert (g_thread_self () == adapter->priv->thread);
84 return adapter->priv->db;
85 }
86
87 static gpointer
gom_adapter_worker(gpointer data)88 gom_adapter_worker (gpointer data)
89 {
90 GomAsyncCmd *cmd;
91 GAsyncQueue *queue = data;
92
93 /*
94 * First item is open request.
95 */
96 cmd = g_async_queue_pop(queue);
97 g_assert (cmd->type == ASYNC_CMD_TYPE_OPEN);
98 cmd->callback(cmd->adapter, cmd->callback_data);
99 g_object_unref(cmd->adapter);
100 g_free(cmd);
101
102 /*
103 * Handle additional requests.
104 */
105 while ((cmd = g_async_queue_pop(queue))) {
106 /*
107 * XXX: Right now, we synchronize all requests. I hope to make this
108 * more performant when necessary.
109 */
110 cmd->callback(cmd->adapter, cmd->callback_data);
111 if (cmd->type == ASYNC_CMD_TYPE_CLOSE) {
112 g_object_unref(cmd->adapter);
113 g_free(cmd);
114 break;
115 }
116
117 g_object_unref(cmd->adapter);
118 g_free(cmd);
119 }
120
121 return NULL;
122 }
123
124 /**
125 * gom_adapter_queue_write:
126 * @adapter: (in): A #GomAdapter.
127 * @callback: (in) (scope async): A callback to execute write queries on SQLite.
128 * @user_data: (in): User data for @callback.
129 *
130 * Queues a callback to be executed within the SQLite thwrite. The callback can
131 * perform reads and writes.
132 */
133 void
gom_adapter_queue_write(GomAdapter * adapter,GomAdapterCallback callback,gpointer user_data)134 gom_adapter_queue_write (GomAdapter *adapter,
135 GomAdapterCallback callback,
136 gpointer user_data)
137 {
138 GomAdapterPrivate *priv;
139 GomAsyncCmd *cmd;
140
141 g_return_if_fail(GOM_IS_ADAPTER(adapter));
142 g_return_if_fail(callback != NULL);
143 g_return_if_fail(adapter->priv->queue != NULL);
144
145 priv = adapter->priv;
146
147 cmd = _async_cmd_new(adapter, ASYNC_CMD_TYPE_WRITE, callback, user_data);
148
149 g_async_queue_push(priv->queue, cmd);
150 }
151
152 /**
153 * gom_adapter_queue_read:
154 * @adapter: (in): A #GomAdapter.
155 * @callback: (in) (scope async): A callback to execute read queries on SQLite.
156 * @user_data: (in): User data for @callback.
157 *
158 * Queues a callback to be executed within the SQLite thread. The callback is
159 * expected to perform reads only.
160 */
161 void
gom_adapter_queue_read(GomAdapter * adapter,GomAdapterCallback callback,gpointer user_data)162 gom_adapter_queue_read (GomAdapter *adapter,
163 GomAdapterCallback callback,
164 gpointer user_data)
165 {
166 GomAdapterPrivate *priv;
167 GomAsyncCmd *cmd;
168
169 g_return_if_fail(GOM_IS_ADAPTER(adapter));
170 g_return_if_fail(callback != NULL);
171 g_return_if_fail(adapter->priv->queue != NULL);
172
173 priv = adapter->priv;
174
175 cmd = _async_cmd_new(adapter, ASYNC_CMD_TYPE_READ, callback, user_data);
176
177 g_async_queue_push(priv->queue, cmd);
178 }
179
180 static void
open_callback(GomAdapter * adapter,gpointer user_data)181 open_callback (GomAdapter *adapter,
182 gpointer user_data)
183 {
184 GSimpleAsyncResult *simple = user_data;
185 GAsyncQueue *queue;
186 const char *uri;
187 gint flags;
188 gint ret;
189
190 queue = g_object_get_data(G_OBJECT(simple), "queue");
191 uri = g_object_get_data(G_OBJECT(simple), "uri");
192 flags = SQLITE_OPEN_CREATE | SQLITE_OPEN_READWRITE | SQLITE_OPEN_URI | SQLITE_OPEN_NOMUTEX;
193 ret = sqlite3_open_v2(uri, &adapter->priv->db, flags, NULL);
194 if (ret != SQLITE_OK) {
195 g_simple_async_result_set_error(simple, GOM_ERROR,
196 GOM_ERROR_ADAPTER_OPEN,
197 "Failed to open database at %s", uri);
198 }
199 g_simple_async_result_set_op_res_gboolean(simple, ret == SQLITE_OK);
200 if (!queue)
201 g_simple_async_result_complete_in_idle(simple);
202 else
203 g_async_queue_push(queue, GINT_TO_POINTER(TRUE));
204 }
205
206 gboolean
gom_adapter_open_sync(GomAdapter * adapter,const gchar * uri,GError ** error)207 gom_adapter_open_sync (GomAdapter *adapter,
208 const gchar *uri,
209 GError **error)
210 {
211 GomAdapterPrivate *priv;
212 GSimpleAsyncResult *simple;
213 GomAsyncCmd *cmd;
214 GAsyncQueue *queue;
215 gboolean ret;
216
217 g_return_val_if_fail(GOM_IS_ADAPTER(adapter), FALSE);
218 g_return_val_if_fail(uri != NULL, FALSE);
219
220 priv = adapter->priv;
221
222 if (priv->thread) {
223 g_warning("%s may only be called once per adapter.",
224 G_STRFUNC);
225 return FALSE;
226 }
227
228 priv->queue = g_async_queue_new();
229
230 #if GLIB_CHECK_VERSION(2, 32, 0)
231 priv->thread = g_thread_new("gom-adapter-worker",
232 gom_adapter_worker,
233 priv->queue);
234 #else
235 priv->thread = g_thread_create(gom_adapter_worker, priv->queue,
236 TRUE, NULL);
237 #endif
238
239 queue = g_async_queue_new();
240
241 simple = g_simple_async_result_new(G_OBJECT(adapter), NULL, NULL,
242 gom_adapter_open_sync);
243 g_object_set_data_full(G_OBJECT(simple), "uri",
244 g_strdup(uri), g_free);
245 g_object_set_data (G_OBJECT(simple), "queue", queue);
246
247 cmd = _async_cmd_new(adapter, ASYNC_CMD_TYPE_OPEN, open_callback, simple);
248
249 g_async_queue_push(priv->queue, cmd);
250 g_async_queue_pop(queue);
251 g_async_queue_unref(queue);
252
253 if (!(ret = g_simple_async_result_get_op_res_gboolean(simple))) {
254 g_simple_async_result_propagate_error(simple, error);
255 }
256 g_object_unref(simple);
257
258 return ret;
259 }
260
261 /**
262 * gom_adapter_open_async:
263 * @adapter: a #GomAdapter
264 * @uri: a URI understood by SQLite
265 * @callback: the function to call when the operation finished, or %NULL
266 * @user_data: the user data to pass to the callback function
267 *
268 * Opens the database pointed to by @uri. @uri can be in any format understood
269 * by SQLite. See <ulink type="http" url="http://www.sqlite.org/c3ref/open.html">http://www.sqlite.org/c3ref/open.html</ulink>
270 * for details.
271 */
272 void
gom_adapter_open_async(GomAdapter * adapter,const gchar * uri,GAsyncReadyCallback callback,gpointer user_data)273 gom_adapter_open_async (GomAdapter *adapter,
274 const gchar *uri,
275 GAsyncReadyCallback callback,
276 gpointer user_data)
277 {
278 GomAdapterPrivate *priv;
279 GSimpleAsyncResult *simple;
280 GomAsyncCmd *cmd;
281
282 g_return_if_fail(GOM_IS_ADAPTER(adapter));
283 g_return_if_fail(uri != NULL);
284 g_return_if_fail(callback != NULL);
285
286 priv = adapter->priv;
287
288 if (priv->thread) {
289 g_warning("%s may only be called once per adapter.",
290 G_STRFUNC);
291 return;
292 }
293
294 priv->queue = g_async_queue_new();
295
296 #if GLIB_CHECK_VERSION(2, 32, 0)
297 priv->thread = g_thread_new("gom-adapter-worker",
298 gom_adapter_worker,
299 priv->queue);
300 #else
301 priv->thread = g_thread_create(gom_adapter_worker, priv->queue,
302 TRUE, NULL);
303 #endif
304
305 simple = g_simple_async_result_new(G_OBJECT(adapter), callback, user_data,
306 gom_adapter_open_async);
307 g_object_set_data_full(G_OBJECT(simple), "uri",
308 g_strdup(uri), g_free);
309
310 cmd = _async_cmd_new(adapter, ASYNC_CMD_TYPE_OPEN, open_callback, simple);
311
312 g_async_queue_push(priv->queue, cmd);
313 }
314
315 gboolean
gom_adapter_open_finish(GomAdapter * adapter,GAsyncResult * result,GError ** error)316 gom_adapter_open_finish (GomAdapter *adapter,
317 GAsyncResult *result,
318 GError **error)
319 {
320 GSimpleAsyncResult *simple = (GSimpleAsyncResult *)result;
321 gboolean ret;
322
323 g_return_val_if_fail(GOM_IS_ADAPTER(adapter), FALSE);
324 g_return_val_if_fail(G_IS_SIMPLE_ASYNC_RESULT(simple), FALSE);
325
326 if (!(ret = g_simple_async_result_get_op_res_gboolean(simple))) {
327 g_simple_async_result_propagate_error(simple, error);
328 }
329 g_object_unref(simple);
330
331 return ret;
332 }
333
334 static void
close_callback(GomAdapter * adapter,gpointer user_data)335 close_callback (GomAdapter *adapter,
336 gpointer user_data)
337 {
338 GSimpleAsyncResult *simple = user_data;
339 GAsyncQueue *queue;
340
341 queue = g_object_get_data(user_data, "queue");
342
343 sqlite3_close(adapter->priv->db);
344 adapter->priv->db = NULL;
345
346 g_simple_async_result_set_op_res_gboolean(simple, TRUE);
347 if (!queue)
348 g_simple_async_result_complete_in_idle(simple);
349 else
350 g_async_queue_push(queue, GINT_TO_POINTER(TRUE));
351 }
352
353 gboolean
gom_adapter_close_sync(GomAdapter * adapter,GError ** error)354 gom_adapter_close_sync (GomAdapter *adapter,
355 GError **error)
356 {
357 GomAsyncCmd *cmd;
358 GomAdapterPrivate *priv;
359 GSimpleAsyncResult *simple;
360 GAsyncQueue *queue;
361 gboolean ret;
362
363 g_return_val_if_fail(GOM_IS_ADAPTER(adapter), FALSE);
364
365 priv = adapter->priv;
366
367 if (!priv->db) {
368 return TRUE;
369 }
370
371 queue = g_async_queue_new();
372
373 simple = g_simple_async_result_new(G_OBJECT(adapter), NULL, NULL,
374 gom_adapter_close_sync);
375 g_object_set_data(G_OBJECT(simple), "queue", queue);
376
377 cmd = _async_cmd_new(adapter, ASYNC_CMD_TYPE_CLOSE, close_callback, simple);
378
379 g_async_queue_push(priv->queue, cmd);
380 g_async_queue_pop(queue);
381 g_async_queue_unref(queue);
382
383 if (!(ret = g_simple_async_result_get_op_res_gboolean(simple))) {
384 g_simple_async_result_propagate_error(simple, error);
385 }
386 g_object_unref(simple);
387
388 return ret;
389 }
390
391 void
gom_adapter_close_async(GomAdapter * adapter,GAsyncReadyCallback callback,gpointer user_data)392 gom_adapter_close_async (GomAdapter *adapter,
393 GAsyncReadyCallback callback,
394 gpointer user_data)
395 {
396 GomAdapterPrivate *priv;
397 GSimpleAsyncResult *simple;
398 GomAsyncCmd *cmd;
399
400 g_return_if_fail(GOM_IS_ADAPTER(adapter));
401 g_return_if_fail(callback != NULL);
402
403 priv = adapter->priv;
404
405 simple = g_simple_async_result_new(G_OBJECT(adapter), callback, user_data,
406 gom_adapter_close_async);
407
408 if (!priv->db) {
409 g_simple_async_result_set_op_res_gboolean(simple, TRUE);
410 g_simple_async_result_complete_in_idle(simple);
411 g_object_unref(simple);
412 return;
413 }
414
415 cmd = _async_cmd_new(adapter, ASYNC_CMD_TYPE_CLOSE, close_callback, simple);
416
417 g_async_queue_push(priv->queue, cmd);
418 }
419
420 gboolean
gom_adapter_close_finish(GomAdapter * adapter,GAsyncResult * result,GError ** error)421 gom_adapter_close_finish (GomAdapter *adapter,
422 GAsyncResult *result,
423 GError **error)
424 {
425 GSimpleAsyncResult *simple = (GSimpleAsyncResult *)result;
426 gboolean ret;
427
428 g_return_val_if_fail(GOM_IS_ADAPTER(adapter), FALSE);
429 g_return_val_if_fail(G_IS_SIMPLE_ASYNC_RESULT(simple), FALSE);
430
431 if (!(ret = g_simple_async_result_get_op_res_gboolean(simple))) {
432 g_simple_async_result_propagate_error(simple, error);
433 }
434 g_object_unref(simple);
435
436 return ret;
437 }
438
439 /**
440 * gom_adapter_execute_sql:
441 * @adapter: A #GomAdapter.
442 * @sql: SQL to execute.
443 * @error: a #GError
444 *
445 * This is a helper function to make simple execution of SQL easier.
446 * It is primarily meant for things like "BEGIN;" and "COMMIT;".
447 *
448 * This MUST be called from within a write transaction using
449 * gom_adapter_queue_write().
450 *
451 * Returns: %TRUE if successful;
452 */
453 gboolean
gom_adapter_execute_sql(GomAdapter * adapter,const gchar * sql,GError ** error)454 gom_adapter_execute_sql (GomAdapter *adapter,
455 const gchar *sql,
456 GError **error)
457 {
458 GomCommand *command;
459 gboolean ret;
460
461 g_return_val_if_fail(GOM_IS_ADAPTER(adapter), FALSE);
462 g_return_val_if_fail(sql, FALSE);
463 g_assert (g_thread_self () == adapter->priv->thread);
464
465 command = g_object_new(GOM_TYPE_COMMAND,
466 "adapter", adapter,
467 "sql", sql,
468 NULL);
469 ret = gom_command_execute(command, NULL, error);
470 g_object_unref(command);
471
472 return ret;
473 }
474
475 /**
476 * gom_adapter_finalize:
477 * @object: (in): A #GomAdapter.
478 *
479 * Finalizer for a #GomAdapter instance. Frees any resources held by
480 * the instance.
481 */
482 static void
gom_adapter_finalize(GObject * object)483 gom_adapter_finalize (GObject *object)
484 {
485 GomAdapterPrivate *priv = GOM_ADAPTER(object)->priv;
486
487 if (priv->db)
488 g_warning("Adapter not closed, leaking!");
489
490 g_clear_pointer(&priv->queue, g_async_queue_unref);
491 g_clear_pointer(&priv->thread, g_thread_unref);
492
493 G_OBJECT_CLASS(gom_adapter_parent_class)->finalize(object);
494 }
495
496 /**
497 * gom_adapter_class_init:
498 * @klass: (in): A #GomAdapterClass.
499 *
500 * Initializes the #GomAdapterClass and prepares the vtable.
501 */
502 static void
gom_adapter_class_init(GomAdapterClass * klass)503 gom_adapter_class_init (GomAdapterClass *klass)
504 {
505 GObjectClass *object_class;
506
507 object_class = G_OBJECT_CLASS(klass);
508 object_class->finalize = gom_adapter_finalize;
509 g_type_class_add_private(object_class, sizeof(GomAdapterPrivate));
510 }
511
512 /**
513 * gom_adapter_init:
514 * @adapter: (in): A #GomAdapter.
515 *
516 * Initializes the newly created #GomAdapter instance.
517 */
518 static void
gom_adapter_init(GomAdapter * adapter)519 gom_adapter_init (GomAdapter *adapter)
520 {
521 adapter->priv =
522 G_TYPE_INSTANCE_GET_PRIVATE(adapter,
523 GOM_TYPE_ADAPTER,
524 GomAdapterPrivate);
525 }
526