1 /*
2  * the PLyCursor class
3  *
4  * src/pl/plpython/plpy_cursorobject.c
5  */
6 
7 #include "postgres.h"
8 
9 #include <limits.h>
10 
11 #include "access/xact.h"
12 #include "catalog/pg_type.h"
13 #include "mb/pg_wchar.h"
14 #include "plpy_cursorobject.h"
15 #include "plpy_elog.h"
16 #include "plpy_main.h"
17 #include "plpy_planobject.h"
18 #include "plpy_procedure.h"
19 #include "plpy_resultobject.h"
20 #include "plpy_spi.h"
21 #include "plpython.h"
22 #include "utils/memutils.h"
23 
24 static PyObject *PLy_cursor_query(const char *query);
25 static void PLy_cursor_dealloc(PyObject *arg);
26 static PyObject *PLy_cursor_iternext(PyObject *self);
27 static PyObject *PLy_cursor_fetch(PyObject *self, PyObject *args);
28 static PyObject *PLy_cursor_close(PyObject *self, PyObject *unused);
29 
30 static char PLy_cursor_doc[] = "Wrapper around a PostgreSQL cursor";
31 
32 static PyMethodDef PLy_cursor_methods[] = {
33 	{"fetch", PLy_cursor_fetch, METH_VARARGS, NULL},
34 	{"close", PLy_cursor_close, METH_NOARGS, NULL},
35 	{NULL, NULL, 0, NULL}
36 };
37 
38 static PyTypeObject PLy_CursorType = {
39 	PyVarObject_HEAD_INIT(NULL, 0)
40 	.tp_name = "PLyCursor",
41 	.tp_basicsize = sizeof(PLyCursorObject),
42 	.tp_dealloc = PLy_cursor_dealloc,
43 	.tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER,
44 	.tp_doc = PLy_cursor_doc,
45 	.tp_iter = PyObject_SelfIter,
46 	.tp_iternext = PLy_cursor_iternext,
47 	.tp_methods = PLy_cursor_methods,
48 };
49 
50 void
51 PLy_cursor_init_type(void)
52 {
53 	if (PyType_Ready(&PLy_CursorType) < 0)
54 		elog(ERROR, "could not initialize PLy_CursorType");
55 }
56 
57 PyObject *
58 PLy_cursor(PyObject *self, PyObject *args)
59 {
60 	char	   *query;
61 	PyObject   *plan;
62 	PyObject   *planargs = NULL;
63 
64 	if (PyArg_ParseTuple(args, "s", &query))
65 		return PLy_cursor_query(query);
66 
67 	PyErr_Clear();
68 
69 	if (PyArg_ParseTuple(args, "O|O", &plan, &planargs))
70 		return PLy_cursor_plan(plan, planargs);
71 
72 	PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan");
73 	return NULL;
74 }
75 
76 
77 static PyObject *
78 PLy_cursor_query(const char *query)
79 {
80 	PLyCursorObject *cursor;
81 	PLyExecutionContext *exec_ctx = PLy_current_execution_context();
82 	volatile MemoryContext oldcontext;
83 	volatile ResourceOwner oldowner;
84 
85 	if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
86 		return NULL;
87 	cursor->portalname = NULL;
88 	cursor->closed = false;
89 	cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
90 										 "PL/Python cursor context",
91 										 ALLOCSET_DEFAULT_SIZES);
92 
93 	/* Initialize for converting result tuples to Python */
94 	PLy_input_setup_func(&cursor->result, cursor->mcxt,
95 						 RECORDOID, -1,
96 						 exec_ctx->curr_proc);
97 
98 	oldcontext = CurrentMemoryContext;
99 	oldowner = CurrentResourceOwner;
100 
101 	PLy_spi_subtransaction_begin(oldcontext, oldowner);
102 
103 	PG_TRY();
104 	{
105 		SPIPlanPtr	plan;
106 		Portal		portal;
107 
108 		pg_verifymbstr(query, strlen(query), false);
109 
110 		plan = SPI_prepare(query, 0, NULL);
111 		if (plan == NULL)
112 			elog(ERROR, "SPI_prepare failed: %s",
113 				 SPI_result_code_string(SPI_result));
114 
115 		portal = SPI_cursor_open(NULL, plan, NULL, NULL,
116 								 exec_ctx->curr_proc->fn_readonly);
117 		SPI_freeplan(plan);
118 
119 		if (portal == NULL)
120 			elog(ERROR, "SPI_cursor_open() failed: %s",
121 				 SPI_result_code_string(SPI_result));
122 
123 		cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
124 
125 		PinPortal(portal);
126 
127 		PLy_spi_subtransaction_commit(oldcontext, oldowner);
128 	}
129 	PG_CATCH();
130 	{
131 		PLy_spi_subtransaction_abort(oldcontext, oldowner);
132 		return NULL;
133 	}
134 	PG_END_TRY();
135 
136 	Assert(cursor->portalname != NULL);
137 	return (PyObject *) cursor;
138 }
139 
140 PyObject *
141 PLy_cursor_plan(PyObject *ob, PyObject *args)
142 {
143 	PLyCursorObject *cursor;
144 	volatile int nargs;
145 	int			i;
146 	PLyPlanObject *plan;
147 	PLyExecutionContext *exec_ctx = PLy_current_execution_context();
148 	volatile MemoryContext oldcontext;
149 	volatile ResourceOwner oldowner;
150 
151 	if (args)
152 	{
153 		if (!PySequence_Check(args) || PyString_Check(args) || PyUnicode_Check(args))
154 		{
155 			PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument");
156 			return NULL;
157 		}
158 		nargs = PySequence_Length(args);
159 	}
160 	else
161 		nargs = 0;
162 
163 	plan = (PLyPlanObject *) ob;
164 
165 	if (nargs != plan->nargs)
166 	{
167 		char	   *sv;
168 		PyObject   *so = PyObject_Str(args);
169 
170 		if (!so)
171 			PLy_elog(ERROR, "could not execute plan");
172 		sv = PyString_AsString(so);
173 		PLy_exception_set_plural(PyExc_TypeError,
174 								 "Expected sequence of %d argument, got %d: %s",
175 								 "Expected sequence of %d arguments, got %d: %s",
176 								 plan->nargs,
177 								 plan->nargs, nargs, sv);
178 		Py_DECREF(so);
179 
180 		return NULL;
181 	}
182 
183 	if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
184 		return NULL;
185 	cursor->portalname = NULL;
186 	cursor->closed = false;
187 	cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
188 										 "PL/Python cursor context",
189 										 ALLOCSET_DEFAULT_SIZES);
190 
191 	/* Initialize for converting result tuples to Python */
192 	PLy_input_setup_func(&cursor->result, cursor->mcxt,
193 						 RECORDOID, -1,
194 						 exec_ctx->curr_proc);
195 
196 	oldcontext = CurrentMemoryContext;
197 	oldowner = CurrentResourceOwner;
198 
199 	PLy_spi_subtransaction_begin(oldcontext, oldowner);
200 
201 	PG_TRY();
202 	{
203 		Portal		portal;
204 		char	   *volatile nulls;
205 		volatile int j;
206 
207 		if (nargs > 0)
208 			nulls = palloc(nargs * sizeof(char));
209 		else
210 			nulls = NULL;
211 
212 		for (j = 0; j < nargs; j++)
213 		{
214 			PLyObToDatum *arg = &plan->args[j];
215 			PyObject   *elem;
216 
217 			elem = PySequence_GetItem(args, j);
218 			PG_TRY();
219 			{
220 				bool		isnull;
221 
222 				plan->values[j] = PLy_output_convert(arg, elem, &isnull);
223 				nulls[j] = isnull ? 'n' : ' ';
224 			}
225 			PG_FINALLY();
226 			{
227 				Py_DECREF(elem);
228 			}
229 			PG_END_TRY();
230 		}
231 
232 		portal = SPI_cursor_open(NULL, plan->plan, plan->values, nulls,
233 								 exec_ctx->curr_proc->fn_readonly);
234 		if (portal == NULL)
235 			elog(ERROR, "SPI_cursor_open() failed: %s",
236 				 SPI_result_code_string(SPI_result));
237 
238 		cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
239 
240 		PinPortal(portal);
241 
242 		PLy_spi_subtransaction_commit(oldcontext, oldowner);
243 	}
244 	PG_CATCH();
245 	{
246 		int			k;
247 
248 		/* cleanup plan->values array */
249 		for (k = 0; k < nargs; k++)
250 		{
251 			if (!plan->args[k].typbyval &&
252 				(plan->values[k] != PointerGetDatum(NULL)))
253 			{
254 				pfree(DatumGetPointer(plan->values[k]));
255 				plan->values[k] = PointerGetDatum(NULL);
256 			}
257 		}
258 
259 		Py_DECREF(cursor);
260 
261 		PLy_spi_subtransaction_abort(oldcontext, oldowner);
262 		return NULL;
263 	}
reuse_queue(struct deflate_buff_queue * p)264 	PG_END_TRY();
265 
266 	for (i = 0; i < nargs; i++)
267 	{
268 		if (!plan->args[i].typbyval &&
new_queue(void)269 			(plan->values[i] != PointerGetDatum(NULL)))
270 		{
271 			pfree(DatumGetPointer(plan->values[i]));
272 			plan->values[i] = PointerGetDatum(NULL);
273 		}
274 	}
275 
276 	Assert(cursor->portalname != NULL);
277 	return (PyObject *) cursor;
278 }
279 
280 static void
281 PLy_cursor_dealloc(PyObject *arg)
282 {
283 	PLyCursorObject *cursor;
284 	Portal		portal;
285 
286 	cursor = (PLyCursorObject *) arg;
287 
288 	if (!cursor->closed)
289 	{
290 		portal = GetPortalByName(cursor->portalname);
291 
292 		if (PortalIsValid(portal))
293 		{
294 			UnpinPortal(portal);
295 			SPI_cursor_close(portal);
296 		}
297 		cursor->closed = true;
298 	}
299 	if (cursor->mcxt)
300 	{
301 		MemoryContextDelete(cursor->mcxt);
302 		cursor->mcxt = NULL;
303 	}
304 	arg->ob_type->tp_free(arg);
305 }
306 
307 static PyObject *
308 PLy_cursor_iternext(PyObject *self)
309 {
310 	PLyCursorObject *cursor;
311 	PyObject   *ret;
312 	PLyExecutionContext *exec_ctx = PLy_current_execution_context();
313 	volatile MemoryContext oldcontext;
314 	volatile ResourceOwner oldowner;
315 	Portal		portal;
316 
317 	cursor = (PLyCursorObject *) self;
318 
319 	if (cursor->closed)
320 	{
321 		PLy_exception_set(PyExc_ValueError, "iterating a closed cursor");
322 		return NULL;
323 	}
324 
325 	portal = GetPortalByName(cursor->portalname);
326 	if (!PortalIsValid(portal))
327 	{
328 		PLy_exception_set(PyExc_ValueError,
329 						  "iterating a cursor in an aborted subtransaction");
330 		return NULL;
331 	}
332 
333 	oldcontext = CurrentMemoryContext;
334 	oldowner = CurrentResourceOwner;
335 
336 	PLy_spi_subtransaction_begin(oldcontext, oldowner);
337 
338 	PG_TRY();
339 	{
340 		SPI_cursor_fetch(portal, true, 1);
341 		if (SPI_processed == 0)
342 		{
343 			PyErr_SetNone(PyExc_StopIteration);
344 			ret = NULL;
345 		}
346 		else
347 		{
348 			PLy_input_setup_tuple(&cursor->result, SPI_tuptable->tupdesc,
349 								  exec_ctx->curr_proc);
350 
351 			ret = PLy_input_from_tuple(&cursor->result, SPI_tuptable->vals[0],
352 									   SPI_tuptable->tupdesc, true);
353 		}
354 
355 		SPI_freetuptable(SPI_tuptable);
356 
357 		PLy_spi_subtransaction_commit(oldcontext, oldowner);
358 	}
359 	PG_CATCH();
360 	{
361 		PLy_spi_subtransaction_abort(oldcontext, oldowner);
362 		return NULL;
363 	}
364 	PG_END_TRY();
365 
366 	return ret;
367 }
368 
369 static PyObject *
370 PLy_cursor_fetch(PyObject *self, PyObject *args)
371 {
372 	PLyCursorObject *cursor;
373 	int			count;
374 	PLyResultObject *ret;
375 	PLyExecutionContext *exec_ctx = PLy_current_execution_context();
376 	volatile MemoryContext oldcontext;
377 	volatile ResourceOwner oldowner;
378 	Portal		portal;
379 
380 	if (!PyArg_ParseTuple(args, "i:fetch", &count))
381 		return NULL;
382 
383 	cursor = (PLyCursorObject *) self;
384 
385 	if (cursor->closed)
386 	{
387 		PLy_exception_set(PyExc_ValueError, "fetch from a closed cursor");
388 		return NULL;
389 	}
390 
391 	portal = GetPortalByName(cursor->portalname);
392 	if (!PortalIsValid(portal))
393 	{
394 		PLy_exception_set(PyExc_ValueError,
395 						  "iterating a cursor in an aborted subtransaction");
396 		return NULL;
397 	}
398 
399 	ret = (PLyResultObject *) PLy_result_new();
400 	if (ret == NULL)
401 		return NULL;
402 
403 	oldcontext = CurrentMemoryContext;
404 	oldowner = CurrentResourceOwner;
405 
406 	PLy_spi_subtransaction_begin(oldcontext, oldowner);
407 
408 	PG_TRY();
409 	{
410 		SPI_cursor_fetch(portal, true, count);
411 
412 		Py_DECREF(ret->status);
413 		ret->status = PyInt_FromLong(SPI_OK_FETCH);
414 
415 		Py_DECREF(ret->nrows);
416 		ret->nrows = PyLong_FromUnsignedLongLong(SPI_processed);
417 
418 		if (SPI_processed != 0)
419 		{
420 			uint64		i;
421 
422 			/*
423 			 * PyList_New() and PyList_SetItem() use Py_ssize_t for list size
424 			 * and list indices; so we cannot support a result larger than
425 			 * PY_SSIZE_T_MAX.
426 			 */
427 			if (SPI_processed > (uint64) PY_SSIZE_T_MAX)
428 				ereport(ERROR,
429 						(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
430 						 errmsg("query result has too many rows to fit in a Python list")));
431 
432 			Py_DECREF(ret->rows);
433 			ret->rows = PyList_New(SPI_processed);
434 			if (!ret->rows)
435 			{
error(char * m)436 				Py_DECREF(ret);
437 				ret = NULL;
438 			}
439 			else
440 			{
441 				PLy_input_setup_tuple(&cursor->result, SPI_tuptable->tupdesc,
442 									  exec_ctx->curr_proc);
443 
444 				for (i = 0; i < SPI_processed; i++)
445 				{
446 					PyObject   *row = PLy_input_from_tuple(&cursor->result,
447 														   SPI_tuptable->vals[i],
448 														   SPI_tuptable->tupdesc,
449 														   true);
450 
451 					PyList_SetItem(ret->rows, i, row);
452 				}
453 			}
454 		}
455 
456 		SPI_freetuptable(SPI_tuptable);
457 
458 		PLy_spi_subtransaction_commit(oldcontext, oldowner);
459 	}
460 	PG_CATCH();
461 	{
462 		PLy_spi_subtransaction_abort(oldcontext, oldowner);
463 		return NULL;
464 	}
465 	PG_END_TRY();
466 
467 	return (PyObject *) ret;
468 }
469 
470 static PyObject *
471 PLy_cursor_close(PyObject *self, PyObject *unused)
472 {
473 	PLyCursorObject *cursor = (PLyCursorObject *) self;
474 
475 	if (!cursor->closed)
476 	{
477 		Portal		portal = GetPortalByName(cursor->portalname);
478 
479 		if (!PortalIsValid(portal))
480 		{
481 			PLy_exception_set(PyExc_ValueError,
482 							  "closing a cursor in an aborted subtransaction");
483 			return NULL;
484 		}
485 
486 		UnpinPortal(portal);
487 		SPI_cursor_close(portal);
488 		cursor->closed = true;
489 	}
490 
491 	Py_RETURN_NONE;
492 }
493