1 /*
2  * the PLyCursor class
3  *
4  * src/pl/plpython/plpy_cursorobject.c
5  */
6 
7 #include "postgres.h"
8 
9 #include <limits.h>
10 
11 #include "access/xact.h"
12 #include "mb/pg_wchar.h"
13 #include "utils/memutils.h"
14 
15 #include "plpython.h"
16 
17 #include "plpy_cursorobject.h"
18 
19 #include "plpy_elog.h"
20 #include "plpy_main.h"
21 #include "plpy_planobject.h"
22 #include "plpy_procedure.h"
23 #include "plpy_resultobject.h"
24 #include "plpy_spi.h"
25 
26 
27 static PyObject *PLy_cursor_query(const char *query);
28 static PyObject *PLy_cursor_plan(PyObject *ob, PyObject *args);
29 static void PLy_cursor_dealloc(PyObject *arg);
30 static PyObject *PLy_cursor_iternext(PyObject *self);
31 static PyObject *PLy_cursor_fetch(PyObject *self, PyObject *args);
32 static PyObject *PLy_cursor_close(PyObject *self, PyObject *unused);
33 
34 static char PLy_cursor_doc[] = {
35 	"Wrapper around a PostgreSQL cursor"
36 };
37 
38 static PyMethodDef PLy_cursor_methods[] = {
39 	{"fetch", PLy_cursor_fetch, METH_VARARGS, NULL},
40 	{"close", PLy_cursor_close, METH_NOARGS, NULL},
41 	{NULL, NULL, 0, NULL}
42 };
43 
44 static PyTypeObject PLy_CursorType = {
45 	PyVarObject_HEAD_INIT(NULL, 0)
46 	"PLyCursor",				/* tp_name */
47 	sizeof(PLyCursorObject),	/* tp_size */
48 	0,							/* tp_itemsize */
49 
50 	/*
51 	 * methods
52 	 */
53 	PLy_cursor_dealloc,			/* tp_dealloc */
54 	0,							/* tp_print */
55 	0,							/* tp_getattr */
56 	0,							/* tp_setattr */
57 	0,							/* tp_compare */
58 	0,							/* tp_repr */
59 	0,							/* tp_as_number */
60 	0,							/* tp_as_sequence */
61 	0,							/* tp_as_mapping */
62 	0,							/* tp_hash */
63 	0,							/* tp_call */
64 	0,							/* tp_str */
65 	0,							/* tp_getattro */
66 	0,							/* tp_setattro */
67 	0,							/* tp_as_buffer */
68 	Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER,	/* tp_flags */
69 	PLy_cursor_doc,				/* tp_doc */
70 	0,							/* tp_traverse */
71 	0,							/* tp_clear */
72 	0,							/* tp_richcompare */
73 	0,							/* tp_weaklistoffset */
74 	PyObject_SelfIter,			/* tp_iter */
75 	PLy_cursor_iternext,		/* tp_iternext */
76 	PLy_cursor_methods,			/* tp_tpmethods */
77 };
78 
79 void
PLy_cursor_init_type(void)80 PLy_cursor_init_type(void)
81 {
82 	if (PyType_Ready(&PLy_CursorType) < 0)
83 		elog(ERROR, "could not initialize PLy_CursorType");
84 }
85 
86 PyObject *
PLy_cursor(PyObject * self,PyObject * args)87 PLy_cursor(PyObject *self, PyObject *args)
88 {
89 	char	   *query;
90 	PyObject   *plan;
91 	PyObject   *planargs = NULL;
92 
93 	if (PyArg_ParseTuple(args, "s", &query))
94 		return PLy_cursor_query(query);
95 
96 	PyErr_Clear();
97 
98 	if (PyArg_ParseTuple(args, "O|O", &plan, &planargs))
99 		return PLy_cursor_plan(plan, planargs);
100 
101 	PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan");
102 	return NULL;
103 }
104 
105 
106 static PyObject *
PLy_cursor_query(const char * query)107 PLy_cursor_query(const char *query)
108 {
109 	PLyCursorObject *cursor;
110 	volatile MemoryContext oldcontext;
111 	volatile ResourceOwner oldowner;
112 
113 	if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
114 		return NULL;
115 	cursor->portalname = NULL;
116 	cursor->closed = false;
117 	cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
118 										 "PL/Python cursor context",
119 										 ALLOCSET_DEFAULT_SIZES);
120 	PLy_typeinfo_init(&cursor->result, cursor->mcxt);
121 
122 	oldcontext = CurrentMemoryContext;
123 	oldowner = CurrentResourceOwner;
124 
125 	PLy_spi_subtransaction_begin(oldcontext, oldowner);
126 
127 	PG_TRY();
128 	{
129 		PLyExecutionContext *exec_ctx = PLy_current_execution_context();
130 		SPIPlanPtr	plan;
131 		Portal		portal;
132 
133 		pg_verifymbstr(query, strlen(query), false);
134 
135 		plan = SPI_prepare(query, 0, NULL);
136 		if (plan == NULL)
137 			elog(ERROR, "SPI_prepare failed: %s",
138 				 SPI_result_code_string(SPI_result));
139 
140 		portal = SPI_cursor_open(NULL, plan, NULL, NULL,
141 								 exec_ctx->curr_proc->fn_readonly);
142 		SPI_freeplan(plan);
143 
144 		if (portal == NULL)
145 			elog(ERROR, "SPI_cursor_open() failed: %s",
146 				 SPI_result_code_string(SPI_result));
147 
148 		cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
149 
150 		PLy_spi_subtransaction_commit(oldcontext, oldowner);
151 	}
152 	PG_CATCH();
153 	{
154 		PLy_spi_subtransaction_abort(oldcontext, oldowner);
155 		return NULL;
156 	}
157 	PG_END_TRY();
158 
159 	Assert(cursor->portalname != NULL);
160 	return (PyObject *) cursor;
161 }
162 
163 static PyObject *
PLy_cursor_plan(PyObject * ob,PyObject * args)164 PLy_cursor_plan(PyObject *ob, PyObject *args)
165 {
166 	PLyCursorObject *cursor;
167 	volatile int nargs;
168 	int			i;
169 	PLyPlanObject *plan;
170 	volatile MemoryContext oldcontext;
171 	volatile ResourceOwner oldowner;
172 
173 	if (args)
174 	{
175 		if (!PySequence_Check(args) || PyString_Check(args) || PyUnicode_Check(args))
176 		{
177 			PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument");
178 			return NULL;
179 		}
180 		nargs = PySequence_Length(args);
181 	}
182 	else
183 		nargs = 0;
184 
185 	plan = (PLyPlanObject *) ob;
186 
187 	if (nargs != plan->nargs)
188 	{
189 		char	   *sv;
190 		PyObject   *so = PyObject_Str(args);
191 
192 		if (!so)
193 			PLy_elog(ERROR, "could not execute plan");
194 		sv = PyString_AsString(so);
195 		PLy_exception_set_plural(PyExc_TypeError,
196 							  "Expected sequence of %d argument, got %d: %s",
197 							 "Expected sequence of %d arguments, got %d: %s",
198 								 plan->nargs,
199 								 plan->nargs, nargs, sv);
200 		Py_DECREF(so);
201 
202 		return NULL;
203 	}
204 
205 	if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
206 		return NULL;
207 	cursor->portalname = NULL;
208 	cursor->closed = false;
209 	cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
210 										 "PL/Python cursor context",
211 										 ALLOCSET_DEFAULT_SIZES);
212 	PLy_typeinfo_init(&cursor->result, cursor->mcxt);
213 
214 	oldcontext = CurrentMemoryContext;
215 	oldowner = CurrentResourceOwner;
216 
217 	PLy_spi_subtransaction_begin(oldcontext, oldowner);
218 
219 	PG_TRY();
220 	{
221 		PLyExecutionContext *exec_ctx = PLy_current_execution_context();
222 		Portal		portal;
223 		char	   *volatile nulls;
224 		volatile int j;
225 
226 		if (nargs > 0)
227 			nulls = palloc(nargs * sizeof(char));
228 		else
229 			nulls = NULL;
230 
231 		for (j = 0; j < nargs; j++)
232 		{
233 			PyObject   *elem;
234 
235 			elem = PySequence_GetItem(args, j);
236 			if (elem != Py_None)
237 			{
238 				PG_TRY();
239 				{
240 					plan->values[j] =
241 						plan->args[j].out.d.func(&(plan->args[j].out.d),
242 												 -1,
243 												 elem);
244 				}
245 				PG_CATCH();
246 				{
247 					Py_DECREF(elem);
248 					PG_RE_THROW();
249 				}
250 				PG_END_TRY();
251 
252 				Py_DECREF(elem);
253 				nulls[j] = ' ';
254 			}
255 			else
256 			{
257 				Py_DECREF(elem);
258 				plan->values[j] =
259 					InputFunctionCall(&(plan->args[j].out.d.typfunc),
260 									  NULL,
261 									  plan->args[j].out.d.typioparam,
262 									  -1);
263 				nulls[j] = 'n';
264 			}
265 		}
266 
267 		portal = SPI_cursor_open(NULL, plan->plan, plan->values, nulls,
268 								 exec_ctx->curr_proc->fn_readonly);
269 		if (portal == NULL)
270 			elog(ERROR, "SPI_cursor_open() failed: %s",
271 				 SPI_result_code_string(SPI_result));
272 
273 		cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
274 
275 		PLy_spi_subtransaction_commit(oldcontext, oldowner);
276 	}
277 	PG_CATCH();
278 	{
279 		int			k;
280 
281 		/* cleanup plan->values array */
282 		for (k = 0; k < nargs; k++)
283 		{
284 			if (!plan->args[k].out.d.typbyval &&
285 				(plan->values[k] != PointerGetDatum(NULL)))
286 			{
287 				pfree(DatumGetPointer(plan->values[k]));
288 				plan->values[k] = PointerGetDatum(NULL);
289 			}
290 		}
291 
292 		Py_DECREF(cursor);
293 
294 		PLy_spi_subtransaction_abort(oldcontext, oldowner);
295 		return NULL;
296 	}
297 	PG_END_TRY();
298 
299 	for (i = 0; i < nargs; i++)
300 	{
301 		if (!plan->args[i].out.d.typbyval &&
302 			(plan->values[i] != PointerGetDatum(NULL)))
303 		{
304 			pfree(DatumGetPointer(plan->values[i]));
305 			plan->values[i] = PointerGetDatum(NULL);
306 		}
307 	}
308 
309 	Assert(cursor->portalname != NULL);
310 	return (PyObject *) cursor;
311 }
312 
313 static void
PLy_cursor_dealloc(PyObject * arg)314 PLy_cursor_dealloc(PyObject *arg)
315 {
316 	PLyCursorObject *cursor;
317 	Portal		portal;
318 
319 	cursor = (PLyCursorObject *) arg;
320 
321 	if (!cursor->closed)
322 	{
323 		portal = GetPortalByName(cursor->portalname);
324 
325 		if (PortalIsValid(portal))
326 			SPI_cursor_close(portal);
327 		cursor->closed = true;
328 	}
329 	if (cursor->mcxt)
330 	{
331 		MemoryContextDelete(cursor->mcxt);
332 		cursor->mcxt = NULL;
333 	}
334 	arg->ob_type->tp_free(arg);
335 }
336 
337 static PyObject *
PLy_cursor_iternext(PyObject * self)338 PLy_cursor_iternext(PyObject *self)
339 {
340 	PLyCursorObject *cursor;
341 	PyObject   *ret;
342 	volatile MemoryContext oldcontext;
343 	volatile ResourceOwner oldowner;
344 	Portal		portal;
345 
346 	cursor = (PLyCursorObject *) self;
347 
348 	if (cursor->closed)
349 	{
350 		PLy_exception_set(PyExc_ValueError, "iterating a closed cursor");
351 		return NULL;
352 	}
353 
354 	portal = GetPortalByName(cursor->portalname);
355 	if (!PortalIsValid(portal))
356 	{
357 		PLy_exception_set(PyExc_ValueError,
358 						  "iterating a cursor in an aborted subtransaction");
359 		return NULL;
360 	}
361 
362 	oldcontext = CurrentMemoryContext;
363 	oldowner = CurrentResourceOwner;
364 
365 	PLy_spi_subtransaction_begin(oldcontext, oldowner);
366 
367 	PG_TRY();
368 	{
369 		SPI_cursor_fetch(portal, true, 1);
370 		if (SPI_processed == 0)
371 		{
372 			PyErr_SetNone(PyExc_StopIteration);
373 			ret = NULL;
374 		}
375 		else
376 		{
377 			if (cursor->result.is_rowtype != 1)
378 				PLy_input_tuple_funcs(&cursor->result, SPI_tuptable->tupdesc);
379 
380 			ret = PLyDict_FromTuple(&cursor->result, SPI_tuptable->vals[0],
381 									SPI_tuptable->tupdesc);
382 		}
383 
384 		SPI_freetuptable(SPI_tuptable);
385 
386 		PLy_spi_subtransaction_commit(oldcontext, oldowner);
387 	}
388 	PG_CATCH();
389 	{
390 		PLy_spi_subtransaction_abort(oldcontext, oldowner);
391 		return NULL;
392 	}
393 	PG_END_TRY();
394 
395 	return ret;
396 }
397 
398 static PyObject *
PLy_cursor_fetch(PyObject * self,PyObject * args)399 PLy_cursor_fetch(PyObject *self, PyObject *args)
400 {
401 	PLyCursorObject *cursor;
402 	int			count;
403 	PLyResultObject *ret;
404 	volatile MemoryContext oldcontext;
405 	volatile ResourceOwner oldowner;
406 	Portal		portal;
407 
408 	if (!PyArg_ParseTuple(args, "i", &count))
409 		return NULL;
410 
411 	cursor = (PLyCursorObject *) self;
412 
413 	if (cursor->closed)
414 	{
415 		PLy_exception_set(PyExc_ValueError, "fetch from a closed cursor");
416 		return NULL;
417 	}
418 
419 	portal = GetPortalByName(cursor->portalname);
420 	if (!PortalIsValid(portal))
421 	{
422 		PLy_exception_set(PyExc_ValueError,
423 						  "iterating a cursor in an aborted subtransaction");
424 		return NULL;
425 	}
426 
427 	ret = (PLyResultObject *) PLy_result_new();
428 	if (ret == NULL)
429 		return NULL;
430 
431 	oldcontext = CurrentMemoryContext;
432 	oldowner = CurrentResourceOwner;
433 
434 	PLy_spi_subtransaction_begin(oldcontext, oldowner);
435 
436 	PG_TRY();
437 	{
438 		SPI_cursor_fetch(portal, true, count);
439 
440 		if (cursor->result.is_rowtype != 1)
441 			PLy_input_tuple_funcs(&cursor->result, SPI_tuptable->tupdesc);
442 
443 		Py_DECREF(ret->status);
444 		ret->status = PyInt_FromLong(SPI_OK_FETCH);
445 
446 		Py_DECREF(ret->nrows);
447 		ret->nrows = (SPI_processed > (uint64) LONG_MAX) ?
448 			PyFloat_FromDouble((double) SPI_processed) :
449 			PyInt_FromLong((long) SPI_processed);
450 
451 		if (SPI_processed != 0)
452 		{
453 			uint64		i;
454 
455 			/*
456 			 * PyList_New() and PyList_SetItem() use Py_ssize_t for list size
457 			 * and list indices; so we cannot support a result larger than
458 			 * PY_SSIZE_T_MAX.
459 			 */
460 			if (SPI_processed > (uint64) PY_SSIZE_T_MAX)
461 				ereport(ERROR,
462 						(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
463 						 errmsg("query result has too many rows to fit in a Python list")));
464 
465 			Py_DECREF(ret->rows);
466 			ret->rows = PyList_New(SPI_processed);
467 
468 			for (i = 0; i < SPI_processed; i++)
469 			{
470 				PyObject   *row = PLyDict_FromTuple(&cursor->result,
471 													SPI_tuptable->vals[i],
472 													SPI_tuptable->tupdesc);
473 
474 				PyList_SetItem(ret->rows, i, row);
475 			}
476 		}
477 
478 		SPI_freetuptable(SPI_tuptable);
479 
480 		PLy_spi_subtransaction_commit(oldcontext, oldowner);
481 	}
482 	PG_CATCH();
483 	{
484 		PLy_spi_subtransaction_abort(oldcontext, oldowner);
485 		return NULL;
486 	}
487 	PG_END_TRY();
488 
489 	return (PyObject *) ret;
490 }
491 
492 static PyObject *
PLy_cursor_close(PyObject * self,PyObject * unused)493 PLy_cursor_close(PyObject *self, PyObject *unused)
494 {
495 	PLyCursorObject *cursor = (PLyCursorObject *) self;
496 
497 	if (!cursor->closed)
498 	{
499 		Portal		portal = GetPortalByName(cursor->portalname);
500 
501 		if (!PortalIsValid(portal))
502 		{
503 			PLy_exception_set(PyExc_ValueError,
504 							"closing a cursor in an aborted subtransaction");
505 			return NULL;
506 		}
507 
508 		SPI_cursor_close(portal);
509 		cursor->closed = true;
510 	}
511 
512 	Py_INCREF(Py_None);
513 	return Py_None;
514 }
515