1 /* 2 * the PLyCursor class 3 * 4 * src/pl/plpython/plpy_cursorobject.c 5 */ 6 7 #include "postgres.h" 8 9 #include <limits.h> 10 11 #include "access/xact.h" 12 #include "catalog/pg_type.h" 13 #include "mb/pg_wchar.h" 14 #include "plpy_cursorobject.h" 15 #include "plpy_elog.h" 16 #include "plpy_main.h" 17 #include "plpy_planobject.h" 18 #include "plpy_procedure.h" 19 #include "plpy_resultobject.h" 20 #include "plpy_spi.h" 21 #include "plpython.h" 22 #include "utils/memutils.h" 23 24 static PyObject *PLy_cursor_query(const char *query); 25 static void PLy_cursor_dealloc(PyObject *arg); 26 static PyObject *PLy_cursor_iternext(PyObject *self); 27 static PyObject *PLy_cursor_fetch(PyObject *self, PyObject *args); 28 static PyObject *PLy_cursor_close(PyObject *self, PyObject *unused); 29 30 static char PLy_cursor_doc[] = "Wrapper around a PostgreSQL cursor"; 31 32 static PyMethodDef PLy_cursor_methods[] = { 33 {"fetch", PLy_cursor_fetch, METH_VARARGS, NULL}, 34 {"close", PLy_cursor_close, METH_NOARGS, NULL}, 35 {NULL, NULL, 0, NULL} 36 }; 37 38 static PyTypeObject PLy_CursorType = { 39 PyVarObject_HEAD_INIT(NULL, 0) 40 .tp_name = "PLyCursor", 41 .tp_basicsize = sizeof(PLyCursorObject), 42 .tp_dealloc = PLy_cursor_dealloc, 43 .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER, 44 .tp_doc = PLy_cursor_doc, 45 .tp_iter = PyObject_SelfIter, 46 .tp_iternext = PLy_cursor_iternext, 47 .tp_methods = PLy_cursor_methods, 48 }; 49 50 void 51 PLy_cursor_init_type(void) 52 { 53 if (PyType_Ready(&PLy_CursorType) < 0) 54 elog(ERROR, "could not initialize PLy_CursorType"); 55 } 56 57 PyObject * 58 PLy_cursor(PyObject *self, PyObject *args) 59 { 60 char *query; 61 PyObject *plan; 62 PyObject *planargs = NULL; 63 64 if (PyArg_ParseTuple(args, "s", &query)) 65 return PLy_cursor_query(query); 66 67 PyErr_Clear(); 68 69 if (PyArg_ParseTuple(args, "O|O", &plan, &planargs)) 70 return PLy_cursor_plan(plan, planargs); 71 72 PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan"); 73 return NULL; 74 } 75 76 77 static PyObject * 78 PLy_cursor_query(const char *query) 79 { 80 PLyCursorObject *cursor; 81 PLyExecutionContext *exec_ctx = PLy_current_execution_context(); 82 volatile MemoryContext oldcontext; 83 volatile ResourceOwner oldowner; 84 85 if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL) 86 return NULL; 87 cursor->portalname = NULL; 88 cursor->closed = false; 89 cursor->mcxt = AllocSetContextCreate(TopMemoryContext, 90 "PL/Python cursor context", 91 ALLOCSET_DEFAULT_SIZES); 92 93 /* Initialize for converting result tuples to Python */ 94 PLy_input_setup_func(&cursor->result, cursor->mcxt, 95 RECORDOID, -1, 96 exec_ctx->curr_proc); 97 98 oldcontext = CurrentMemoryContext; 99 oldowner = CurrentResourceOwner; 100 101 PLy_spi_subtransaction_begin(oldcontext, oldowner); 102 103 PG_TRY(); 104 { 105 SPIPlanPtr plan; 106 Portal portal; 107 108 pg_verifymbstr(query, strlen(query), false); 109 110 plan = SPI_prepare(query, 0, NULL); 111 if (plan == NULL) 112 elog(ERROR, "SPI_prepare failed: %s", 113 SPI_result_code_string(SPI_result)); 114 115 portal = SPI_cursor_open(NULL, plan, NULL, NULL, 116 exec_ctx->curr_proc->fn_readonly); 117 SPI_freeplan(plan); 118 119 if (portal == NULL) 120 elog(ERROR, "SPI_cursor_open() failed: %s", 121 SPI_result_code_string(SPI_result)); 122 123 cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name); 124 125 PinPortal(portal); 126 127 PLy_spi_subtransaction_commit(oldcontext, oldowner); 128 } 129 PG_CATCH(); 130 { 131 PLy_spi_subtransaction_abort(oldcontext, oldowner); 132 return NULL; 133 } 134 PG_END_TRY(); 135 136 Assert(cursor->portalname != NULL); 137 return (PyObject *) cursor; 138 } 139 140 PyObject * 141 PLy_cursor_plan(PyObject *ob, PyObject *args) 142 { 143 PLyCursorObject *cursor; 144 volatile int nargs; 145 int i; 146 PLyPlanObject *plan; 147 PLyExecutionContext *exec_ctx = PLy_current_execution_context(); 148 volatile MemoryContext oldcontext; 149 volatile ResourceOwner oldowner; 150 151 if (args) 152 { 153 if (!PySequence_Check(args) || PyString_Check(args) || PyUnicode_Check(args)) 154 { 155 PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument"); 156 return NULL; 157 } 158 nargs = PySequence_Length(args); 159 } 160 else 161 nargs = 0; 162 163 plan = (PLyPlanObject *) ob; 164 165 if (nargs != plan->nargs) 166 { 167 char *sv; 168 PyObject *so = PyObject_Str(args); 169 170 if (!so) 171 PLy_elog(ERROR, "could not execute plan"); 172 sv = PyString_AsString(so); 173 PLy_exception_set_plural(PyExc_TypeError, 174 "Expected sequence of %d argument, got %d: %s", 175 "Expected sequence of %d arguments, got %d: %s", 176 plan->nargs, 177 plan->nargs, nargs, sv); 178 Py_DECREF(so); 179 180 return NULL; 181 } 182 183 if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL) 184 return NULL; 185 cursor->portalname = NULL; 186 cursor->closed = false; 187 cursor->mcxt = AllocSetContextCreate(TopMemoryContext, 188 "PL/Python cursor context", 189 ALLOCSET_DEFAULT_SIZES); 190 191 /* Initialize for converting result tuples to Python */ 192 PLy_input_setup_func(&cursor->result, cursor->mcxt, 193 RECORDOID, -1, 194 exec_ctx->curr_proc); 195 196 oldcontext = CurrentMemoryContext; 197 oldowner = CurrentResourceOwner; 198 199 PLy_spi_subtransaction_begin(oldcontext, oldowner); 200 201 PG_TRY(); 202 { 203 Portal portal; 204 char *volatile nulls; 205 volatile int j; 206 207 if (nargs > 0) 208 nulls = palloc(nargs * sizeof(char)); 209 else 210 nulls = NULL; 211 212 for (j = 0; j < nargs; j++) 213 { 214 PLyObToDatum *arg = &plan->args[j]; 215 PyObject *elem; 216 217 elem = PySequence_GetItem(args, j); 218 PG_TRY(); 219 { 220 bool isnull; 221 222 plan->values[j] = PLy_output_convert(arg, elem, &isnull); 223 nulls[j] = isnull ? 'n' : ' '; 224 } 225 PG_FINALLY(); 226 { 227 Py_DECREF(elem); 228 } 229 PG_END_TRY(); 230 } 231 232 portal = SPI_cursor_open(NULL, plan->plan, plan->values, nulls, 233 exec_ctx->curr_proc->fn_readonly); 234 if (portal == NULL) 235 elog(ERROR, "SPI_cursor_open() failed: %s", 236 SPI_result_code_string(SPI_result)); 237 238 cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name); 239 240 PinPortal(portal); 241 242 PLy_spi_subtransaction_commit(oldcontext, oldowner); 243 } 244 PG_CATCH(); 245 { 246 int k; 247 248 /* cleanup plan->values array */ 249 for (k = 0; k < nargs; k++) 250 { 251 if (!plan->args[k].typbyval && 252 (plan->values[k] != PointerGetDatum(NULL))) 253 { 254 pfree(DatumGetPointer(plan->values[k])); 255 plan->values[k] = PointerGetDatum(NULL); 256 } 257 } 258 259 Py_DECREF(cursor); 260 261 PLy_spi_subtransaction_abort(oldcontext, oldowner); 262 return NULL; 263 } reuse_queue(struct deflate_buff_queue * p)264 PG_END_TRY(); 265 266 for (i = 0; i < nargs; i++) 267 { 268 if (!plan->args[i].typbyval && new_queue(void)269 (plan->values[i] != PointerGetDatum(NULL))) 270 { 271 pfree(DatumGetPointer(plan->values[i])); 272 plan->values[i] = PointerGetDatum(NULL); 273 } 274 } 275 276 Assert(cursor->portalname != NULL); 277 return (PyObject *) cursor; 278 } 279 280 static void 281 PLy_cursor_dealloc(PyObject *arg) 282 { 283 PLyCursorObject *cursor; 284 Portal portal; 285 286 cursor = (PLyCursorObject *) arg; 287 288 if (!cursor->closed) 289 { 290 portal = GetPortalByName(cursor->portalname); 291 292 if (PortalIsValid(portal)) 293 { 294 UnpinPortal(portal); 295 SPI_cursor_close(portal); 296 } 297 cursor->closed = true; 298 } 299 if (cursor->mcxt) 300 { 301 MemoryContextDelete(cursor->mcxt); 302 cursor->mcxt = NULL; 303 } 304 arg->ob_type->tp_free(arg); 305 } 306 307 static PyObject * 308 PLy_cursor_iternext(PyObject *self) 309 { 310 PLyCursorObject *cursor; 311 PyObject *ret; 312 PLyExecutionContext *exec_ctx = PLy_current_execution_context(); 313 volatile MemoryContext oldcontext; 314 volatile ResourceOwner oldowner; 315 Portal portal; 316 317 cursor = (PLyCursorObject *) self; 318 319 if (cursor->closed) 320 { 321 PLy_exception_set(PyExc_ValueError, "iterating a closed cursor"); 322 return NULL; 323 } 324 325 portal = GetPortalByName(cursor->portalname); 326 if (!PortalIsValid(portal)) 327 { 328 PLy_exception_set(PyExc_ValueError, 329 "iterating a cursor in an aborted subtransaction"); 330 return NULL; 331 } 332 333 oldcontext = CurrentMemoryContext; 334 oldowner = CurrentResourceOwner; 335 336 PLy_spi_subtransaction_begin(oldcontext, oldowner); 337 338 PG_TRY(); 339 { 340 SPI_cursor_fetch(portal, true, 1); 341 if (SPI_processed == 0) 342 { 343 PyErr_SetNone(PyExc_StopIteration); 344 ret = NULL; 345 } 346 else 347 { 348 PLy_input_setup_tuple(&cursor->result, SPI_tuptable->tupdesc, 349 exec_ctx->curr_proc); 350 351 ret = PLy_input_from_tuple(&cursor->result, SPI_tuptable->vals[0], 352 SPI_tuptable->tupdesc, true); 353 } 354 355 SPI_freetuptable(SPI_tuptable); 356 357 PLy_spi_subtransaction_commit(oldcontext, oldowner); 358 } 359 PG_CATCH(); 360 { 361 PLy_spi_subtransaction_abort(oldcontext, oldowner); 362 return NULL; 363 } 364 PG_END_TRY(); 365 366 return ret; 367 } 368 369 static PyObject * 370 PLy_cursor_fetch(PyObject *self, PyObject *args) 371 { 372 PLyCursorObject *cursor; 373 int count; 374 PLyResultObject *ret; 375 PLyExecutionContext *exec_ctx = PLy_current_execution_context(); 376 volatile MemoryContext oldcontext; 377 volatile ResourceOwner oldowner; 378 Portal portal; 379 380 if (!PyArg_ParseTuple(args, "i:fetch", &count)) 381 return NULL; 382 383 cursor = (PLyCursorObject *) self; 384 385 if (cursor->closed) 386 { 387 PLy_exception_set(PyExc_ValueError, "fetch from a closed cursor"); 388 return NULL; 389 } 390 391 portal = GetPortalByName(cursor->portalname); 392 if (!PortalIsValid(portal)) 393 { 394 PLy_exception_set(PyExc_ValueError, 395 "iterating a cursor in an aborted subtransaction"); 396 return NULL; 397 } 398 399 ret = (PLyResultObject *) PLy_result_new(); 400 if (ret == NULL) 401 return NULL; 402 403 oldcontext = CurrentMemoryContext; 404 oldowner = CurrentResourceOwner; 405 406 PLy_spi_subtransaction_begin(oldcontext, oldowner); 407 408 PG_TRY(); 409 { 410 SPI_cursor_fetch(portal, true, count); 411 412 Py_DECREF(ret->status); 413 ret->status = PyInt_FromLong(SPI_OK_FETCH); 414 415 Py_DECREF(ret->nrows); 416 ret->nrows = PyLong_FromUnsignedLongLong(SPI_processed); 417 418 if (SPI_processed != 0) 419 { 420 uint64 i; 421 422 /* 423 * PyList_New() and PyList_SetItem() use Py_ssize_t for list size 424 * and list indices; so we cannot support a result larger than 425 * PY_SSIZE_T_MAX. 426 */ 427 if (SPI_processed > (uint64) PY_SSIZE_T_MAX) 428 ereport(ERROR, 429 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), 430 errmsg("query result has too many rows to fit in a Python list"))); 431 432 Py_DECREF(ret->rows); 433 ret->rows = PyList_New(SPI_processed); 434 if (!ret->rows) 435 { error(char * m)436 Py_DECREF(ret); 437 ret = NULL; 438 } 439 else 440 { 441 PLy_input_setup_tuple(&cursor->result, SPI_tuptable->tupdesc, 442 exec_ctx->curr_proc); 443 444 for (i = 0; i < SPI_processed; i++) 445 { 446 PyObject *row = PLy_input_from_tuple(&cursor->result, 447 SPI_tuptable->vals[i], 448 SPI_tuptable->tupdesc, 449 true); 450 451 PyList_SetItem(ret->rows, i, row); 452 } 453 } 454 } 455 456 SPI_freetuptable(SPI_tuptable); 457 458 PLy_spi_subtransaction_commit(oldcontext, oldowner); 459 } 460 PG_CATCH(); 461 { 462 PLy_spi_subtransaction_abort(oldcontext, oldowner); 463 return NULL; 464 } 465 PG_END_TRY(); 466 467 return (PyObject *) ret; 468 } 469 470 static PyObject * 471 PLy_cursor_close(PyObject *self, PyObject *unused) 472 { 473 PLyCursorObject *cursor = (PLyCursorObject *) self; 474 475 if (!cursor->closed) 476 { 477 Portal portal = GetPortalByName(cursor->portalname); 478 479 if (!PortalIsValid(portal)) 480 { 481 PLy_exception_set(PyExc_ValueError, 482 "closing a cursor in an aborted subtransaction"); 483 return NULL; 484 } 485 486 UnpinPortal(portal); 487 SPI_cursor_close(portal); 488 cursor->closed = true; 489 } 490 491 Py_RETURN_NONE; 492 } 493