1 /*
2  * the PLyCursor class
3  *
4  * src/pl/plpython/plpy_cursorobject.c
5  */
6 
7 #include "postgres.h"
8 
9 #include <limits.h>
10 
11 #include "access/xact.h"
12 #include "catalog/pg_type.h"
13 #include "mb/pg_wchar.h"
14 #include "utils/memutils.h"
15 
16 #include "plpython.h"
17 
18 #include "plpy_cursorobject.h"
19 
20 #include "plpy_elog.h"
21 #include "plpy_main.h"
22 #include "plpy_planobject.h"
23 #include "plpy_procedure.h"
24 #include "plpy_resultobject.h"
25 #include "plpy_spi.h"
26 
27 
28 static PyObject *PLy_cursor_query(const char *query);
29 static void PLy_cursor_dealloc(PyObject *arg);
30 static PyObject *PLy_cursor_iternext(PyObject *self);
31 static PyObject *PLy_cursor_fetch(PyObject *self, PyObject *args);
32 static PyObject *PLy_cursor_close(PyObject *self, PyObject *unused);
33 
34 static char PLy_cursor_doc[] = {
35 	"Wrapper around a PostgreSQL cursor"
36 };
37 
38 static PyMethodDef PLy_cursor_methods[] = {
39 	{"fetch", PLy_cursor_fetch, METH_VARARGS, NULL},
40 	{"close", PLy_cursor_close, METH_NOARGS, NULL},
41 	{NULL, NULL, 0, NULL}
42 };
43 
44 static PyTypeObject PLy_CursorType = {
45 	PyVarObject_HEAD_INIT(NULL, 0)
46 	"PLyCursor",				/* tp_name */
47 	sizeof(PLyCursorObject),	/* tp_size */
48 	0,							/* tp_itemsize */
49 
50 	/*
51 	 * methods
52 	 */
53 	PLy_cursor_dealloc,			/* tp_dealloc */
54 	0,							/* tp_print */
55 	0,							/* tp_getattr */
56 	0,							/* tp_setattr */
57 	0,							/* tp_compare */
58 	0,							/* tp_repr */
59 	0,							/* tp_as_number */
60 	0,							/* tp_as_sequence */
61 	0,							/* tp_as_mapping */
62 	0,							/* tp_hash */
63 	0,							/* tp_call */
64 	0,							/* tp_str */
65 	0,							/* tp_getattro */
66 	0,							/* tp_setattro */
67 	0,							/* tp_as_buffer */
68 	Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER,	/* tp_flags */
69 	PLy_cursor_doc,				/* tp_doc */
70 	0,							/* tp_traverse */
71 	0,							/* tp_clear */
72 	0,							/* tp_richcompare */
73 	0,							/* tp_weaklistoffset */
74 	PyObject_SelfIter,			/* tp_iter */
75 	PLy_cursor_iternext,		/* tp_iternext */
76 	PLy_cursor_methods,			/* tp_tpmethods */
77 };
78 
79 void
PLy_cursor_init_type(void)80 PLy_cursor_init_type(void)
81 {
82 	if (PyType_Ready(&PLy_CursorType) < 0)
83 		elog(ERROR, "could not initialize PLy_CursorType");
84 }
85 
86 PyObject *
PLy_cursor(PyObject * self,PyObject * args)87 PLy_cursor(PyObject *self, PyObject *args)
88 {
89 	char	   *query;
90 	PyObject   *plan;
91 	PyObject   *planargs = NULL;
92 
93 	if (PyArg_ParseTuple(args, "s", &query))
94 		return PLy_cursor_query(query);
95 
96 	PyErr_Clear();
97 
98 	if (PyArg_ParseTuple(args, "O|O", &plan, &planargs))
99 		return PLy_cursor_plan(plan, planargs);
100 
101 	PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan");
102 	return NULL;
103 }
104 
105 
106 static PyObject *
PLy_cursor_query(const char * query)107 PLy_cursor_query(const char *query)
108 {
109 	PLyCursorObject *cursor;
110 	PLyExecutionContext *exec_ctx = PLy_current_execution_context();
111 	volatile MemoryContext oldcontext;
112 	volatile ResourceOwner oldowner;
113 
114 	if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
115 		return NULL;
116 	cursor->portalname = NULL;
117 	cursor->closed = false;
118 	cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
119 										 "PL/Python cursor context",
120 										 ALLOCSET_DEFAULT_SIZES);
121 
122 	/* Initialize for converting result tuples to Python */
123 	PLy_input_setup_func(&cursor->result, cursor->mcxt,
124 						 RECORDOID, -1,
125 						 exec_ctx->curr_proc);
126 
127 	oldcontext = CurrentMemoryContext;
128 	oldowner = CurrentResourceOwner;
129 
130 	PLy_spi_subtransaction_begin(oldcontext, oldowner);
131 
132 	PG_TRY();
133 	{
134 		SPIPlanPtr	plan;
135 		Portal		portal;
136 
137 		pg_verifymbstr(query, strlen(query), false);
138 
139 		plan = SPI_prepare(query, 0, NULL);
140 		if (plan == NULL)
141 			elog(ERROR, "SPI_prepare failed: %s",
142 				 SPI_result_code_string(SPI_result));
143 
144 		portal = SPI_cursor_open(NULL, plan, NULL, NULL,
145 								 exec_ctx->curr_proc->fn_readonly);
146 		SPI_freeplan(plan);
147 
148 		if (portal == NULL)
149 			elog(ERROR, "SPI_cursor_open() failed: %s",
150 				 SPI_result_code_string(SPI_result));
151 
152 		cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
153 
154 		PinPortal(portal);
155 
156 		PLy_spi_subtransaction_commit(oldcontext, oldowner);
157 	}
158 	PG_CATCH();
159 	{
160 		PLy_spi_subtransaction_abort(oldcontext, oldowner);
161 		return NULL;
162 	}
163 	PG_END_TRY();
164 
165 	Assert(cursor->portalname != NULL);
166 	return (PyObject *) cursor;
167 }
168 
169 PyObject *
PLy_cursor_plan(PyObject * ob,PyObject * args)170 PLy_cursor_plan(PyObject *ob, PyObject *args)
171 {
172 	PLyCursorObject *cursor;
173 	volatile int nargs;
174 	int			i;
175 	PLyPlanObject *plan;
176 	PLyExecutionContext *exec_ctx = PLy_current_execution_context();
177 	volatile MemoryContext oldcontext;
178 	volatile ResourceOwner oldowner;
179 
180 	if (args)
181 	{
182 		if (!PySequence_Check(args) || PyString_Check(args) || PyUnicode_Check(args))
183 		{
184 			PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument");
185 			return NULL;
186 		}
187 		nargs = PySequence_Length(args);
188 	}
189 	else
190 		nargs = 0;
191 
192 	plan = (PLyPlanObject *) ob;
193 
194 	if (nargs != plan->nargs)
195 	{
196 		char	   *sv;
197 		PyObject   *so = PyObject_Str(args);
198 
199 		if (!so)
200 			PLy_elog(ERROR, "could not execute plan");
201 		sv = PyString_AsString(so);
202 		PLy_exception_set_plural(PyExc_TypeError,
203 								 "Expected sequence of %d argument, got %d: %s",
204 								 "Expected sequence of %d arguments, got %d: %s",
205 								 plan->nargs,
206 								 plan->nargs, nargs, sv);
207 		Py_DECREF(so);
208 
209 		return NULL;
210 	}
211 
212 	if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
213 		return NULL;
214 	cursor->portalname = NULL;
215 	cursor->closed = false;
216 	cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
217 										 "PL/Python cursor context",
218 										 ALLOCSET_DEFAULT_SIZES);
219 
220 	/* Initialize for converting result tuples to Python */
221 	PLy_input_setup_func(&cursor->result, cursor->mcxt,
222 						 RECORDOID, -1,
223 						 exec_ctx->curr_proc);
224 
225 	oldcontext = CurrentMemoryContext;
226 	oldowner = CurrentResourceOwner;
227 
228 	PLy_spi_subtransaction_begin(oldcontext, oldowner);
229 
230 	PG_TRY();
231 	{
232 		Portal		portal;
233 		char	   *volatile nulls;
234 		volatile int j;
235 
236 		if (nargs > 0)
237 			nulls = palloc(nargs * sizeof(char));
238 		else
239 			nulls = NULL;
240 
241 		for (j = 0; j < nargs; j++)
242 		{
243 			PLyObToDatum *arg = &plan->args[j];
244 			PyObject   *elem;
245 
246 			elem = PySequence_GetItem(args, j);
247 			PG_TRY();
248 			{
249 				bool		isnull;
250 
251 				plan->values[j] = PLy_output_convert(arg, elem, &isnull);
252 				nulls[j] = isnull ? 'n' : ' ';
253 			}
254 			PG_CATCH();
255 			{
256 				Py_DECREF(elem);
257 				PG_RE_THROW();
258 			}
259 			PG_END_TRY();
260 			Py_DECREF(elem);
261 		}
262 
263 		portal = SPI_cursor_open(NULL, plan->plan, plan->values, nulls,
264 								 exec_ctx->curr_proc->fn_readonly);
265 		if (portal == NULL)
266 			elog(ERROR, "SPI_cursor_open() failed: %s",
267 				 SPI_result_code_string(SPI_result));
268 
269 		cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
270 
271 		PinPortal(portal);
272 
273 		PLy_spi_subtransaction_commit(oldcontext, oldowner);
274 	}
275 	PG_CATCH();
276 	{
277 		int			k;
278 
279 		/* cleanup plan->values array */
280 		for (k = 0; k < nargs; k++)
281 		{
282 			if (!plan->args[k].typbyval &&
283 				(plan->values[k] != PointerGetDatum(NULL)))
284 			{
285 				pfree(DatumGetPointer(plan->values[k]));
286 				plan->values[k] = PointerGetDatum(NULL);
287 			}
288 		}
289 
290 		Py_DECREF(cursor);
291 
292 		PLy_spi_subtransaction_abort(oldcontext, oldowner);
293 		return NULL;
294 	}
295 	PG_END_TRY();
296 
297 	for (i = 0; i < nargs; i++)
298 	{
299 		if (!plan->args[i].typbyval &&
300 			(plan->values[i] != PointerGetDatum(NULL)))
301 		{
302 			pfree(DatumGetPointer(plan->values[i]));
303 			plan->values[i] = PointerGetDatum(NULL);
304 		}
305 	}
306 
307 	Assert(cursor->portalname != NULL);
308 	return (PyObject *) cursor;
309 }
310 
311 static void
PLy_cursor_dealloc(PyObject * arg)312 PLy_cursor_dealloc(PyObject *arg)
313 {
314 	PLyCursorObject *cursor;
315 	Portal		portal;
316 
317 	cursor = (PLyCursorObject *) arg;
318 
319 	if (!cursor->closed)
320 	{
321 		portal = GetPortalByName(cursor->portalname);
322 
323 		if (PortalIsValid(portal))
324 		{
325 			UnpinPortal(portal);
326 			SPI_cursor_close(portal);
327 		}
328 		cursor->closed = true;
329 	}
330 	if (cursor->mcxt)
331 	{
332 		MemoryContextDelete(cursor->mcxt);
333 		cursor->mcxt = NULL;
334 	}
335 	arg->ob_type->tp_free(arg);
336 }
337 
338 static PyObject *
PLy_cursor_iternext(PyObject * self)339 PLy_cursor_iternext(PyObject *self)
340 {
341 	PLyCursorObject *cursor;
342 	PyObject   *ret;
343 	PLyExecutionContext *exec_ctx = PLy_current_execution_context();
344 	volatile MemoryContext oldcontext;
345 	volatile ResourceOwner oldowner;
346 	Portal		portal;
347 
348 	cursor = (PLyCursorObject *) self;
349 
350 	if (cursor->closed)
351 	{
352 		PLy_exception_set(PyExc_ValueError, "iterating a closed cursor");
353 		return NULL;
354 	}
355 
356 	portal = GetPortalByName(cursor->portalname);
357 	if (!PortalIsValid(portal))
358 	{
359 		PLy_exception_set(PyExc_ValueError,
360 						  "iterating a cursor in an aborted subtransaction");
361 		return NULL;
362 	}
363 
364 	oldcontext = CurrentMemoryContext;
365 	oldowner = CurrentResourceOwner;
366 
367 	PLy_spi_subtransaction_begin(oldcontext, oldowner);
368 
369 	PG_TRY();
370 	{
371 		SPI_cursor_fetch(portal, true, 1);
372 		if (SPI_processed == 0)
373 		{
374 			PyErr_SetNone(PyExc_StopIteration);
375 			ret = NULL;
376 		}
377 		else
378 		{
379 			PLy_input_setup_tuple(&cursor->result, SPI_tuptable->tupdesc,
380 								  exec_ctx->curr_proc);
381 
382 			ret = PLy_input_from_tuple(&cursor->result, SPI_tuptable->vals[0],
383 									   SPI_tuptable->tupdesc);
384 		}
385 
386 		SPI_freetuptable(SPI_tuptable);
387 
388 		PLy_spi_subtransaction_commit(oldcontext, oldowner);
389 	}
390 	PG_CATCH();
391 	{
392 		PLy_spi_subtransaction_abort(oldcontext, oldowner);
393 		return NULL;
394 	}
395 	PG_END_TRY();
396 
397 	return ret;
398 }
399 
400 static PyObject *
PLy_cursor_fetch(PyObject * self,PyObject * args)401 PLy_cursor_fetch(PyObject *self, PyObject *args)
402 {
403 	PLyCursorObject *cursor;
404 	int			count;
405 	PLyResultObject *ret;
406 	PLyExecutionContext *exec_ctx = PLy_current_execution_context();
407 	volatile MemoryContext oldcontext;
408 	volatile ResourceOwner oldowner;
409 	Portal		portal;
410 
411 	if (!PyArg_ParseTuple(args, "i:fetch", &count))
412 		return NULL;
413 
414 	cursor = (PLyCursorObject *) self;
415 
416 	if (cursor->closed)
417 	{
418 		PLy_exception_set(PyExc_ValueError, "fetch from a closed cursor");
419 		return NULL;
420 	}
421 
422 	portal = GetPortalByName(cursor->portalname);
423 	if (!PortalIsValid(portal))
424 	{
425 		PLy_exception_set(PyExc_ValueError,
426 						  "iterating a cursor in an aborted subtransaction");
427 		return NULL;
428 	}
429 
430 	ret = (PLyResultObject *) PLy_result_new();
431 	if (ret == NULL)
432 		return NULL;
433 
434 	oldcontext = CurrentMemoryContext;
435 	oldowner = CurrentResourceOwner;
436 
437 	PLy_spi_subtransaction_begin(oldcontext, oldowner);
438 
439 	PG_TRY();
440 	{
441 		SPI_cursor_fetch(portal, true, count);
442 
443 		Py_DECREF(ret->status);
444 		ret->status = PyInt_FromLong(SPI_OK_FETCH);
445 
446 		Py_DECREF(ret->nrows);
447 		ret->nrows = PyLong_FromUnsignedLongLong(SPI_processed);
448 
449 		if (SPI_processed != 0)
450 		{
451 			uint64		i;
452 
453 			/*
454 			 * PyList_New() and PyList_SetItem() use Py_ssize_t for list size
455 			 * and list indices; so we cannot support a result larger than
456 			 * PY_SSIZE_T_MAX.
457 			 */
458 			if (SPI_processed > (uint64) PY_SSIZE_T_MAX)
459 				ereport(ERROR,
460 						(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
461 						 errmsg("query result has too many rows to fit in a Python list")));
462 
463 			Py_DECREF(ret->rows);
464 			ret->rows = PyList_New(SPI_processed);
465 			if (!ret->rows)
466 			{
467 				Py_DECREF(ret);
468 				ret = NULL;
469 			}
470 			else
471 			{
472 				PLy_input_setup_tuple(&cursor->result, SPI_tuptable->tupdesc,
473 									  exec_ctx->curr_proc);
474 
475 				for (i = 0; i < SPI_processed; i++)
476 				{
477 					PyObject   *row = PLy_input_from_tuple(&cursor->result,
478 														   SPI_tuptable->vals[i],
479 														   SPI_tuptable->tupdesc);
480 
481 					PyList_SetItem(ret->rows, i, row);
482 				}
483 			}
484 		}
485 
486 		SPI_freetuptable(SPI_tuptable);
487 
488 		PLy_spi_subtransaction_commit(oldcontext, oldowner);
489 	}
490 	PG_CATCH();
491 	{
492 		PLy_spi_subtransaction_abort(oldcontext, oldowner);
493 		return NULL;
494 	}
495 	PG_END_TRY();
496 
497 	return (PyObject *) ret;
498 }
499 
500 static PyObject *
PLy_cursor_close(PyObject * self,PyObject * unused)501 PLy_cursor_close(PyObject *self, PyObject *unused)
502 {
503 	PLyCursorObject *cursor = (PLyCursorObject *) self;
504 
505 	if (!cursor->closed)
506 	{
507 		Portal		portal = GetPortalByName(cursor->portalname);
508 
509 		if (!PortalIsValid(portal))
510 		{
511 			PLy_exception_set(PyExc_ValueError,
512 							  "closing a cursor in an aborted subtransaction");
513 			return NULL;
514 		}
515 
516 		UnpinPortal(portal);
517 		SPI_cursor_close(portal);
518 		cursor->closed = true;
519 	}
520 
521 	Py_RETURN_NONE;
522 }
523