1 /*-------------------------------------------------------------------------
2  *
3  * spi.c
4  *				Server Programming Interface
5  *
6  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/executor/spi.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/htup_details.h"
18 #include "access/printtup.h"
19 #include "access/sysattr.h"
20 #include "access/xact.h"
21 #include "catalog/heap.h"
22 #include "catalog/pg_type.h"
23 #include "commands/trigger.h"
24 #include "executor/executor.h"
25 #include "executor/spi_priv.h"
26 #include "miscadmin.h"
27 #include "tcop/pquery.h"
28 #include "tcop/utility.h"
29 #include "utils/builtins.h"
30 #include "utils/datum.h"
31 #include "utils/lsyscache.h"
32 #include "utils/memutils.h"
33 #include "utils/rel.h"
34 #include "utils/snapmgr.h"
35 #include "utils/syscache.h"
36 #include "utils/typcache.h"
37 
38 
39 /*
40  * These global variables are part of the API for various SPI functions
41  * (a horrible API choice, but it's too late now).  To reduce the risk of
42  * interference between different SPI callers, we save and restore them
43  * when entering/exiting a SPI nesting level.
44  */
45 uint64		SPI_processed = 0;
46 SPITupleTable *SPI_tuptable = NULL;
47 int			SPI_result = 0;
48 
49 static _SPI_connection *_SPI_stack = NULL;
50 static _SPI_connection *_SPI_current = NULL;
51 static int	_SPI_stack_depth = 0;	/* allocated size of _SPI_stack */
52 static int	_SPI_connected = -1;	/* current stack index */
53 
54 typedef struct SPICallbackArg
55 {
56 	const char *query;
57 	RawParseMode mode;
58 } SPICallbackArg;
59 
60 static Portal SPI_cursor_open_internal(const char *name, SPIPlanPtr plan,
61 									   ParamListInfo paramLI, bool read_only);
62 
63 static void _SPI_prepare_plan(const char *src, SPIPlanPtr plan);
64 
65 static void _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan);
66 
67 static int	_SPI_execute_plan(SPIPlanPtr plan, const SPIExecuteOptions *options,
68 							  Snapshot snapshot, Snapshot crosscheck_snapshot,
69 							  bool fire_triggers);
70 
71 static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
72 										 Datum *Values, const char *Nulls);
73 
74 static int	_SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount);
75 
76 static void _SPI_error_callback(void *arg);
77 
78 static void _SPI_cursor_operation(Portal portal,
79 								  FetchDirection direction, long count,
80 								  DestReceiver *dest);
81 
82 static SPIPlanPtr _SPI_make_plan_non_temp(SPIPlanPtr plan);
83 static SPIPlanPtr _SPI_save_plan(SPIPlanPtr plan);
84 
85 static int	_SPI_begin_call(bool use_exec);
86 static int	_SPI_end_call(bool use_exec);
87 static MemoryContext _SPI_execmem(void);
88 static MemoryContext _SPI_procmem(void);
89 static bool _SPI_checktuples(void);
90 
91 
92 /* =================== interface functions =================== */
93 
94 int
SPI_connect(void)95 SPI_connect(void)
96 {
97 	return SPI_connect_ext(0);
98 }
99 
100 int
SPI_connect_ext(int options)101 SPI_connect_ext(int options)
102 {
103 	int			newdepth;
104 
105 	/* Enlarge stack if necessary */
106 	if (_SPI_stack == NULL)
107 	{
108 		if (_SPI_connected != -1 || _SPI_stack_depth != 0)
109 			elog(ERROR, "SPI stack corrupted");
110 		newdepth = 16;
111 		_SPI_stack = (_SPI_connection *)
112 			MemoryContextAlloc(TopMemoryContext,
113 							   newdepth * sizeof(_SPI_connection));
114 		_SPI_stack_depth = newdepth;
115 	}
116 	else
117 	{
118 		if (_SPI_stack_depth <= 0 || _SPI_stack_depth <= _SPI_connected)
119 			elog(ERROR, "SPI stack corrupted");
120 		if (_SPI_stack_depth == _SPI_connected + 1)
121 		{
122 			newdepth = _SPI_stack_depth * 2;
123 			_SPI_stack = (_SPI_connection *)
124 				repalloc(_SPI_stack,
125 						 newdepth * sizeof(_SPI_connection));
126 			_SPI_stack_depth = newdepth;
127 		}
128 	}
129 
130 	/* Enter new stack level */
131 	_SPI_connected++;
132 	Assert(_SPI_connected >= 0 && _SPI_connected < _SPI_stack_depth);
133 
134 	_SPI_current = &(_SPI_stack[_SPI_connected]);
135 	_SPI_current->processed = 0;
136 	_SPI_current->tuptable = NULL;
137 	_SPI_current->execSubid = InvalidSubTransactionId;
138 	slist_init(&_SPI_current->tuptables);
139 	_SPI_current->procCxt = NULL;	/* in case we fail to create 'em */
140 	_SPI_current->execCxt = NULL;
141 	_SPI_current->connectSubid = GetCurrentSubTransactionId();
142 	_SPI_current->queryEnv = NULL;
143 	_SPI_current->atomic = (options & SPI_OPT_NONATOMIC ? false : true);
144 	_SPI_current->internal_xact = false;
145 	_SPI_current->outer_processed = SPI_processed;
146 	_SPI_current->outer_tuptable = SPI_tuptable;
147 	_SPI_current->outer_result = SPI_result;
148 
149 	/*
150 	 * Create memory contexts for this procedure
151 	 *
152 	 * In atomic contexts (the normal case), we use TopTransactionContext,
153 	 * otherwise PortalContext, so that it lives across transaction
154 	 * boundaries.
155 	 *
156 	 * XXX It could be better to use PortalContext as the parent context in
157 	 * all cases, but we may not be inside a portal (consider deferred-trigger
158 	 * execution).  Perhaps CurTransactionContext could be an option?  For now
159 	 * it doesn't matter because we clean up explicitly in AtEOSubXact_SPI().
160 	 */
161 	_SPI_current->procCxt = AllocSetContextCreate(_SPI_current->atomic ? TopTransactionContext : PortalContext,
162 												  "SPI Proc",
163 												  ALLOCSET_DEFAULT_SIZES);
164 	_SPI_current->execCxt = AllocSetContextCreate(_SPI_current->atomic ? TopTransactionContext : _SPI_current->procCxt,
165 												  "SPI Exec",
166 												  ALLOCSET_DEFAULT_SIZES);
167 	/* ... and switch to procedure's context */
168 	_SPI_current->savedcxt = MemoryContextSwitchTo(_SPI_current->procCxt);
169 
170 	/*
171 	 * Reset API global variables so that current caller cannot accidentally
172 	 * depend on state of an outer caller.
173 	 */
174 	SPI_processed = 0;
175 	SPI_tuptable = NULL;
176 	SPI_result = 0;
177 
178 	return SPI_OK_CONNECT;
179 }
180 
181 int
SPI_finish(void)182 SPI_finish(void)
183 {
184 	int			res;
185 
186 	res = _SPI_begin_call(false);	/* just check we're connected */
187 	if (res < 0)
188 		return res;
189 
190 	/* Restore memory context as it was before procedure call */
191 	MemoryContextSwitchTo(_SPI_current->savedcxt);
192 
193 	/* Release memory used in procedure call (including tuptables) */
194 	MemoryContextDelete(_SPI_current->execCxt);
195 	_SPI_current->execCxt = NULL;
196 	MemoryContextDelete(_SPI_current->procCxt);
197 	_SPI_current->procCxt = NULL;
198 
199 	/*
200 	 * Restore outer API variables, especially SPI_tuptable which is probably
201 	 * pointing at a just-deleted tuptable
202 	 */
203 	SPI_processed = _SPI_current->outer_processed;
204 	SPI_tuptable = _SPI_current->outer_tuptable;
205 	SPI_result = _SPI_current->outer_result;
206 
207 	/* Exit stack level */
208 	_SPI_connected--;
209 	if (_SPI_connected < 0)
210 		_SPI_current = NULL;
211 	else
212 		_SPI_current = &(_SPI_stack[_SPI_connected]);
213 
214 	return SPI_OK_FINISH;
215 }
216 
217 void
SPI_start_transaction(void)218 SPI_start_transaction(void)
219 {
220 	MemoryContext oldcontext = CurrentMemoryContext;
221 
222 	StartTransactionCommand();
223 	MemoryContextSwitchTo(oldcontext);
224 }
225 
226 static void
_SPI_commit(bool chain)227 _SPI_commit(bool chain)
228 {
229 	MemoryContext oldcontext = CurrentMemoryContext;
230 
231 	if (_SPI_current->atomic)
232 		ereport(ERROR,
233 				(errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION),
234 				 errmsg("invalid transaction termination")));
235 
236 	/*
237 	 * This restriction is required by PLs implemented on top of SPI.  They
238 	 * use subtransactions to establish exception blocks that are supposed to
239 	 * be rolled back together if there is an error.  Terminating the
240 	 * top-level transaction in such a block violates that idea.  A future PL
241 	 * implementation might have different ideas about this, in which case
242 	 * this restriction would have to be refined or the check possibly be
243 	 * moved out of SPI into the PLs.
244 	 */
245 	if (IsSubTransaction())
246 		ereport(ERROR,
247 				(errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION),
248 				 errmsg("cannot commit while a subtransaction is active")));
249 
250 	/*
251 	 * Hold any pinned portals that any PLs might be using.  We have to do
252 	 * this before changing transaction state, since this will run
253 	 * user-defined code that might throw an error.
254 	 */
255 	HoldPinnedPortals();
256 
257 	/* Start the actual commit */
258 	_SPI_current->internal_xact = true;
259 
260 	/* Release snapshots associated with portals */
261 	ForgetPortalSnapshots();
262 
263 	if (chain)
264 		SaveTransactionCharacteristics();
265 
266 	CommitTransactionCommand();
267 
268 	if (chain)
269 	{
270 		StartTransactionCommand();
271 		RestoreTransactionCharacteristics();
272 	}
273 
274 	MemoryContextSwitchTo(oldcontext);
275 
276 	_SPI_current->internal_xact = false;
277 }
278 
279 void
SPI_commit(void)280 SPI_commit(void)
281 {
282 	_SPI_commit(false);
283 }
284 
285 void
SPI_commit_and_chain(void)286 SPI_commit_and_chain(void)
287 {
288 	_SPI_commit(true);
289 }
290 
291 static void
_SPI_rollback(bool chain)292 _SPI_rollback(bool chain)
293 {
294 	MemoryContext oldcontext = CurrentMemoryContext;
295 
296 	if (_SPI_current->atomic)
297 		ereport(ERROR,
298 				(errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION),
299 				 errmsg("invalid transaction termination")));
300 
301 	/* see under SPI_commit() */
302 	if (IsSubTransaction())
303 		ereport(ERROR,
304 				(errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION),
305 				 errmsg("cannot roll back while a subtransaction is active")));
306 
307 	/*
308 	 * Hold any pinned portals that any PLs might be using.  We have to do
309 	 * this before changing transaction state, since this will run
310 	 * user-defined code that might throw an error, and in any case couldn't
311 	 * be run in an already-aborted transaction.
312 	 */
313 	HoldPinnedPortals();
314 
315 	/* Start the actual rollback */
316 	_SPI_current->internal_xact = true;
317 
318 	/* Release snapshots associated with portals */
319 	ForgetPortalSnapshots();
320 
321 	if (chain)
322 		SaveTransactionCharacteristics();
323 
324 	AbortCurrentTransaction();
325 
326 	if (chain)
327 	{
328 		StartTransactionCommand();
329 		RestoreTransactionCharacteristics();
330 	}
331 
332 	MemoryContextSwitchTo(oldcontext);
333 
334 	_SPI_current->internal_xact = false;
335 }
336 
337 void
SPI_rollback(void)338 SPI_rollback(void)
339 {
340 	_SPI_rollback(false);
341 }
342 
343 void
SPI_rollback_and_chain(void)344 SPI_rollback_and_chain(void)
345 {
346 	_SPI_rollback(true);
347 }
348 
349 /*
350  * Clean up SPI state.  Called on transaction end (of non-SPI-internal
351  * transactions) and when returning to the main loop on error.
352  */
353 void
SPICleanup(void)354 SPICleanup(void)
355 {
356 	_SPI_current = NULL;
357 	_SPI_connected = -1;
358 	/* Reset API global variables, too */
359 	SPI_processed = 0;
360 	SPI_tuptable = NULL;
361 	SPI_result = 0;
362 }
363 
364 /*
365  * Clean up SPI state at transaction commit or abort.
366  */
367 void
AtEOXact_SPI(bool isCommit)368 AtEOXact_SPI(bool isCommit)
369 {
370 	/* Do nothing if the transaction end was initiated by SPI. */
371 	if (_SPI_current && _SPI_current->internal_xact)
372 		return;
373 
374 	if (isCommit && _SPI_connected != -1)
375 		ereport(WARNING,
376 				(errcode(ERRCODE_WARNING),
377 				 errmsg("transaction left non-empty SPI stack"),
378 				 errhint("Check for missing \"SPI_finish\" calls.")));
379 
380 	SPICleanup();
381 }
382 
383 /*
384  * Clean up SPI state at subtransaction commit or abort.
385  *
386  * During commit, there shouldn't be any unclosed entries remaining from
387  * the current subtransaction; we emit a warning if any are found.
388  */
389 void
AtEOSubXact_SPI(bool isCommit,SubTransactionId mySubid)390 AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid)
391 {
392 	bool		found = false;
393 
394 	while (_SPI_connected >= 0)
395 	{
396 		_SPI_connection *connection = &(_SPI_stack[_SPI_connected]);
397 
398 		if (connection->connectSubid != mySubid)
399 			break;				/* couldn't be any underneath it either */
400 
401 		if (connection->internal_xact)
402 			break;
403 
404 		found = true;
405 
406 		/*
407 		 * Release procedure memory explicitly (see note in SPI_connect)
408 		 */
409 		if (connection->execCxt)
410 		{
411 			MemoryContextDelete(connection->execCxt);
412 			connection->execCxt = NULL;
413 		}
414 		if (connection->procCxt)
415 		{
416 			MemoryContextDelete(connection->procCxt);
417 			connection->procCxt = NULL;
418 		}
419 
420 		/*
421 		 * Restore outer global variables and pop the stack entry.  Unlike
422 		 * SPI_finish(), we don't risk switching to memory contexts that might
423 		 * be already gone.
424 		 */
425 		SPI_processed = connection->outer_processed;
426 		SPI_tuptable = connection->outer_tuptable;
427 		SPI_result = connection->outer_result;
428 
429 		_SPI_connected--;
430 		if (_SPI_connected < 0)
431 			_SPI_current = NULL;
432 		else
433 			_SPI_current = &(_SPI_stack[_SPI_connected]);
434 	}
435 
436 	if (found && isCommit)
437 		ereport(WARNING,
438 				(errcode(ERRCODE_WARNING),
439 				 errmsg("subtransaction left non-empty SPI stack"),
440 				 errhint("Check for missing \"SPI_finish\" calls.")));
441 
442 	/*
443 	 * If we are aborting a subtransaction and there is an open SPI context
444 	 * surrounding the subxact, clean up to prevent memory leakage.
445 	 */
446 	if (_SPI_current && !isCommit)
447 	{
448 		slist_mutable_iter siter;
449 
450 		/*
451 		 * Throw away executor state if current executor operation was started
452 		 * within current subxact (essentially, force a _SPI_end_call(true)).
453 		 */
454 		if (_SPI_current->execSubid >= mySubid)
455 		{
456 			_SPI_current->execSubid = InvalidSubTransactionId;
457 			MemoryContextResetAndDeleteChildren(_SPI_current->execCxt);
458 		}
459 
460 		/* throw away any tuple tables created within current subxact */
461 		slist_foreach_modify(siter, &_SPI_current->tuptables)
462 		{
463 			SPITupleTable *tuptable;
464 
465 			tuptable = slist_container(SPITupleTable, next, siter.cur);
466 			if (tuptable->subid >= mySubid)
467 			{
468 				/*
469 				 * If we used SPI_freetuptable() here, its internal search of
470 				 * the tuptables list would make this operation O(N^2).
471 				 * Instead, just free the tuptable manually.  This should
472 				 * match what SPI_freetuptable() does.
473 				 */
474 				slist_delete_current(&siter);
475 				if (tuptable == _SPI_current->tuptable)
476 					_SPI_current->tuptable = NULL;
477 				if (tuptable == SPI_tuptable)
478 					SPI_tuptable = NULL;
479 				MemoryContextDelete(tuptable->tuptabcxt);
480 			}
481 		}
482 	}
483 }
484 
485 /*
486  * Are we executing inside a procedure (that is, a nonatomic SPI context)?
487  */
488 bool
SPI_inside_nonatomic_context(void)489 SPI_inside_nonatomic_context(void)
490 {
491 	if (_SPI_current == NULL)
492 		return false;			/* not in any SPI context at all */
493 	if (_SPI_current->atomic)
494 		return false;			/* it's atomic (ie function not procedure) */
495 	return true;
496 }
497 
498 
499 /* Parse, plan, and execute a query string */
500 int
SPI_execute(const char * src,bool read_only,long tcount)501 SPI_execute(const char *src, bool read_only, long tcount)
502 {
503 	_SPI_plan	plan;
504 	SPIExecuteOptions options;
505 	int			res;
506 
507 	if (src == NULL || tcount < 0)
508 		return SPI_ERROR_ARGUMENT;
509 
510 	res = _SPI_begin_call(true);
511 	if (res < 0)
512 		return res;
513 
514 	memset(&plan, 0, sizeof(_SPI_plan));
515 	plan.magic = _SPI_PLAN_MAGIC;
516 	plan.parse_mode = RAW_PARSE_DEFAULT;
517 	plan.cursor_options = CURSOR_OPT_PARALLEL_OK;
518 
519 	_SPI_prepare_oneshot_plan(src, &plan);
520 
521 	memset(&options, 0, sizeof(options));
522 	options.read_only = read_only;
523 	options.tcount = tcount;
524 
525 	res = _SPI_execute_plan(&plan, &options,
526 							InvalidSnapshot, InvalidSnapshot,
527 							true);
528 
529 	_SPI_end_call(true);
530 	return res;
531 }
532 
533 /* Obsolete version of SPI_execute */
534 int
SPI_exec(const char * src,long tcount)535 SPI_exec(const char *src, long tcount)
536 {
537 	return SPI_execute(src, false, tcount);
538 }
539 
540 /* Parse, plan, and execute a query string, with extensible options */
541 int
SPI_execute_extended(const char * src,const SPIExecuteOptions * options)542 SPI_execute_extended(const char *src,
543 					 const SPIExecuteOptions *options)
544 {
545 	int			res;
546 	_SPI_plan	plan;
547 
548 	if (src == NULL || options == NULL)
549 		return SPI_ERROR_ARGUMENT;
550 
551 	res = _SPI_begin_call(true);
552 	if (res < 0)
553 		return res;
554 
555 	memset(&plan, 0, sizeof(_SPI_plan));
556 	plan.magic = _SPI_PLAN_MAGIC;
557 	plan.parse_mode = RAW_PARSE_DEFAULT;
558 	plan.cursor_options = CURSOR_OPT_PARALLEL_OK;
559 	if (options->params)
560 	{
561 		plan.parserSetup = options->params->parserSetup;
562 		plan.parserSetupArg = options->params->parserSetupArg;
563 	}
564 
565 	_SPI_prepare_oneshot_plan(src, &plan);
566 
567 	res = _SPI_execute_plan(&plan, options,
568 							InvalidSnapshot, InvalidSnapshot,
569 							true);
570 
571 	_SPI_end_call(true);
572 	return res;
573 }
574 
575 /* Execute a previously prepared plan */
576 int
SPI_execute_plan(SPIPlanPtr plan,Datum * Values,const char * Nulls,bool read_only,long tcount)577 SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
578 				 bool read_only, long tcount)
579 {
580 	SPIExecuteOptions options;
581 	int			res;
582 
583 	if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
584 		return SPI_ERROR_ARGUMENT;
585 
586 	if (plan->nargs > 0 && Values == NULL)
587 		return SPI_ERROR_PARAM;
588 
589 	res = _SPI_begin_call(true);
590 	if (res < 0)
591 		return res;
592 
593 	memset(&options, 0, sizeof(options));
594 	options.params = _SPI_convert_params(plan->nargs, plan->argtypes,
595 										 Values, Nulls);
596 	options.read_only = read_only;
597 	options.tcount = tcount;
598 
599 	res = _SPI_execute_plan(plan, &options,
600 							InvalidSnapshot, InvalidSnapshot,
601 							true);
602 
603 	_SPI_end_call(true);
604 	return res;
605 }
606 
607 /* Obsolete version of SPI_execute_plan */
608 int
SPI_execp(SPIPlanPtr plan,Datum * Values,const char * Nulls,long tcount)609 SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls, long tcount)
610 {
611 	return SPI_execute_plan(plan, Values, Nulls, false, tcount);
612 }
613 
614 /* Execute a previously prepared plan */
615 int
SPI_execute_plan_extended(SPIPlanPtr plan,const SPIExecuteOptions * options)616 SPI_execute_plan_extended(SPIPlanPtr plan,
617 						  const SPIExecuteOptions *options)
618 {
619 	int			res;
620 
621 	if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || options == NULL)
622 		return SPI_ERROR_ARGUMENT;
623 
624 	res = _SPI_begin_call(true);
625 	if (res < 0)
626 		return res;
627 
628 	res = _SPI_execute_plan(plan, options,
629 							InvalidSnapshot, InvalidSnapshot,
630 							true);
631 
632 	_SPI_end_call(true);
633 	return res;
634 }
635 
636 /* Execute a previously prepared plan */
637 int
SPI_execute_plan_with_paramlist(SPIPlanPtr plan,ParamListInfo params,bool read_only,long tcount)638 SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params,
639 								bool read_only, long tcount)
640 {
641 	SPIExecuteOptions options;
642 	int			res;
643 
644 	if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
645 		return SPI_ERROR_ARGUMENT;
646 
647 	res = _SPI_begin_call(true);
648 	if (res < 0)
649 		return res;
650 
651 	memset(&options, 0, sizeof(options));
652 	options.params = params;
653 	options.read_only = read_only;
654 	options.tcount = tcount;
655 
656 	res = _SPI_execute_plan(plan, &options,
657 							InvalidSnapshot, InvalidSnapshot,
658 							true);
659 
660 	_SPI_end_call(true);
661 	return res;
662 }
663 
664 /*
665  * SPI_execute_snapshot -- identical to SPI_execute_plan, except that we allow
666  * the caller to specify exactly which snapshots to use, which will be
667  * registered here.  Also, the caller may specify that AFTER triggers should be
668  * queued as part of the outer query rather than being fired immediately at the
669  * end of the command.
670  *
671  * This is currently not documented in spi.sgml because it is only intended
672  * for use by RI triggers.
673  *
674  * Passing snapshot == InvalidSnapshot will select the normal behavior of
675  * fetching a new snapshot for each query.
676  */
677 int
SPI_execute_snapshot(SPIPlanPtr plan,Datum * Values,const char * Nulls,Snapshot snapshot,Snapshot crosscheck_snapshot,bool read_only,bool fire_triggers,long tcount)678 SPI_execute_snapshot(SPIPlanPtr plan,
679 					 Datum *Values, const char *Nulls,
680 					 Snapshot snapshot, Snapshot crosscheck_snapshot,
681 					 bool read_only, bool fire_triggers, long tcount)
682 {
683 	SPIExecuteOptions options;
684 	int			res;
685 
686 	if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
687 		return SPI_ERROR_ARGUMENT;
688 
689 	if (plan->nargs > 0 && Values == NULL)
690 		return SPI_ERROR_PARAM;
691 
692 	res = _SPI_begin_call(true);
693 	if (res < 0)
694 		return res;
695 
696 	memset(&options, 0, sizeof(options));
697 	options.params = _SPI_convert_params(plan->nargs, plan->argtypes,
698 										 Values, Nulls);
699 	options.read_only = read_only;
700 	options.tcount = tcount;
701 
702 	res = _SPI_execute_plan(plan, &options,
703 							snapshot, crosscheck_snapshot,
704 							fire_triggers);
705 
706 	_SPI_end_call(true);
707 	return res;
708 }
709 
710 /*
711  * SPI_execute_with_args -- plan and execute a query with supplied arguments
712  *
713  * This is functionally equivalent to SPI_prepare followed by
714  * SPI_execute_plan.
715  */
716 int
SPI_execute_with_args(const char * src,int nargs,Oid * argtypes,Datum * Values,const char * Nulls,bool read_only,long tcount)717 SPI_execute_with_args(const char *src,
718 					  int nargs, Oid *argtypes,
719 					  Datum *Values, const char *Nulls,
720 					  bool read_only, long tcount)
721 {
722 	int			res;
723 	_SPI_plan	plan;
724 	ParamListInfo paramLI;
725 	SPIExecuteOptions options;
726 
727 	if (src == NULL || nargs < 0 || tcount < 0)
728 		return SPI_ERROR_ARGUMENT;
729 
730 	if (nargs > 0 && (argtypes == NULL || Values == NULL))
731 		return SPI_ERROR_PARAM;
732 
733 	res = _SPI_begin_call(true);
734 	if (res < 0)
735 		return res;
736 
737 	memset(&plan, 0, sizeof(_SPI_plan));
738 	plan.magic = _SPI_PLAN_MAGIC;
739 	plan.parse_mode = RAW_PARSE_DEFAULT;
740 	plan.cursor_options = CURSOR_OPT_PARALLEL_OK;
741 	plan.nargs = nargs;
742 	plan.argtypes = argtypes;
743 	plan.parserSetup = NULL;
744 	plan.parserSetupArg = NULL;
745 
746 	paramLI = _SPI_convert_params(nargs, argtypes,
747 								  Values, Nulls);
748 
749 	_SPI_prepare_oneshot_plan(src, &plan);
750 
751 	memset(&options, 0, sizeof(options));
752 	options.params = paramLI;
753 	options.read_only = read_only;
754 	options.tcount = tcount;
755 
756 	res = _SPI_execute_plan(&plan, &options,
757 							InvalidSnapshot, InvalidSnapshot,
758 							true);
759 
760 	_SPI_end_call(true);
761 	return res;
762 }
763 
764 SPIPlanPtr
SPI_prepare(const char * src,int nargs,Oid * argtypes)765 SPI_prepare(const char *src, int nargs, Oid *argtypes)
766 {
767 	return SPI_prepare_cursor(src, nargs, argtypes, 0);
768 }
769 
770 SPIPlanPtr
SPI_prepare_cursor(const char * src,int nargs,Oid * argtypes,int cursorOptions)771 SPI_prepare_cursor(const char *src, int nargs, Oid *argtypes,
772 				   int cursorOptions)
773 {
774 	_SPI_plan	plan;
775 	SPIPlanPtr	result;
776 
777 	if (src == NULL || nargs < 0 || (nargs > 0 && argtypes == NULL))
778 	{
779 		SPI_result = SPI_ERROR_ARGUMENT;
780 		return NULL;
781 	}
782 
783 	SPI_result = _SPI_begin_call(true);
784 	if (SPI_result < 0)
785 		return NULL;
786 
787 	memset(&plan, 0, sizeof(_SPI_plan));
788 	plan.magic = _SPI_PLAN_MAGIC;
789 	plan.parse_mode = RAW_PARSE_DEFAULT;
790 	plan.cursor_options = cursorOptions;
791 	plan.nargs = nargs;
792 	plan.argtypes = argtypes;
793 	plan.parserSetup = NULL;
794 	plan.parserSetupArg = NULL;
795 
796 	_SPI_prepare_plan(src, &plan);
797 
798 	/* copy plan to procedure context */
799 	result = _SPI_make_plan_non_temp(&plan);
800 
801 	_SPI_end_call(true);
802 
803 	return result;
804 }
805 
806 SPIPlanPtr
SPI_prepare_extended(const char * src,const SPIPrepareOptions * options)807 SPI_prepare_extended(const char *src,
808 					 const SPIPrepareOptions *options)
809 {
810 	_SPI_plan	plan;
811 	SPIPlanPtr	result;
812 
813 	if (src == NULL || options == NULL)
814 	{
815 		SPI_result = SPI_ERROR_ARGUMENT;
816 		return NULL;
817 	}
818 
819 	SPI_result = _SPI_begin_call(true);
820 	if (SPI_result < 0)
821 		return NULL;
822 
823 	memset(&plan, 0, sizeof(_SPI_plan));
824 	plan.magic = _SPI_PLAN_MAGIC;
825 	plan.parse_mode = options->parseMode;
826 	plan.cursor_options = options->cursorOptions;
827 	plan.nargs = 0;
828 	plan.argtypes = NULL;
829 	plan.parserSetup = options->parserSetup;
830 	plan.parserSetupArg = options->parserSetupArg;
831 
832 	_SPI_prepare_plan(src, &plan);
833 
834 	/* copy plan to procedure context */
835 	result = _SPI_make_plan_non_temp(&plan);
836 
837 	_SPI_end_call(true);
838 
839 	return result;
840 }
841 
842 SPIPlanPtr
SPI_prepare_params(const char * src,ParserSetupHook parserSetup,void * parserSetupArg,int cursorOptions)843 SPI_prepare_params(const char *src,
844 				   ParserSetupHook parserSetup,
845 				   void *parserSetupArg,
846 				   int cursorOptions)
847 {
848 	_SPI_plan	plan;
849 	SPIPlanPtr	result;
850 
851 	if (src == NULL)
852 	{
853 		SPI_result = SPI_ERROR_ARGUMENT;
854 		return NULL;
855 	}
856 
857 	SPI_result = _SPI_begin_call(true);
858 	if (SPI_result < 0)
859 		return NULL;
860 
861 	memset(&plan, 0, sizeof(_SPI_plan));
862 	plan.magic = _SPI_PLAN_MAGIC;
863 	plan.parse_mode = RAW_PARSE_DEFAULT;
864 	plan.cursor_options = cursorOptions;
865 	plan.nargs = 0;
866 	plan.argtypes = NULL;
867 	plan.parserSetup = parserSetup;
868 	plan.parserSetupArg = parserSetupArg;
869 
870 	_SPI_prepare_plan(src, &plan);
871 
872 	/* copy plan to procedure context */
873 	result = _SPI_make_plan_non_temp(&plan);
874 
875 	_SPI_end_call(true);
876 
877 	return result;
878 }
879 
880 int
SPI_keepplan(SPIPlanPtr plan)881 SPI_keepplan(SPIPlanPtr plan)
882 {
883 	ListCell   *lc;
884 
885 	if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC ||
886 		plan->saved || plan->oneshot)
887 		return SPI_ERROR_ARGUMENT;
888 
889 	/*
890 	 * Mark it saved, reparent it under CacheMemoryContext, and mark all the
891 	 * component CachedPlanSources as saved.  This sequence cannot fail
892 	 * partway through, so there's no risk of long-term memory leakage.
893 	 */
894 	plan->saved = true;
895 	MemoryContextSetParent(plan->plancxt, CacheMemoryContext);
896 
897 	foreach(lc, plan->plancache_list)
898 	{
899 		CachedPlanSource *plansource = (CachedPlanSource *) lfirst(lc);
900 
901 		SaveCachedPlan(plansource);
902 	}
903 
904 	return 0;
905 }
906 
907 SPIPlanPtr
SPI_saveplan(SPIPlanPtr plan)908 SPI_saveplan(SPIPlanPtr plan)
909 {
910 	SPIPlanPtr	newplan;
911 
912 	if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC)
913 	{
914 		SPI_result = SPI_ERROR_ARGUMENT;
915 		return NULL;
916 	}
917 
918 	SPI_result = _SPI_begin_call(false);	/* don't change context */
919 	if (SPI_result < 0)
920 		return NULL;
921 
922 	newplan = _SPI_save_plan(plan);
923 
924 	SPI_result = _SPI_end_call(false);
925 
926 	return newplan;
927 }
928 
929 int
SPI_freeplan(SPIPlanPtr plan)930 SPI_freeplan(SPIPlanPtr plan)
931 {
932 	ListCell   *lc;
933 
934 	if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC)
935 		return SPI_ERROR_ARGUMENT;
936 
937 	/* Release the plancache entries */
938 	foreach(lc, plan->plancache_list)
939 	{
940 		CachedPlanSource *plansource = (CachedPlanSource *) lfirst(lc);
941 
942 		DropCachedPlan(plansource);
943 	}
944 
945 	/* Now get rid of the _SPI_plan and subsidiary data in its plancxt */
946 	MemoryContextDelete(plan->plancxt);
947 
948 	return 0;
949 }
950 
951 HeapTuple
SPI_copytuple(HeapTuple tuple)952 SPI_copytuple(HeapTuple tuple)
953 {
954 	MemoryContext oldcxt;
955 	HeapTuple	ctuple;
956 
957 	if (tuple == NULL)
958 	{
959 		SPI_result = SPI_ERROR_ARGUMENT;
960 		return NULL;
961 	}
962 
963 	if (_SPI_current == NULL)
964 	{
965 		SPI_result = SPI_ERROR_UNCONNECTED;
966 		return NULL;
967 	}
968 
969 	oldcxt = MemoryContextSwitchTo(_SPI_current->savedcxt);
970 
971 	ctuple = heap_copytuple(tuple);
972 
973 	MemoryContextSwitchTo(oldcxt);
974 
975 	return ctuple;
976 }
977 
978 HeapTupleHeader
SPI_returntuple(HeapTuple tuple,TupleDesc tupdesc)979 SPI_returntuple(HeapTuple tuple, TupleDesc tupdesc)
980 {
981 	MemoryContext oldcxt;
982 	HeapTupleHeader dtup;
983 
984 	if (tuple == NULL || tupdesc == NULL)
985 	{
986 		SPI_result = SPI_ERROR_ARGUMENT;
987 		return NULL;
988 	}
989 
990 	if (_SPI_current == NULL)
991 	{
992 		SPI_result = SPI_ERROR_UNCONNECTED;
993 		return NULL;
994 	}
995 
996 	/* For RECORD results, make sure a typmod has been assigned */
997 	if (tupdesc->tdtypeid == RECORDOID &&
998 		tupdesc->tdtypmod < 0)
999 		assign_record_type_typmod(tupdesc);
1000 
1001 	oldcxt = MemoryContextSwitchTo(_SPI_current->savedcxt);
1002 
1003 	dtup = DatumGetHeapTupleHeader(heap_copy_tuple_as_datum(tuple, tupdesc));
1004 
1005 	MemoryContextSwitchTo(oldcxt);
1006 
1007 	return dtup;
1008 }
1009 
1010 HeapTuple
SPI_modifytuple(Relation rel,HeapTuple tuple,int natts,int * attnum,Datum * Values,const char * Nulls)1011 SPI_modifytuple(Relation rel, HeapTuple tuple, int natts, int *attnum,
1012 				Datum *Values, const char *Nulls)
1013 {
1014 	MemoryContext oldcxt;
1015 	HeapTuple	mtuple;
1016 	int			numberOfAttributes;
1017 	Datum	   *v;
1018 	bool	   *n;
1019 	int			i;
1020 
1021 	if (rel == NULL || tuple == NULL || natts < 0 || attnum == NULL || Values == NULL)
1022 	{
1023 		SPI_result = SPI_ERROR_ARGUMENT;
1024 		return NULL;
1025 	}
1026 
1027 	if (_SPI_current == NULL)
1028 	{
1029 		SPI_result = SPI_ERROR_UNCONNECTED;
1030 		return NULL;
1031 	}
1032 
1033 	oldcxt = MemoryContextSwitchTo(_SPI_current->savedcxt);
1034 
1035 	SPI_result = 0;
1036 
1037 	numberOfAttributes = rel->rd_att->natts;
1038 	v = (Datum *) palloc(numberOfAttributes * sizeof(Datum));
1039 	n = (bool *) palloc(numberOfAttributes * sizeof(bool));
1040 
1041 	/* fetch old values and nulls */
1042 	heap_deform_tuple(tuple, rel->rd_att, v, n);
1043 
1044 	/* replace values and nulls */
1045 	for (i = 0; i < natts; i++)
1046 	{
1047 		if (attnum[i] <= 0 || attnum[i] > numberOfAttributes)
1048 			break;
1049 		v[attnum[i] - 1] = Values[i];
1050 		n[attnum[i] - 1] = (Nulls && Nulls[i] == 'n') ? true : false;
1051 	}
1052 
1053 	if (i == natts)				/* no errors in *attnum */
1054 	{
1055 		mtuple = heap_form_tuple(rel->rd_att, v, n);
1056 
1057 		/*
1058 		 * copy the identification info of the old tuple: t_ctid, t_self, and
1059 		 * OID (if any)
1060 		 */
1061 		mtuple->t_data->t_ctid = tuple->t_data->t_ctid;
1062 		mtuple->t_self = tuple->t_self;
1063 		mtuple->t_tableOid = tuple->t_tableOid;
1064 	}
1065 	else
1066 	{
1067 		mtuple = NULL;
1068 		SPI_result = SPI_ERROR_NOATTRIBUTE;
1069 	}
1070 
1071 	pfree(v);
1072 	pfree(n);
1073 
1074 	MemoryContextSwitchTo(oldcxt);
1075 
1076 	return mtuple;
1077 }
1078 
1079 int
SPI_fnumber(TupleDesc tupdesc,const char * fname)1080 SPI_fnumber(TupleDesc tupdesc, const char *fname)
1081 {
1082 	int			res;
1083 	const FormData_pg_attribute *sysatt;
1084 
1085 	for (res = 0; res < tupdesc->natts; res++)
1086 	{
1087 		Form_pg_attribute attr = TupleDescAttr(tupdesc, res);
1088 
1089 		if (namestrcmp(&attr->attname, fname) == 0 &&
1090 			!attr->attisdropped)
1091 			return res + 1;
1092 	}
1093 
1094 	sysatt = SystemAttributeByName(fname);
1095 	if (sysatt != NULL)
1096 		return sysatt->attnum;
1097 
1098 	/* SPI_ERROR_NOATTRIBUTE is different from all sys column numbers */
1099 	return SPI_ERROR_NOATTRIBUTE;
1100 }
1101 
1102 char *
SPI_fname(TupleDesc tupdesc,int fnumber)1103 SPI_fname(TupleDesc tupdesc, int fnumber)
1104 {
1105 	const FormData_pg_attribute *att;
1106 
1107 	SPI_result = 0;
1108 
1109 	if (fnumber > tupdesc->natts || fnumber == 0 ||
1110 		fnumber <= FirstLowInvalidHeapAttributeNumber)
1111 	{
1112 		SPI_result = SPI_ERROR_NOATTRIBUTE;
1113 		return NULL;
1114 	}
1115 
1116 	if (fnumber > 0)
1117 		att = TupleDescAttr(tupdesc, fnumber - 1);
1118 	else
1119 		att = SystemAttributeDefinition(fnumber);
1120 
1121 	return pstrdup(NameStr(att->attname));
1122 }
1123 
1124 char *
SPI_getvalue(HeapTuple tuple,TupleDesc tupdesc,int fnumber)1125 SPI_getvalue(HeapTuple tuple, TupleDesc tupdesc, int fnumber)
1126 {
1127 	Datum		val;
1128 	bool		isnull;
1129 	Oid			typoid,
1130 				foutoid;
1131 	bool		typisvarlena;
1132 
1133 	SPI_result = 0;
1134 
1135 	if (fnumber > tupdesc->natts || fnumber == 0 ||
1136 		fnumber <= FirstLowInvalidHeapAttributeNumber)
1137 	{
1138 		SPI_result = SPI_ERROR_NOATTRIBUTE;
1139 		return NULL;
1140 	}
1141 
1142 	val = heap_getattr(tuple, fnumber, tupdesc, &isnull);
1143 	if (isnull)
1144 		return NULL;
1145 
1146 	if (fnumber > 0)
1147 		typoid = TupleDescAttr(tupdesc, fnumber - 1)->atttypid;
1148 	else
1149 		typoid = (SystemAttributeDefinition(fnumber))->atttypid;
1150 
1151 	getTypeOutputInfo(typoid, &foutoid, &typisvarlena);
1152 
1153 	return OidOutputFunctionCall(foutoid, val);
1154 }
1155 
1156 Datum
SPI_getbinval(HeapTuple tuple,TupleDesc tupdesc,int fnumber,bool * isnull)1157 SPI_getbinval(HeapTuple tuple, TupleDesc tupdesc, int fnumber, bool *isnull)
1158 {
1159 	SPI_result = 0;
1160 
1161 	if (fnumber > tupdesc->natts || fnumber == 0 ||
1162 		fnumber <= FirstLowInvalidHeapAttributeNumber)
1163 	{
1164 		SPI_result = SPI_ERROR_NOATTRIBUTE;
1165 		*isnull = true;
1166 		return (Datum) NULL;
1167 	}
1168 
1169 	return heap_getattr(tuple, fnumber, tupdesc, isnull);
1170 }
1171 
1172 char *
SPI_gettype(TupleDesc tupdesc,int fnumber)1173 SPI_gettype(TupleDesc tupdesc, int fnumber)
1174 {
1175 	Oid			typoid;
1176 	HeapTuple	typeTuple;
1177 	char	   *result;
1178 
1179 	SPI_result = 0;
1180 
1181 	if (fnumber > tupdesc->natts || fnumber == 0 ||
1182 		fnumber <= FirstLowInvalidHeapAttributeNumber)
1183 	{
1184 		SPI_result = SPI_ERROR_NOATTRIBUTE;
1185 		return NULL;
1186 	}
1187 
1188 	if (fnumber > 0)
1189 		typoid = TupleDescAttr(tupdesc, fnumber - 1)->atttypid;
1190 	else
1191 		typoid = (SystemAttributeDefinition(fnumber))->atttypid;
1192 
1193 	typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typoid));
1194 
1195 	if (!HeapTupleIsValid(typeTuple))
1196 	{
1197 		SPI_result = SPI_ERROR_TYPUNKNOWN;
1198 		return NULL;
1199 	}
1200 
1201 	result = pstrdup(NameStr(((Form_pg_type) GETSTRUCT(typeTuple))->typname));
1202 	ReleaseSysCache(typeTuple);
1203 	return result;
1204 }
1205 
1206 /*
1207  * Get the data type OID for a column.
1208  *
1209  * There's nothing similar for typmod and typcollation.  The rare consumers
1210  * thereof should inspect the TupleDesc directly.
1211  */
1212 Oid
SPI_gettypeid(TupleDesc tupdesc,int fnumber)1213 SPI_gettypeid(TupleDesc tupdesc, int fnumber)
1214 {
1215 	SPI_result = 0;
1216 
1217 	if (fnumber > tupdesc->natts || fnumber == 0 ||
1218 		fnumber <= FirstLowInvalidHeapAttributeNumber)
1219 	{
1220 		SPI_result = SPI_ERROR_NOATTRIBUTE;
1221 		return InvalidOid;
1222 	}
1223 
1224 	if (fnumber > 0)
1225 		return TupleDescAttr(tupdesc, fnumber - 1)->atttypid;
1226 	else
1227 		return (SystemAttributeDefinition(fnumber))->atttypid;
1228 }
1229 
1230 char *
SPI_getrelname(Relation rel)1231 SPI_getrelname(Relation rel)
1232 {
1233 	return pstrdup(RelationGetRelationName(rel));
1234 }
1235 
1236 char *
SPI_getnspname(Relation rel)1237 SPI_getnspname(Relation rel)
1238 {
1239 	return get_namespace_name(RelationGetNamespace(rel));
1240 }
1241 
1242 void *
SPI_palloc(Size size)1243 SPI_palloc(Size size)
1244 {
1245 	if (_SPI_current == NULL)
1246 		elog(ERROR, "SPI_palloc called while not connected to SPI");
1247 
1248 	return MemoryContextAlloc(_SPI_current->savedcxt, size);
1249 }
1250 
1251 void *
SPI_repalloc(void * pointer,Size size)1252 SPI_repalloc(void *pointer, Size size)
1253 {
1254 	/* No longer need to worry which context chunk was in... */
1255 	return repalloc(pointer, size);
1256 }
1257 
1258 void
SPI_pfree(void * pointer)1259 SPI_pfree(void *pointer)
1260 {
1261 	/* No longer need to worry which context chunk was in... */
1262 	pfree(pointer);
1263 }
1264 
1265 Datum
SPI_datumTransfer(Datum value,bool typByVal,int typLen)1266 SPI_datumTransfer(Datum value, bool typByVal, int typLen)
1267 {
1268 	MemoryContext oldcxt;
1269 	Datum		result;
1270 
1271 	if (_SPI_current == NULL)
1272 		elog(ERROR, "SPI_datumTransfer called while not connected to SPI");
1273 
1274 	oldcxt = MemoryContextSwitchTo(_SPI_current->savedcxt);
1275 
1276 	result = datumTransfer(value, typByVal, typLen);
1277 
1278 	MemoryContextSwitchTo(oldcxt);
1279 
1280 	return result;
1281 }
1282 
1283 void
SPI_freetuple(HeapTuple tuple)1284 SPI_freetuple(HeapTuple tuple)
1285 {
1286 	/* No longer need to worry which context tuple was in... */
1287 	heap_freetuple(tuple);
1288 }
1289 
1290 void
SPI_freetuptable(SPITupleTable * tuptable)1291 SPI_freetuptable(SPITupleTable *tuptable)
1292 {
1293 	bool		found = false;
1294 
1295 	/* ignore call if NULL pointer */
1296 	if (tuptable == NULL)
1297 		return;
1298 
1299 	/*
1300 	 * Search only the topmost SPI context for a matching tuple table.
1301 	 */
1302 	if (_SPI_current != NULL)
1303 	{
1304 		slist_mutable_iter siter;
1305 
1306 		/* find tuptable in active list, then remove it */
1307 		slist_foreach_modify(siter, &_SPI_current->tuptables)
1308 		{
1309 			SPITupleTable *tt;
1310 
1311 			tt = slist_container(SPITupleTable, next, siter.cur);
1312 			if (tt == tuptable)
1313 			{
1314 				slist_delete_current(&siter);
1315 				found = true;
1316 				break;
1317 			}
1318 		}
1319 	}
1320 
1321 	/*
1322 	 * Refuse the deletion if we didn't find it in the topmost SPI context.
1323 	 * This is primarily a guard against double deletion, but might prevent
1324 	 * other errors as well.  Since the worst consequence of not deleting a
1325 	 * tuptable would be a transient memory leak, this is just a WARNING.
1326 	 */
1327 	if (!found)
1328 	{
1329 		elog(WARNING, "attempt to delete invalid SPITupleTable %p", tuptable);
1330 		return;
1331 	}
1332 
1333 	/* for safety, reset global variables that might point at tuptable */
1334 	if (tuptable == _SPI_current->tuptable)
1335 		_SPI_current->tuptable = NULL;
1336 	if (tuptable == SPI_tuptable)
1337 		SPI_tuptable = NULL;
1338 
1339 	/* release all memory belonging to tuptable */
1340 	MemoryContextDelete(tuptable->tuptabcxt);
1341 }
1342 
1343 
1344 /*
1345  * SPI_cursor_open()
1346  *
1347  *	Open a prepared SPI plan as a portal
1348  */
1349 Portal
SPI_cursor_open(const char * name,SPIPlanPtr plan,Datum * Values,const char * Nulls,bool read_only)1350 SPI_cursor_open(const char *name, SPIPlanPtr plan,
1351 				Datum *Values, const char *Nulls,
1352 				bool read_only)
1353 {
1354 	Portal		portal;
1355 	ParamListInfo paramLI;
1356 
1357 	/* build transient ParamListInfo in caller's context */
1358 	paramLI = _SPI_convert_params(plan->nargs, plan->argtypes,
1359 								  Values, Nulls);
1360 
1361 	portal = SPI_cursor_open_internal(name, plan, paramLI, read_only);
1362 
1363 	/* done with the transient ParamListInfo */
1364 	if (paramLI)
1365 		pfree(paramLI);
1366 
1367 	return portal;
1368 }
1369 
1370 
1371 /*
1372  * SPI_cursor_open_with_args()
1373  *
1374  * Parse and plan a query and open it as a portal.
1375  */
1376 Portal
SPI_cursor_open_with_args(const char * name,const char * src,int nargs,Oid * argtypes,Datum * Values,const char * Nulls,bool read_only,int cursorOptions)1377 SPI_cursor_open_with_args(const char *name,
1378 						  const char *src,
1379 						  int nargs, Oid *argtypes,
1380 						  Datum *Values, const char *Nulls,
1381 						  bool read_only, int cursorOptions)
1382 {
1383 	Portal		result;
1384 	_SPI_plan	plan;
1385 	ParamListInfo paramLI;
1386 
1387 	if (src == NULL || nargs < 0)
1388 		elog(ERROR, "SPI_cursor_open_with_args called with invalid arguments");
1389 
1390 	if (nargs > 0 && (argtypes == NULL || Values == NULL))
1391 		elog(ERROR, "SPI_cursor_open_with_args called with missing parameters");
1392 
1393 	SPI_result = _SPI_begin_call(true);
1394 	if (SPI_result < 0)
1395 		elog(ERROR, "SPI_cursor_open_with_args called while not connected");
1396 
1397 	memset(&plan, 0, sizeof(_SPI_plan));
1398 	plan.magic = _SPI_PLAN_MAGIC;
1399 	plan.parse_mode = RAW_PARSE_DEFAULT;
1400 	plan.cursor_options = cursorOptions;
1401 	plan.nargs = nargs;
1402 	plan.argtypes = argtypes;
1403 	plan.parserSetup = NULL;
1404 	plan.parserSetupArg = NULL;
1405 
1406 	/* build transient ParamListInfo in executor context */
1407 	paramLI = _SPI_convert_params(nargs, argtypes,
1408 								  Values, Nulls);
1409 
1410 	_SPI_prepare_plan(src, &plan);
1411 
1412 	/* We needn't copy the plan; SPI_cursor_open_internal will do so */
1413 
1414 	result = SPI_cursor_open_internal(name, &plan, paramLI, read_only);
1415 
1416 	/* And clean up */
1417 	_SPI_end_call(true);
1418 
1419 	return result;
1420 }
1421 
1422 
1423 /*
1424  * SPI_cursor_open_with_paramlist()
1425  *
1426  *	Same as SPI_cursor_open except that parameters (if any) are passed
1427  *	as a ParamListInfo, which supports dynamic parameter set determination
1428  */
1429 Portal
SPI_cursor_open_with_paramlist(const char * name,SPIPlanPtr plan,ParamListInfo params,bool read_only)1430 SPI_cursor_open_with_paramlist(const char *name, SPIPlanPtr plan,
1431 							   ParamListInfo params, bool read_only)
1432 {
1433 	return SPI_cursor_open_internal(name, plan, params, read_only);
1434 }
1435 
1436 /* Parse a query and open it as a cursor */
1437 Portal
SPI_cursor_parse_open(const char * name,const char * src,const SPIParseOpenOptions * options)1438 SPI_cursor_parse_open(const char *name,
1439 					  const char *src,
1440 					  const SPIParseOpenOptions *options)
1441 {
1442 	Portal		result;
1443 	_SPI_plan	plan;
1444 
1445 	if (src == NULL || options == NULL)
1446 		elog(ERROR, "SPI_cursor_parse_open called with invalid arguments");
1447 
1448 	SPI_result = _SPI_begin_call(true);
1449 	if (SPI_result < 0)
1450 		elog(ERROR, "SPI_cursor_parse_open called while not connected");
1451 
1452 	memset(&plan, 0, sizeof(_SPI_plan));
1453 	plan.magic = _SPI_PLAN_MAGIC;
1454 	plan.parse_mode = RAW_PARSE_DEFAULT;
1455 	plan.cursor_options = options->cursorOptions;
1456 	if (options->params)
1457 	{
1458 		plan.parserSetup = options->params->parserSetup;
1459 		plan.parserSetupArg = options->params->parserSetupArg;
1460 	}
1461 
1462 	_SPI_prepare_plan(src, &plan);
1463 
1464 	/* We needn't copy the plan; SPI_cursor_open_internal will do so */
1465 
1466 	result = SPI_cursor_open_internal(name, &plan,
1467 									  options->params, options->read_only);
1468 
1469 	/* And clean up */
1470 	_SPI_end_call(true);
1471 
1472 	return result;
1473 }
1474 
1475 
1476 /*
1477  * SPI_cursor_open_internal()
1478  *
1479  *	Common code for SPI_cursor_open variants
1480  */
1481 static Portal
SPI_cursor_open_internal(const char * name,SPIPlanPtr plan,ParamListInfo paramLI,bool read_only)1482 SPI_cursor_open_internal(const char *name, SPIPlanPtr plan,
1483 						 ParamListInfo paramLI, bool read_only)
1484 {
1485 	CachedPlanSource *plansource;
1486 	CachedPlan *cplan;
1487 	List	   *stmt_list;
1488 	char	   *query_string;
1489 	Snapshot	snapshot;
1490 	MemoryContext oldcontext;
1491 	Portal		portal;
1492 	SPICallbackArg spicallbackarg;
1493 	ErrorContextCallback spierrcontext;
1494 
1495 	/*
1496 	 * Check that the plan is something the Portal code will special-case as
1497 	 * returning one tupleset.
1498 	 */
1499 	if (!SPI_is_cursor_plan(plan))
1500 	{
1501 		/* try to give a good error message */
1502 		const char *cmdtag;
1503 
1504 		if (list_length(plan->plancache_list) != 1)
1505 			ereport(ERROR,
1506 					(errcode(ERRCODE_INVALID_CURSOR_DEFINITION),
1507 					 errmsg("cannot open multi-query plan as cursor")));
1508 		plansource = (CachedPlanSource *) linitial(plan->plancache_list);
1509 		/* A SELECT that fails SPI_is_cursor_plan() must be SELECT INTO */
1510 		if (plansource->commandTag == CMDTAG_SELECT)
1511 			cmdtag = "SELECT INTO";
1512 		else
1513 			cmdtag = GetCommandTagName(plansource->commandTag);
1514 		ereport(ERROR,
1515 				(errcode(ERRCODE_INVALID_CURSOR_DEFINITION),
1516 		/* translator: %s is name of a SQL command, eg INSERT */
1517 				 errmsg("cannot open %s query as cursor", cmdtag)));
1518 	}
1519 
1520 	Assert(list_length(plan->plancache_list) == 1);
1521 	plansource = (CachedPlanSource *) linitial(plan->plancache_list);
1522 
1523 	/* Push the SPI stack */
1524 	if (_SPI_begin_call(true) < 0)
1525 		elog(ERROR, "SPI_cursor_open called while not connected");
1526 
1527 	/* Reset SPI result (note we deliberately don't touch lastoid) */
1528 	SPI_processed = 0;
1529 	SPI_tuptable = NULL;
1530 	_SPI_current->processed = 0;
1531 	_SPI_current->tuptable = NULL;
1532 
1533 	/* Create the portal */
1534 	if (name == NULL || name[0] == '\0')
1535 	{
1536 		/* Use a random nonconflicting name */
1537 		portal = CreateNewPortal();
1538 	}
1539 	else
1540 	{
1541 		/* In this path, error if portal of same name already exists */
1542 		portal = CreatePortal(name, false, false);
1543 	}
1544 
1545 	/* Copy the plan's query string into the portal */
1546 	query_string = MemoryContextStrdup(portal->portalContext,
1547 									   plansource->query_string);
1548 
1549 	/*
1550 	 * Setup error traceback support for ereport(), in case GetCachedPlan
1551 	 * throws an error.
1552 	 */
1553 	spicallbackarg.query = plansource->query_string;
1554 	spicallbackarg.mode = plan->parse_mode;
1555 	spierrcontext.callback = _SPI_error_callback;
1556 	spierrcontext.arg = &spicallbackarg;
1557 	spierrcontext.previous = error_context_stack;
1558 	error_context_stack = &spierrcontext;
1559 
1560 	/*
1561 	 * Note: for a saved plan, we mustn't have any failure occur between
1562 	 * GetCachedPlan and PortalDefineQuery; that would result in leaking our
1563 	 * plancache refcount.
1564 	 */
1565 
1566 	/* Replan if needed, and increment plan refcount for portal */
1567 	cplan = GetCachedPlan(plansource, paramLI, NULL, _SPI_current->queryEnv);
1568 	stmt_list = cplan->stmt_list;
1569 
1570 	if (!plan->saved)
1571 	{
1572 		/*
1573 		 * We don't want the portal to depend on an unsaved CachedPlanSource,
1574 		 * so must copy the plan into the portal's context.  An error here
1575 		 * will result in leaking our refcount on the plan, but it doesn't
1576 		 * matter because the plan is unsaved and hence transient anyway.
1577 		 */
1578 		oldcontext = MemoryContextSwitchTo(portal->portalContext);
1579 		stmt_list = copyObject(stmt_list);
1580 		MemoryContextSwitchTo(oldcontext);
1581 		ReleaseCachedPlan(cplan, NULL);
1582 		cplan = NULL;			/* portal shouldn't depend on cplan */
1583 	}
1584 
1585 	/*
1586 	 * Set up the portal.
1587 	 */
1588 	PortalDefineQuery(portal,
1589 					  NULL,		/* no statement name */
1590 					  query_string,
1591 					  plansource->commandTag,
1592 					  stmt_list,
1593 					  cplan);
1594 
1595 	/*
1596 	 * Set up options for portal.  Default SCROLL type is chosen the same way
1597 	 * as PerformCursorOpen does it.
1598 	 */
1599 	portal->cursorOptions = plan->cursor_options;
1600 	if (!(portal->cursorOptions & (CURSOR_OPT_SCROLL | CURSOR_OPT_NO_SCROLL)))
1601 	{
1602 		if (list_length(stmt_list) == 1 &&
1603 			linitial_node(PlannedStmt, stmt_list)->commandType != CMD_UTILITY &&
1604 			linitial_node(PlannedStmt, stmt_list)->rowMarks == NIL &&
1605 			ExecSupportsBackwardScan(linitial_node(PlannedStmt, stmt_list)->planTree))
1606 			portal->cursorOptions |= CURSOR_OPT_SCROLL;
1607 		else
1608 			portal->cursorOptions |= CURSOR_OPT_NO_SCROLL;
1609 	}
1610 
1611 	/*
1612 	 * Disallow SCROLL with SELECT FOR UPDATE.  This is not redundant with the
1613 	 * check in transformDeclareCursorStmt because the cursor options might
1614 	 * not have come through there.
1615 	 */
1616 	if (portal->cursorOptions & CURSOR_OPT_SCROLL)
1617 	{
1618 		if (list_length(stmt_list) == 1 &&
1619 			linitial_node(PlannedStmt, stmt_list)->commandType != CMD_UTILITY &&
1620 			linitial_node(PlannedStmt, stmt_list)->rowMarks != NIL)
1621 			ereport(ERROR,
1622 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1623 					 errmsg("DECLARE SCROLL CURSOR ... FOR UPDATE/SHARE is not supported"),
1624 					 errdetail("Scrollable cursors must be READ ONLY.")));
1625 	}
1626 
1627 	/* Make current query environment available to portal at execution time. */
1628 	portal->queryEnv = _SPI_current->queryEnv;
1629 
1630 	/*
1631 	 * If told to be read-only, we'd better check for read-only queries. This
1632 	 * can't be done earlier because we need to look at the finished, planned
1633 	 * queries.  (In particular, we don't want to do it between GetCachedPlan
1634 	 * and PortalDefineQuery, because throwing an error between those steps
1635 	 * would result in leaking our plancache refcount.)
1636 	 */
1637 	if (read_only)
1638 	{
1639 		ListCell   *lc;
1640 
1641 		foreach(lc, stmt_list)
1642 		{
1643 			PlannedStmt *pstmt = lfirst_node(PlannedStmt, lc);
1644 
1645 			if (!CommandIsReadOnly(pstmt))
1646 				ereport(ERROR,
1647 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1648 				/* translator: %s is a SQL statement name */
1649 						 errmsg("%s is not allowed in a non-volatile function",
1650 								CreateCommandName((Node *) pstmt))));
1651 		}
1652 	}
1653 
1654 	/* Set up the snapshot to use. */
1655 	if (read_only)
1656 		snapshot = GetActiveSnapshot();
1657 	else
1658 	{
1659 		CommandCounterIncrement();
1660 		snapshot = GetTransactionSnapshot();
1661 	}
1662 
1663 	/*
1664 	 * If the plan has parameters, copy them into the portal.  Note that this
1665 	 * must be done after revalidating the plan, because in dynamic parameter
1666 	 * cases the set of parameters could have changed during re-parsing.
1667 	 */
1668 	if (paramLI)
1669 	{
1670 		oldcontext = MemoryContextSwitchTo(portal->portalContext);
1671 		paramLI = copyParamList(paramLI);
1672 		MemoryContextSwitchTo(oldcontext);
1673 	}
1674 
1675 	/*
1676 	 * Start portal execution.
1677 	 */
1678 	PortalStart(portal, paramLI, 0, snapshot);
1679 
1680 	Assert(portal->strategy != PORTAL_MULTI_QUERY);
1681 
1682 	/* Pop the error context stack */
1683 	error_context_stack = spierrcontext.previous;
1684 
1685 	/* Pop the SPI stack */
1686 	_SPI_end_call(true);
1687 
1688 	/* Return the created portal */
1689 	return portal;
1690 }
1691 
1692 
1693 /*
1694  * SPI_cursor_find()
1695  *
1696  *	Find the portal of an existing open cursor
1697  */
1698 Portal
SPI_cursor_find(const char * name)1699 SPI_cursor_find(const char *name)
1700 {
1701 	return GetPortalByName(name);
1702 }
1703 
1704 
1705 /*
1706  * SPI_cursor_fetch()
1707  *
1708  *	Fetch rows in a cursor
1709  */
1710 void
SPI_cursor_fetch(Portal portal,bool forward,long count)1711 SPI_cursor_fetch(Portal portal, bool forward, long count)
1712 {
1713 	_SPI_cursor_operation(portal,
1714 						  forward ? FETCH_FORWARD : FETCH_BACKWARD, count,
1715 						  CreateDestReceiver(DestSPI));
1716 	/* we know that the DestSPI receiver doesn't need a destroy call */
1717 }
1718 
1719 
1720 /*
1721  * SPI_cursor_move()
1722  *
1723  *	Move in a cursor
1724  */
1725 void
SPI_cursor_move(Portal portal,bool forward,long count)1726 SPI_cursor_move(Portal portal, bool forward, long count)
1727 {
1728 	_SPI_cursor_operation(portal,
1729 						  forward ? FETCH_FORWARD : FETCH_BACKWARD, count,
1730 						  None_Receiver);
1731 }
1732 
1733 
1734 /*
1735  * SPI_scroll_cursor_fetch()
1736  *
1737  *	Fetch rows in a scrollable cursor
1738  */
1739 void
SPI_scroll_cursor_fetch(Portal portal,FetchDirection direction,long count)1740 SPI_scroll_cursor_fetch(Portal portal, FetchDirection direction, long count)
1741 {
1742 	_SPI_cursor_operation(portal,
1743 						  direction, count,
1744 						  CreateDestReceiver(DestSPI));
1745 	/* we know that the DestSPI receiver doesn't need a destroy call */
1746 }
1747 
1748 
1749 /*
1750  * SPI_scroll_cursor_move()
1751  *
1752  *	Move in a scrollable cursor
1753  */
1754 void
SPI_scroll_cursor_move(Portal portal,FetchDirection direction,long count)1755 SPI_scroll_cursor_move(Portal portal, FetchDirection direction, long count)
1756 {
1757 	_SPI_cursor_operation(portal, direction, count, None_Receiver);
1758 }
1759 
1760 
1761 /*
1762  * SPI_cursor_close()
1763  *
1764  *	Close a cursor
1765  */
1766 void
SPI_cursor_close(Portal portal)1767 SPI_cursor_close(Portal portal)
1768 {
1769 	if (!PortalIsValid(portal))
1770 		elog(ERROR, "invalid portal in SPI cursor operation");
1771 
1772 	PortalDrop(portal, false);
1773 }
1774 
1775 /*
1776  * Returns the Oid representing the type id for argument at argIndex. First
1777  * parameter is at index zero.
1778  */
1779 Oid
SPI_getargtypeid(SPIPlanPtr plan,int argIndex)1780 SPI_getargtypeid(SPIPlanPtr plan, int argIndex)
1781 {
1782 	if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC ||
1783 		argIndex < 0 || argIndex >= plan->nargs)
1784 	{
1785 		SPI_result = SPI_ERROR_ARGUMENT;
1786 		return InvalidOid;
1787 	}
1788 	return plan->argtypes[argIndex];
1789 }
1790 
1791 /*
1792  * Returns the number of arguments for the prepared plan.
1793  */
1794 int
SPI_getargcount(SPIPlanPtr plan)1795 SPI_getargcount(SPIPlanPtr plan)
1796 {
1797 	if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC)
1798 	{
1799 		SPI_result = SPI_ERROR_ARGUMENT;
1800 		return -1;
1801 	}
1802 	return plan->nargs;
1803 }
1804 
1805 /*
1806  * Returns true if the plan contains exactly one command
1807  * and that command returns tuples to the caller (eg, SELECT or
1808  * INSERT ... RETURNING, but not SELECT ... INTO). In essence,
1809  * the result indicates if the command can be used with SPI_cursor_open
1810  *
1811  * Parameters
1812  *	  plan: A plan previously prepared using SPI_prepare
1813  */
1814 bool
SPI_is_cursor_plan(SPIPlanPtr plan)1815 SPI_is_cursor_plan(SPIPlanPtr plan)
1816 {
1817 	CachedPlanSource *plansource;
1818 
1819 	if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC)
1820 	{
1821 		SPI_result = SPI_ERROR_ARGUMENT;
1822 		return false;
1823 	}
1824 
1825 	if (list_length(plan->plancache_list) != 1)
1826 	{
1827 		SPI_result = 0;
1828 		return false;			/* not exactly 1 pre-rewrite command */
1829 	}
1830 	plansource = (CachedPlanSource *) linitial(plan->plancache_list);
1831 
1832 	/*
1833 	 * We used to force revalidation of the cached plan here, but that seems
1834 	 * unnecessary: invalidation could mean a change in the rowtype of the
1835 	 * tuples returned by a plan, but not whether it returns tuples at all.
1836 	 */
1837 	SPI_result = 0;
1838 
1839 	/* Does it return tuples? */
1840 	if (plansource->resultDesc)
1841 		return true;
1842 
1843 	return false;
1844 }
1845 
1846 /*
1847  * SPI_plan_is_valid --- test whether a SPI plan is currently valid
1848  * (that is, not marked as being in need of revalidation).
1849  *
1850  * See notes for CachedPlanIsValid before using this.
1851  */
1852 bool
SPI_plan_is_valid(SPIPlanPtr plan)1853 SPI_plan_is_valid(SPIPlanPtr plan)
1854 {
1855 	ListCell   *lc;
1856 
1857 	Assert(plan->magic == _SPI_PLAN_MAGIC);
1858 
1859 	foreach(lc, plan->plancache_list)
1860 	{
1861 		CachedPlanSource *plansource = (CachedPlanSource *) lfirst(lc);
1862 
1863 		if (!CachedPlanIsValid(plansource))
1864 			return false;
1865 	}
1866 	return true;
1867 }
1868 
1869 /*
1870  * SPI_result_code_string --- convert any SPI return code to a string
1871  *
1872  * This is often useful in error messages.  Most callers will probably
1873  * only pass negative (error-case) codes, but for generality we recognize
1874  * the success codes too.
1875  */
1876 const char *
SPI_result_code_string(int code)1877 SPI_result_code_string(int code)
1878 {
1879 	static char buf[64];
1880 
1881 	switch (code)
1882 	{
1883 		case SPI_ERROR_CONNECT:
1884 			return "SPI_ERROR_CONNECT";
1885 		case SPI_ERROR_COPY:
1886 			return "SPI_ERROR_COPY";
1887 		case SPI_ERROR_OPUNKNOWN:
1888 			return "SPI_ERROR_OPUNKNOWN";
1889 		case SPI_ERROR_UNCONNECTED:
1890 			return "SPI_ERROR_UNCONNECTED";
1891 		case SPI_ERROR_ARGUMENT:
1892 			return "SPI_ERROR_ARGUMENT";
1893 		case SPI_ERROR_PARAM:
1894 			return "SPI_ERROR_PARAM";
1895 		case SPI_ERROR_TRANSACTION:
1896 			return "SPI_ERROR_TRANSACTION";
1897 		case SPI_ERROR_NOATTRIBUTE:
1898 			return "SPI_ERROR_NOATTRIBUTE";
1899 		case SPI_ERROR_NOOUTFUNC:
1900 			return "SPI_ERROR_NOOUTFUNC";
1901 		case SPI_ERROR_TYPUNKNOWN:
1902 			return "SPI_ERROR_TYPUNKNOWN";
1903 		case SPI_ERROR_REL_DUPLICATE:
1904 			return "SPI_ERROR_REL_DUPLICATE";
1905 		case SPI_ERROR_REL_NOT_FOUND:
1906 			return "SPI_ERROR_REL_NOT_FOUND";
1907 		case SPI_OK_CONNECT:
1908 			return "SPI_OK_CONNECT";
1909 		case SPI_OK_FINISH:
1910 			return "SPI_OK_FINISH";
1911 		case SPI_OK_FETCH:
1912 			return "SPI_OK_FETCH";
1913 		case SPI_OK_UTILITY:
1914 			return "SPI_OK_UTILITY";
1915 		case SPI_OK_SELECT:
1916 			return "SPI_OK_SELECT";
1917 		case SPI_OK_SELINTO:
1918 			return "SPI_OK_SELINTO";
1919 		case SPI_OK_INSERT:
1920 			return "SPI_OK_INSERT";
1921 		case SPI_OK_DELETE:
1922 			return "SPI_OK_DELETE";
1923 		case SPI_OK_UPDATE:
1924 			return "SPI_OK_UPDATE";
1925 		case SPI_OK_CURSOR:
1926 			return "SPI_OK_CURSOR";
1927 		case SPI_OK_INSERT_RETURNING:
1928 			return "SPI_OK_INSERT_RETURNING";
1929 		case SPI_OK_DELETE_RETURNING:
1930 			return "SPI_OK_DELETE_RETURNING";
1931 		case SPI_OK_UPDATE_RETURNING:
1932 			return "SPI_OK_UPDATE_RETURNING";
1933 		case SPI_OK_REWRITTEN:
1934 			return "SPI_OK_REWRITTEN";
1935 		case SPI_OK_REL_REGISTER:
1936 			return "SPI_OK_REL_REGISTER";
1937 		case SPI_OK_REL_UNREGISTER:
1938 			return "SPI_OK_REL_UNREGISTER";
1939 	}
1940 	/* Unrecognized code ... return something useful ... */
1941 	sprintf(buf, "Unrecognized SPI code %d", code);
1942 	return buf;
1943 }
1944 
1945 /*
1946  * SPI_plan_get_plan_sources --- get a SPI plan's underlying list of
1947  * CachedPlanSources.
1948  *
1949  * This is exported so that PL/pgSQL can use it (this beats letting PL/pgSQL
1950  * look directly into the SPIPlan for itself).  It's not documented in
1951  * spi.sgml because we'd just as soon not have too many places using this.
1952  */
1953 List *
SPI_plan_get_plan_sources(SPIPlanPtr plan)1954 SPI_plan_get_plan_sources(SPIPlanPtr plan)
1955 {
1956 	Assert(plan->magic == _SPI_PLAN_MAGIC);
1957 	return plan->plancache_list;
1958 }
1959 
1960 /*
1961  * SPI_plan_get_cached_plan --- get a SPI plan's generic CachedPlan,
1962  * if the SPI plan contains exactly one CachedPlanSource.  If not,
1963  * return NULL.
1964  *
1965  * The plan's refcount is incremented (and logged in CurrentResourceOwner,
1966  * if it's a saved plan).  Caller is responsible for doing ReleaseCachedPlan.
1967  *
1968  * This is exported so that PL/pgSQL can use it (this beats letting PL/pgSQL
1969  * look directly into the SPIPlan for itself).  It's not documented in
1970  * spi.sgml because we'd just as soon not have too many places using this.
1971  */
1972 CachedPlan *
SPI_plan_get_cached_plan(SPIPlanPtr plan)1973 SPI_plan_get_cached_plan(SPIPlanPtr plan)
1974 {
1975 	CachedPlanSource *plansource;
1976 	CachedPlan *cplan;
1977 	SPICallbackArg spicallbackarg;
1978 	ErrorContextCallback spierrcontext;
1979 
1980 	Assert(plan->magic == _SPI_PLAN_MAGIC);
1981 
1982 	/* Can't support one-shot plans here */
1983 	if (plan->oneshot)
1984 		return NULL;
1985 
1986 	/* Must have exactly one CachedPlanSource */
1987 	if (list_length(plan->plancache_list) != 1)
1988 		return NULL;
1989 	plansource = (CachedPlanSource *) linitial(plan->plancache_list);
1990 
1991 	/* Setup error traceback support for ereport() */
1992 	spicallbackarg.query = plansource->query_string;
1993 	spicallbackarg.mode = plan->parse_mode;
1994 	spierrcontext.callback = _SPI_error_callback;
1995 	spierrcontext.arg = &spicallbackarg;
1996 	spierrcontext.previous = error_context_stack;
1997 	error_context_stack = &spierrcontext;
1998 
1999 	/* Get the generic plan for the query */
2000 	cplan = GetCachedPlan(plansource, NULL,
2001 						  plan->saved ? CurrentResourceOwner : NULL,
2002 						  _SPI_current->queryEnv);
2003 	Assert(cplan == plansource->gplan);
2004 
2005 	/* Pop the error context stack */
2006 	error_context_stack = spierrcontext.previous;
2007 
2008 	return cplan;
2009 }
2010 
2011 
2012 /* =================== private functions =================== */
2013 
2014 /*
2015  * spi_dest_startup
2016  *		Initialize to receive tuples from Executor into SPITupleTable
2017  *		of current SPI procedure
2018  */
2019 void
spi_dest_startup(DestReceiver * self,int operation,TupleDesc typeinfo)2020 spi_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
2021 {
2022 	SPITupleTable *tuptable;
2023 	MemoryContext oldcxt;
2024 	MemoryContext tuptabcxt;
2025 
2026 	if (_SPI_current == NULL)
2027 		elog(ERROR, "spi_dest_startup called while not connected to SPI");
2028 
2029 	if (_SPI_current->tuptable != NULL)
2030 		elog(ERROR, "improper call to spi_dest_startup");
2031 
2032 	/* We create the tuple table context as a child of procCxt */
2033 
2034 	oldcxt = _SPI_procmem();	/* switch to procedure memory context */
2035 
2036 	tuptabcxt = AllocSetContextCreate(CurrentMemoryContext,
2037 									  "SPI TupTable",
2038 									  ALLOCSET_DEFAULT_SIZES);
2039 	MemoryContextSwitchTo(tuptabcxt);
2040 
2041 	_SPI_current->tuptable = tuptable = (SPITupleTable *)
2042 		palloc0(sizeof(SPITupleTable));
2043 	tuptable->tuptabcxt = tuptabcxt;
2044 	tuptable->subid = GetCurrentSubTransactionId();
2045 
2046 	/*
2047 	 * The tuptable is now valid enough to be freed by AtEOSubXact_SPI, so put
2048 	 * it onto the SPI context's tuptables list.  This will ensure it's not
2049 	 * leaked even in the unlikely event the following few lines fail.
2050 	 */
2051 	slist_push_head(&_SPI_current->tuptables, &tuptable->next);
2052 
2053 	/* set up initial allocations */
2054 	tuptable->alloced = 128;
2055 	tuptable->vals = (HeapTuple *) palloc(tuptable->alloced * sizeof(HeapTuple));
2056 	tuptable->numvals = 0;
2057 	tuptable->tupdesc = CreateTupleDescCopy(typeinfo);
2058 
2059 	MemoryContextSwitchTo(oldcxt);
2060 }
2061 
2062 /*
2063  * spi_printtup
2064  *		store tuple retrieved by Executor into SPITupleTable
2065  *		of current SPI procedure
2066  */
2067 bool
spi_printtup(TupleTableSlot * slot,DestReceiver * self)2068 spi_printtup(TupleTableSlot *slot, DestReceiver *self)
2069 {
2070 	SPITupleTable *tuptable;
2071 	MemoryContext oldcxt;
2072 
2073 	if (_SPI_current == NULL)
2074 		elog(ERROR, "spi_printtup called while not connected to SPI");
2075 
2076 	tuptable = _SPI_current->tuptable;
2077 	if (tuptable == NULL)
2078 		elog(ERROR, "improper call to spi_printtup");
2079 
2080 	oldcxt = MemoryContextSwitchTo(tuptable->tuptabcxt);
2081 
2082 	if (tuptable->numvals >= tuptable->alloced)
2083 	{
2084 		/* Double the size of the pointer array */
2085 		uint64		newalloced = tuptable->alloced * 2;
2086 
2087 		tuptable->vals = (HeapTuple *) repalloc_huge(tuptable->vals,
2088 													 newalloced * sizeof(HeapTuple));
2089 		tuptable->alloced = newalloced;
2090 	}
2091 
2092 	tuptable->vals[tuptable->numvals] = ExecCopySlotHeapTuple(slot);
2093 	(tuptable->numvals)++;
2094 
2095 	MemoryContextSwitchTo(oldcxt);
2096 
2097 	return true;
2098 }
2099 
2100 /*
2101  * Static functions
2102  */
2103 
2104 /*
2105  * Parse and analyze a querystring.
2106  *
2107  * At entry, plan->argtypes and plan->nargs (or alternatively plan->parserSetup
2108  * and plan->parserSetupArg) must be valid, as must plan->parse_mode and
2109  * plan->cursor_options.
2110  *
2111  * Results are stored into *plan (specifically, plan->plancache_list).
2112  * Note that the result data is all in CurrentMemoryContext or child contexts
2113  * thereof; in practice this means it is in the SPI executor context, and
2114  * what we are creating is a "temporary" SPIPlan.  Cruft generated during
2115  * parsing is also left in CurrentMemoryContext.
2116  */
2117 static void
_SPI_prepare_plan(const char * src,SPIPlanPtr plan)2118 _SPI_prepare_plan(const char *src, SPIPlanPtr plan)
2119 {
2120 	List	   *raw_parsetree_list;
2121 	List	   *plancache_list;
2122 	ListCell   *list_item;
2123 	SPICallbackArg spicallbackarg;
2124 	ErrorContextCallback spierrcontext;
2125 
2126 	/*
2127 	 * Setup error traceback support for ereport()
2128 	 */
2129 	spicallbackarg.query = src;
2130 	spicallbackarg.mode = plan->parse_mode;
2131 	spierrcontext.callback = _SPI_error_callback;
2132 	spierrcontext.arg = &spicallbackarg;
2133 	spierrcontext.previous = error_context_stack;
2134 	error_context_stack = &spierrcontext;
2135 
2136 	/*
2137 	 * Parse the request string into a list of raw parse trees.
2138 	 */
2139 	raw_parsetree_list = raw_parser(src, plan->parse_mode);
2140 
2141 	/*
2142 	 * Do parse analysis and rule rewrite for each raw parsetree, storing the
2143 	 * results into unsaved plancache entries.
2144 	 */
2145 	plancache_list = NIL;
2146 
2147 	foreach(list_item, raw_parsetree_list)
2148 	{
2149 		RawStmt    *parsetree = lfirst_node(RawStmt, list_item);
2150 		List	   *stmt_list;
2151 		CachedPlanSource *plansource;
2152 
2153 		/*
2154 		 * Create the CachedPlanSource before we do parse analysis, since it
2155 		 * needs to see the unmodified raw parse tree.
2156 		 */
2157 		plansource = CreateCachedPlan(parsetree,
2158 									  src,
2159 									  CreateCommandTag(parsetree->stmt));
2160 
2161 		/*
2162 		 * Parameter datatypes are driven by parserSetup hook if provided,
2163 		 * otherwise we use the fixed parameter list.
2164 		 */
2165 		if (plan->parserSetup != NULL)
2166 		{
2167 			Assert(plan->nargs == 0);
2168 			stmt_list = pg_analyze_and_rewrite_params(parsetree,
2169 													  src,
2170 													  plan->parserSetup,
2171 													  plan->parserSetupArg,
2172 													  _SPI_current->queryEnv);
2173 		}
2174 		else
2175 		{
2176 			stmt_list = pg_analyze_and_rewrite(parsetree,
2177 											   src,
2178 											   plan->argtypes,
2179 											   plan->nargs,
2180 											   _SPI_current->queryEnv);
2181 		}
2182 
2183 		/* Finish filling in the CachedPlanSource */
2184 		CompleteCachedPlan(plansource,
2185 						   stmt_list,
2186 						   NULL,
2187 						   plan->argtypes,
2188 						   plan->nargs,
2189 						   plan->parserSetup,
2190 						   plan->parserSetupArg,
2191 						   plan->cursor_options,
2192 						   false);	/* not fixed result */
2193 
2194 		plancache_list = lappend(plancache_list, plansource);
2195 	}
2196 
2197 	plan->plancache_list = plancache_list;
2198 	plan->oneshot = false;
2199 
2200 	/*
2201 	 * Pop the error context stack
2202 	 */
2203 	error_context_stack = spierrcontext.previous;
2204 }
2205 
2206 /*
2207  * Parse, but don't analyze, a querystring.
2208  *
2209  * This is a stripped-down version of _SPI_prepare_plan that only does the
2210  * initial raw parsing.  It creates "one shot" CachedPlanSources
2211  * that still require parse analysis before execution is possible.
2212  *
2213  * The advantage of using the "one shot" form of CachedPlanSource is that
2214  * we eliminate data copying and invalidation overhead.  Postponing parse
2215  * analysis also prevents issues if some of the raw parsetrees are DDL
2216  * commands that affect validity of later parsetrees.  Both of these
2217  * attributes are good things for SPI_execute() and similar cases.
2218  *
2219  * Results are stored into *plan (specifically, plan->plancache_list).
2220  * Note that the result data is all in CurrentMemoryContext or child contexts
2221  * thereof; in practice this means it is in the SPI executor context, and
2222  * what we are creating is a "temporary" SPIPlan.  Cruft generated during
2223  * parsing is also left in CurrentMemoryContext.
2224  */
2225 static void
_SPI_prepare_oneshot_plan(const char * src,SPIPlanPtr plan)2226 _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan)
2227 {
2228 	List	   *raw_parsetree_list;
2229 	List	   *plancache_list;
2230 	ListCell   *list_item;
2231 	SPICallbackArg spicallbackarg;
2232 	ErrorContextCallback spierrcontext;
2233 
2234 	/*
2235 	 * Setup error traceback support for ereport()
2236 	 */
2237 	spicallbackarg.query = src;
2238 	spicallbackarg.mode = plan->parse_mode;
2239 	spierrcontext.callback = _SPI_error_callback;
2240 	spierrcontext.arg = &spicallbackarg;
2241 	spierrcontext.previous = error_context_stack;
2242 	error_context_stack = &spierrcontext;
2243 
2244 	/*
2245 	 * Parse the request string into a list of raw parse trees.
2246 	 */
2247 	raw_parsetree_list = raw_parser(src, plan->parse_mode);
2248 
2249 	/*
2250 	 * Construct plancache entries, but don't do parse analysis yet.
2251 	 */
2252 	plancache_list = NIL;
2253 
2254 	foreach(list_item, raw_parsetree_list)
2255 	{
2256 		RawStmt    *parsetree = lfirst_node(RawStmt, list_item);
2257 		CachedPlanSource *plansource;
2258 
2259 		plansource = CreateOneShotCachedPlan(parsetree,
2260 											 src,
2261 											 CreateCommandTag(parsetree->stmt));
2262 
2263 		plancache_list = lappend(plancache_list, plansource);
2264 	}
2265 
2266 	plan->plancache_list = plancache_list;
2267 	plan->oneshot = true;
2268 
2269 	/*
2270 	 * Pop the error context stack
2271 	 */
2272 	error_context_stack = spierrcontext.previous;
2273 }
2274 
2275 /*
2276  * _SPI_execute_plan: execute the given plan with the given options
2277  *
2278  * options contains options accessible from outside SPI:
2279  * params: parameter values to pass to query
2280  * read_only: true for read-only execution (no CommandCounterIncrement)
2281  * allow_nonatomic: true to allow nonatomic CALL/DO execution
2282  * must_return_tuples: throw error if query doesn't return tuples
2283  * tcount: execution tuple-count limit, or 0 for none
2284  * dest: DestReceiver to receive output, or NULL for normal SPI output
2285  * owner: ResourceOwner that will be used to hold refcount on plan;
2286  *		if NULL, CurrentResourceOwner is used (ignored for non-saved plan)
2287  *
2288  * Additional, only-internally-accessible options:
2289  * snapshot: query snapshot to use, or InvalidSnapshot for the normal
2290  *		behavior of taking a new snapshot for each query.
2291  * crosscheck_snapshot: for RI use, all others pass InvalidSnapshot
2292  * fire_triggers: true to fire AFTER triggers at end of query (normal case);
2293  *		false means any AFTER triggers are postponed to end of outer query
2294  */
2295 static int
_SPI_execute_plan(SPIPlanPtr plan,const SPIExecuteOptions * options,Snapshot snapshot,Snapshot crosscheck_snapshot,bool fire_triggers)2296 _SPI_execute_plan(SPIPlanPtr plan, const SPIExecuteOptions *options,
2297 				  Snapshot snapshot, Snapshot crosscheck_snapshot,
2298 				  bool fire_triggers)
2299 {
2300 	int			my_res = 0;
2301 	uint64		my_processed = 0;
2302 	SPITupleTable *my_tuptable = NULL;
2303 	int			res = 0;
2304 	bool		pushed_active_snap = false;
2305 	ResourceOwner plan_owner = options->owner;
2306 	SPICallbackArg spicallbackarg;
2307 	ErrorContextCallback spierrcontext;
2308 	CachedPlan *cplan = NULL;
2309 	ListCell   *lc1;
2310 
2311 	/*
2312 	 * Setup error traceback support for ereport()
2313 	 */
2314 	spicallbackarg.query = NULL;	/* we'll fill this below */
2315 	spicallbackarg.mode = plan->parse_mode;
2316 	spierrcontext.callback = _SPI_error_callback;
2317 	spierrcontext.arg = &spicallbackarg;
2318 	spierrcontext.previous = error_context_stack;
2319 	error_context_stack = &spierrcontext;
2320 
2321 	/*
2322 	 * We support four distinct snapshot management behaviors:
2323 	 *
2324 	 * snapshot != InvalidSnapshot, read_only = true: use exactly the given
2325 	 * snapshot.
2326 	 *
2327 	 * snapshot != InvalidSnapshot, read_only = false: use the given snapshot,
2328 	 * modified by advancing its command ID before each querytree.
2329 	 *
2330 	 * snapshot == InvalidSnapshot, read_only = true: use the entry-time
2331 	 * ActiveSnapshot, if any (if there isn't one, we run with no snapshot).
2332 	 *
2333 	 * snapshot == InvalidSnapshot, read_only = false: take a full new
2334 	 * snapshot for each user command, and advance its command ID before each
2335 	 * querytree within the command.
2336 	 *
2337 	 * In the first two cases, we can just push the snap onto the stack once
2338 	 * for the whole plan list.
2339 	 *
2340 	 * Note that snapshot != InvalidSnapshot implies an atomic execution
2341 	 * context.
2342 	 */
2343 	if (snapshot != InvalidSnapshot)
2344 	{
2345 		Assert(!options->allow_nonatomic);
2346 		if (options->read_only)
2347 		{
2348 			PushActiveSnapshot(snapshot);
2349 			pushed_active_snap = true;
2350 		}
2351 		else
2352 		{
2353 			/* Make sure we have a private copy of the snapshot to modify */
2354 			PushCopiedSnapshot(snapshot);
2355 			pushed_active_snap = true;
2356 		}
2357 	}
2358 
2359 	/*
2360 	 * Ensure that we have a resource owner if plan is saved, and not if it
2361 	 * isn't.
2362 	 */
2363 	if (!plan->saved)
2364 		plan_owner = NULL;
2365 	else if (plan_owner == NULL)
2366 		plan_owner = CurrentResourceOwner;
2367 
2368 	/*
2369 	 * We interpret must_return_tuples as "there must be at least one query,
2370 	 * and all of them must return tuples".  This is a bit laxer than
2371 	 * SPI_is_cursor_plan's check, but there seems no reason to enforce that
2372 	 * there be only one query.
2373 	 */
2374 	if (options->must_return_tuples && plan->plancache_list == NIL)
2375 		ereport(ERROR,
2376 				(errcode(ERRCODE_SYNTAX_ERROR),
2377 				 errmsg("empty query does not return tuples")));
2378 
2379 	foreach(lc1, plan->plancache_list)
2380 	{
2381 		CachedPlanSource *plansource = (CachedPlanSource *) lfirst(lc1);
2382 		List	   *stmt_list;
2383 		ListCell   *lc2;
2384 
2385 		spicallbackarg.query = plansource->query_string;
2386 
2387 		/*
2388 		 * If this is a one-shot plan, we still need to do parse analysis.
2389 		 */
2390 		if (plan->oneshot)
2391 		{
2392 			RawStmt    *parsetree = plansource->raw_parse_tree;
2393 			const char *src = plansource->query_string;
2394 			List	   *stmt_list;
2395 
2396 			/*
2397 			 * Parameter datatypes are driven by parserSetup hook if provided,
2398 			 * otherwise we use the fixed parameter list.
2399 			 */
2400 			if (parsetree == NULL)
2401 				stmt_list = NIL;
2402 			else if (plan->parserSetup != NULL)
2403 			{
2404 				Assert(plan->nargs == 0);
2405 				stmt_list = pg_analyze_and_rewrite_params(parsetree,
2406 														  src,
2407 														  plan->parserSetup,
2408 														  plan->parserSetupArg,
2409 														  _SPI_current->queryEnv);
2410 			}
2411 			else
2412 			{
2413 				stmt_list = pg_analyze_and_rewrite(parsetree,
2414 												   src,
2415 												   plan->argtypes,
2416 												   plan->nargs,
2417 												   _SPI_current->queryEnv);
2418 			}
2419 
2420 			/* Finish filling in the CachedPlanSource */
2421 			CompleteCachedPlan(plansource,
2422 							   stmt_list,
2423 							   NULL,
2424 							   plan->argtypes,
2425 							   plan->nargs,
2426 							   plan->parserSetup,
2427 							   plan->parserSetupArg,
2428 							   plan->cursor_options,
2429 							   false);	/* not fixed result */
2430 		}
2431 
2432 		/*
2433 		 * If asked to, complain when query does not return tuples.
2434 		 * (Replanning can't change this, so we can check it before that.
2435 		 * However, we can't check it till after parse analysis, so in the
2436 		 * case of a one-shot plan this is the earliest we could check.)
2437 		 */
2438 		if (options->must_return_tuples && !plansource->resultDesc)
2439 		{
2440 			/* try to give a good error message */
2441 			const char *cmdtag;
2442 
2443 			/* A SELECT without resultDesc must be SELECT INTO */
2444 			if (plansource->commandTag == CMDTAG_SELECT)
2445 				cmdtag = "SELECT INTO";
2446 			else
2447 				cmdtag = GetCommandTagName(plansource->commandTag);
2448 			ereport(ERROR,
2449 					(errcode(ERRCODE_SYNTAX_ERROR),
2450 			/* translator: %s is name of a SQL command, eg INSERT */
2451 					 errmsg("%s query does not return tuples", cmdtag)));
2452 		}
2453 
2454 		/*
2455 		 * Replan if needed, and increment plan refcount.  If it's a saved
2456 		 * plan, the refcount must be backed by the plan_owner.
2457 		 */
2458 		cplan = GetCachedPlan(plansource, options->params,
2459 							  plan_owner, _SPI_current->queryEnv);
2460 
2461 		stmt_list = cplan->stmt_list;
2462 
2463 		/*
2464 		 * If we weren't given a specific snapshot to use, and the statement
2465 		 * list requires a snapshot, set that up.
2466 		 */
2467 		if (snapshot == InvalidSnapshot &&
2468 			(list_length(stmt_list) > 1 ||
2469 			 (list_length(stmt_list) == 1 &&
2470 			  PlannedStmtRequiresSnapshot(linitial_node(PlannedStmt,
2471 														stmt_list)))))
2472 		{
2473 			/*
2474 			 * First, ensure there's a Portal-level snapshot.  This back-fills
2475 			 * the snapshot stack in case the previous operation was a COMMIT
2476 			 * or ROLLBACK inside a procedure or DO block.  (We can't put back
2477 			 * the Portal snapshot any sooner, or we'd break cases like doing
2478 			 * SET or LOCK just after COMMIT.)  It's enough to check once per
2479 			 * statement list, since COMMIT/ROLLBACK/CALL/DO can't appear
2480 			 * within a multi-statement list.
2481 			 */
2482 			EnsurePortalSnapshotExists();
2483 
2484 			/*
2485 			 * In the default non-read-only case, get a new per-statement-list
2486 			 * snapshot, replacing any that we pushed in a previous cycle.
2487 			 * Skip it when doing non-atomic execution, though (we rely
2488 			 * entirely on the Portal snapshot in that case).
2489 			 */
2490 			if (!options->read_only && !options->allow_nonatomic)
2491 			{
2492 				if (pushed_active_snap)
2493 					PopActiveSnapshot();
2494 				PushActiveSnapshot(GetTransactionSnapshot());
2495 				pushed_active_snap = true;
2496 			}
2497 		}
2498 
2499 		foreach(lc2, stmt_list)
2500 		{
2501 			PlannedStmt *stmt = lfirst_node(PlannedStmt, lc2);
2502 			bool		canSetTag = stmt->canSetTag;
2503 			DestReceiver *dest;
2504 
2505 			/*
2506 			 * Reset output state.  (Note that if a non-SPI receiver is used,
2507 			 * _SPI_current->processed will stay zero, and that's what we'll
2508 			 * report to the caller.  It's the receiver's job to count tuples
2509 			 * in that case.)
2510 			 */
2511 			_SPI_current->processed = 0;
2512 			_SPI_current->tuptable = NULL;
2513 
2514 			/* Check for unsupported cases. */
2515 			if (stmt->utilityStmt)
2516 			{
2517 				if (IsA(stmt->utilityStmt, CopyStmt))
2518 				{
2519 					CopyStmt   *cstmt = (CopyStmt *) stmt->utilityStmt;
2520 
2521 					if (cstmt->filename == NULL)
2522 					{
2523 						my_res = SPI_ERROR_COPY;
2524 						goto fail;
2525 					}
2526 				}
2527 				else if (IsA(stmt->utilityStmt, TransactionStmt))
2528 				{
2529 					my_res = SPI_ERROR_TRANSACTION;
2530 					goto fail;
2531 				}
2532 			}
2533 
2534 			if (options->read_only && !CommandIsReadOnly(stmt))
2535 				ereport(ERROR,
2536 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2537 				/* translator: %s is a SQL statement name */
2538 						 errmsg("%s is not allowed in a non-volatile function",
2539 								CreateCommandName((Node *) stmt))));
2540 
2541 			/*
2542 			 * If not read-only mode, advance the command counter before each
2543 			 * command and update the snapshot.  (But skip it if the snapshot
2544 			 * isn't under our control.)
2545 			 */
2546 			if (!options->read_only && pushed_active_snap)
2547 			{
2548 				CommandCounterIncrement();
2549 				UpdateActiveSnapshotCommandId();
2550 			}
2551 
2552 			/*
2553 			 * Select appropriate tuple receiver.  Output from non-canSetTag
2554 			 * subqueries always goes to the bit bucket.
2555 			 */
2556 			if (!canSetTag)
2557 				dest = CreateDestReceiver(DestNone);
2558 			else if (options->dest)
2559 				dest = options->dest;
2560 			else
2561 				dest = CreateDestReceiver(DestSPI);
2562 
2563 			if (stmt->utilityStmt == NULL)
2564 			{
2565 				QueryDesc  *qdesc;
2566 				Snapshot	snap;
2567 
2568 				if (ActiveSnapshotSet())
2569 					snap = GetActiveSnapshot();
2570 				else
2571 					snap = InvalidSnapshot;
2572 
2573 				qdesc = CreateQueryDesc(stmt,
2574 										plansource->query_string,
2575 										snap, crosscheck_snapshot,
2576 										dest,
2577 										options->params,
2578 										_SPI_current->queryEnv,
2579 										0);
2580 				res = _SPI_pquery(qdesc, fire_triggers,
2581 								  canSetTag ? options->tcount : 0);
2582 				FreeQueryDesc(qdesc);
2583 			}
2584 			else
2585 			{
2586 				ProcessUtilityContext context;
2587 				QueryCompletion qc;
2588 
2589 				/*
2590 				 * If the SPI context is atomic, or we were not told to allow
2591 				 * nonatomic operations, tell ProcessUtility this is an atomic
2592 				 * execution context.
2593 				 */
2594 				if (_SPI_current->atomic || !options->allow_nonatomic)
2595 					context = PROCESS_UTILITY_QUERY;
2596 				else
2597 					context = PROCESS_UTILITY_QUERY_NONATOMIC;
2598 
2599 				InitializeQueryCompletion(&qc);
2600 				ProcessUtility(stmt,
2601 							   plansource->query_string,
2602 							   true,	/* protect plancache's node tree */
2603 							   context,
2604 							   options->params,
2605 							   _SPI_current->queryEnv,
2606 							   dest,
2607 							   &qc);
2608 
2609 				/* Update "processed" if stmt returned tuples */
2610 				if (_SPI_current->tuptable)
2611 					_SPI_current->processed = _SPI_current->tuptable->numvals;
2612 
2613 				res = SPI_OK_UTILITY;
2614 
2615 				/*
2616 				 * Some utility statements return a row count, even though the
2617 				 * tuples are not returned to the caller.
2618 				 */
2619 				if (IsA(stmt->utilityStmt, CreateTableAsStmt))
2620 				{
2621 					CreateTableAsStmt *ctastmt = (CreateTableAsStmt *) stmt->utilityStmt;
2622 
2623 					if (qc.commandTag == CMDTAG_SELECT)
2624 						_SPI_current->processed = qc.nprocessed;
2625 					else
2626 					{
2627 						/*
2628 						 * Must be an IF NOT EXISTS that did nothing, or a
2629 						 * CREATE ... WITH NO DATA.
2630 						 */
2631 						Assert(ctastmt->if_not_exists ||
2632 							   ctastmt->into->skipData);
2633 						_SPI_current->processed = 0;
2634 					}
2635 
2636 					/*
2637 					 * For historical reasons, if CREATE TABLE AS was spelled
2638 					 * as SELECT INTO, return a special return code.
2639 					 */
2640 					if (ctastmt->is_select_into)
2641 						res = SPI_OK_SELINTO;
2642 				}
2643 				else if (IsA(stmt->utilityStmt, CopyStmt))
2644 				{
2645 					Assert(qc.commandTag == CMDTAG_COPY);
2646 					_SPI_current->processed = qc.nprocessed;
2647 				}
2648 			}
2649 
2650 			/*
2651 			 * The last canSetTag query sets the status values returned to the
2652 			 * caller.  Be careful to free any tuptables not returned, to
2653 			 * avoid intra-transaction memory leak.
2654 			 */
2655 			if (canSetTag)
2656 			{
2657 				my_processed = _SPI_current->processed;
2658 				SPI_freetuptable(my_tuptable);
2659 				my_tuptable = _SPI_current->tuptable;
2660 				my_res = res;
2661 			}
2662 			else
2663 			{
2664 				SPI_freetuptable(_SPI_current->tuptable);
2665 				_SPI_current->tuptable = NULL;
2666 			}
2667 
2668 			/*
2669 			 * We don't issue a destroy call to the receiver.  The SPI and
2670 			 * None receivers would ignore it anyway, while if the caller
2671 			 * supplied a receiver, it's not our job to destroy it.
2672 			 */
2673 
2674 			if (res < 0)
2675 			{
2676 				my_res = res;
2677 				goto fail;
2678 			}
2679 		}
2680 
2681 		/* Done with this plan, so release refcount */
2682 		ReleaseCachedPlan(cplan, plan_owner);
2683 		cplan = NULL;
2684 
2685 		/*
2686 		 * If not read-only mode, advance the command counter after the last
2687 		 * command.  This ensures that its effects are visible, in case it was
2688 		 * DDL that would affect the next CachedPlanSource.
2689 		 */
2690 		if (!options->read_only)
2691 			CommandCounterIncrement();
2692 	}
2693 
2694 fail:
2695 
2696 	/* Pop the snapshot off the stack if we pushed one */
2697 	if (pushed_active_snap)
2698 		PopActiveSnapshot();
2699 
2700 	/* We no longer need the cached plan refcount, if any */
2701 	if (cplan)
2702 		ReleaseCachedPlan(cplan, plan_owner);
2703 
2704 	/*
2705 	 * Pop the error context stack
2706 	 */
2707 	error_context_stack = spierrcontext.previous;
2708 
2709 	/* Save results for caller */
2710 	SPI_processed = my_processed;
2711 	SPI_tuptable = my_tuptable;
2712 
2713 	/* tuptable now is caller's responsibility, not SPI's */
2714 	_SPI_current->tuptable = NULL;
2715 
2716 	/*
2717 	 * If none of the queries had canSetTag, return SPI_OK_REWRITTEN. Prior to
2718 	 * 8.4, we used return the last query's result code, but not its auxiliary
2719 	 * results, but that's confusing.
2720 	 */
2721 	if (my_res == 0)
2722 		my_res = SPI_OK_REWRITTEN;
2723 
2724 	return my_res;
2725 }
2726 
2727 /*
2728  * Convert arrays of query parameters to form wanted by planner and executor
2729  */
2730 static ParamListInfo
_SPI_convert_params(int nargs,Oid * argtypes,Datum * Values,const char * Nulls)2731 _SPI_convert_params(int nargs, Oid *argtypes,
2732 					Datum *Values, const char *Nulls)
2733 {
2734 	ParamListInfo paramLI;
2735 
2736 	if (nargs > 0)
2737 	{
2738 		paramLI = makeParamList(nargs);
2739 
2740 		for (int i = 0; i < nargs; i++)
2741 		{
2742 			ParamExternData *prm = &paramLI->params[i];
2743 
2744 			prm->value = Values[i];
2745 			prm->isnull = (Nulls && Nulls[i] == 'n');
2746 			prm->pflags = PARAM_FLAG_CONST;
2747 			prm->ptype = argtypes[i];
2748 		}
2749 	}
2750 	else
2751 		paramLI = NULL;
2752 	return paramLI;
2753 }
2754 
2755 static int
_SPI_pquery(QueryDesc * queryDesc,bool fire_triggers,uint64 tcount)2756 _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount)
2757 {
2758 	int			operation = queryDesc->operation;
2759 	int			eflags;
2760 	int			res;
2761 
2762 	switch (operation)
2763 	{
2764 		case CMD_SELECT:
2765 			if (queryDesc->dest->mydest == DestNone)
2766 			{
2767 				/* Don't return SPI_OK_SELECT if we're discarding result */
2768 				res = SPI_OK_UTILITY;
2769 			}
2770 			else
2771 				res = SPI_OK_SELECT;
2772 			break;
2773 		case CMD_INSERT:
2774 			if (queryDesc->plannedstmt->hasReturning)
2775 				res = SPI_OK_INSERT_RETURNING;
2776 			else
2777 				res = SPI_OK_INSERT;
2778 			break;
2779 		case CMD_DELETE:
2780 			if (queryDesc->plannedstmt->hasReturning)
2781 				res = SPI_OK_DELETE_RETURNING;
2782 			else
2783 				res = SPI_OK_DELETE;
2784 			break;
2785 		case CMD_UPDATE:
2786 			if (queryDesc->plannedstmt->hasReturning)
2787 				res = SPI_OK_UPDATE_RETURNING;
2788 			else
2789 				res = SPI_OK_UPDATE;
2790 			break;
2791 		default:
2792 			return SPI_ERROR_OPUNKNOWN;
2793 	}
2794 
2795 #ifdef SPI_EXECUTOR_STATS
2796 	if (ShowExecutorStats)
2797 		ResetUsage();
2798 #endif
2799 
2800 	/* Select execution options */
2801 	if (fire_triggers)
2802 		eflags = 0;				/* default run-to-completion flags */
2803 	else
2804 		eflags = EXEC_FLAG_SKIP_TRIGGERS;
2805 
2806 	ExecutorStart(queryDesc, eflags);
2807 
2808 	ExecutorRun(queryDesc, ForwardScanDirection, tcount, true);
2809 
2810 	_SPI_current->processed = queryDesc->estate->es_processed;
2811 
2812 	if ((res == SPI_OK_SELECT || queryDesc->plannedstmt->hasReturning) &&
2813 		queryDesc->dest->mydest == DestSPI)
2814 	{
2815 		if (_SPI_checktuples())
2816 			elog(ERROR, "consistency check on SPI tuple count failed");
2817 	}
2818 
2819 	ExecutorFinish(queryDesc);
2820 	ExecutorEnd(queryDesc);
2821 	/* FreeQueryDesc is done by the caller */
2822 
2823 #ifdef SPI_EXECUTOR_STATS
2824 	if (ShowExecutorStats)
2825 		ShowUsage("SPI EXECUTOR STATS");
2826 #endif
2827 
2828 	return res;
2829 }
2830 
2831 /*
2832  * _SPI_error_callback
2833  *
2834  * Add context information when a query invoked via SPI fails
2835  */
2836 static void
_SPI_error_callback(void * arg)2837 _SPI_error_callback(void *arg)
2838 {
2839 	SPICallbackArg *carg = (SPICallbackArg *) arg;
2840 	const char *query = carg->query;
2841 	int			syntaxerrposition;
2842 
2843 	if (query == NULL)			/* in case arg wasn't set yet */
2844 		return;
2845 
2846 	/*
2847 	 * If there is a syntax error position, convert to internal syntax error;
2848 	 * otherwise treat the query as an item of context stack
2849 	 */
2850 	syntaxerrposition = geterrposition();
2851 	if (syntaxerrposition > 0)
2852 	{
2853 		errposition(0);
2854 		internalerrposition(syntaxerrposition);
2855 		internalerrquery(query);
2856 	}
2857 	else
2858 	{
2859 		/* Use the parse mode to decide how to describe the query */
2860 		switch (carg->mode)
2861 		{
2862 			case RAW_PARSE_PLPGSQL_EXPR:
2863 				errcontext("SQL expression \"%s\"", query);
2864 				break;
2865 			case RAW_PARSE_PLPGSQL_ASSIGN1:
2866 			case RAW_PARSE_PLPGSQL_ASSIGN2:
2867 			case RAW_PARSE_PLPGSQL_ASSIGN3:
2868 				errcontext("PL/pgSQL assignment \"%s\"", query);
2869 				break;
2870 			default:
2871 				errcontext("SQL statement \"%s\"", query);
2872 				break;
2873 		}
2874 	}
2875 }
2876 
2877 /*
2878  * _SPI_cursor_operation()
2879  *
2880  *	Do a FETCH or MOVE in a cursor
2881  */
2882 static void
_SPI_cursor_operation(Portal portal,FetchDirection direction,long count,DestReceiver * dest)2883 _SPI_cursor_operation(Portal portal, FetchDirection direction, long count,
2884 					  DestReceiver *dest)
2885 {
2886 	uint64		nfetched;
2887 
2888 	/* Check that the portal is valid */
2889 	if (!PortalIsValid(portal))
2890 		elog(ERROR, "invalid portal in SPI cursor operation");
2891 
2892 	/* Push the SPI stack */
2893 	if (_SPI_begin_call(true) < 0)
2894 		elog(ERROR, "SPI cursor operation called while not connected");
2895 
2896 	/* Reset the SPI result (note we deliberately don't touch lastoid) */
2897 	SPI_processed = 0;
2898 	SPI_tuptable = NULL;
2899 	_SPI_current->processed = 0;
2900 	_SPI_current->tuptable = NULL;
2901 
2902 	/* Run the cursor */
2903 	nfetched = PortalRunFetch(portal,
2904 							  direction,
2905 							  count,
2906 							  dest);
2907 
2908 	/*
2909 	 * Think not to combine this store with the preceding function call. If
2910 	 * the portal contains calls to functions that use SPI, then _SPI_stack is
2911 	 * likely to move around while the portal runs.  When control returns,
2912 	 * _SPI_current will point to the correct stack entry... but the pointer
2913 	 * may be different than it was beforehand. So we must be sure to re-fetch
2914 	 * the pointer after the function call completes.
2915 	 */
2916 	_SPI_current->processed = nfetched;
2917 
2918 	if (dest->mydest == DestSPI && _SPI_checktuples())
2919 		elog(ERROR, "consistency check on SPI tuple count failed");
2920 
2921 	/* Put the result into place for access by caller */
2922 	SPI_processed = _SPI_current->processed;
2923 	SPI_tuptable = _SPI_current->tuptable;
2924 
2925 	/* tuptable now is caller's responsibility, not SPI's */
2926 	_SPI_current->tuptable = NULL;
2927 
2928 	/* Pop the SPI stack */
2929 	_SPI_end_call(true);
2930 }
2931 
2932 
2933 static MemoryContext
_SPI_execmem(void)2934 _SPI_execmem(void)
2935 {
2936 	return MemoryContextSwitchTo(_SPI_current->execCxt);
2937 }
2938 
2939 static MemoryContext
_SPI_procmem(void)2940 _SPI_procmem(void)
2941 {
2942 	return MemoryContextSwitchTo(_SPI_current->procCxt);
2943 }
2944 
2945 /*
2946  * _SPI_begin_call: begin a SPI operation within a connected procedure
2947  *
2948  * use_exec is true if we intend to make use of the procedure's execCxt
2949  * during this SPI operation.  We'll switch into that context, and arrange
2950  * for it to be cleaned up at _SPI_end_call or if an error occurs.
2951  */
2952 static int
_SPI_begin_call(bool use_exec)2953 _SPI_begin_call(bool use_exec)
2954 {
2955 	if (_SPI_current == NULL)
2956 		return SPI_ERROR_UNCONNECTED;
2957 
2958 	if (use_exec)
2959 	{
2960 		/* remember when the Executor operation started */
2961 		_SPI_current->execSubid = GetCurrentSubTransactionId();
2962 		/* switch to the Executor memory context */
2963 		_SPI_execmem();
2964 	}
2965 
2966 	return 0;
2967 }
2968 
2969 /*
2970  * _SPI_end_call: end a SPI operation within a connected procedure
2971  *
2972  * use_exec must be the same as in the previous _SPI_begin_call
2973  *
2974  * Note: this currently has no failure return cases, so callers don't check
2975  */
2976 static int
_SPI_end_call(bool use_exec)2977 _SPI_end_call(bool use_exec)
2978 {
2979 	if (use_exec)
2980 	{
2981 		/* switch to the procedure memory context */
2982 		_SPI_procmem();
2983 		/* mark Executor context no longer in use */
2984 		_SPI_current->execSubid = InvalidSubTransactionId;
2985 		/* and free Executor memory */
2986 		MemoryContextResetAndDeleteChildren(_SPI_current->execCxt);
2987 	}
2988 
2989 	return 0;
2990 }
2991 
2992 static bool
_SPI_checktuples(void)2993 _SPI_checktuples(void)
2994 {
2995 	uint64		processed = _SPI_current->processed;
2996 	SPITupleTable *tuptable = _SPI_current->tuptable;
2997 	bool		failed = false;
2998 
2999 	if (tuptable == NULL)		/* spi_dest_startup was not called */
3000 		failed = true;
3001 	else if (processed != tuptable->numvals)
3002 		failed = true;
3003 
3004 	return failed;
3005 }
3006 
3007 /*
3008  * Convert a "temporary" SPIPlan into an "unsaved" plan.
3009  *
3010  * The passed _SPI_plan struct is on the stack, and all its subsidiary data
3011  * is in or under the current SPI executor context.  Copy the plan into the
3012  * SPI procedure context so it will survive _SPI_end_call().  To minimize
3013  * data copying, this destructively modifies the input plan, by taking the
3014  * plancache entries away from it and reparenting them to the new SPIPlan.
3015  */
3016 static SPIPlanPtr
_SPI_make_plan_non_temp(SPIPlanPtr plan)3017 _SPI_make_plan_non_temp(SPIPlanPtr plan)
3018 {
3019 	SPIPlanPtr	newplan;
3020 	MemoryContext parentcxt = _SPI_current->procCxt;
3021 	MemoryContext plancxt;
3022 	MemoryContext oldcxt;
3023 	ListCell   *lc;
3024 
3025 	/* Assert the input is a temporary SPIPlan */
3026 	Assert(plan->magic == _SPI_PLAN_MAGIC);
3027 	Assert(plan->plancxt == NULL);
3028 	/* One-shot plans can't be saved */
3029 	Assert(!plan->oneshot);
3030 
3031 	/*
3032 	 * Create a memory context for the plan, underneath the procedure context.
3033 	 * We don't expect the plan to be very large.
3034 	 */
3035 	plancxt = AllocSetContextCreate(parentcxt,
3036 									"SPI Plan",
3037 									ALLOCSET_SMALL_SIZES);
3038 	oldcxt = MemoryContextSwitchTo(plancxt);
3039 
3040 	/* Copy the _SPI_plan struct and subsidiary data into the new context */
3041 	newplan = (SPIPlanPtr) palloc0(sizeof(_SPI_plan));
3042 	newplan->magic = _SPI_PLAN_MAGIC;
3043 	newplan->plancxt = plancxt;
3044 	newplan->parse_mode = plan->parse_mode;
3045 	newplan->cursor_options = plan->cursor_options;
3046 	newplan->nargs = plan->nargs;
3047 	if (plan->nargs > 0)
3048 	{
3049 		newplan->argtypes = (Oid *) palloc(plan->nargs * sizeof(Oid));
3050 		memcpy(newplan->argtypes, plan->argtypes, plan->nargs * sizeof(Oid));
3051 	}
3052 	else
3053 		newplan->argtypes = NULL;
3054 	newplan->parserSetup = plan->parserSetup;
3055 	newplan->parserSetupArg = plan->parserSetupArg;
3056 
3057 	/*
3058 	 * Reparent all the CachedPlanSources into the procedure context.  In
3059 	 * theory this could fail partway through due to the pallocs, but we don't
3060 	 * care too much since both the procedure context and the executor context
3061 	 * would go away on error.
3062 	 */
3063 	foreach(lc, plan->plancache_list)
3064 	{
3065 		CachedPlanSource *plansource = (CachedPlanSource *) lfirst(lc);
3066 
3067 		CachedPlanSetParentContext(plansource, parentcxt);
3068 
3069 		/* Build new list, with list cells in plancxt */
3070 		newplan->plancache_list = lappend(newplan->plancache_list, plansource);
3071 	}
3072 
3073 	MemoryContextSwitchTo(oldcxt);
3074 
3075 	/* For safety, unlink the CachedPlanSources from the temporary plan */
3076 	plan->plancache_list = NIL;
3077 
3078 	return newplan;
3079 }
3080 
3081 /*
3082  * Make a "saved" copy of the given plan.
3083  */
3084 static SPIPlanPtr
_SPI_save_plan(SPIPlanPtr plan)3085 _SPI_save_plan(SPIPlanPtr plan)
3086 {
3087 	SPIPlanPtr	newplan;
3088 	MemoryContext plancxt;
3089 	MemoryContext oldcxt;
3090 	ListCell   *lc;
3091 
3092 	/* One-shot plans can't be saved */
3093 	Assert(!plan->oneshot);
3094 
3095 	/*
3096 	 * Create a memory context for the plan.  We don't expect the plan to be
3097 	 * very large, so use smaller-than-default alloc parameters.  It's a
3098 	 * transient context until we finish copying everything.
3099 	 */
3100 	plancxt = AllocSetContextCreate(CurrentMemoryContext,
3101 									"SPI Plan",
3102 									ALLOCSET_SMALL_SIZES);
3103 	oldcxt = MemoryContextSwitchTo(plancxt);
3104 
3105 	/* Copy the SPI plan into its own context */
3106 	newplan = (SPIPlanPtr) palloc0(sizeof(_SPI_plan));
3107 	newplan->magic = _SPI_PLAN_MAGIC;
3108 	newplan->plancxt = plancxt;
3109 	newplan->parse_mode = plan->parse_mode;
3110 	newplan->cursor_options = plan->cursor_options;
3111 	newplan->nargs = plan->nargs;
3112 	if (plan->nargs > 0)
3113 	{
3114 		newplan->argtypes = (Oid *) palloc(plan->nargs * sizeof(Oid));
3115 		memcpy(newplan->argtypes, plan->argtypes, plan->nargs * sizeof(Oid));
3116 	}
3117 	else
3118 		newplan->argtypes = NULL;
3119 	newplan->parserSetup = plan->parserSetup;
3120 	newplan->parserSetupArg = plan->parserSetupArg;
3121 
3122 	/* Copy all the plancache entries */
3123 	foreach(lc, plan->plancache_list)
3124 	{
3125 		CachedPlanSource *plansource = (CachedPlanSource *) lfirst(lc);
3126 		CachedPlanSource *newsource;
3127 
3128 		newsource = CopyCachedPlan(plansource);
3129 		newplan->plancache_list = lappend(newplan->plancache_list, newsource);
3130 	}
3131 
3132 	MemoryContextSwitchTo(oldcxt);
3133 
3134 	/*
3135 	 * Mark it saved, reparent it under CacheMemoryContext, and mark all the
3136 	 * component CachedPlanSources as saved.  This sequence cannot fail
3137 	 * partway through, so there's no risk of long-term memory leakage.
3138 	 */
3139 	newplan->saved = true;
3140 	MemoryContextSetParent(newplan->plancxt, CacheMemoryContext);
3141 
3142 	foreach(lc, newplan->plancache_list)
3143 	{
3144 		CachedPlanSource *plansource = (CachedPlanSource *) lfirst(lc);
3145 
3146 		SaveCachedPlan(plansource);
3147 	}
3148 
3149 	return newplan;
3150 }
3151 
3152 /*
3153  * Internal lookup of ephemeral named relation by name.
3154  */
3155 static EphemeralNamedRelation
_SPI_find_ENR_by_name(const char * name)3156 _SPI_find_ENR_by_name(const char *name)
3157 {
3158 	/* internal static function; any error is bug in SPI itself */
3159 	Assert(name != NULL);
3160 
3161 	/* fast exit if no tuplestores have been added */
3162 	if (_SPI_current->queryEnv == NULL)
3163 		return NULL;
3164 
3165 	return get_ENR(_SPI_current->queryEnv, name);
3166 }
3167 
3168 /*
3169  * Register an ephemeral named relation for use by the planner and executor on
3170  * subsequent calls using this SPI connection.
3171  */
3172 int
SPI_register_relation(EphemeralNamedRelation enr)3173 SPI_register_relation(EphemeralNamedRelation enr)
3174 {
3175 	EphemeralNamedRelation match;
3176 	int			res;
3177 
3178 	if (enr == NULL || enr->md.name == NULL)
3179 		return SPI_ERROR_ARGUMENT;
3180 
3181 	res = _SPI_begin_call(false);	/* keep current memory context */
3182 	if (res < 0)
3183 		return res;
3184 
3185 	match = _SPI_find_ENR_by_name(enr->md.name);
3186 	if (match)
3187 		res = SPI_ERROR_REL_DUPLICATE;
3188 	else
3189 	{
3190 		if (_SPI_current->queryEnv == NULL)
3191 			_SPI_current->queryEnv = create_queryEnv();
3192 
3193 		register_ENR(_SPI_current->queryEnv, enr);
3194 		res = SPI_OK_REL_REGISTER;
3195 	}
3196 
3197 	_SPI_end_call(false);
3198 
3199 	return res;
3200 }
3201 
3202 /*
3203  * Unregister an ephemeral named relation by name.  This will probably be a
3204  * rarely used function, since SPI_finish will clear it automatically.
3205  */
3206 int
SPI_unregister_relation(const char * name)3207 SPI_unregister_relation(const char *name)
3208 {
3209 	EphemeralNamedRelation match;
3210 	int			res;
3211 
3212 	if (name == NULL)
3213 		return SPI_ERROR_ARGUMENT;
3214 
3215 	res = _SPI_begin_call(false);	/* keep current memory context */
3216 	if (res < 0)
3217 		return res;
3218 
3219 	match = _SPI_find_ENR_by_name(name);
3220 	if (match)
3221 	{
3222 		unregister_ENR(_SPI_current->queryEnv, match->md.name);
3223 		res = SPI_OK_REL_UNREGISTER;
3224 	}
3225 	else
3226 		res = SPI_ERROR_REL_NOT_FOUND;
3227 
3228 	_SPI_end_call(false);
3229 
3230 	return res;
3231 }
3232 
3233 /*
3234  * Register the transient relations from 'tdata' using this SPI connection.
3235  * This should be called by PL implementations' trigger handlers after
3236  * connecting, in order to make transition tables visible to any queries run
3237  * in this connection.
3238  */
3239 int
SPI_register_trigger_data(TriggerData * tdata)3240 SPI_register_trigger_data(TriggerData *tdata)
3241 {
3242 	if (tdata == NULL)
3243 		return SPI_ERROR_ARGUMENT;
3244 
3245 	if (tdata->tg_newtable)
3246 	{
3247 		EphemeralNamedRelation enr =
3248 		palloc(sizeof(EphemeralNamedRelationData));
3249 		int			rc;
3250 
3251 		enr->md.name = tdata->tg_trigger->tgnewtable;
3252 		enr->md.reliddesc = tdata->tg_relation->rd_id;
3253 		enr->md.tupdesc = NULL;
3254 		enr->md.enrtype = ENR_NAMED_TUPLESTORE;
3255 		enr->md.enrtuples = tuplestore_tuple_count(tdata->tg_newtable);
3256 		enr->reldata = tdata->tg_newtable;
3257 		rc = SPI_register_relation(enr);
3258 		if (rc != SPI_OK_REL_REGISTER)
3259 			return rc;
3260 	}
3261 
3262 	if (tdata->tg_oldtable)
3263 	{
3264 		EphemeralNamedRelation enr =
3265 		palloc(sizeof(EphemeralNamedRelationData));
3266 		int			rc;
3267 
3268 		enr->md.name = tdata->tg_trigger->tgoldtable;
3269 		enr->md.reliddesc = tdata->tg_relation->rd_id;
3270 		enr->md.tupdesc = NULL;
3271 		enr->md.enrtype = ENR_NAMED_TUPLESTORE;
3272 		enr->md.enrtuples = tuplestore_tuple_count(tdata->tg_oldtable);
3273 		enr->reldata = tdata->tg_oldtable;
3274 		rc = SPI_register_relation(enr);
3275 		if (rc != SPI_OK_REL_REGISTER)
3276 			return rc;
3277 	}
3278 
3279 	return SPI_OK_TD_REGISTER;
3280 }
3281