1 /*
2 * Copyright (C) 2009 - 2011 Vivien Malerba <malerba@gnome-db.org>
3 * Copyright (C) 2010 David King <davidk@openismus.com>
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
14 *
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
19 */
20
21 #include <stdarg.h>
22 #include <string.h>
23 #include <glib/gi18n-lib.h>
24 #include "gda-thread-wrapper.h"
25 #include "gda-thread-recordset.h"
26 #include "gda-thread-provider.h"
27 #include <libgda/providers-support/gda-data-select-priv.h>
28 #include "gda-thread-blob-op.h"
29
30 static void gda_thread_recordset_class_init (GdaThreadRecordsetClass *klass);
31 static void gda_thread_recordset_init (GdaThreadRecordset *recset,
32 GdaThreadRecordsetClass *klass);
33 static void gda_thread_recordset_dispose (GObject *object);
34
35 /* virtual methods */
36 static gint gda_thread_recordset_fetch_nb_rows (GdaDataSelect *model);
37 static gboolean gda_thread_recordset_fetch_random (GdaDataSelect *model, GdaRow **prow, gint rownum, GError **error);
38
39 static gboolean gda_thread_recordset_fetch_prev (GdaDataSelect *model, GdaRow **prow, gint rownum, GError **error);
40 static gboolean gda_thread_recordset_fetch_next (GdaDataSelect *model, GdaRow **prow, gint rownum, GError **error);
41 static gboolean gda_thread_recordset_fetch_at (GdaDataSelect *model, GdaRow **prow, gint rownum, GError **error);
42
43 struct _GdaThreadRecordsetPrivate {
44 GdaDataModel *sub_model;
45 GdaThreadWrapper *wrapper;
46 gint nblobs;
47 gint *blobs_conv;
48 };
49 static GObjectClass *parent_class = NULL;
50
51 #define DATA_SELECT_CLASS(model) (GDA_DATA_SELECT_CLASS (G_OBJECT_GET_CLASS (model)))
52 #define COPY_PUBLIC_DATA(from,to) \
53 (((GdaDataSelect *) (to))->nb_stored_rows = ((GdaDataSelect *) (from))->nb_stored_rows, \
54 ((GdaDataSelect *) (to))->prep_stmt = ((GdaDataSelect *) (from))->prep_stmt, \
55 ((GdaDataSelect *) (to))->advertized_nrows = ((GdaDataSelect *) (from))->advertized_nrows)
56
57 /*
58 * Object init and finalize
59 */
60 static void
gda_thread_recordset_init(GdaThreadRecordset * recset,G_GNUC_UNUSED GdaThreadRecordsetClass * klass)61 gda_thread_recordset_init (GdaThreadRecordset *recset,
62 G_GNUC_UNUSED GdaThreadRecordsetClass *klass)
63 {
64 g_return_if_fail (GDA_IS_THREAD_RECORDSET (recset));
65 recset->priv = g_new0 (GdaThreadRecordsetPrivate, 1);
66 recset->priv->sub_model = NULL;
67 recset->priv->wrapper = NULL;
68 recset->priv->blobs_conv = NULL;
69 }
70
71 static void
gda_thread_recordset_class_init(GdaThreadRecordsetClass * klass)72 gda_thread_recordset_class_init (GdaThreadRecordsetClass *klass)
73 {
74 GObjectClass *object_class = G_OBJECT_CLASS (klass);
75 GdaDataSelectClass *pmodel_class = GDA_DATA_SELECT_CLASS (klass);
76
77 parent_class = g_type_class_peek_parent (klass);
78
79 object_class->dispose = gda_thread_recordset_dispose;
80 pmodel_class->fetch_nb_rows = gda_thread_recordset_fetch_nb_rows;
81 pmodel_class->fetch_random = gda_thread_recordset_fetch_random;
82 pmodel_class->store_all = NULL;
83
84 pmodel_class->fetch_next = gda_thread_recordset_fetch_next;
85 pmodel_class->fetch_prev = gda_thread_recordset_fetch_prev;
86 pmodel_class->fetch_at = gda_thread_recordset_fetch_at;
87 }
88
89 static void
gda_thread_recordset_dispose(GObject * object)90 gda_thread_recordset_dispose (GObject *object)
91 {
92 GdaThreadRecordset *recset = (GdaThreadRecordset *) object;
93
94 g_return_if_fail (GDA_IS_THREAD_RECORDSET (recset));
95
96 if (recset->priv) {
97 if (recset->priv->sub_model) {
98 /* unref recset->priv->sub_model in sub thread */
99 gda_thread_wrapper_execute_void (recset->priv->wrapper,
100 (GdaThreadWrapperVoidFunc) g_object_unref,
101 recset->priv->sub_model, NULL, NULL);
102 recset->priv->sub_model = NULL;
103 g_object_unref (recset->priv->wrapper);
104 recset->priv->wrapper = NULL;
105
106 if (recset->priv->blobs_conv) {
107 g_free (recset->priv->blobs_conv);
108 recset->priv->blobs_conv = NULL;
109 }
110 }
111 g_free (recset->priv);
112 recset->priv = NULL;
113
114 ((GdaDataSelect*) recset)->prep_stmt = NULL; /* don't unref since we don't hold a reference to it */
115 }
116
117 parent_class->dispose (object);
118 }
119
120 /*
121 * Public functions
122 */
123
124 GType
_gda_thread_recordset_get_type(void)125 _gda_thread_recordset_get_type (void)
126 {
127 static GType type = 0;
128
129 if (G_UNLIKELY (type == 0)) {
130 static GMutex registering;
131 static const GTypeInfo info = {
132 sizeof (GdaThreadRecordsetClass),
133 (GBaseInitFunc) NULL,
134 (GBaseFinalizeFunc) NULL,
135 (GClassInitFunc) gda_thread_recordset_class_init,
136 NULL,
137 NULL,
138 sizeof (GdaThreadRecordset),
139 0,
140 (GInstanceInitFunc) gda_thread_recordset_init,
141 0
142 };
143 g_mutex_lock (®istering);
144 if (type == 0)
145 type = g_type_register_static (GDA_TYPE_DATA_SELECT, "GdaThreadRecordset", &info, 0);
146 g_mutex_unlock (®istering);
147 }
148
149 return type;
150 }
151
152
153 /*
154 * executed in the sub thread (the one which can manipulate @sub_model)
155 */
156 GdaDataModel *
_gda_thread_recordset_new(GdaConnection * cnc,GdaThreadWrapper * wrapper,GdaDataModel * sub_model)157 _gda_thread_recordset_new (GdaConnection *cnc, GdaThreadWrapper *wrapper, GdaDataModel *sub_model)
158 {
159 GdaThreadRecordset *model;
160
161 model = GDA_THREAD_RECORDSET (g_object_new (GDA_TYPE_THREAD_RECORDSET,
162 "connection", cnc, NULL));
163
164 _gda_data_select_share_private_data (GDA_DATA_SELECT (sub_model),
165 GDA_DATA_SELECT (model));
166 model->priv->wrapper = g_object_ref (wrapper);
167 model->priv->sub_model = g_object_ref (sub_model);
168
169 gint ncols, i, nblobs;
170 gint *blobs_conv = NULL;
171 ncols = gda_data_model_get_n_columns (sub_model);
172 nblobs = 0;
173 for (i = 0; i < ncols; i++) {
174 GdaColumn *col;
175 col = gda_data_model_describe_column (sub_model, i);
176 if (gda_column_get_g_type (col) == GDA_TYPE_BLOB) {
177 if (!blobs_conv)
178 blobs_conv = g_new0 (gint, ncols);
179 blobs_conv [nblobs] = i;
180 nblobs++;
181 }
182 }
183 model->priv->blobs_conv = blobs_conv;
184 model->priv->nblobs = nblobs;
185
186 COPY_PUBLIC_DATA (sub_model, model);
187
188 return GDA_DATA_MODEL (model);
189 }
190
191 /*
192 * GdaDataSelect virtual methods
193 */
194
195
196 /*
197 * fetch nb rows
198 */
199 static gpointer
sub_thread_fetch_nb_rows(GdaDataSelect * model,G_GNUC_UNUSED GError ** error)200 sub_thread_fetch_nb_rows (GdaDataSelect *model, G_GNUC_UNUSED GError **error)
201 {
202 /* WARNING: function executed in sub thread! */
203 gint retval;
204 gint *res;
205 retval = DATA_SELECT_CLASS (model)->fetch_nb_rows (model);
206 #ifdef GDA_DEBUG_NO
207 g_print ("/%s() => %d\n", __FUNCTION__, retval);
208 #endif
209 res = g_new (gint, 1);
210 *res = retval;
211 return res;
212 }
213
214 static gint
gda_thread_recordset_fetch_nb_rows(GdaDataSelect * model)215 gda_thread_recordset_fetch_nb_rows (GdaDataSelect *model)
216 {
217 GdaThreadRecordset *rs = (GdaThreadRecordset*) model;
218 gint nb;
219 gint *res;
220 guint jid;
221 jid = gda_thread_wrapper_execute (rs->priv->wrapper,
222 (GdaThreadWrapperFunc) sub_thread_fetch_nb_rows,
223 rs->priv->sub_model, NULL, NULL);
224
225 res = (gint*) gda_thread_wrapper_fetch_result (rs->priv->wrapper, TRUE, jid, NULL);
226 nb = *res;
227 g_free (res);
228 COPY_PUBLIC_DATA (rs->priv->sub_model, rs);
229 return nb;
230 }
231
232 /*
233 * fetch random
234 */
235 typedef struct {
236 GdaDataSelect *select;
237 gint rownum;
238 GdaRow **row;
239 } ThreadData;
240
241 static gpointer
sub_thread_fetch_random(ThreadData * data,GError ** error)242 sub_thread_fetch_random (ThreadData *data, GError **error)
243 {
244 /* WARNING: function executed in sub thread! */
245 gboolean retval;
246 retval = DATA_SELECT_CLASS (data->select)->fetch_random (data->select, data->row, data->rownum, error);
247 #ifdef GDA_DEBUG_NO
248 g_print ("/%s() => %s\n", __FUNCTION__, retval ? "TRUE" : "FALSE");
249 #endif
250 return GINT_TO_POINTER (retval);
251 }
252
253 static void
alter_blob_values(GdaThreadRecordset * rs,GdaRow ** prow)254 alter_blob_values (GdaThreadRecordset *rs, GdaRow **prow)
255 {
256 gint i;
257 for (i = 0; i < rs->priv->nblobs; i++) {
258 GValue *value = gda_row_get_value (*prow, rs->priv->blobs_conv[i]);
259 if (G_VALUE_TYPE (value) == GDA_TYPE_BLOB) {
260 GdaBlob *blob;
261 blob = (GdaBlob*) gda_value_get_blob (value);
262 if (blob->op) {
263 GdaBlobOp *nop;
264 nop = _gda_thread_blob_op_new (rs->priv->wrapper, blob->op);
265 g_object_unref (blob->op);
266 blob->op = nop;
267 }
268 }
269 }
270 }
271
272 static gboolean
gda_thread_recordset_fetch_random(GdaDataSelect * model,GdaRow ** prow,gint rownum,GError ** error)273 gda_thread_recordset_fetch_random (GdaDataSelect *model, GdaRow **prow, gint rownum, GError **error)
274 {
275 GdaThreadRecordset *rs = (GdaThreadRecordset*) model;
276 ThreadData wdata;
277 gboolean retval;
278 guint jid;
279
280 wdata.select = (GdaDataSelect *) rs->priv->sub_model;
281 wdata.rownum = rownum;
282 wdata.row = prow;
283 jid = gda_thread_wrapper_execute (rs->priv->wrapper,
284 (GdaThreadWrapperFunc) sub_thread_fetch_random,
285 &wdata, NULL, NULL);
286
287 retval = GPOINTER_TO_INT (gda_thread_wrapper_fetch_result (rs->priv->wrapper, TRUE, jid, error)) ?
288 TRUE : FALSE;
289 COPY_PUBLIC_DATA (rs->priv->sub_model, rs);
290
291 if (*prow && rs->priv->blobs_conv)
292 alter_blob_values (rs, prow);
293
294 return retval;
295 }
296
297 /*
298 * fetch next
299 */
300 static gpointer
sub_thread_fetch_next(ThreadData * data,GError ** error)301 sub_thread_fetch_next (ThreadData *data, GError **error)
302 {
303 /* WARNING: function executed in sub thread! */
304 gboolean retval;
305 retval = DATA_SELECT_CLASS (data->select)->fetch_next (data->select, data->row, data->rownum, error);
306 #ifdef GDA_DEBUG_NO
307 g_print ("/%s() => %s\n", __FUNCTION__, retval ? "TRUE" : "FALSE");
308 #endif
309 return GINT_TO_POINTER (retval);
310 }
311
312 static gboolean
gda_thread_recordset_fetch_next(GdaDataSelect * model,GdaRow ** prow,gint rownum,GError ** error)313 gda_thread_recordset_fetch_next (GdaDataSelect *model, GdaRow **prow, gint rownum, GError **error)
314 {
315 GdaThreadRecordset *rs = (GdaThreadRecordset*) model;
316 ThreadData wdata;
317 gboolean retval;
318 guint jid;
319
320 wdata.select = (GdaDataSelect *) rs->priv->sub_model;
321 wdata.rownum = rownum;
322 wdata.row = prow;
323 jid = gda_thread_wrapper_execute (rs->priv->wrapper,
324 (GdaThreadWrapperFunc) sub_thread_fetch_next,
325 &wdata, NULL, NULL);
326 retval = GPOINTER_TO_INT (gda_thread_wrapper_fetch_result (rs->priv->wrapper, TRUE, jid, error)) ?
327 TRUE : FALSE;
328 COPY_PUBLIC_DATA (rs->priv->sub_model, rs);
329
330 if (*prow && rs->priv->blobs_conv)
331 alter_blob_values (rs, prow);
332
333 return retval;
334 }
335
336 /*
337 * fetch prev
338 */
339 static gpointer
sub_thread_fetch_prev(ThreadData * data,GError ** error)340 sub_thread_fetch_prev (ThreadData *data, GError **error)
341 {
342 /* WARNING: function executed in sub thread! */
343 gboolean retval;
344 retval = DATA_SELECT_CLASS (data->select)->fetch_prev (data->select, data->row, data->rownum, error);
345 #ifdef GDA_DEBUG_NO
346 g_print ("/%s() => %s\n", __FUNCTION__, retval ? "TRUE" : "FALSE");
347 #endif
348 return GINT_TO_POINTER (retval);
349 }
350
351 static gboolean
gda_thread_recordset_fetch_prev(GdaDataSelect * model,GdaRow ** prow,gint rownum,GError ** error)352 gda_thread_recordset_fetch_prev (GdaDataSelect *model, GdaRow **prow, gint rownum, GError **error)
353 {
354 GdaThreadRecordset *rs = (GdaThreadRecordset*) model;
355 ThreadData wdata;
356 gboolean retval;
357 guint jid;
358
359 wdata.select = (GdaDataSelect *) rs->priv->sub_model;
360 wdata.rownum = rownum;
361 wdata.row = prow;
362 jid = gda_thread_wrapper_execute (rs->priv->wrapper,
363 (GdaThreadWrapperFunc) sub_thread_fetch_prev,
364 &wdata, NULL, NULL);
365 retval = GPOINTER_TO_INT (gda_thread_wrapper_fetch_result (rs->priv->wrapper, TRUE, jid, error)) ?
366 TRUE : FALSE;
367 COPY_PUBLIC_DATA (rs->priv->sub_model, rs);
368
369 if (*prow && rs->priv->blobs_conv)
370 alter_blob_values (rs, prow);
371
372 return retval;
373 }
374
375 /*
376 * fetch at
377 */
378 static gpointer
sub_thread_fetch_at(ThreadData * data,GError ** error)379 sub_thread_fetch_at (ThreadData *data, GError **error)
380 {
381 /* WARNING: function executed in sub thread! */
382 gboolean retval;
383 retval = DATA_SELECT_CLASS (data->select)->fetch_at (data->select, data->row, data->rownum, error);
384 #ifdef GDA_DEBUG_NO
385 g_print ("/%s() => %s\n", __FUNCTION__, retval ? "TRUE" : "FALSE");
386 #endif
387 return GINT_TO_POINTER (retval);
388 }
389
390 static gboolean
gda_thread_recordset_fetch_at(GdaDataSelect * model,GdaRow ** prow,gint rownum,GError ** error)391 gda_thread_recordset_fetch_at (GdaDataSelect *model, GdaRow **prow, gint rownum, GError **error)
392 {
393 GdaThreadRecordset *rs = (GdaThreadRecordset*) model;
394 ThreadData wdata;
395 gboolean retval;
396 guint jid;
397
398 wdata.select = (GdaDataSelect *) rs->priv->sub_model;
399 wdata.rownum = rownum;
400 wdata.row = prow;
401 jid = gda_thread_wrapper_execute (rs->priv->wrapper,
402 (GdaThreadWrapperFunc) sub_thread_fetch_at,
403 &wdata, NULL, NULL);
404 retval = GPOINTER_TO_INT (gda_thread_wrapper_fetch_result (rs->priv->wrapper, TRUE, jid, error)) ?
405 TRUE : FALSE;
406 COPY_PUBLIC_DATA (rs->priv->sub_model, rs);
407
408 if (*prow && rs->priv->blobs_conv)
409 alter_blob_values (rs, prow);
410
411 return retval;
412 }
413