1 /*
2 * the PLyCursor class
3 *
4 * src/pl/plpython/plpy_cursorobject.c
5 */
6
7 #include "postgres.h"
8
9 #include <limits.h>
10
11 #include "access/xact.h"
12 #include "catalog/pg_type.h"
13 #include "mb/pg_wchar.h"
14 #include "utils/memutils.h"
15
16 #include "plpython.h"
17
18 #include "plpy_cursorobject.h"
19
20 #include "plpy_elog.h"
21 #include "plpy_main.h"
22 #include "plpy_planobject.h"
23 #include "plpy_procedure.h"
24 #include "plpy_resultobject.h"
25 #include "plpy_spi.h"
26
27
28 static PyObject *PLy_cursor_query(const char *query);
29 static void PLy_cursor_dealloc(PyObject *arg);
30 static PyObject *PLy_cursor_iternext(PyObject *self);
31 static PyObject *PLy_cursor_fetch(PyObject *self, PyObject *args);
32 static PyObject *PLy_cursor_close(PyObject *self, PyObject *unused);
33
34 static char PLy_cursor_doc[] = {
35 "Wrapper around a PostgreSQL cursor"
36 };
37
38 static PyMethodDef PLy_cursor_methods[] = {
39 {"fetch", PLy_cursor_fetch, METH_VARARGS, NULL},
40 {"close", PLy_cursor_close, METH_NOARGS, NULL},
41 {NULL, NULL, 0, NULL}
42 };
43
44 static PyTypeObject PLy_CursorType = {
45 PyVarObject_HEAD_INIT(NULL, 0)
46 .tp_name = "PLyCursor",
47 .tp_basicsize = sizeof(PLyCursorObject),
48 .tp_dealloc = PLy_cursor_dealloc,
49 .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER,
50 .tp_doc = PLy_cursor_doc,
51 .tp_iter = PyObject_SelfIter,
52 .tp_iternext = PLy_cursor_iternext,
53 .tp_methods = PLy_cursor_methods,
54 };
55
56 void
PLy_cursor_init_type(void)57 PLy_cursor_init_type(void)
58 {
59 if (PyType_Ready(&PLy_CursorType) < 0)
60 elog(ERROR, "could not initialize PLy_CursorType");
61 }
62
63 PyObject *
PLy_cursor(PyObject * self,PyObject * args)64 PLy_cursor(PyObject *self, PyObject *args)
65 {
66 char *query;
67 PyObject *plan;
68 PyObject *planargs = NULL;
69
70 if (PyArg_ParseTuple(args, "s", &query))
71 return PLy_cursor_query(query);
72
73 PyErr_Clear();
74
75 if (PyArg_ParseTuple(args, "O|O", &plan, &planargs))
76 return PLy_cursor_plan(plan, planargs);
77
78 PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan");
79 return NULL;
80 }
81
82
83 static PyObject *
PLy_cursor_query(const char * query)84 PLy_cursor_query(const char *query)
85 {
86 PLyCursorObject *cursor;
87 PLyExecutionContext *exec_ctx = PLy_current_execution_context();
88 volatile MemoryContext oldcontext;
89 volatile ResourceOwner oldowner;
90
91 if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
92 return NULL;
93 cursor->portalname = NULL;
94 cursor->closed = false;
95 cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
96 "PL/Python cursor context",
97 ALLOCSET_DEFAULT_SIZES);
98
99 /* Initialize for converting result tuples to Python */
100 PLy_input_setup_func(&cursor->result, cursor->mcxt,
101 RECORDOID, -1,
102 exec_ctx->curr_proc);
103
104 oldcontext = CurrentMemoryContext;
105 oldowner = CurrentResourceOwner;
106
107 PLy_spi_subtransaction_begin(oldcontext, oldowner);
108
109 PG_TRY();
110 {
111 SPIPlanPtr plan;
112 Portal portal;
113
114 pg_verifymbstr(query, strlen(query), false);
115
116 plan = SPI_prepare(query, 0, NULL);
117 if (plan == NULL)
118 elog(ERROR, "SPI_prepare failed: %s",
119 SPI_result_code_string(SPI_result));
120
121 portal = SPI_cursor_open(NULL, plan, NULL, NULL,
122 exec_ctx->curr_proc->fn_readonly);
123 SPI_freeplan(plan);
124
125 if (portal == NULL)
126 elog(ERROR, "SPI_cursor_open() failed: %s",
127 SPI_result_code_string(SPI_result));
128
129 cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
130
131 PinPortal(portal);
132
133 PLy_spi_subtransaction_commit(oldcontext, oldowner);
134 }
135 PG_CATCH();
136 {
137 PLy_spi_subtransaction_abort(oldcontext, oldowner);
138 return NULL;
139 }
140 PG_END_TRY();
141
142 Assert(cursor->portalname != NULL);
143 return (PyObject *) cursor;
144 }
145
146 PyObject *
PLy_cursor_plan(PyObject * ob,PyObject * args)147 PLy_cursor_plan(PyObject *ob, PyObject *args)
148 {
149 PLyCursorObject *cursor;
150 volatile int nargs;
151 int i;
152 PLyPlanObject *plan;
153 PLyExecutionContext *exec_ctx = PLy_current_execution_context();
154 volatile MemoryContext oldcontext;
155 volatile ResourceOwner oldowner;
156
157 if (args)
158 {
159 if (!PySequence_Check(args) || PyString_Check(args) || PyUnicode_Check(args))
160 {
161 PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument");
162 return NULL;
163 }
164 nargs = PySequence_Length(args);
165 }
166 else
167 nargs = 0;
168
169 plan = (PLyPlanObject *) ob;
170
171 if (nargs != plan->nargs)
172 {
173 char *sv;
174 PyObject *so = PyObject_Str(args);
175
176 if (!so)
177 PLy_elog(ERROR, "could not execute plan");
178 sv = PyString_AsString(so);
179 PLy_exception_set_plural(PyExc_TypeError,
180 "Expected sequence of %d argument, got %d: %s",
181 "Expected sequence of %d arguments, got %d: %s",
182 plan->nargs,
183 plan->nargs, nargs, sv);
184 Py_DECREF(so);
185
186 return NULL;
187 }
188
189 if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
190 return NULL;
191 cursor->portalname = NULL;
192 cursor->closed = false;
193 cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
194 "PL/Python cursor context",
195 ALLOCSET_DEFAULT_SIZES);
196
197 /* Initialize for converting result tuples to Python */
198 PLy_input_setup_func(&cursor->result, cursor->mcxt,
199 RECORDOID, -1,
200 exec_ctx->curr_proc);
201
202 oldcontext = CurrentMemoryContext;
203 oldowner = CurrentResourceOwner;
204
205 PLy_spi_subtransaction_begin(oldcontext, oldowner);
206
207 PG_TRY();
208 {
209 Portal portal;
210 char *volatile nulls;
211 volatile int j;
212
213 if (nargs > 0)
214 nulls = palloc(nargs * sizeof(char));
215 else
216 nulls = NULL;
217
218 for (j = 0; j < nargs; j++)
219 {
220 PLyObToDatum *arg = &plan->args[j];
221 PyObject *elem;
222
223 elem = PySequence_GetItem(args, j);
224 PG_TRY();
225 {
226 bool isnull;
227
228 plan->values[j] = PLy_output_convert(arg, elem, &isnull);
229 nulls[j] = isnull ? 'n' : ' ';
230 }
231 PG_CATCH();
232 {
233 Py_DECREF(elem);
234 PG_RE_THROW();
235 }
236 PG_END_TRY();
237 Py_DECREF(elem);
238 }
239
240 portal = SPI_cursor_open(NULL, plan->plan, plan->values, nulls,
241 exec_ctx->curr_proc->fn_readonly);
242 if (portal == NULL)
243 elog(ERROR, "SPI_cursor_open() failed: %s",
244 SPI_result_code_string(SPI_result));
245
246 cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
247
248 PinPortal(portal);
249
250 PLy_spi_subtransaction_commit(oldcontext, oldowner);
251 }
252 PG_CATCH();
253 {
254 int k;
255
256 /* cleanup plan->values array */
257 for (k = 0; k < nargs; k++)
258 {
259 if (!plan->args[k].typbyval &&
260 (plan->values[k] != PointerGetDatum(NULL)))
261 {
262 pfree(DatumGetPointer(plan->values[k]));
263 plan->values[k] = PointerGetDatum(NULL);
264 }
265 }
266
267 Py_DECREF(cursor);
268
269 PLy_spi_subtransaction_abort(oldcontext, oldowner);
270 return NULL;
271 }
272 PG_END_TRY();
273
274 for (i = 0; i < nargs; i++)
275 {
276 if (!plan->args[i].typbyval &&
277 (plan->values[i] != PointerGetDatum(NULL)))
278 {
279 pfree(DatumGetPointer(plan->values[i]));
280 plan->values[i] = PointerGetDatum(NULL);
281 }
282 }
283
284 Assert(cursor->portalname != NULL);
285 return (PyObject *) cursor;
286 }
287
288 static void
PLy_cursor_dealloc(PyObject * arg)289 PLy_cursor_dealloc(PyObject *arg)
290 {
291 PLyCursorObject *cursor;
292 Portal portal;
293
294 cursor = (PLyCursorObject *) arg;
295
296 if (!cursor->closed)
297 {
298 portal = GetPortalByName(cursor->portalname);
299
300 if (PortalIsValid(portal))
301 {
302 UnpinPortal(portal);
303 SPI_cursor_close(portal);
304 }
305 cursor->closed = true;
306 }
307 if (cursor->mcxt)
308 {
309 MemoryContextDelete(cursor->mcxt);
310 cursor->mcxt = NULL;
311 }
312 arg->ob_type->tp_free(arg);
313 }
314
315 static PyObject *
PLy_cursor_iternext(PyObject * self)316 PLy_cursor_iternext(PyObject *self)
317 {
318 PLyCursorObject *cursor;
319 PyObject *ret;
320 PLyExecutionContext *exec_ctx = PLy_current_execution_context();
321 volatile MemoryContext oldcontext;
322 volatile ResourceOwner oldowner;
323 Portal portal;
324
325 cursor = (PLyCursorObject *) self;
326
327 if (cursor->closed)
328 {
329 PLy_exception_set(PyExc_ValueError, "iterating a closed cursor");
330 return NULL;
331 }
332
333 portal = GetPortalByName(cursor->portalname);
334 if (!PortalIsValid(portal))
335 {
336 PLy_exception_set(PyExc_ValueError,
337 "iterating a cursor in an aborted subtransaction");
338 return NULL;
339 }
340
341 oldcontext = CurrentMemoryContext;
342 oldowner = CurrentResourceOwner;
343
344 PLy_spi_subtransaction_begin(oldcontext, oldowner);
345
346 PG_TRY();
347 {
348 SPI_cursor_fetch(portal, true, 1);
349 if (SPI_processed == 0)
350 {
351 PyErr_SetNone(PyExc_StopIteration);
352 ret = NULL;
353 }
354 else
355 {
356 PLy_input_setup_tuple(&cursor->result, SPI_tuptable->tupdesc,
357 exec_ctx->curr_proc);
358
359 ret = PLy_input_from_tuple(&cursor->result, SPI_tuptable->vals[0],
360 SPI_tuptable->tupdesc, true);
361 }
362
363 SPI_freetuptable(SPI_tuptable);
364
365 PLy_spi_subtransaction_commit(oldcontext, oldowner);
366 }
367 PG_CATCH();
368 {
369 PLy_spi_subtransaction_abort(oldcontext, oldowner);
370 return NULL;
371 }
372 PG_END_TRY();
373
374 return ret;
375 }
376
377 static PyObject *
PLy_cursor_fetch(PyObject * self,PyObject * args)378 PLy_cursor_fetch(PyObject *self, PyObject *args)
379 {
380 PLyCursorObject *cursor;
381 int count;
382 PLyResultObject *ret;
383 PLyExecutionContext *exec_ctx = PLy_current_execution_context();
384 volatile MemoryContext oldcontext;
385 volatile ResourceOwner oldowner;
386 Portal portal;
387
388 if (!PyArg_ParseTuple(args, "i:fetch", &count))
389 return NULL;
390
391 cursor = (PLyCursorObject *) self;
392
393 if (cursor->closed)
394 {
395 PLy_exception_set(PyExc_ValueError, "fetch from a closed cursor");
396 return NULL;
397 }
398
399 portal = GetPortalByName(cursor->portalname);
400 if (!PortalIsValid(portal))
401 {
402 PLy_exception_set(PyExc_ValueError,
403 "iterating a cursor in an aborted subtransaction");
404 return NULL;
405 }
406
407 ret = (PLyResultObject *) PLy_result_new();
408 if (ret == NULL)
409 return NULL;
410
411 oldcontext = CurrentMemoryContext;
412 oldowner = CurrentResourceOwner;
413
414 PLy_spi_subtransaction_begin(oldcontext, oldowner);
415
416 PG_TRY();
417 {
418 SPI_cursor_fetch(portal, true, count);
419
420 Py_DECREF(ret->status);
421 ret->status = PyInt_FromLong(SPI_OK_FETCH);
422
423 Py_DECREF(ret->nrows);
424 ret->nrows = PyLong_FromUnsignedLongLong(SPI_processed);
425
426 if (SPI_processed != 0)
427 {
428 uint64 i;
429
430 /*
431 * PyList_New() and PyList_SetItem() use Py_ssize_t for list size
432 * and list indices; so we cannot support a result larger than
433 * PY_SSIZE_T_MAX.
434 */
435 if (SPI_processed > (uint64) PY_SSIZE_T_MAX)
436 ereport(ERROR,
437 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
438 errmsg("query result has too many rows to fit in a Python list")));
439
440 Py_DECREF(ret->rows);
441 ret->rows = PyList_New(SPI_processed);
442 if (!ret->rows)
443 {
444 Py_DECREF(ret);
445 ret = NULL;
446 }
447 else
448 {
449 PLy_input_setup_tuple(&cursor->result, SPI_tuptable->tupdesc,
450 exec_ctx->curr_proc);
451
452 for (i = 0; i < SPI_processed; i++)
453 {
454 PyObject *row = PLy_input_from_tuple(&cursor->result,
455 SPI_tuptable->vals[i],
456 SPI_tuptable->tupdesc,
457 true);
458
459 PyList_SetItem(ret->rows, i, row);
460 }
461 }
462 }
463
464 SPI_freetuptable(SPI_tuptable);
465
466 PLy_spi_subtransaction_commit(oldcontext, oldowner);
467 }
468 PG_CATCH();
469 {
470 PLy_spi_subtransaction_abort(oldcontext, oldowner);
471 return NULL;
472 }
473 PG_END_TRY();
474
475 return (PyObject *) ret;
476 }
477
478 static PyObject *
PLy_cursor_close(PyObject * self,PyObject * unused)479 PLy_cursor_close(PyObject *self, PyObject *unused)
480 {
481 PLyCursorObject *cursor = (PLyCursorObject *) self;
482
483 if (!cursor->closed)
484 {
485 Portal portal = GetPortalByName(cursor->portalname);
486
487 if (!PortalIsValid(portal))
488 {
489 PLy_exception_set(PyExc_ValueError,
490 "closing a cursor in an aborted subtransaction");
491 return NULL;
492 }
493
494 UnpinPortal(portal);
495 SPI_cursor_close(portal);
496 cursor->closed = true;
497 }
498
499 Py_RETURN_NONE;
500 }
501