1 /*
2  * Copyright (C) 2009 - 2011 Vivien Malerba <malerba@gnome-db.org>
3  * Copyright (C) 2010 David King <davidk@openismus.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA  02110-1301, USA.
19  */
20 
21 #include <stdarg.h>
22 #include <string.h>
23 #include <glib/gi18n-lib.h>
24 #include "gda-thread-wrapper.h"
25 #include "gda-thread-recordset.h"
26 #include "gda-thread-provider.h"
27 #include <libgda/providers-support/gda-data-select-priv.h>
28 #include "gda-thread-blob-op.h"
29 
30 static void gda_thread_recordset_class_init (GdaThreadRecordsetClass *klass);
31 static void gda_thread_recordset_init       (GdaThreadRecordset *recset,
32 					     GdaThreadRecordsetClass *klass);
33 static void gda_thread_recordset_dispose   (GObject *object);
34 
35 /* virtual methods */
36 static gint    gda_thread_recordset_fetch_nb_rows (GdaDataSelect *model);
37 static gboolean gda_thread_recordset_fetch_random (GdaDataSelect *model, GdaRow **prow, gint rownum, GError **error);
38 
39 static gboolean gda_thread_recordset_fetch_prev (GdaDataSelect *model, GdaRow **prow, gint rownum, GError **error);
40 static gboolean gda_thread_recordset_fetch_next (GdaDataSelect *model, GdaRow **prow, gint rownum, GError **error);
41 static gboolean gda_thread_recordset_fetch_at (GdaDataSelect *model, GdaRow **prow, gint rownum, GError **error);
42 
43 struct _GdaThreadRecordsetPrivate {
44 	GdaDataModel *sub_model;
45 	GdaThreadWrapper *wrapper;
46 	gint nblobs;
47 	gint *blobs_conv;
48 };
49 static GObjectClass *parent_class = NULL;
50 
51 #define DATA_SELECT_CLASS(model) (GDA_DATA_SELECT_CLASS (G_OBJECT_GET_CLASS (model)))
52 #define COPY_PUBLIC_DATA(from,to) \
53 	(((GdaDataSelect *) (to))->nb_stored_rows = ((GdaDataSelect *) (from))->nb_stored_rows, \
54 	 ((GdaDataSelect *) (to))->prep_stmt = ((GdaDataSelect *) (from))->prep_stmt, \
55 	 ((GdaDataSelect *) (to))->advertized_nrows = ((GdaDataSelect *) (from))->advertized_nrows)
56 
57 /*
58  * Object init and finalize
59  */
60 static void
gda_thread_recordset_init(GdaThreadRecordset * recset,G_GNUC_UNUSED GdaThreadRecordsetClass * klass)61 gda_thread_recordset_init (GdaThreadRecordset *recset,
62 			   G_GNUC_UNUSED GdaThreadRecordsetClass *klass)
63 {
64 	g_return_if_fail (GDA_IS_THREAD_RECORDSET (recset));
65 	recset->priv = g_new0 (GdaThreadRecordsetPrivate, 1);
66 	recset->priv->sub_model = NULL;
67 	recset->priv->wrapper = NULL;
68 	recset->priv->blobs_conv = NULL;
69 }
70 
71 static void
gda_thread_recordset_class_init(GdaThreadRecordsetClass * klass)72 gda_thread_recordset_class_init (GdaThreadRecordsetClass *klass)
73 {
74 	GObjectClass *object_class = G_OBJECT_CLASS (klass);
75 	GdaDataSelectClass *pmodel_class = GDA_DATA_SELECT_CLASS (klass);
76 
77 	parent_class = g_type_class_peek_parent (klass);
78 
79 	object_class->dispose = gda_thread_recordset_dispose;
80 	pmodel_class->fetch_nb_rows = gda_thread_recordset_fetch_nb_rows;
81 	pmodel_class->fetch_random = gda_thread_recordset_fetch_random;
82 	pmodel_class->store_all = NULL;
83 
84 	pmodel_class->fetch_next = gda_thread_recordset_fetch_next;
85 	pmodel_class->fetch_prev = gda_thread_recordset_fetch_prev;
86 	pmodel_class->fetch_at = gda_thread_recordset_fetch_at;
87 }
88 
89 static void
gda_thread_recordset_dispose(GObject * object)90 gda_thread_recordset_dispose (GObject *object)
91 {
92 	GdaThreadRecordset *recset = (GdaThreadRecordset *) object;
93 
94 	g_return_if_fail (GDA_IS_THREAD_RECORDSET (recset));
95 
96 	if (recset->priv) {
97 		if (recset->priv->sub_model) {
98 			/* unref recset->priv->sub_model in sub thread */
99 			gda_thread_wrapper_execute_void (recset->priv->wrapper,
100 							 (GdaThreadWrapperVoidFunc) g_object_unref,
101 							 recset->priv->sub_model, NULL, NULL);
102 			recset->priv->sub_model = NULL;
103 			g_object_unref (recset->priv->wrapper);
104 			recset->priv->wrapper = NULL;
105 
106 			if (recset->priv->blobs_conv) {
107 				g_free (recset->priv->blobs_conv);
108 				recset->priv->blobs_conv = NULL;
109 			}
110 		}
111 		g_free (recset->priv);
112 		recset->priv = NULL;
113 
114 		((GdaDataSelect*) recset)->prep_stmt = NULL; /* don't unref since we don't hold a reference to it */
115 	}
116 
117 	parent_class->dispose (object);
118 }
119 
120 /*
121  * Public functions
122  */
123 
124 GType
_gda_thread_recordset_get_type(void)125 _gda_thread_recordset_get_type (void)
126 {
127 	static GType type = 0;
128 
129 	if (G_UNLIKELY (type == 0)) {
130 		static GMutex registering;
131 		static const GTypeInfo info = {
132 			sizeof (GdaThreadRecordsetClass),
133 			(GBaseInitFunc) NULL,
134 			(GBaseFinalizeFunc) NULL,
135 			(GClassInitFunc) gda_thread_recordset_class_init,
136 			NULL,
137 			NULL,
138 			sizeof (GdaThreadRecordset),
139 			0,
140 			(GInstanceInitFunc) gda_thread_recordset_init,
141 			0
142 		};
143 		g_mutex_lock (&registering);
144 		if (type == 0)
145 			type = g_type_register_static (GDA_TYPE_DATA_SELECT, "GdaThreadRecordset", &info, 0);
146 		g_mutex_unlock (&registering);
147 	}
148 
149 	return type;
150 }
151 
152 
153 /*
154  * executed in the sub thread (the one which can manipulate @sub_model)
155  */
156 GdaDataModel *
_gda_thread_recordset_new(GdaConnection * cnc,GdaThreadWrapper * wrapper,GdaDataModel * sub_model)157 _gda_thread_recordset_new (GdaConnection *cnc, GdaThreadWrapper *wrapper, GdaDataModel *sub_model)
158 {
159 	GdaThreadRecordset *model;
160 
161 	model = GDA_THREAD_RECORDSET (g_object_new (GDA_TYPE_THREAD_RECORDSET,
162 						    "connection", cnc, NULL));
163 
164 	_gda_data_select_share_private_data (GDA_DATA_SELECT (sub_model),
165 					     GDA_DATA_SELECT (model));
166 	model->priv->wrapper = g_object_ref (wrapper);
167 	model->priv->sub_model = g_object_ref (sub_model);
168 
169 	gint ncols, i, nblobs;
170 	gint *blobs_conv = NULL;
171 	ncols = gda_data_model_get_n_columns (sub_model);
172 	nblobs = 0;
173 	for (i = 0; i < ncols; i++) {
174 		GdaColumn *col;
175 		col = gda_data_model_describe_column (sub_model, i);
176 		if (gda_column_get_g_type (col) == GDA_TYPE_BLOB) {
177 			if (!blobs_conv)
178 				blobs_conv = g_new0 (gint, ncols);
179 			blobs_conv [nblobs] = i;
180 			nblobs++;
181 		}
182 	}
183 	model->priv->blobs_conv = blobs_conv;
184 	model->priv->nblobs = nblobs;
185 
186 	COPY_PUBLIC_DATA (sub_model, model);
187 
188         return GDA_DATA_MODEL (model);
189 }
190 
191 /*
192  * GdaDataSelect virtual methods
193  */
194 
195 
196 /*
197  * fetch nb rows
198  */
199 static gpointer
sub_thread_fetch_nb_rows(GdaDataSelect * model,G_GNUC_UNUSED GError ** error)200 sub_thread_fetch_nb_rows (GdaDataSelect *model, G_GNUC_UNUSED GError **error)
201 {
202 	/* WARNING: function executed in sub thread! */
203 	gint retval;
204 	gint *res;
205 	retval = DATA_SELECT_CLASS (model)->fetch_nb_rows (model);
206 #ifdef GDA_DEBUG_NO
207 	g_print ("/%s() => %d\n", __FUNCTION__, retval);
208 #endif
209 	res = g_new (gint, 1);
210 	*res = retval;
211 	return res;
212 }
213 
214 static gint
gda_thread_recordset_fetch_nb_rows(GdaDataSelect * model)215 gda_thread_recordset_fetch_nb_rows (GdaDataSelect *model)
216 {
217 	GdaThreadRecordset *rs = (GdaThreadRecordset*) model;
218 	gint nb;
219 	gint *res;
220 	guint jid;
221 	jid = gda_thread_wrapper_execute (rs->priv->wrapper,
222 					  (GdaThreadWrapperFunc) sub_thread_fetch_nb_rows,
223 					  rs->priv->sub_model, NULL, NULL);
224 
225 	res = (gint*) gda_thread_wrapper_fetch_result (rs->priv->wrapper, TRUE, jid, NULL);
226 	nb = *res;
227 	g_free (res);
228 	COPY_PUBLIC_DATA (rs->priv->sub_model, rs);
229 	return nb;
230 }
231 
232 /*
233  * fetch random
234  */
235 typedef struct {
236 	GdaDataSelect *select;
237 	gint rownum;
238 	GdaRow **row;
239 } ThreadData;
240 
241 static gpointer
sub_thread_fetch_random(ThreadData * data,GError ** error)242 sub_thread_fetch_random (ThreadData *data, GError **error)
243 {
244 	/* WARNING: function executed in sub thread! */
245 	gboolean retval;
246 	retval = DATA_SELECT_CLASS (data->select)->fetch_random (data->select, data->row, data->rownum, error);
247 #ifdef GDA_DEBUG_NO
248 	g_print ("/%s() => %s\n", __FUNCTION__, retval ? "TRUE" : "FALSE");
249 #endif
250 	return GINT_TO_POINTER (retval);
251 }
252 
253 static void
alter_blob_values(GdaThreadRecordset * rs,GdaRow ** prow)254 alter_blob_values (GdaThreadRecordset *rs, GdaRow **prow)
255 {
256 	gint i;
257 	for (i = 0; i < rs->priv->nblobs; i++) {
258 		GValue *value = gda_row_get_value (*prow, rs->priv->blobs_conv[i]);
259 		if (G_VALUE_TYPE (value) == GDA_TYPE_BLOB) {
260 			GdaBlob *blob;
261 			blob = (GdaBlob*) gda_value_get_blob (value);
262 			if (blob->op) {
263 				GdaBlobOp *nop;
264 				nop = _gda_thread_blob_op_new (rs->priv->wrapper, blob->op);
265 				g_object_unref (blob->op);
266 				blob->op = nop;
267 			}
268 		}
269 	}
270 }
271 
272 static gboolean
gda_thread_recordset_fetch_random(GdaDataSelect * model,GdaRow ** prow,gint rownum,GError ** error)273 gda_thread_recordset_fetch_random (GdaDataSelect *model, GdaRow **prow, gint rownum, GError **error)
274 {
275 	GdaThreadRecordset *rs = (GdaThreadRecordset*) model;
276 	ThreadData wdata;
277 	gboolean retval;
278 	guint jid;
279 
280 	wdata.select = (GdaDataSelect *) rs->priv->sub_model;
281 	wdata.rownum = rownum;
282 	wdata.row = prow;
283 	jid = gda_thread_wrapper_execute (rs->priv->wrapper,
284 					  (GdaThreadWrapperFunc) sub_thread_fetch_random,
285 					  &wdata, NULL, NULL);
286 
287 	retval = GPOINTER_TO_INT (gda_thread_wrapper_fetch_result (rs->priv->wrapper, TRUE, jid, error)) ?
288 		TRUE : FALSE;
289 	COPY_PUBLIC_DATA (rs->priv->sub_model, rs);
290 
291 	if (*prow && rs->priv->blobs_conv)
292 		alter_blob_values (rs, prow);
293 
294 	return retval;
295 }
296 
297 /*
298  * fetch next
299  */
300 static gpointer
sub_thread_fetch_next(ThreadData * data,GError ** error)301 sub_thread_fetch_next (ThreadData *data, GError **error)
302 {
303 	/* WARNING: function executed in sub thread! */
304 	gboolean retval;
305 	retval = DATA_SELECT_CLASS (data->select)->fetch_next (data->select, data->row, data->rownum, error);
306 #ifdef GDA_DEBUG_NO
307 	g_print ("/%s() => %s\n", __FUNCTION__, retval ? "TRUE" : "FALSE");
308 #endif
309 	return GINT_TO_POINTER (retval);
310 }
311 
312 static gboolean
gda_thread_recordset_fetch_next(GdaDataSelect * model,GdaRow ** prow,gint rownum,GError ** error)313 gda_thread_recordset_fetch_next (GdaDataSelect *model, GdaRow **prow, gint rownum, GError **error)
314 {
315 	GdaThreadRecordset *rs = (GdaThreadRecordset*) model;
316 	ThreadData wdata;
317 	gboolean retval;
318 	guint jid;
319 
320 	wdata.select = (GdaDataSelect *) rs->priv->sub_model;
321 	wdata.rownum = rownum;
322 	wdata.row = prow;
323 	jid = gda_thread_wrapper_execute (rs->priv->wrapper,
324 					  (GdaThreadWrapperFunc) sub_thread_fetch_next,
325 					  &wdata, NULL, NULL);
326 	retval = GPOINTER_TO_INT (gda_thread_wrapper_fetch_result (rs->priv->wrapper, TRUE, jid, error)) ?
327 		TRUE : FALSE;
328 	COPY_PUBLIC_DATA (rs->priv->sub_model, rs);
329 
330 	if (*prow && rs->priv->blobs_conv)
331 		alter_blob_values (rs, prow);
332 
333 	return retval;
334 }
335 
336 /*
337  * fetch prev
338  */
339 static gpointer
sub_thread_fetch_prev(ThreadData * data,GError ** error)340 sub_thread_fetch_prev (ThreadData *data, GError **error)
341 {
342 	/* WARNING: function executed in sub thread! */
343 	gboolean retval;
344 	retval = DATA_SELECT_CLASS (data->select)->fetch_prev (data->select, data->row, data->rownum, error);
345 #ifdef GDA_DEBUG_NO
346 	g_print ("/%s() => %s\n", __FUNCTION__, retval ? "TRUE" : "FALSE");
347 #endif
348 	return GINT_TO_POINTER (retval);
349 }
350 
351 static gboolean
gda_thread_recordset_fetch_prev(GdaDataSelect * model,GdaRow ** prow,gint rownum,GError ** error)352 gda_thread_recordset_fetch_prev (GdaDataSelect *model, GdaRow **prow, gint rownum, GError **error)
353 {
354 	GdaThreadRecordset *rs = (GdaThreadRecordset*) model;
355 	ThreadData wdata;
356 	gboolean retval;
357 	guint jid;
358 
359 	wdata.select = (GdaDataSelect *) rs->priv->sub_model;
360 	wdata.rownum = rownum;
361 	wdata.row = prow;
362 	jid = gda_thread_wrapper_execute (rs->priv->wrapper,
363 					  (GdaThreadWrapperFunc) sub_thread_fetch_prev,
364 					  &wdata, NULL, NULL);
365 	retval = GPOINTER_TO_INT (gda_thread_wrapper_fetch_result (rs->priv->wrapper, TRUE, jid, error)) ?
366 		TRUE : FALSE;
367 	COPY_PUBLIC_DATA (rs->priv->sub_model, rs);
368 
369 	if (*prow && rs->priv->blobs_conv)
370 		alter_blob_values (rs, prow);
371 
372 	return retval;
373 }
374 
375 /*
376  * fetch at
377  */
378 static gpointer
sub_thread_fetch_at(ThreadData * data,GError ** error)379 sub_thread_fetch_at (ThreadData *data, GError **error)
380 {
381 	/* WARNING: function executed in sub thread! */
382 	gboolean retval;
383 	retval = DATA_SELECT_CLASS (data->select)->fetch_at (data->select, data->row, data->rownum, error);
384 #ifdef GDA_DEBUG_NO
385 	g_print ("/%s() => %s\n", __FUNCTION__, retval ? "TRUE" : "FALSE");
386 #endif
387 	return GINT_TO_POINTER (retval);
388 }
389 
390 static gboolean
gda_thread_recordset_fetch_at(GdaDataSelect * model,GdaRow ** prow,gint rownum,GError ** error)391 gda_thread_recordset_fetch_at (GdaDataSelect *model, GdaRow **prow, gint rownum, GError **error)
392 {
393 	GdaThreadRecordset *rs = (GdaThreadRecordset*) model;
394 	ThreadData wdata;
395 	gboolean retval;
396 	guint jid;
397 
398 	wdata.select = (GdaDataSelect *) rs->priv->sub_model;
399 	wdata.rownum = rownum;
400 	wdata.row = prow;
401 	jid = gda_thread_wrapper_execute (rs->priv->wrapper,
402 					  (GdaThreadWrapperFunc) sub_thread_fetch_at,
403 					  &wdata, NULL, NULL);
404 	retval = GPOINTER_TO_INT (gda_thread_wrapper_fetch_result (rs->priv->wrapper, TRUE, jid, error)) ?
405 		TRUE : FALSE;
406 	COPY_PUBLIC_DATA (rs->priv->sub_model, rs);
407 
408 	if (*prow && rs->priv->blobs_conv)
409 		alter_blob_values (rs, prow);
410 
411 	return retval;
412 }
413