1 /*
2 * the PLyCursor class
3 *
4 * src/pl/plpython/plpy_cursorobject.c
5 */
6
7 #include "postgres.h"
8
9 #include <limits.h>
10
11 #include "access/xact.h"
12 #include "catalog/pg_type.h"
13 #include "mb/pg_wchar.h"
14 #include "utils/memutils.h"
15
16 #include "plpython.h"
17
18 #include "plpy_cursorobject.h"
19
20 #include "plpy_elog.h"
21 #include "plpy_main.h"
22 #include "plpy_planobject.h"
23 #include "plpy_procedure.h"
24 #include "plpy_resultobject.h"
25 #include "plpy_spi.h"
26
27
28 static PyObject *PLy_cursor_query(const char *query);
29 static void PLy_cursor_dealloc(PyObject *arg);
30 static PyObject *PLy_cursor_iternext(PyObject *self);
31 static PyObject *PLy_cursor_fetch(PyObject *self, PyObject *args);
32 static PyObject *PLy_cursor_close(PyObject *self, PyObject *unused);
33
34 static char PLy_cursor_doc[] = {
35 "Wrapper around a PostgreSQL cursor"
36 };
37
38 static PyMethodDef PLy_cursor_methods[] = {
39 {"fetch", PLy_cursor_fetch, METH_VARARGS, NULL},
40 {"close", PLy_cursor_close, METH_NOARGS, NULL},
41 {NULL, NULL, 0, NULL}
42 };
43
44 static PyTypeObject PLy_CursorType = {
45 PyVarObject_HEAD_INIT(NULL, 0)
46 "PLyCursor", /* tp_name */
47 sizeof(PLyCursorObject), /* tp_size */
48 0, /* tp_itemsize */
49
50 /*
51 * methods
52 */
53 PLy_cursor_dealloc, /* tp_dealloc */
54 0, /* tp_print */
55 0, /* tp_getattr */
56 0, /* tp_setattr */
57 0, /* tp_compare */
58 0, /* tp_repr */
59 0, /* tp_as_number */
60 0, /* tp_as_sequence */
61 0, /* tp_as_mapping */
62 0, /* tp_hash */
63 0, /* tp_call */
64 0, /* tp_str */
65 0, /* tp_getattro */
66 0, /* tp_setattro */
67 0, /* tp_as_buffer */
68 Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER, /* tp_flags */
69 PLy_cursor_doc, /* tp_doc */
70 0, /* tp_traverse */
71 0, /* tp_clear */
72 0, /* tp_richcompare */
73 0, /* tp_weaklistoffset */
74 PyObject_SelfIter, /* tp_iter */
75 PLy_cursor_iternext, /* tp_iternext */
76 PLy_cursor_methods, /* tp_tpmethods */
77 };
78
79 void
PLy_cursor_init_type(void)80 PLy_cursor_init_type(void)
81 {
82 if (PyType_Ready(&PLy_CursorType) < 0)
83 elog(ERROR, "could not initialize PLy_CursorType");
84 }
85
86 PyObject *
PLy_cursor(PyObject * self,PyObject * args)87 PLy_cursor(PyObject *self, PyObject *args)
88 {
89 char *query;
90 PyObject *plan;
91 PyObject *planargs = NULL;
92
93 if (PyArg_ParseTuple(args, "s", &query))
94 return PLy_cursor_query(query);
95
96 PyErr_Clear();
97
98 if (PyArg_ParseTuple(args, "O|O", &plan, &planargs))
99 return PLy_cursor_plan(plan, planargs);
100
101 PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan");
102 return NULL;
103 }
104
105
106 static PyObject *
PLy_cursor_query(const char * query)107 PLy_cursor_query(const char *query)
108 {
109 PLyCursorObject *cursor;
110 PLyExecutionContext *exec_ctx = PLy_current_execution_context();
111 volatile MemoryContext oldcontext;
112 volatile ResourceOwner oldowner;
113
114 if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
115 return NULL;
116 cursor->portalname = NULL;
117 cursor->closed = false;
118 cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
119 "PL/Python cursor context",
120 ALLOCSET_DEFAULT_SIZES);
121
122 /* Initialize for converting result tuples to Python */
123 PLy_input_setup_func(&cursor->result, cursor->mcxt,
124 RECORDOID, -1,
125 exec_ctx->curr_proc);
126
127 oldcontext = CurrentMemoryContext;
128 oldowner = CurrentResourceOwner;
129
130 PLy_spi_subtransaction_begin(oldcontext, oldowner);
131
132 PG_TRY();
133 {
134 SPIPlanPtr plan;
135 Portal portal;
136
137 pg_verifymbstr(query, strlen(query), false);
138
139 plan = SPI_prepare(query, 0, NULL);
140 if (plan == NULL)
141 elog(ERROR, "SPI_prepare failed: %s",
142 SPI_result_code_string(SPI_result));
143
144 portal = SPI_cursor_open(NULL, plan, NULL, NULL,
145 exec_ctx->curr_proc->fn_readonly);
146 SPI_freeplan(plan);
147
148 if (portal == NULL)
149 elog(ERROR, "SPI_cursor_open() failed: %s",
150 SPI_result_code_string(SPI_result));
151
152 cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
153
154 PinPortal(portal);
155
156 PLy_spi_subtransaction_commit(oldcontext, oldowner);
157 }
158 PG_CATCH();
159 {
160 PLy_spi_subtransaction_abort(oldcontext, oldowner);
161 return NULL;
162 }
163 PG_END_TRY();
164
165 Assert(cursor->portalname != NULL);
166 return (PyObject *) cursor;
167 }
168
169 PyObject *
PLy_cursor_plan(PyObject * ob,PyObject * args)170 PLy_cursor_plan(PyObject *ob, PyObject *args)
171 {
172 PLyCursorObject *cursor;
173 volatile int nargs;
174 int i;
175 PLyPlanObject *plan;
176 PLyExecutionContext *exec_ctx = PLy_current_execution_context();
177 volatile MemoryContext oldcontext;
178 volatile ResourceOwner oldowner;
179
180 if (args)
181 {
182 if (!PySequence_Check(args) || PyString_Check(args) || PyUnicode_Check(args))
183 {
184 PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument");
185 return NULL;
186 }
187 nargs = PySequence_Length(args);
188 }
189 else
190 nargs = 0;
191
192 plan = (PLyPlanObject *) ob;
193
194 if (nargs != plan->nargs)
195 {
196 char *sv;
197 PyObject *so = PyObject_Str(args);
198
199 if (!so)
200 PLy_elog(ERROR, "could not execute plan");
201 sv = PyString_AsString(so);
202 PLy_exception_set_plural(PyExc_TypeError,
203 "Expected sequence of %d argument, got %d: %s",
204 "Expected sequence of %d arguments, got %d: %s",
205 plan->nargs,
206 plan->nargs, nargs, sv);
207 Py_DECREF(so);
208
209 return NULL;
210 }
211
212 if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
213 return NULL;
214 cursor->portalname = NULL;
215 cursor->closed = false;
216 cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
217 "PL/Python cursor context",
218 ALLOCSET_DEFAULT_SIZES);
219
220 /* Initialize for converting result tuples to Python */
221 PLy_input_setup_func(&cursor->result, cursor->mcxt,
222 RECORDOID, -1,
223 exec_ctx->curr_proc);
224
225 oldcontext = CurrentMemoryContext;
226 oldowner = CurrentResourceOwner;
227
228 PLy_spi_subtransaction_begin(oldcontext, oldowner);
229
230 PG_TRY();
231 {
232 Portal portal;
233 char *volatile nulls;
234 volatile int j;
235
236 if (nargs > 0)
237 nulls = palloc(nargs * sizeof(char));
238 else
239 nulls = NULL;
240
241 for (j = 0; j < nargs; j++)
242 {
243 PLyObToDatum *arg = &plan->args[j];
244 PyObject *elem;
245
246 elem = PySequence_GetItem(args, j);
247 PG_TRY();
248 {
249 bool isnull;
250
251 plan->values[j] = PLy_output_convert(arg, elem, &isnull);
252 nulls[j] = isnull ? 'n' : ' ';
253 }
254 PG_CATCH();
255 {
256 Py_DECREF(elem);
257 PG_RE_THROW();
258 }
259 PG_END_TRY();
260 Py_DECREF(elem);
261 }
262
263 portal = SPI_cursor_open(NULL, plan->plan, plan->values, nulls,
264 exec_ctx->curr_proc->fn_readonly);
265 if (portal == NULL)
266 elog(ERROR, "SPI_cursor_open() failed: %s",
267 SPI_result_code_string(SPI_result));
268
269 cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
270
271 PinPortal(portal);
272
273 PLy_spi_subtransaction_commit(oldcontext, oldowner);
274 }
275 PG_CATCH();
276 {
277 int k;
278
279 /* cleanup plan->values array */
280 for (k = 0; k < nargs; k++)
281 {
282 if (!plan->args[k].typbyval &&
283 (plan->values[k] != PointerGetDatum(NULL)))
284 {
285 pfree(DatumGetPointer(plan->values[k]));
286 plan->values[k] = PointerGetDatum(NULL);
287 }
288 }
289
290 Py_DECREF(cursor);
291
292 PLy_spi_subtransaction_abort(oldcontext, oldowner);
293 return NULL;
294 }
295 PG_END_TRY();
296
297 for (i = 0; i < nargs; i++)
298 {
299 if (!plan->args[i].typbyval &&
300 (plan->values[i] != PointerGetDatum(NULL)))
301 {
302 pfree(DatumGetPointer(plan->values[i]));
303 plan->values[i] = PointerGetDatum(NULL);
304 }
305 }
306
307 Assert(cursor->portalname != NULL);
308 return (PyObject *) cursor;
309 }
310
311 static void
PLy_cursor_dealloc(PyObject * arg)312 PLy_cursor_dealloc(PyObject *arg)
313 {
314 PLyCursorObject *cursor;
315 Portal portal;
316
317 cursor = (PLyCursorObject *) arg;
318
319 if (!cursor->closed)
320 {
321 portal = GetPortalByName(cursor->portalname);
322
323 if (PortalIsValid(portal))
324 {
325 UnpinPortal(portal);
326 SPI_cursor_close(portal);
327 }
328 cursor->closed = true;
329 }
330 if (cursor->mcxt)
331 {
332 MemoryContextDelete(cursor->mcxt);
333 cursor->mcxt = NULL;
334 }
335 arg->ob_type->tp_free(arg);
336 }
337
338 static PyObject *
PLy_cursor_iternext(PyObject * self)339 PLy_cursor_iternext(PyObject *self)
340 {
341 PLyCursorObject *cursor;
342 PyObject *ret;
343 PLyExecutionContext *exec_ctx = PLy_current_execution_context();
344 volatile MemoryContext oldcontext;
345 volatile ResourceOwner oldowner;
346 Portal portal;
347
348 cursor = (PLyCursorObject *) self;
349
350 if (cursor->closed)
351 {
352 PLy_exception_set(PyExc_ValueError, "iterating a closed cursor");
353 return NULL;
354 }
355
356 portal = GetPortalByName(cursor->portalname);
357 if (!PortalIsValid(portal))
358 {
359 PLy_exception_set(PyExc_ValueError,
360 "iterating a cursor in an aborted subtransaction");
361 return NULL;
362 }
363
364 oldcontext = CurrentMemoryContext;
365 oldowner = CurrentResourceOwner;
366
367 PLy_spi_subtransaction_begin(oldcontext, oldowner);
368
369 PG_TRY();
370 {
371 SPI_cursor_fetch(portal, true, 1);
372 if (SPI_processed == 0)
373 {
374 PyErr_SetNone(PyExc_StopIteration);
375 ret = NULL;
376 }
377 else
378 {
379 PLy_input_setup_tuple(&cursor->result, SPI_tuptable->tupdesc,
380 exec_ctx->curr_proc);
381
382 ret = PLy_input_from_tuple(&cursor->result, SPI_tuptable->vals[0],
383 SPI_tuptable->tupdesc);
384 }
385
386 SPI_freetuptable(SPI_tuptable);
387
388 PLy_spi_subtransaction_commit(oldcontext, oldowner);
389 }
390 PG_CATCH();
391 {
392 PLy_spi_subtransaction_abort(oldcontext, oldowner);
393 return NULL;
394 }
395 PG_END_TRY();
396
397 return ret;
398 }
399
400 static PyObject *
PLy_cursor_fetch(PyObject * self,PyObject * args)401 PLy_cursor_fetch(PyObject *self, PyObject *args)
402 {
403 PLyCursorObject *cursor;
404 int count;
405 PLyResultObject *ret;
406 PLyExecutionContext *exec_ctx = PLy_current_execution_context();
407 volatile MemoryContext oldcontext;
408 volatile ResourceOwner oldowner;
409 Portal portal;
410
411 if (!PyArg_ParseTuple(args, "i:fetch", &count))
412 return NULL;
413
414 cursor = (PLyCursorObject *) self;
415
416 if (cursor->closed)
417 {
418 PLy_exception_set(PyExc_ValueError, "fetch from a closed cursor");
419 return NULL;
420 }
421
422 portal = GetPortalByName(cursor->portalname);
423 if (!PortalIsValid(portal))
424 {
425 PLy_exception_set(PyExc_ValueError,
426 "iterating a cursor in an aborted subtransaction");
427 return NULL;
428 }
429
430 ret = (PLyResultObject *) PLy_result_new();
431 if (ret == NULL)
432 return NULL;
433
434 oldcontext = CurrentMemoryContext;
435 oldowner = CurrentResourceOwner;
436
437 PLy_spi_subtransaction_begin(oldcontext, oldowner);
438
439 PG_TRY();
440 {
441 SPI_cursor_fetch(portal, true, count);
442
443 Py_DECREF(ret->status);
444 ret->status = PyInt_FromLong(SPI_OK_FETCH);
445
446 Py_DECREF(ret->nrows);
447 ret->nrows = PyLong_FromUnsignedLongLong(SPI_processed);
448
449 if (SPI_processed != 0)
450 {
451 uint64 i;
452
453 /*
454 * PyList_New() and PyList_SetItem() use Py_ssize_t for list size
455 * and list indices; so we cannot support a result larger than
456 * PY_SSIZE_T_MAX.
457 */
458 if (SPI_processed > (uint64) PY_SSIZE_T_MAX)
459 ereport(ERROR,
460 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
461 errmsg("query result has too many rows to fit in a Python list")));
462
463 Py_DECREF(ret->rows);
464 ret->rows = PyList_New(SPI_processed);
465 if (!ret->rows)
466 {
467 Py_DECREF(ret);
468 ret = NULL;
469 }
470 else
471 {
472 PLy_input_setup_tuple(&cursor->result, SPI_tuptable->tupdesc,
473 exec_ctx->curr_proc);
474
475 for (i = 0; i < SPI_processed; i++)
476 {
477 PyObject *row = PLy_input_from_tuple(&cursor->result,
478 SPI_tuptable->vals[i],
479 SPI_tuptable->tupdesc);
480
481 PyList_SetItem(ret->rows, i, row);
482 }
483 }
484 }
485
486 SPI_freetuptable(SPI_tuptable);
487
488 PLy_spi_subtransaction_commit(oldcontext, oldowner);
489 }
490 PG_CATCH();
491 {
492 PLy_spi_subtransaction_abort(oldcontext, oldowner);
493 return NULL;
494 }
495 PG_END_TRY();
496
497 return (PyObject *) ret;
498 }
499
500 static PyObject *
PLy_cursor_close(PyObject * self,PyObject * unused)501 PLy_cursor_close(PyObject *self, PyObject *unused)
502 {
503 PLyCursorObject *cursor = (PLyCursorObject *) self;
504
505 if (!cursor->closed)
506 {
507 Portal portal = GetPortalByName(cursor->portalname);
508
509 if (!PortalIsValid(portal))
510 {
511 PLy_exception_set(PyExc_ValueError,
512 "closing a cursor in an aborted subtransaction");
513 return NULL;
514 }
515
516 UnpinPortal(portal);
517 SPI_cursor_close(portal);
518 cursor->closed = true;
519 }
520
521 Py_RETURN_NONE;
522 }
523