1 /*
2 * the PLyCursor class
3 *
4 * src/pl/plpython/plpy_cursorobject.c
5 */
6
7 #include "postgres.h"
8
9 #include <limits.h>
10
11 #include "access/xact.h"
12 #include "catalog/pg_type.h"
13 #include "mb/pg_wchar.h"
14 #include "plpy_cursorobject.h"
15 #include "plpy_elog.h"
16 #include "plpy_main.h"
17 #include "plpy_planobject.h"
18 #include "plpy_procedure.h"
19 #include "plpy_resultobject.h"
20 #include "plpy_spi.h"
21 #include "plpython.h"
22 #include "utils/memutils.h"
23
24 static PyObject *PLy_cursor_query(const char *query);
25 static void PLy_cursor_dealloc(PyObject *arg);
26 static PyObject *PLy_cursor_iternext(PyObject *self);
27 static PyObject *PLy_cursor_fetch(PyObject *self, PyObject *args);
28 static PyObject *PLy_cursor_close(PyObject *self, PyObject *unused);
29
30 static char PLy_cursor_doc[] = "Wrapper around a PostgreSQL cursor";
31
32 static PyMethodDef PLy_cursor_methods[] = {
33 {"fetch", PLy_cursor_fetch, METH_VARARGS, NULL},
34 {"close", PLy_cursor_close, METH_NOARGS, NULL},
35 {NULL, NULL, 0, NULL}
36 };
37
38 static PyTypeObject PLy_CursorType = {
39 PyVarObject_HEAD_INIT(NULL, 0)
40 .tp_name = "PLyCursor",
41 .tp_basicsize = sizeof(PLyCursorObject),
42 .tp_dealloc = PLy_cursor_dealloc,
43 .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER,
44 .tp_doc = PLy_cursor_doc,
45 .tp_iter = PyObject_SelfIter,
46 .tp_iternext = PLy_cursor_iternext,
47 .tp_methods = PLy_cursor_methods,
48 };
49
50 void
PLy_cursor_init_type(void)51 PLy_cursor_init_type(void)
52 {
53 if (PyType_Ready(&PLy_CursorType) < 0)
54 elog(ERROR, "could not initialize PLy_CursorType");
55 }
56
57 PyObject *
PLy_cursor(PyObject * self,PyObject * args)58 PLy_cursor(PyObject *self, PyObject *args)
59 {
60 char *query;
61 PyObject *plan;
62 PyObject *planargs = NULL;
63
64 if (PyArg_ParseTuple(args, "s", &query))
65 return PLy_cursor_query(query);
66
67 PyErr_Clear();
68
69 if (PyArg_ParseTuple(args, "O|O", &plan, &planargs))
70 return PLy_cursor_plan(plan, planargs);
71
72 PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan");
73 return NULL;
74 }
75
76
77 static PyObject *
PLy_cursor_query(const char * query)78 PLy_cursor_query(const char *query)
79 {
80 PLyCursorObject *cursor;
81 PLyExecutionContext *exec_ctx = PLy_current_execution_context();
82 volatile MemoryContext oldcontext;
83 volatile ResourceOwner oldowner;
84
85 if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
86 return NULL;
87 cursor->portalname = NULL;
88 cursor->closed = false;
89 cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
90 "PL/Python cursor context",
91 ALLOCSET_DEFAULT_SIZES);
92
93 /* Initialize for converting result tuples to Python */
94 PLy_input_setup_func(&cursor->result, cursor->mcxt,
95 RECORDOID, -1,
96 exec_ctx->curr_proc);
97
98 oldcontext = CurrentMemoryContext;
99 oldowner = CurrentResourceOwner;
100
101 PLy_spi_subtransaction_begin(oldcontext, oldowner);
102
103 PG_TRY();
104 {
105 SPIPlanPtr plan;
106 Portal portal;
107
108 pg_verifymbstr(query, strlen(query), false);
109
110 plan = SPI_prepare(query, 0, NULL);
111 if (plan == NULL)
112 elog(ERROR, "SPI_prepare failed: %s",
113 SPI_result_code_string(SPI_result));
114
115 portal = SPI_cursor_open(NULL, plan, NULL, NULL,
116 exec_ctx->curr_proc->fn_readonly);
117 SPI_freeplan(plan);
118
119 if (portal == NULL)
120 elog(ERROR, "SPI_cursor_open() failed: %s",
121 SPI_result_code_string(SPI_result));
122
123 cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
124
125 PinPortal(portal);
126
127 PLy_spi_subtransaction_commit(oldcontext, oldowner);
128 }
129 PG_CATCH();
130 {
131 PLy_spi_subtransaction_abort(oldcontext, oldowner);
132 return NULL;
133 }
134 PG_END_TRY();
135
136 Assert(cursor->portalname != NULL);
137 return (PyObject *) cursor;
138 }
139
140 PyObject *
PLy_cursor_plan(PyObject * ob,PyObject * args)141 PLy_cursor_plan(PyObject *ob, PyObject *args)
142 {
143 PLyCursorObject *cursor;
144 volatile int nargs;
145 int i;
146 PLyPlanObject *plan;
147 PLyExecutionContext *exec_ctx = PLy_current_execution_context();
148 volatile MemoryContext oldcontext;
149 volatile ResourceOwner oldowner;
150
151 if (args)
152 {
153 if (!PySequence_Check(args) || PyString_Check(args) || PyUnicode_Check(args))
154 {
155 PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument");
156 return NULL;
157 }
158 nargs = PySequence_Length(args);
159 }
160 else
161 nargs = 0;
162
163 plan = (PLyPlanObject *) ob;
164
165 if (nargs != plan->nargs)
166 {
167 char *sv;
168 PyObject *so = PyObject_Str(args);
169
170 if (!so)
171 PLy_elog(ERROR, "could not execute plan");
172 sv = PyString_AsString(so);
173 PLy_exception_set_plural(PyExc_TypeError,
174 "Expected sequence of %d argument, got %d: %s",
175 "Expected sequence of %d arguments, got %d: %s",
176 plan->nargs,
177 plan->nargs, nargs, sv);
178 Py_DECREF(so);
179
180 return NULL;
181 }
182
183 if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
184 return NULL;
185 cursor->portalname = NULL;
186 cursor->closed = false;
187 cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
188 "PL/Python cursor context",
189 ALLOCSET_DEFAULT_SIZES);
190
191 /* Initialize for converting result tuples to Python */
192 PLy_input_setup_func(&cursor->result, cursor->mcxt,
193 RECORDOID, -1,
194 exec_ctx->curr_proc);
195
196 oldcontext = CurrentMemoryContext;
197 oldowner = CurrentResourceOwner;
198
199 PLy_spi_subtransaction_begin(oldcontext, oldowner);
200
201 PG_TRY();
202 {
203 Portal portal;
204 char *volatile nulls;
205 volatile int j;
206
207 if (nargs > 0)
208 nulls = palloc(nargs * sizeof(char));
209 else
210 nulls = NULL;
211
212 for (j = 0; j < nargs; j++)
213 {
214 PLyObToDatum *arg = &plan->args[j];
215 PyObject *elem;
216
217 elem = PySequence_GetItem(args, j);
218 PG_TRY();
219 {
220 bool isnull;
221
222 plan->values[j] = PLy_output_convert(arg, elem, &isnull);
223 nulls[j] = isnull ? 'n' : ' ';
224 }
225 PG_FINALLY();
226 {
227 Py_DECREF(elem);
228 }
229 PG_END_TRY();
230 }
231
232 portal = SPI_cursor_open(NULL, plan->plan, plan->values, nulls,
233 exec_ctx->curr_proc->fn_readonly);
234 if (portal == NULL)
235 elog(ERROR, "SPI_cursor_open() failed: %s",
236 SPI_result_code_string(SPI_result));
237
238 cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
239
240 PinPortal(portal);
241
242 PLy_spi_subtransaction_commit(oldcontext, oldowner);
243 }
244 PG_CATCH();
245 {
246 int k;
247
248 /* cleanup plan->values array */
249 for (k = 0; k < nargs; k++)
250 {
251 if (!plan->args[k].typbyval &&
252 (plan->values[k] != PointerGetDatum(NULL)))
253 {
254 pfree(DatumGetPointer(plan->values[k]));
255 plan->values[k] = PointerGetDatum(NULL);
256 }
257 }
258
259 Py_DECREF(cursor);
260
261 PLy_spi_subtransaction_abort(oldcontext, oldowner);
262 return NULL;
263 }
264 PG_END_TRY();
265
266 for (i = 0; i < nargs; i++)
267 {
268 if (!plan->args[i].typbyval &&
269 (plan->values[i] != PointerGetDatum(NULL)))
270 {
271 pfree(DatumGetPointer(plan->values[i]));
272 plan->values[i] = PointerGetDatum(NULL);
273 }
274 }
275
276 Assert(cursor->portalname != NULL);
277 return (PyObject *) cursor;
278 }
279
280 static void
PLy_cursor_dealloc(PyObject * arg)281 PLy_cursor_dealloc(PyObject *arg)
282 {
283 PLyCursorObject *cursor;
284 Portal portal;
285
286 cursor = (PLyCursorObject *) arg;
287
288 if (!cursor->closed)
289 {
290 portal = GetPortalByName(cursor->portalname);
291
292 if (PortalIsValid(portal))
293 {
294 UnpinPortal(portal);
295 SPI_cursor_close(portal);
296 }
297 cursor->closed = true;
298 }
299 if (cursor->mcxt)
300 {
301 MemoryContextDelete(cursor->mcxt);
302 cursor->mcxt = NULL;
303 }
304 arg->ob_type->tp_free(arg);
305 }
306
307 static PyObject *
PLy_cursor_iternext(PyObject * self)308 PLy_cursor_iternext(PyObject *self)
309 {
310 PLyCursorObject *cursor;
311 PyObject *ret;
312 PLyExecutionContext *exec_ctx = PLy_current_execution_context();
313 volatile MemoryContext oldcontext;
314 volatile ResourceOwner oldowner;
315 Portal portal;
316
317 cursor = (PLyCursorObject *) self;
318
319 if (cursor->closed)
320 {
321 PLy_exception_set(PyExc_ValueError, "iterating a closed cursor");
322 return NULL;
323 }
324
325 portal = GetPortalByName(cursor->portalname);
326 if (!PortalIsValid(portal))
327 {
328 PLy_exception_set(PyExc_ValueError,
329 "iterating a cursor in an aborted subtransaction");
330 return NULL;
331 }
332
333 oldcontext = CurrentMemoryContext;
334 oldowner = CurrentResourceOwner;
335
336 PLy_spi_subtransaction_begin(oldcontext, oldowner);
337
338 PG_TRY();
339 {
340 SPI_cursor_fetch(portal, true, 1);
341 if (SPI_processed == 0)
342 {
343 PyErr_SetNone(PyExc_StopIteration);
344 ret = NULL;
345 }
346 else
347 {
348 PLy_input_setup_tuple(&cursor->result, SPI_tuptable->tupdesc,
349 exec_ctx->curr_proc);
350
351 ret = PLy_input_from_tuple(&cursor->result, SPI_tuptable->vals[0],
352 SPI_tuptable->tupdesc, true);
353 }
354
355 SPI_freetuptable(SPI_tuptable);
356
357 PLy_spi_subtransaction_commit(oldcontext, oldowner);
358 }
359 PG_CATCH();
360 {
361 PLy_spi_subtransaction_abort(oldcontext, oldowner);
362 return NULL;
363 }
364 PG_END_TRY();
365
366 return ret;
367 }
368
369 static PyObject *
PLy_cursor_fetch(PyObject * self,PyObject * args)370 PLy_cursor_fetch(PyObject *self, PyObject *args)
371 {
372 PLyCursorObject *cursor;
373 int count;
374 PLyResultObject *ret;
375 PLyExecutionContext *exec_ctx = PLy_current_execution_context();
376 volatile MemoryContext oldcontext;
377 volatile ResourceOwner oldowner;
378 Portal portal;
379
380 if (!PyArg_ParseTuple(args, "i:fetch", &count))
381 return NULL;
382
383 cursor = (PLyCursorObject *) self;
384
385 if (cursor->closed)
386 {
387 PLy_exception_set(PyExc_ValueError, "fetch from a closed cursor");
388 return NULL;
389 }
390
391 portal = GetPortalByName(cursor->portalname);
392 if (!PortalIsValid(portal))
393 {
394 PLy_exception_set(PyExc_ValueError,
395 "iterating a cursor in an aborted subtransaction");
396 return NULL;
397 }
398
399 ret = (PLyResultObject *) PLy_result_new();
400 if (ret == NULL)
401 return NULL;
402
403 oldcontext = CurrentMemoryContext;
404 oldowner = CurrentResourceOwner;
405
406 PLy_spi_subtransaction_begin(oldcontext, oldowner);
407
408 PG_TRY();
409 {
410 SPI_cursor_fetch(portal, true, count);
411
412 Py_DECREF(ret->status);
413 ret->status = PyInt_FromLong(SPI_OK_FETCH);
414
415 Py_DECREF(ret->nrows);
416 ret->nrows = PyLong_FromUnsignedLongLong(SPI_processed);
417
418 if (SPI_processed != 0)
419 {
420 uint64 i;
421
422 /*
423 * PyList_New() and PyList_SetItem() use Py_ssize_t for list size
424 * and list indices; so we cannot support a result larger than
425 * PY_SSIZE_T_MAX.
426 */
427 if (SPI_processed > (uint64) PY_SSIZE_T_MAX)
428 ereport(ERROR,
429 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
430 errmsg("query result has too many rows to fit in a Python list")));
431
432 Py_DECREF(ret->rows);
433 ret->rows = PyList_New(SPI_processed);
434 if (!ret->rows)
435 {
436 Py_DECREF(ret);
437 ret = NULL;
438 }
439 else
440 {
441 PLy_input_setup_tuple(&cursor->result, SPI_tuptable->tupdesc,
442 exec_ctx->curr_proc);
443
444 for (i = 0; i < SPI_processed; i++)
445 {
446 PyObject *row = PLy_input_from_tuple(&cursor->result,
447 SPI_tuptable->vals[i],
448 SPI_tuptable->tupdesc,
449 true);
450
451 PyList_SetItem(ret->rows, i, row);
452 }
453 }
454 }
455
456 SPI_freetuptable(SPI_tuptable);
457
458 PLy_spi_subtransaction_commit(oldcontext, oldowner);
459 }
460 PG_CATCH();
461 {
462 PLy_spi_subtransaction_abort(oldcontext, oldowner);
463 return NULL;
464 }
465 PG_END_TRY();
466
467 return (PyObject *) ret;
468 }
469
470 static PyObject *
PLy_cursor_close(PyObject * self,PyObject * unused)471 PLy_cursor_close(PyObject *self, PyObject *unused)
472 {
473 PLyCursorObject *cursor = (PLyCursorObject *) self;
474
475 if (!cursor->closed)
476 {
477 Portal portal = GetPortalByName(cursor->portalname);
478
479 if (!PortalIsValid(portal))
480 {
481 PLy_exception_set(PyExc_ValueError,
482 "closing a cursor in an aborted subtransaction");
483 return NULL;
484 }
485
486 UnpinPortal(portal);
487 SPI_cursor_close(portal);
488 cursor->closed = true;
489 }
490
491 Py_RETURN_NONE;
492 }
493