1 /*
2 * the PLyCursor class
3 *
4 * src/pl/plpython/plpy_cursorobject.c
5 */
6
7 #include "postgres.h"
8
9 #include <limits.h>
10
11 #include "access/xact.h"
12 #include "mb/pg_wchar.h"
13 #include "utils/memutils.h"
14
15 #include "plpython.h"
16
17 #include "plpy_cursorobject.h"
18
19 #include "plpy_elog.h"
20 #include "plpy_main.h"
21 #include "plpy_planobject.h"
22 #include "plpy_procedure.h"
23 #include "plpy_resultobject.h"
24 #include "plpy_spi.h"
25
26
27 static PyObject *PLy_cursor_query(const char *query);
28 static PyObject *PLy_cursor_plan(PyObject *ob, PyObject *args);
29 static void PLy_cursor_dealloc(PyObject *arg);
30 static PyObject *PLy_cursor_iternext(PyObject *self);
31 static PyObject *PLy_cursor_fetch(PyObject *self, PyObject *args);
32 static PyObject *PLy_cursor_close(PyObject *self, PyObject *unused);
33
34 static char PLy_cursor_doc[] = {
35 "Wrapper around a PostgreSQL cursor"
36 };
37
38 static PyMethodDef PLy_cursor_methods[] = {
39 {"fetch", PLy_cursor_fetch, METH_VARARGS, NULL},
40 {"close", PLy_cursor_close, METH_NOARGS, NULL},
41 {NULL, NULL, 0, NULL}
42 };
43
44 static PyTypeObject PLy_CursorType = {
45 PyVarObject_HEAD_INIT(NULL, 0)
46 "PLyCursor", /* tp_name */
47 sizeof(PLyCursorObject), /* tp_size */
48 0, /* tp_itemsize */
49
50 /*
51 * methods
52 */
53 PLy_cursor_dealloc, /* tp_dealloc */
54 0, /* tp_print */
55 0, /* tp_getattr */
56 0, /* tp_setattr */
57 0, /* tp_compare */
58 0, /* tp_repr */
59 0, /* tp_as_number */
60 0, /* tp_as_sequence */
61 0, /* tp_as_mapping */
62 0, /* tp_hash */
63 0, /* tp_call */
64 0, /* tp_str */
65 0, /* tp_getattro */
66 0, /* tp_setattro */
67 0, /* tp_as_buffer */
68 Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER, /* tp_flags */
69 PLy_cursor_doc, /* tp_doc */
70 0, /* tp_traverse */
71 0, /* tp_clear */
72 0, /* tp_richcompare */
73 0, /* tp_weaklistoffset */
74 PyObject_SelfIter, /* tp_iter */
75 PLy_cursor_iternext, /* tp_iternext */
76 PLy_cursor_methods, /* tp_tpmethods */
77 };
78
79 void
PLy_cursor_init_type(void)80 PLy_cursor_init_type(void)
81 {
82 if (PyType_Ready(&PLy_CursorType) < 0)
83 elog(ERROR, "could not initialize PLy_CursorType");
84 }
85
86 PyObject *
PLy_cursor(PyObject * self,PyObject * args)87 PLy_cursor(PyObject *self, PyObject *args)
88 {
89 char *query;
90 PyObject *plan;
91 PyObject *planargs = NULL;
92
93 if (PyArg_ParseTuple(args, "s", &query))
94 return PLy_cursor_query(query);
95
96 PyErr_Clear();
97
98 if (PyArg_ParseTuple(args, "O|O", &plan, &planargs))
99 return PLy_cursor_plan(plan, planargs);
100
101 PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan");
102 return NULL;
103 }
104
105
106 static PyObject *
PLy_cursor_query(const char * query)107 PLy_cursor_query(const char *query)
108 {
109 PLyCursorObject *cursor;
110 volatile MemoryContext oldcontext;
111 volatile ResourceOwner oldowner;
112
113 if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
114 return NULL;
115 cursor->portalname = NULL;
116 cursor->closed = false;
117 cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
118 "PL/Python cursor context",
119 ALLOCSET_DEFAULT_SIZES);
120 PLy_typeinfo_init(&cursor->result, cursor->mcxt);
121
122 oldcontext = CurrentMemoryContext;
123 oldowner = CurrentResourceOwner;
124
125 PLy_spi_subtransaction_begin(oldcontext, oldowner);
126
127 PG_TRY();
128 {
129 PLyExecutionContext *exec_ctx = PLy_current_execution_context();
130 SPIPlanPtr plan;
131 Portal portal;
132
133 pg_verifymbstr(query, strlen(query), false);
134
135 plan = SPI_prepare(query, 0, NULL);
136 if (plan == NULL)
137 elog(ERROR, "SPI_prepare failed: %s",
138 SPI_result_code_string(SPI_result));
139
140 portal = SPI_cursor_open(NULL, plan, NULL, NULL,
141 exec_ctx->curr_proc->fn_readonly);
142 SPI_freeplan(plan);
143
144 if (portal == NULL)
145 elog(ERROR, "SPI_cursor_open() failed: %s",
146 SPI_result_code_string(SPI_result));
147
148 cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
149
150 PLy_spi_subtransaction_commit(oldcontext, oldowner);
151 }
152 PG_CATCH();
153 {
154 PLy_spi_subtransaction_abort(oldcontext, oldowner);
155 return NULL;
156 }
157 PG_END_TRY();
158
159 Assert(cursor->portalname != NULL);
160 return (PyObject *) cursor;
161 }
162
163 static PyObject *
PLy_cursor_plan(PyObject * ob,PyObject * args)164 PLy_cursor_plan(PyObject *ob, PyObject *args)
165 {
166 PLyCursorObject *cursor;
167 volatile int nargs;
168 int i;
169 PLyPlanObject *plan;
170 volatile MemoryContext oldcontext;
171 volatile ResourceOwner oldowner;
172
173 if (args)
174 {
175 if (!PySequence_Check(args) || PyString_Check(args) || PyUnicode_Check(args))
176 {
177 PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument");
178 return NULL;
179 }
180 nargs = PySequence_Length(args);
181 }
182 else
183 nargs = 0;
184
185 plan = (PLyPlanObject *) ob;
186
187 if (nargs != plan->nargs)
188 {
189 char *sv;
190 PyObject *so = PyObject_Str(args);
191
192 if (!so)
193 PLy_elog(ERROR, "could not execute plan");
194 sv = PyString_AsString(so);
195 PLy_exception_set_plural(PyExc_TypeError,
196 "Expected sequence of %d argument, got %d: %s",
197 "Expected sequence of %d arguments, got %d: %s",
198 plan->nargs,
199 plan->nargs, nargs, sv);
200 Py_DECREF(so);
201
202 return NULL;
203 }
204
205 if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
206 return NULL;
207 cursor->portalname = NULL;
208 cursor->closed = false;
209 cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
210 "PL/Python cursor context",
211 ALLOCSET_DEFAULT_SIZES);
212 PLy_typeinfo_init(&cursor->result, cursor->mcxt);
213
214 oldcontext = CurrentMemoryContext;
215 oldowner = CurrentResourceOwner;
216
217 PLy_spi_subtransaction_begin(oldcontext, oldowner);
218
219 PG_TRY();
220 {
221 PLyExecutionContext *exec_ctx = PLy_current_execution_context();
222 Portal portal;
223 char *volatile nulls;
224 volatile int j;
225
226 if (nargs > 0)
227 nulls = palloc(nargs * sizeof(char));
228 else
229 nulls = NULL;
230
231 for (j = 0; j < nargs; j++)
232 {
233 PyObject *elem;
234
235 elem = PySequence_GetItem(args, j);
236 if (elem != Py_None)
237 {
238 PG_TRY();
239 {
240 plan->values[j] =
241 plan->args[j].out.d.func(&(plan->args[j].out.d),
242 -1,
243 elem);
244 }
245 PG_CATCH();
246 {
247 Py_DECREF(elem);
248 PG_RE_THROW();
249 }
250 PG_END_TRY();
251
252 Py_DECREF(elem);
253 nulls[j] = ' ';
254 }
255 else
256 {
257 Py_DECREF(elem);
258 plan->values[j] =
259 InputFunctionCall(&(plan->args[j].out.d.typfunc),
260 NULL,
261 plan->args[j].out.d.typioparam,
262 -1);
263 nulls[j] = 'n';
264 }
265 }
266
267 portal = SPI_cursor_open(NULL, plan->plan, plan->values, nulls,
268 exec_ctx->curr_proc->fn_readonly);
269 if (portal == NULL)
270 elog(ERROR, "SPI_cursor_open() failed: %s",
271 SPI_result_code_string(SPI_result));
272
273 cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
274
275 PLy_spi_subtransaction_commit(oldcontext, oldowner);
276 }
277 PG_CATCH();
278 {
279 int k;
280
281 /* cleanup plan->values array */
282 for (k = 0; k < nargs; k++)
283 {
284 if (!plan->args[k].out.d.typbyval &&
285 (plan->values[k] != PointerGetDatum(NULL)))
286 {
287 pfree(DatumGetPointer(plan->values[k]));
288 plan->values[k] = PointerGetDatum(NULL);
289 }
290 }
291
292 Py_DECREF(cursor);
293
294 PLy_spi_subtransaction_abort(oldcontext, oldowner);
295 return NULL;
296 }
297 PG_END_TRY();
298
299 for (i = 0; i < nargs; i++)
300 {
301 if (!plan->args[i].out.d.typbyval &&
302 (plan->values[i] != PointerGetDatum(NULL)))
303 {
304 pfree(DatumGetPointer(plan->values[i]));
305 plan->values[i] = PointerGetDatum(NULL);
306 }
307 }
308
309 Assert(cursor->portalname != NULL);
310 return (PyObject *) cursor;
311 }
312
313 static void
PLy_cursor_dealloc(PyObject * arg)314 PLy_cursor_dealloc(PyObject *arg)
315 {
316 PLyCursorObject *cursor;
317 Portal portal;
318
319 cursor = (PLyCursorObject *) arg;
320
321 if (!cursor->closed)
322 {
323 portal = GetPortalByName(cursor->portalname);
324
325 if (PortalIsValid(portal))
326 SPI_cursor_close(portal);
327 cursor->closed = true;
328 }
329 if (cursor->mcxt)
330 {
331 MemoryContextDelete(cursor->mcxt);
332 cursor->mcxt = NULL;
333 }
334 arg->ob_type->tp_free(arg);
335 }
336
337 static PyObject *
PLy_cursor_iternext(PyObject * self)338 PLy_cursor_iternext(PyObject *self)
339 {
340 PLyCursorObject *cursor;
341 PyObject *ret;
342 volatile MemoryContext oldcontext;
343 volatile ResourceOwner oldowner;
344 Portal portal;
345
346 cursor = (PLyCursorObject *) self;
347
348 if (cursor->closed)
349 {
350 PLy_exception_set(PyExc_ValueError, "iterating a closed cursor");
351 return NULL;
352 }
353
354 portal = GetPortalByName(cursor->portalname);
355 if (!PortalIsValid(portal))
356 {
357 PLy_exception_set(PyExc_ValueError,
358 "iterating a cursor in an aborted subtransaction");
359 return NULL;
360 }
361
362 oldcontext = CurrentMemoryContext;
363 oldowner = CurrentResourceOwner;
364
365 PLy_spi_subtransaction_begin(oldcontext, oldowner);
366
367 PG_TRY();
368 {
369 SPI_cursor_fetch(portal, true, 1);
370 if (SPI_processed == 0)
371 {
372 PyErr_SetNone(PyExc_StopIteration);
373 ret = NULL;
374 }
375 else
376 {
377 if (cursor->result.is_rowtype != 1)
378 PLy_input_tuple_funcs(&cursor->result, SPI_tuptable->tupdesc);
379
380 ret = PLyDict_FromTuple(&cursor->result, SPI_tuptable->vals[0],
381 SPI_tuptable->tupdesc);
382 }
383
384 SPI_freetuptable(SPI_tuptable);
385
386 PLy_spi_subtransaction_commit(oldcontext, oldowner);
387 }
388 PG_CATCH();
389 {
390 PLy_spi_subtransaction_abort(oldcontext, oldowner);
391 return NULL;
392 }
393 PG_END_TRY();
394
395 return ret;
396 }
397
398 static PyObject *
PLy_cursor_fetch(PyObject * self,PyObject * args)399 PLy_cursor_fetch(PyObject *self, PyObject *args)
400 {
401 PLyCursorObject *cursor;
402 int count;
403 PLyResultObject *ret;
404 volatile MemoryContext oldcontext;
405 volatile ResourceOwner oldowner;
406 Portal portal;
407
408 if (!PyArg_ParseTuple(args, "i", &count))
409 return NULL;
410
411 cursor = (PLyCursorObject *) self;
412
413 if (cursor->closed)
414 {
415 PLy_exception_set(PyExc_ValueError, "fetch from a closed cursor");
416 return NULL;
417 }
418
419 portal = GetPortalByName(cursor->portalname);
420 if (!PortalIsValid(portal))
421 {
422 PLy_exception_set(PyExc_ValueError,
423 "iterating a cursor in an aborted subtransaction");
424 return NULL;
425 }
426
427 ret = (PLyResultObject *) PLy_result_new();
428 if (ret == NULL)
429 return NULL;
430
431 oldcontext = CurrentMemoryContext;
432 oldowner = CurrentResourceOwner;
433
434 PLy_spi_subtransaction_begin(oldcontext, oldowner);
435
436 PG_TRY();
437 {
438 SPI_cursor_fetch(portal, true, count);
439
440 if (cursor->result.is_rowtype != 1)
441 PLy_input_tuple_funcs(&cursor->result, SPI_tuptable->tupdesc);
442
443 Py_DECREF(ret->status);
444 ret->status = PyInt_FromLong(SPI_OK_FETCH);
445
446 Py_DECREF(ret->nrows);
447 ret->nrows = (SPI_processed > (uint64) LONG_MAX) ?
448 PyFloat_FromDouble((double) SPI_processed) :
449 PyInt_FromLong((long) SPI_processed);
450
451 if (SPI_processed != 0)
452 {
453 uint64 i;
454
455 /*
456 * PyList_New() and PyList_SetItem() use Py_ssize_t for list size
457 * and list indices; so we cannot support a result larger than
458 * PY_SSIZE_T_MAX.
459 */
460 if (SPI_processed > (uint64) PY_SSIZE_T_MAX)
461 ereport(ERROR,
462 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
463 errmsg("query result has too many rows to fit in a Python list")));
464
465 Py_DECREF(ret->rows);
466 ret->rows = PyList_New(SPI_processed);
467
468 for (i = 0; i < SPI_processed; i++)
469 {
470 PyObject *row = PLyDict_FromTuple(&cursor->result,
471 SPI_tuptable->vals[i],
472 SPI_tuptable->tupdesc);
473
474 PyList_SetItem(ret->rows, i, row);
475 }
476 }
477
478 SPI_freetuptable(SPI_tuptable);
479
480 PLy_spi_subtransaction_commit(oldcontext, oldowner);
481 }
482 PG_CATCH();
483 {
484 PLy_spi_subtransaction_abort(oldcontext, oldowner);
485 return NULL;
486 }
487 PG_END_TRY();
488
489 return (PyObject *) ret;
490 }
491
492 static PyObject *
PLy_cursor_close(PyObject * self,PyObject * unused)493 PLy_cursor_close(PyObject *self, PyObject *unused)
494 {
495 PLyCursorObject *cursor = (PLyCursorObject *) self;
496
497 if (!cursor->closed)
498 {
499 Portal portal = GetPortalByName(cursor->portalname);
500
501 if (!PortalIsValid(portal))
502 {
503 PLy_exception_set(PyExc_ValueError,
504 "closing a cursor in an aborted subtransaction");
505 return NULL;
506 }
507
508 SPI_cursor_close(portal);
509 cursor->closed = true;
510 }
511
512 Py_INCREF(Py_None);
513 return Py_None;
514 }
515