1 /*
2 * interface to SPI functions
3 *
4 * src/pl/plpython/plpy_spi.c
5 */
6
7 #include "postgres.h"
8
9 #include <limits.h>
10
11 #include "access/htup_details.h"
12 #include "access/xact.h"
13 #include "catalog/pg_type.h"
14 #include "executor/spi.h"
15 #include "mb/pg_wchar.h"
16 #include "parser/parse_type.h"
17 #include "utils/memutils.h"
18 #include "utils/syscache.h"
19
20 #include "plpython.h"
21
22 #include "plpy_spi.h"
23
24 #include "plpy_elog.h"
25 #include "plpy_main.h"
26 #include "plpy_planobject.h"
27 #include "plpy_plpymodule.h"
28 #include "plpy_procedure.h"
29 #include "plpy_resultobject.h"
30
31
32 static PyObject *PLy_spi_execute_query(char *query, long limit);
33 static PyObject *PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit);
34 static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable,
35 uint64 rows, int status);
36 static void PLy_spi_exception_set(PyObject *excclass, ErrorData *edata);
37
38
39 /* prepare(query="select * from foo")
40 * prepare(query="select * from foo where bar = $1", params=["text"])
41 * prepare(query="select * from foo where bar = $1", params=["text"], limit=5)
42 */
43 PyObject *
PLy_spi_prepare(PyObject * self,PyObject * args)44 PLy_spi_prepare(PyObject *self, PyObject *args)
45 {
46 PLyPlanObject *plan;
47 PyObject *list = NULL;
48 PyObject *volatile optr = NULL;
49 char *query;
50 volatile MemoryContext oldcontext;
51 volatile ResourceOwner oldowner;
52 volatile int nargs;
53
54 if (!PyArg_ParseTuple(args, "s|O", &query, &list))
55 return NULL;
56
57 if (list && (!PySequence_Check(list)))
58 {
59 PLy_exception_set(PyExc_TypeError,
60 "second argument of plpy.prepare must be a sequence");
61 return NULL;
62 }
63
64 if ((plan = (PLyPlanObject *) PLy_plan_new()) == NULL)
65 return NULL;
66
67 plan->mcxt = AllocSetContextCreate(TopMemoryContext,
68 "PL/Python plan context",
69 ALLOCSET_DEFAULT_SIZES);
70 oldcontext = MemoryContextSwitchTo(plan->mcxt);
71
72 nargs = list ? PySequence_Length(list) : 0;
73
74 plan->nargs = nargs;
75 plan->types = nargs ? palloc(sizeof(Oid) * nargs) : NULL;
76 plan->values = nargs ? palloc(sizeof(Datum) * nargs) : NULL;
77 plan->args = nargs ? palloc(sizeof(PLyTypeInfo) * nargs) : NULL;
78
79 MemoryContextSwitchTo(oldcontext);
80
81 oldcontext = CurrentMemoryContext;
82 oldowner = CurrentResourceOwner;
83
84 PLy_spi_subtransaction_begin(oldcontext, oldowner);
85
86 PG_TRY();
87 {
88 int i;
89 PLyExecutionContext *exec_ctx = PLy_current_execution_context();
90
91 /*
92 * the other loop might throw an exception, if PLyTypeInfo member
93 * isn't properly initialized the Py_DECREF(plan) will go boom
94 */
95 for (i = 0; i < nargs; i++)
96 {
97 PLy_typeinfo_init(&plan->args[i], plan->mcxt);
98 plan->values[i] = PointerGetDatum(NULL);
99 }
100
101 for (i = 0; i < nargs; i++)
102 {
103 char *sptr;
104 HeapTuple typeTup;
105 Oid typeId;
106 int32 typmod;
107
108 optr = PySequence_GetItem(list, i);
109 if (PyString_Check(optr))
110 sptr = PyString_AsString(optr);
111 else if (PyUnicode_Check(optr))
112 sptr = PLyUnicode_AsString(optr);
113 else
114 {
115 ereport(ERROR,
116 (errmsg("plpy.prepare: type name at ordinal position %d is not a string", i)));
117 sptr = NULL; /* keep compiler quiet */
118 }
119
120 /********************************************************
121 * Resolve argument type names and then look them up by
122 * oid in the system cache, and remember the required
123 *information for input conversion.
124 ********************************************************/
125
126 parseTypeString(sptr, &typeId, &typmod, false);
127
128 typeTup = SearchSysCache1(TYPEOID,
129 ObjectIdGetDatum(typeId));
130 if (!HeapTupleIsValid(typeTup))
131 elog(ERROR, "cache lookup failed for type %u", typeId);
132
133 Py_DECREF(optr);
134
135 /*
136 * set optr to NULL, so we won't try to unref it again in case of
137 * an error
138 */
139 optr = NULL;
140
141 plan->types[i] = typeId;
142 PLy_output_datum_func(&plan->args[i], typeTup, exec_ctx->curr_proc->langid, exec_ctx->curr_proc->trftypes);
143 ReleaseSysCache(typeTup);
144 }
145
146 pg_verifymbstr(query, strlen(query), false);
147 plan->plan = SPI_prepare(query, plan->nargs, plan->types);
148 if (plan->plan == NULL)
149 elog(ERROR, "SPI_prepare failed: %s",
150 SPI_result_code_string(SPI_result));
151
152 /* transfer plan from procCxt to topCxt */
153 if (SPI_keepplan(plan->plan))
154 elog(ERROR, "SPI_keepplan failed");
155
156 PLy_spi_subtransaction_commit(oldcontext, oldowner);
157 }
158 PG_CATCH();
159 {
160 Py_DECREF(plan);
161 Py_XDECREF(optr);
162
163 PLy_spi_subtransaction_abort(oldcontext, oldowner);
164 return NULL;
165 }
166 PG_END_TRY();
167
168 Assert(plan->plan != NULL);
169 return (PyObject *) plan;
170 }
171
172 /* execute(query="select * from foo", limit=5)
173 * execute(plan=plan, values=(foo, bar), limit=5)
174 */
175 PyObject *
PLy_spi_execute(PyObject * self,PyObject * args)176 PLy_spi_execute(PyObject *self, PyObject *args)
177 {
178 char *query;
179 PyObject *plan;
180 PyObject *list = NULL;
181 long limit = 0;
182
183 if (PyArg_ParseTuple(args, "s|l", &query, &limit))
184 return PLy_spi_execute_query(query, limit);
185
186 PyErr_Clear();
187
188 if (PyArg_ParseTuple(args, "O|Ol", &plan, &list, &limit) &&
189 is_PLyPlanObject(plan))
190 return PLy_spi_execute_plan(plan, list, limit);
191
192 PLy_exception_set(PLy_exc_error, "plpy.execute expected a query or a plan");
193 return NULL;
194 }
195
196 static PyObject *
PLy_spi_execute_plan(PyObject * ob,PyObject * list,long limit)197 PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
198 {
199 volatile int nargs;
200 int i,
201 rv;
202 PLyPlanObject *plan;
203 volatile MemoryContext oldcontext;
204 volatile ResourceOwner oldowner;
205 PyObject *ret;
206
207 if (list != NULL)
208 {
209 if (!PySequence_Check(list) || PyString_Check(list) || PyUnicode_Check(list))
210 {
211 PLy_exception_set(PyExc_TypeError, "plpy.execute takes a sequence as its second argument");
212 return NULL;
213 }
214 nargs = PySequence_Length(list);
215 }
216 else
217 nargs = 0;
218
219 plan = (PLyPlanObject *) ob;
220
221 if (nargs != plan->nargs)
222 {
223 char *sv;
224 PyObject *so = PyObject_Str(list);
225
226 if (!so)
227 PLy_elog(ERROR, "could not execute plan");
228 sv = PyString_AsString(so);
229 PLy_exception_set_plural(PyExc_TypeError,
230 "Expected sequence of %d argument, got %d: %s",
231 "Expected sequence of %d arguments, got %d: %s",
232 plan->nargs,
233 plan->nargs, nargs, sv);
234 Py_DECREF(so);
235
236 return NULL;
237 }
238
239 oldcontext = CurrentMemoryContext;
240 oldowner = CurrentResourceOwner;
241
242 PLy_spi_subtransaction_begin(oldcontext, oldowner);
243
244 PG_TRY();
245 {
246 PLyExecutionContext *exec_ctx = PLy_current_execution_context();
247 char *volatile nulls;
248 volatile int j;
249
250 if (nargs > 0)
251 nulls = palloc(nargs * sizeof(char));
252 else
253 nulls = NULL;
254
255 for (j = 0; j < nargs; j++)
256 {
257 PyObject *elem;
258
259 elem = PySequence_GetItem(list, j);
260 if (elem != Py_None)
261 {
262 PG_TRY();
263 {
264 plan->values[j] =
265 plan->args[j].out.d.func(&(plan->args[j].out.d),
266 -1,
267 elem);
268 }
269 PG_CATCH();
270 {
271 Py_DECREF(elem);
272 PG_RE_THROW();
273 }
274 PG_END_TRY();
275
276 Py_DECREF(elem);
277 nulls[j] = ' ';
278 }
279 else
280 {
281 Py_DECREF(elem);
282 plan->values[j] =
283 InputFunctionCall(&(plan->args[j].out.d.typfunc),
284 NULL,
285 plan->args[j].out.d.typioparam,
286 -1);
287 nulls[j] = 'n';
288 }
289 }
290
291 rv = SPI_execute_plan(plan->plan, plan->values, nulls,
292 exec_ctx->curr_proc->fn_readonly, limit);
293 ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv);
294
295 if (nargs > 0)
296 pfree(nulls);
297
298 PLy_spi_subtransaction_commit(oldcontext, oldowner);
299 }
300 PG_CATCH();
301 {
302 int k;
303
304 /*
305 * cleanup plan->values array
306 */
307 for (k = 0; k < nargs; k++)
308 {
309 if (!plan->args[k].out.d.typbyval &&
310 (plan->values[k] != PointerGetDatum(NULL)))
311 {
312 pfree(DatumGetPointer(plan->values[k]));
313 plan->values[k] = PointerGetDatum(NULL);
314 }
315 }
316
317 PLy_spi_subtransaction_abort(oldcontext, oldowner);
318 return NULL;
319 }
320 PG_END_TRY();
321
322 for (i = 0; i < nargs; i++)
323 {
324 if (!plan->args[i].out.d.typbyval &&
325 (plan->values[i] != PointerGetDatum(NULL)))
326 {
327 pfree(DatumGetPointer(plan->values[i]));
328 plan->values[i] = PointerGetDatum(NULL);
329 }
330 }
331
332 if (rv < 0)
333 {
334 PLy_exception_set(PLy_exc_spi_error,
335 "SPI_execute_plan failed: %s",
336 SPI_result_code_string(rv));
337 return NULL;
338 }
339
340 return ret;
341 }
342
343 static PyObject *
PLy_spi_execute_query(char * query,long limit)344 PLy_spi_execute_query(char *query, long limit)
345 {
346 int rv;
347 volatile MemoryContext oldcontext;
348 volatile ResourceOwner oldowner;
349 PyObject *ret = NULL;
350
351 oldcontext = CurrentMemoryContext;
352 oldowner = CurrentResourceOwner;
353
354 PLy_spi_subtransaction_begin(oldcontext, oldowner);
355
356 PG_TRY();
357 {
358 PLyExecutionContext *exec_ctx = PLy_current_execution_context();
359
360 pg_verifymbstr(query, strlen(query), false);
361 rv = SPI_execute(query, exec_ctx->curr_proc->fn_readonly, limit);
362 ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv);
363
364 PLy_spi_subtransaction_commit(oldcontext, oldowner);
365 }
366 PG_CATCH();
367 {
368 PLy_spi_subtransaction_abort(oldcontext, oldowner);
369 return NULL;
370 }
371 PG_END_TRY();
372
373 if (rv < 0)
374 {
375 Py_XDECREF(ret);
376 PLy_exception_set(PLy_exc_spi_error,
377 "SPI_execute failed: %s",
378 SPI_result_code_string(rv));
379 return NULL;
380 }
381
382 return ret;
383 }
384
385 static PyObject *
PLy_spi_execute_fetch_result(SPITupleTable * tuptable,uint64 rows,int status)386 PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status)
387 {
388 PLyResultObject *result;
389 volatile MemoryContext oldcontext;
390
391 result = (PLyResultObject *) PLy_result_new();
392 Py_DECREF(result->status);
393 result->status = PyInt_FromLong(status);
394
395 if (status > 0 && tuptable == NULL)
396 {
397 Py_DECREF(result->nrows);
398 result->nrows = (rows > (uint64) LONG_MAX) ?
399 PyFloat_FromDouble((double) rows) :
400 PyInt_FromLong((long) rows);
401 }
402 else if (status > 0 && tuptable != NULL)
403 {
404 PLyTypeInfo args;
405 MemoryContext cxt;
406
407 Py_DECREF(result->nrows);
408 result->nrows = (rows > (uint64) LONG_MAX) ?
409 PyFloat_FromDouble((double) rows) :
410 PyInt_FromLong((long) rows);
411
412 cxt = AllocSetContextCreate(CurrentMemoryContext,
413 "PL/Python temp context",
414 ALLOCSET_DEFAULT_SIZES);
415 PLy_typeinfo_init(&args, cxt);
416
417 oldcontext = CurrentMemoryContext;
418 PG_TRY();
419 {
420 MemoryContext oldcontext2;
421
422 if (rows)
423 {
424 uint64 i;
425
426 /*
427 * PyList_New() and PyList_SetItem() use Py_ssize_t for list
428 * size and list indices; so we cannot support a result larger
429 * than PY_SSIZE_T_MAX.
430 */
431 if (rows > (uint64) PY_SSIZE_T_MAX)
432 ereport(ERROR,
433 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
434 errmsg("query result has too many rows to fit in a Python list")));
435
436 Py_DECREF(result->rows);
437 result->rows = PyList_New(rows);
438
439 PLy_input_tuple_funcs(&args, tuptable->tupdesc);
440 for (i = 0; i < rows; i++)
441 {
442 PyObject *row = PLyDict_FromTuple(&args,
443 tuptable->vals[i],
444 tuptable->tupdesc);
445
446 PyList_SetItem(result->rows, i, row);
447 }
448 }
449
450 /*
451 * Save tuple descriptor for later use by result set metadata
452 * functions. Save it in TopMemoryContext so that it survives
453 * outside of an SPI context. We trust that PLy_result_dealloc()
454 * will clean it up when the time is right. (Do this as late as
455 * possible, to minimize the number of ways the tupdesc could get
456 * leaked due to errors.)
457 */
458 oldcontext2 = MemoryContextSwitchTo(TopMemoryContext);
459 result->tupdesc = CreateTupleDescCopy(tuptable->tupdesc);
460 MemoryContextSwitchTo(oldcontext2);
461 }
462 PG_CATCH();
463 {
464 MemoryContextSwitchTo(oldcontext);
465 MemoryContextDelete(cxt);
466 Py_DECREF(result);
467 PG_RE_THROW();
468 }
469 PG_END_TRY();
470
471 MemoryContextDelete(cxt);
472 SPI_freetuptable(tuptable);
473 }
474
475 return (PyObject *) result;
476 }
477
478 /*
479 * Utilities for running SPI functions in subtransactions.
480 *
481 * Usage:
482 *
483 * MemoryContext oldcontext = CurrentMemoryContext;
484 * ResourceOwner oldowner = CurrentResourceOwner;
485 *
486 * PLy_spi_subtransaction_begin(oldcontext, oldowner);
487 * PG_TRY();
488 * {
489 * <call SPI functions>
490 * PLy_spi_subtransaction_commit(oldcontext, oldowner);
491 * }
492 * PG_CATCH();
493 * {
494 * <do cleanup>
495 * PLy_spi_subtransaction_abort(oldcontext, oldowner);
496 * return NULL;
497 * }
498 * PG_END_TRY();
499 *
500 * These utilities take care of restoring connection to the SPI manager and
501 * setting a Python exception in case of an abort.
502 */
503 void
PLy_spi_subtransaction_begin(MemoryContext oldcontext,ResourceOwner oldowner)504 PLy_spi_subtransaction_begin(MemoryContext oldcontext, ResourceOwner oldowner)
505 {
506 BeginInternalSubTransaction(NULL);
507 /* Want to run inside function's memory context */
508 MemoryContextSwitchTo(oldcontext);
509 }
510
511 void
PLy_spi_subtransaction_commit(MemoryContext oldcontext,ResourceOwner oldowner)512 PLy_spi_subtransaction_commit(MemoryContext oldcontext, ResourceOwner oldowner)
513 {
514 /* Commit the inner transaction, return to outer xact context */
515 ReleaseCurrentSubTransaction();
516 MemoryContextSwitchTo(oldcontext);
517 CurrentResourceOwner = oldowner;
518
519 /*
520 * AtEOSubXact_SPI() should not have popped any SPI context, but just in
521 * case it did, make sure we remain connected.
522 */
523 SPI_restore_connection();
524 }
525
526 void
PLy_spi_subtransaction_abort(MemoryContext oldcontext,ResourceOwner oldowner)527 PLy_spi_subtransaction_abort(MemoryContext oldcontext, ResourceOwner oldowner)
528 {
529 ErrorData *edata;
530 PLyExceptionEntry *entry;
531 PyObject *exc;
532
533 /* Save error info */
534 MemoryContextSwitchTo(oldcontext);
535 edata = CopyErrorData();
536 FlushErrorState();
537
538 /* Abort the inner transaction */
539 RollbackAndReleaseCurrentSubTransaction();
540 MemoryContextSwitchTo(oldcontext);
541 CurrentResourceOwner = oldowner;
542
543 /*
544 * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will
545 * have left us in a disconnected state. We need this hack to return to
546 * connected state.
547 */
548 SPI_restore_connection();
549
550 /* Look up the correct exception */
551 entry = hash_search(PLy_spi_exceptions, &(edata->sqlerrcode),
552 HASH_FIND, NULL);
553
554 /*
555 * This could be a custom error code, if that's the case fallback to
556 * SPIError
557 */
558 exc = entry ? entry->exc : PLy_exc_spi_error;
559 /* Make Python raise the exception */
560 PLy_spi_exception_set(exc, edata);
561 FreeErrorData(edata);
562 }
563
564 /*
565 * Raise a SPIError, passing in it more error details, like the
566 * internal query and error position.
567 */
568 static void
PLy_spi_exception_set(PyObject * excclass,ErrorData * edata)569 PLy_spi_exception_set(PyObject *excclass, ErrorData *edata)
570 {
571 PyObject *args = NULL;
572 PyObject *spierror = NULL;
573 PyObject *spidata = NULL;
574
575 args = Py_BuildValue("(s)", edata->message);
576 if (!args)
577 goto failure;
578
579 /* create a new SPI exception with the error message as the parameter */
580 spierror = PyObject_CallObject(excclass, args);
581 if (!spierror)
582 goto failure;
583
584 spidata = Py_BuildValue("(izzzizzzzz)", edata->sqlerrcode, edata->detail, edata->hint,
585 edata->internalquery, edata->internalpos,
586 edata->schema_name, edata->table_name, edata->column_name,
587 edata->datatype_name, edata->constraint_name);
588 if (!spidata)
589 goto failure;
590
591 if (PyObject_SetAttrString(spierror, "spidata", spidata) == -1)
592 goto failure;
593
594 PyErr_SetObject(excclass, spierror);
595
596 Py_DECREF(args);
597 Py_DECREF(spierror);
598 Py_DECREF(spidata);
599 return;
600
601 failure:
602 Py_XDECREF(args);
603 Py_XDECREF(spierror);
604 Py_XDECREF(spidata);
605 elog(ERROR, "could not convert SPI error to Python exception");
606 }
607