1 /*
2  * the PLyCursor class
3  *
4  * src/pl/plpython/plpy_cursorobject.c
5  */
6 
7 #include "postgres.h"
8 
9 #include <limits.h>
10 
11 #include "access/xact.h"
12 #include "catalog/pg_type.h"
13 #include "mb/pg_wchar.h"
14 #include "utils/memutils.h"
15 
16 #include "plpython.h"
17 
18 #include "plpy_cursorobject.h"
19 
20 #include "plpy_elog.h"
21 #include "plpy_main.h"
22 #include "plpy_planobject.h"
23 #include "plpy_procedure.h"
24 #include "plpy_resultobject.h"
25 #include "plpy_spi.h"
26 
27 
28 static PyObject *PLy_cursor_query(const char *query);
29 static void PLy_cursor_dealloc(PyObject *arg);
30 static PyObject *PLy_cursor_iternext(PyObject *self);
31 static PyObject *PLy_cursor_fetch(PyObject *self, PyObject *args);
32 static PyObject *PLy_cursor_close(PyObject *self, PyObject *unused);
33 
34 static char PLy_cursor_doc[] = {
35 	"Wrapper around a PostgreSQL cursor"
36 };
37 
38 static PyMethodDef PLy_cursor_methods[] = {
39 	{"fetch", PLy_cursor_fetch, METH_VARARGS, NULL},
40 	{"close", PLy_cursor_close, METH_NOARGS, NULL},
41 	{NULL, NULL, 0, NULL}
42 };
43 
44 static PyTypeObject PLy_CursorType = {
45 	PyVarObject_HEAD_INIT(NULL, 0)
46 	.tp_name = "PLyCursor",
47 	.tp_basicsize = sizeof(PLyCursorObject),
48 	.tp_dealloc = PLy_cursor_dealloc,
49 	.tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER,
50 	.tp_doc = PLy_cursor_doc,
51 	.tp_iter = PyObject_SelfIter,
52 	.tp_iternext = PLy_cursor_iternext,
53 	.tp_methods = PLy_cursor_methods,
54 };
55 
56 void
PLy_cursor_init_type(void)57 PLy_cursor_init_type(void)
58 {
59 	if (PyType_Ready(&PLy_CursorType) < 0)
60 		elog(ERROR, "could not initialize PLy_CursorType");
61 }
62 
63 PyObject *
PLy_cursor(PyObject * self,PyObject * args)64 PLy_cursor(PyObject *self, PyObject *args)
65 {
66 	char	   *query;
67 	PyObject   *plan;
68 	PyObject   *planargs = NULL;
69 
70 	if (PyArg_ParseTuple(args, "s", &query))
71 		return PLy_cursor_query(query);
72 
73 	PyErr_Clear();
74 
75 	if (PyArg_ParseTuple(args, "O|O", &plan, &planargs))
76 		return PLy_cursor_plan(plan, planargs);
77 
78 	PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan");
79 	return NULL;
80 }
81 
82 
83 static PyObject *
PLy_cursor_query(const char * query)84 PLy_cursor_query(const char *query)
85 {
86 	PLyCursorObject *cursor;
87 	PLyExecutionContext *exec_ctx = PLy_current_execution_context();
88 	volatile MemoryContext oldcontext;
89 	volatile ResourceOwner oldowner;
90 
91 	if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
92 		return NULL;
93 	cursor->portalname = NULL;
94 	cursor->closed = false;
95 	cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
96 										 "PL/Python cursor context",
97 										 ALLOCSET_DEFAULT_SIZES);
98 
99 	/* Initialize for converting result tuples to Python */
100 	PLy_input_setup_func(&cursor->result, cursor->mcxt,
101 						 RECORDOID, -1,
102 						 exec_ctx->curr_proc);
103 
104 	oldcontext = CurrentMemoryContext;
105 	oldowner = CurrentResourceOwner;
106 
107 	PLy_spi_subtransaction_begin(oldcontext, oldowner);
108 
109 	PG_TRY();
110 	{
111 		SPIPlanPtr	plan;
112 		Portal		portal;
113 
114 		pg_verifymbstr(query, strlen(query), false);
115 
116 		plan = SPI_prepare(query, 0, NULL);
117 		if (plan == NULL)
118 			elog(ERROR, "SPI_prepare failed: %s",
119 				 SPI_result_code_string(SPI_result));
120 
121 		portal = SPI_cursor_open(NULL, plan, NULL, NULL,
122 								 exec_ctx->curr_proc->fn_readonly);
123 		SPI_freeplan(plan);
124 
125 		if (portal == NULL)
126 			elog(ERROR, "SPI_cursor_open() failed: %s",
127 				 SPI_result_code_string(SPI_result));
128 
129 		cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
130 
131 		PinPortal(portal);
132 
133 		PLy_spi_subtransaction_commit(oldcontext, oldowner);
134 	}
135 	PG_CATCH();
136 	{
137 		PLy_spi_subtransaction_abort(oldcontext, oldowner);
138 		return NULL;
139 	}
140 	PG_END_TRY();
141 
142 	Assert(cursor->portalname != NULL);
143 	return (PyObject *) cursor;
144 }
145 
146 PyObject *
PLy_cursor_plan(PyObject * ob,PyObject * args)147 PLy_cursor_plan(PyObject *ob, PyObject *args)
148 {
149 	PLyCursorObject *cursor;
150 	volatile int nargs;
151 	int			i;
152 	PLyPlanObject *plan;
153 	PLyExecutionContext *exec_ctx = PLy_current_execution_context();
154 	volatile MemoryContext oldcontext;
155 	volatile ResourceOwner oldowner;
156 
157 	if (args)
158 	{
159 		if (!PySequence_Check(args) || PyString_Check(args) || PyUnicode_Check(args))
160 		{
161 			PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument");
162 			return NULL;
163 		}
164 		nargs = PySequence_Length(args);
165 	}
166 	else
167 		nargs = 0;
168 
169 	plan = (PLyPlanObject *) ob;
170 
171 	if (nargs != plan->nargs)
172 	{
173 		char	   *sv;
174 		PyObject   *so = PyObject_Str(args);
175 
176 		if (!so)
177 			PLy_elog(ERROR, "could not execute plan");
178 		sv = PyString_AsString(so);
179 		PLy_exception_set_plural(PyExc_TypeError,
180 								 "Expected sequence of %d argument, got %d: %s",
181 								 "Expected sequence of %d arguments, got %d: %s",
182 								 plan->nargs,
183 								 plan->nargs, nargs, sv);
184 		Py_DECREF(so);
185 
186 		return NULL;
187 	}
188 
189 	if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
190 		return NULL;
191 	cursor->portalname = NULL;
192 	cursor->closed = false;
193 	cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
194 										 "PL/Python cursor context",
195 										 ALLOCSET_DEFAULT_SIZES);
196 
197 	/* Initialize for converting result tuples to Python */
198 	PLy_input_setup_func(&cursor->result, cursor->mcxt,
199 						 RECORDOID, -1,
200 						 exec_ctx->curr_proc);
201 
202 	oldcontext = CurrentMemoryContext;
203 	oldowner = CurrentResourceOwner;
204 
205 	PLy_spi_subtransaction_begin(oldcontext, oldowner);
206 
207 	PG_TRY();
208 	{
209 		Portal		portal;
210 		char	   *volatile nulls;
211 		volatile int j;
212 
213 		if (nargs > 0)
214 			nulls = palloc(nargs * sizeof(char));
215 		else
216 			nulls = NULL;
217 
218 		for (j = 0; j < nargs; j++)
219 		{
220 			PLyObToDatum *arg = &plan->args[j];
221 			PyObject   *elem;
222 
223 			elem = PySequence_GetItem(args, j);
224 			PG_TRY();
225 			{
226 				bool		isnull;
227 
228 				plan->values[j] = PLy_output_convert(arg, elem, &isnull);
229 				nulls[j] = isnull ? 'n' : ' ';
230 			}
231 			PG_CATCH();
232 			{
233 				Py_DECREF(elem);
234 				PG_RE_THROW();
235 			}
236 			PG_END_TRY();
237 			Py_DECREF(elem);
238 		}
239 
240 		portal = SPI_cursor_open(NULL, plan->plan, plan->values, nulls,
241 								 exec_ctx->curr_proc->fn_readonly);
242 		if (portal == NULL)
243 			elog(ERROR, "SPI_cursor_open() failed: %s",
244 				 SPI_result_code_string(SPI_result));
245 
246 		cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
247 
248 		PinPortal(portal);
249 
250 		PLy_spi_subtransaction_commit(oldcontext, oldowner);
251 	}
252 	PG_CATCH();
253 	{
254 		int			k;
255 
256 		/* cleanup plan->values array */
257 		for (k = 0; k < nargs; k++)
258 		{
259 			if (!plan->args[k].typbyval &&
260 				(plan->values[k] != PointerGetDatum(NULL)))
261 			{
262 				pfree(DatumGetPointer(plan->values[k]));
263 				plan->values[k] = PointerGetDatum(NULL);
264 			}
265 		}
266 
267 		Py_DECREF(cursor);
268 
269 		PLy_spi_subtransaction_abort(oldcontext, oldowner);
270 		return NULL;
271 	}
272 	PG_END_TRY();
273 
274 	for (i = 0; i < nargs; i++)
275 	{
276 		if (!plan->args[i].typbyval &&
277 			(plan->values[i] != PointerGetDatum(NULL)))
278 		{
279 			pfree(DatumGetPointer(plan->values[i]));
280 			plan->values[i] = PointerGetDatum(NULL);
281 		}
282 	}
283 
284 	Assert(cursor->portalname != NULL);
285 	return (PyObject *) cursor;
286 }
287 
288 static void
PLy_cursor_dealloc(PyObject * arg)289 PLy_cursor_dealloc(PyObject *arg)
290 {
291 	PLyCursorObject *cursor;
292 	Portal		portal;
293 
294 	cursor = (PLyCursorObject *) arg;
295 
296 	if (!cursor->closed)
297 	{
298 		portal = GetPortalByName(cursor->portalname);
299 
300 		if (PortalIsValid(portal))
301 		{
302 			UnpinPortal(portal);
303 			SPI_cursor_close(portal);
304 		}
305 		cursor->closed = true;
306 	}
307 	if (cursor->mcxt)
308 	{
309 		MemoryContextDelete(cursor->mcxt);
310 		cursor->mcxt = NULL;
311 	}
312 	arg->ob_type->tp_free(arg);
313 }
314 
315 static PyObject *
PLy_cursor_iternext(PyObject * self)316 PLy_cursor_iternext(PyObject *self)
317 {
318 	PLyCursorObject *cursor;
319 	PyObject   *ret;
320 	PLyExecutionContext *exec_ctx = PLy_current_execution_context();
321 	volatile MemoryContext oldcontext;
322 	volatile ResourceOwner oldowner;
323 	Portal		portal;
324 
325 	cursor = (PLyCursorObject *) self;
326 
327 	if (cursor->closed)
328 	{
329 		PLy_exception_set(PyExc_ValueError, "iterating a closed cursor");
330 		return NULL;
331 	}
332 
333 	portal = GetPortalByName(cursor->portalname);
334 	if (!PortalIsValid(portal))
335 	{
336 		PLy_exception_set(PyExc_ValueError,
337 						  "iterating a cursor in an aborted subtransaction");
338 		return NULL;
339 	}
340 
341 	oldcontext = CurrentMemoryContext;
342 	oldowner = CurrentResourceOwner;
343 
344 	PLy_spi_subtransaction_begin(oldcontext, oldowner);
345 
346 	PG_TRY();
347 	{
348 		SPI_cursor_fetch(portal, true, 1);
349 		if (SPI_processed == 0)
350 		{
351 			PyErr_SetNone(PyExc_StopIteration);
352 			ret = NULL;
353 		}
354 		else
355 		{
356 			PLy_input_setup_tuple(&cursor->result, SPI_tuptable->tupdesc,
357 								  exec_ctx->curr_proc);
358 
359 			ret = PLy_input_from_tuple(&cursor->result, SPI_tuptable->vals[0],
360 									   SPI_tuptable->tupdesc, true);
361 		}
362 
363 		SPI_freetuptable(SPI_tuptable);
364 
365 		PLy_spi_subtransaction_commit(oldcontext, oldowner);
366 	}
367 	PG_CATCH();
368 	{
369 		PLy_spi_subtransaction_abort(oldcontext, oldowner);
370 		return NULL;
371 	}
372 	PG_END_TRY();
373 
374 	return ret;
375 }
376 
377 static PyObject *
PLy_cursor_fetch(PyObject * self,PyObject * args)378 PLy_cursor_fetch(PyObject *self, PyObject *args)
379 {
380 	PLyCursorObject *cursor;
381 	int			count;
382 	PLyResultObject *ret;
383 	PLyExecutionContext *exec_ctx = PLy_current_execution_context();
384 	volatile MemoryContext oldcontext;
385 	volatile ResourceOwner oldowner;
386 	Portal		portal;
387 
388 	if (!PyArg_ParseTuple(args, "i:fetch", &count))
389 		return NULL;
390 
391 	cursor = (PLyCursorObject *) self;
392 
393 	if (cursor->closed)
394 	{
395 		PLy_exception_set(PyExc_ValueError, "fetch from a closed cursor");
396 		return NULL;
397 	}
398 
399 	portal = GetPortalByName(cursor->portalname);
400 	if (!PortalIsValid(portal))
401 	{
402 		PLy_exception_set(PyExc_ValueError,
403 						  "iterating a cursor in an aborted subtransaction");
404 		return NULL;
405 	}
406 
407 	ret = (PLyResultObject *) PLy_result_new();
408 	if (ret == NULL)
409 		return NULL;
410 
411 	oldcontext = CurrentMemoryContext;
412 	oldowner = CurrentResourceOwner;
413 
414 	PLy_spi_subtransaction_begin(oldcontext, oldowner);
415 
416 	PG_TRY();
417 	{
418 		SPI_cursor_fetch(portal, true, count);
419 
420 		Py_DECREF(ret->status);
421 		ret->status = PyInt_FromLong(SPI_OK_FETCH);
422 
423 		Py_DECREF(ret->nrows);
424 		ret->nrows = PyLong_FromUnsignedLongLong(SPI_processed);
425 
426 		if (SPI_processed != 0)
427 		{
428 			uint64		i;
429 
430 			/*
431 			 * PyList_New() and PyList_SetItem() use Py_ssize_t for list size
432 			 * and list indices; so we cannot support a result larger than
433 			 * PY_SSIZE_T_MAX.
434 			 */
435 			if (SPI_processed > (uint64) PY_SSIZE_T_MAX)
436 				ereport(ERROR,
437 						(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
438 						 errmsg("query result has too many rows to fit in a Python list")));
439 
440 			Py_DECREF(ret->rows);
441 			ret->rows = PyList_New(SPI_processed);
442 			if (!ret->rows)
443 			{
444 				Py_DECREF(ret);
445 				ret = NULL;
446 			}
447 			else
448 			{
449 				PLy_input_setup_tuple(&cursor->result, SPI_tuptable->tupdesc,
450 									  exec_ctx->curr_proc);
451 
452 				for (i = 0; i < SPI_processed; i++)
453 				{
454 					PyObject   *row = PLy_input_from_tuple(&cursor->result,
455 														   SPI_tuptable->vals[i],
456 														   SPI_tuptable->tupdesc,
457 														   true);
458 
459 					PyList_SetItem(ret->rows, i, row);
460 				}
461 			}
462 		}
463 
464 		SPI_freetuptable(SPI_tuptable);
465 
466 		PLy_spi_subtransaction_commit(oldcontext, oldowner);
467 	}
468 	PG_CATCH();
469 	{
470 		PLy_spi_subtransaction_abort(oldcontext, oldowner);
471 		return NULL;
472 	}
473 	PG_END_TRY();
474 
475 	return (PyObject *) ret;
476 }
477 
478 static PyObject *
PLy_cursor_close(PyObject * self,PyObject * unused)479 PLy_cursor_close(PyObject *self, PyObject *unused)
480 {
481 	PLyCursorObject *cursor = (PLyCursorObject *) self;
482 
483 	if (!cursor->closed)
484 	{
485 		Portal		portal = GetPortalByName(cursor->portalname);
486 
487 		if (!PortalIsValid(portal))
488 		{
489 			PLy_exception_set(PyExc_ValueError,
490 							  "closing a cursor in an aborted subtransaction");
491 			return NULL;
492 		}
493 
494 		UnpinPortal(portal);
495 		SPI_cursor_close(portal);
496 		cursor->closed = true;
497 	}
498 
499 	Py_RETURN_NONE;
500 }
501