1 /*-------------------------------------------------------------------------
2  *
3  * portalmem.c
4  *	  backend portal memory management
5  *
6  * Portals are objects representing the execution state of a query.
7  * This module provides memory management services for portals, but it
8  * doesn't actually run the executor for them.
9  *
10  *
11  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  * IDENTIFICATION
15  *	  src/backend/utils/mmgr/portalmem.c
16  *
17  *-------------------------------------------------------------------------
18  */
19 #include "postgres.h"
20 
21 #include "access/xact.h"
22 #include "catalog/pg_type.h"
23 #include "commands/portalcmds.h"
24 #include "miscadmin.h"
25 #include "storage/ipc.h"
26 #include "utils/builtins.h"
27 #include "utils/memutils.h"
28 #include "utils/snapmgr.h"
29 #include "utils/timestamp.h"
30 
31 /*
32  * Estimate of the maximum number of open portals a user would have,
33  * used in initially sizing the PortalHashTable in EnablePortalManager().
34  * Since the hash table can expand, there's no need to make this overly
35  * generous, and keeping it small avoids unnecessary overhead in the
36  * hash_seq_search() calls executed during transaction end.
37  */
38 #define PORTALS_PER_USER	   16
39 
40 
41 /* ----------------
42  *		Global state
43  * ----------------
44  */
45 
46 #define MAX_PORTALNAME_LEN		NAMEDATALEN
47 
48 typedef struct portalhashent
49 {
50 	char		portalname[MAX_PORTALNAME_LEN];
51 	Portal		portal;
52 } PortalHashEnt;
53 
54 static HTAB *PortalHashTable = NULL;
55 
56 #define PortalHashTableLookup(NAME, PORTAL) \
57 do { \
58 	PortalHashEnt *hentry; \
59 	\
60 	hentry = (PortalHashEnt *) hash_search(PortalHashTable, \
61 										   (NAME), HASH_FIND, NULL); \
62 	if (hentry) \
63 		PORTAL = hentry->portal; \
64 	else \
65 		PORTAL = NULL; \
66 } while(0)
67 
68 #define PortalHashTableInsert(PORTAL, NAME) \
69 do { \
70 	PortalHashEnt *hentry; bool found; \
71 	\
72 	hentry = (PortalHashEnt *) hash_search(PortalHashTable, \
73 										   (NAME), HASH_ENTER, &found); \
74 	if (found) \
75 		elog(ERROR, "duplicate portal name"); \
76 	hentry->portal = PORTAL; \
77 	/* To avoid duplicate storage, make PORTAL->name point to htab entry */ \
78 	PORTAL->name = hentry->portalname; \
79 } while(0)
80 
81 #define PortalHashTableDelete(PORTAL) \
82 do { \
83 	PortalHashEnt *hentry; \
84 	\
85 	hentry = (PortalHashEnt *) hash_search(PortalHashTable, \
86 										   PORTAL->name, HASH_REMOVE, NULL); \
87 	if (hentry == NULL) \
88 		elog(WARNING, "trying to delete portal name that does not exist"); \
89 } while(0)
90 
91 static MemoryContext TopPortalContext = NULL;
92 
93 
94 /* ----------------------------------------------------------------
95  *				   public portal interface functions
96  * ----------------------------------------------------------------
97  */
98 
99 /*
100  * EnablePortalManager
101  *		Enables the portal management module at backend startup.
102  */
103 void
EnablePortalManager(void)104 EnablePortalManager(void)
105 {
106 	HASHCTL		ctl;
107 
108 	Assert(TopPortalContext == NULL);
109 
110 	TopPortalContext = AllocSetContextCreate(TopMemoryContext,
111 											 "TopPortalContext",
112 											 ALLOCSET_DEFAULT_SIZES);
113 
114 	ctl.keysize = MAX_PORTALNAME_LEN;
115 	ctl.entrysize = sizeof(PortalHashEnt);
116 
117 	/*
118 	 * use PORTALS_PER_USER as a guess of how many hash table entries to
119 	 * create, initially
120 	 */
121 	PortalHashTable = hash_create("Portal hash", PORTALS_PER_USER,
122 								  &ctl, HASH_ELEM | HASH_STRINGS);
123 }
124 
125 /*
126  * GetPortalByName
127  *		Returns a portal given a portal name, or NULL if name not found.
128  */
129 Portal
GetPortalByName(const char * name)130 GetPortalByName(const char *name)
131 {
132 	Portal		portal;
133 
134 	if (PointerIsValid(name))
135 		PortalHashTableLookup(name, portal);
136 	else
137 		portal = NULL;
138 
139 	return portal;
140 }
141 
142 /*
143  * PortalGetPrimaryStmt
144  *		Get the "primary" stmt within a portal, ie, the one marked canSetTag.
145  *
146  * Returns NULL if no such stmt.  If multiple PlannedStmt structs within the
147  * portal are marked canSetTag, returns the first one.  Neither of these
148  * cases should occur in present usages of this function.
149  */
150 PlannedStmt *
PortalGetPrimaryStmt(Portal portal)151 PortalGetPrimaryStmt(Portal portal)
152 {
153 	ListCell   *lc;
154 
155 	foreach(lc, portal->stmts)
156 	{
157 		PlannedStmt *stmt = lfirst_node(PlannedStmt, lc);
158 
159 		if (stmt->canSetTag)
160 			return stmt;
161 	}
162 	return NULL;
163 }
164 
165 /*
166  * CreatePortal
167  *		Returns a new portal given a name.
168  *
169  * allowDup: if true, automatically drop any pre-existing portal of the
170  * same name (if false, an error is raised).
171  *
172  * dupSilent: if true, don't even emit a WARNING.
173  */
174 Portal
CreatePortal(const char * name,bool allowDup,bool dupSilent)175 CreatePortal(const char *name, bool allowDup, bool dupSilent)
176 {
177 	Portal		portal;
178 
179 	AssertArg(PointerIsValid(name));
180 
181 	portal = GetPortalByName(name);
182 	if (PortalIsValid(portal))
183 	{
184 		if (!allowDup)
185 			ereport(ERROR,
186 					(errcode(ERRCODE_DUPLICATE_CURSOR),
187 					 errmsg("cursor \"%s\" already exists", name)));
188 		if (!dupSilent)
189 			ereport(WARNING,
190 					(errcode(ERRCODE_DUPLICATE_CURSOR),
191 					 errmsg("closing existing cursor \"%s\"",
192 							name)));
193 		PortalDrop(portal, false);
194 	}
195 
196 	/* make new portal structure */
197 	portal = (Portal) MemoryContextAllocZero(TopPortalContext, sizeof *portal);
198 
199 	/* initialize portal context; typically it won't store much */
200 	portal->portalContext = AllocSetContextCreate(TopPortalContext,
201 												  "PortalContext",
202 												  ALLOCSET_SMALL_SIZES);
203 
204 	/* create a resource owner for the portal */
205 	portal->resowner = ResourceOwnerCreate(CurTransactionResourceOwner,
206 										   "Portal");
207 
208 	/* initialize portal fields that don't start off zero */
209 	portal->status = PORTAL_NEW;
210 	portal->cleanup = PortalCleanup;
211 	portal->createSubid = GetCurrentSubTransactionId();
212 	portal->activeSubid = portal->createSubid;
213 	portal->createLevel = GetCurrentTransactionNestLevel();
214 	portal->strategy = PORTAL_MULTI_QUERY;
215 	portal->cursorOptions = CURSOR_OPT_NO_SCROLL;
216 	portal->atStart = true;
217 	portal->atEnd = true;		/* disallow fetches until query is set */
218 	portal->visible = true;
219 	portal->creation_time = GetCurrentStatementStartTimestamp();
220 
221 	/* put portal in table (sets portal->name) */
222 	PortalHashTableInsert(portal, name);
223 
224 	/* for named portals reuse portal->name copy */
225 	MemoryContextSetIdentifier(portal->portalContext, portal->name[0] ? portal->name : "<unnamed>");
226 
227 	return portal;
228 }
229 
230 /*
231  * CreateNewPortal
232  *		Create a new portal, assigning it a random nonconflicting name.
233  */
234 Portal
CreateNewPortal(void)235 CreateNewPortal(void)
236 {
237 	static unsigned int unnamed_portal_count = 0;
238 
239 	char		portalname[MAX_PORTALNAME_LEN];
240 
241 	/* Select a nonconflicting name */
242 	for (;;)
243 	{
244 		unnamed_portal_count++;
245 		sprintf(portalname, "<unnamed portal %u>", unnamed_portal_count);
246 		if (GetPortalByName(portalname) == NULL)
247 			break;
248 	}
249 
250 	return CreatePortal(portalname, false, false);
251 }
252 
253 /*
254  * PortalDefineQuery
255  *		A simple subroutine to establish a portal's query.
256  *
257  * Notes: as of PG 8.4, caller MUST supply a sourceText string; it is not
258  * allowed anymore to pass NULL.  (If you really don't have source text,
259  * you can pass a constant string, perhaps "(query not available)".)
260  *
261  * commandTag shall be NULL if and only if the original query string
262  * (before rewriting) was an empty string.  Also, the passed commandTag must
263  * be a pointer to a constant string, since it is not copied.
264  *
265  * If cplan is provided, then it is a cached plan containing the stmts, and
266  * the caller must have done GetCachedPlan(), causing a refcount increment.
267  * The refcount will be released when the portal is destroyed.
268  *
269  * If cplan is NULL, then it is the caller's responsibility to ensure that
270  * the passed plan trees have adequate lifetime.  Typically this is done by
271  * copying them into the portal's context.
272  *
273  * The caller is also responsible for ensuring that the passed prepStmtName
274  * (if not NULL) and sourceText have adequate lifetime.
275  *
276  * NB: this function mustn't do much beyond storing the passed values; in
277  * particular don't do anything that risks elog(ERROR).  If that were to
278  * happen here before storing the cplan reference, we'd leak the plancache
279  * refcount that the caller is trying to hand off to us.
280  */
281 void
PortalDefineQuery(Portal portal,const char * prepStmtName,const char * sourceText,CommandTag commandTag,List * stmts,CachedPlan * cplan)282 PortalDefineQuery(Portal portal,
283 				  const char *prepStmtName,
284 				  const char *sourceText,
285 				  CommandTag commandTag,
286 				  List *stmts,
287 				  CachedPlan *cplan)
288 {
289 	AssertArg(PortalIsValid(portal));
290 	AssertState(portal->status == PORTAL_NEW);
291 
292 	AssertArg(sourceText != NULL);
293 	AssertArg(commandTag != CMDTAG_UNKNOWN || stmts == NIL);
294 
295 	portal->prepStmtName = prepStmtName;
296 	portal->sourceText = sourceText;
297 	portal->qc.commandTag = commandTag;
298 	portal->qc.nprocessed = 0;
299 	portal->commandTag = commandTag;
300 	portal->stmts = stmts;
301 	portal->cplan = cplan;
302 	portal->status = PORTAL_DEFINED;
303 }
304 
305 /*
306  * PortalReleaseCachedPlan
307  *		Release a portal's reference to its cached plan, if any.
308  */
309 static void
PortalReleaseCachedPlan(Portal portal)310 PortalReleaseCachedPlan(Portal portal)
311 {
312 	if (portal->cplan)
313 	{
314 		ReleaseCachedPlan(portal->cplan, NULL);
315 		portal->cplan = NULL;
316 
317 		/*
318 		 * We must also clear portal->stmts which is now a dangling reference
319 		 * to the cached plan's plan list.  This protects any code that might
320 		 * try to examine the Portal later.
321 		 */
322 		portal->stmts = NIL;
323 	}
324 }
325 
326 /*
327  * PortalCreateHoldStore
328  *		Create the tuplestore for a portal.
329  */
330 void
PortalCreateHoldStore(Portal portal)331 PortalCreateHoldStore(Portal portal)
332 {
333 	MemoryContext oldcxt;
334 
335 	Assert(portal->holdContext == NULL);
336 	Assert(portal->holdStore == NULL);
337 	Assert(portal->holdSnapshot == NULL);
338 
339 	/*
340 	 * Create the memory context that is used for storage of the tuple set.
341 	 * Note this is NOT a child of the portal's portalContext.
342 	 */
343 	portal->holdContext =
344 		AllocSetContextCreate(TopPortalContext,
345 							  "PortalHoldContext",
346 							  ALLOCSET_DEFAULT_SIZES);
347 
348 	/*
349 	 * Create the tuple store, selecting cross-transaction temp files, and
350 	 * enabling random access only if cursor requires scrolling.
351 	 *
352 	 * XXX: Should maintenance_work_mem be used for the portal size?
353 	 */
354 	oldcxt = MemoryContextSwitchTo(portal->holdContext);
355 
356 	portal->holdStore =
357 		tuplestore_begin_heap(portal->cursorOptions & CURSOR_OPT_SCROLL,
358 							  true, work_mem);
359 
360 	MemoryContextSwitchTo(oldcxt);
361 }
362 
363 /*
364  * PinPortal
365  *		Protect a portal from dropping.
366  *
367  * A pinned portal is still unpinned and dropped at transaction or
368  * subtransaction abort.
369  */
370 void
PinPortal(Portal portal)371 PinPortal(Portal portal)
372 {
373 	if (portal->portalPinned)
374 		elog(ERROR, "portal already pinned");
375 
376 	portal->portalPinned = true;
377 }
378 
379 void
UnpinPortal(Portal portal)380 UnpinPortal(Portal portal)
381 {
382 	if (!portal->portalPinned)
383 		elog(ERROR, "portal not pinned");
384 
385 	portal->portalPinned = false;
386 }
387 
388 /*
389  * MarkPortalActive
390  *		Transition a portal from READY to ACTIVE state.
391  *
392  * NOTE: never set portal->status = PORTAL_ACTIVE directly; call this instead.
393  */
394 void
MarkPortalActive(Portal portal)395 MarkPortalActive(Portal portal)
396 {
397 	/* For safety, this is a runtime test not just an Assert */
398 	if (portal->status != PORTAL_READY)
399 		ereport(ERROR,
400 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
401 				 errmsg("portal \"%s\" cannot be run", portal->name)));
402 	/* Perform the state transition */
403 	portal->status = PORTAL_ACTIVE;
404 	portal->activeSubid = GetCurrentSubTransactionId();
405 }
406 
407 /*
408  * MarkPortalDone
409  *		Transition a portal from ACTIVE to DONE state.
410  *
411  * NOTE: never set portal->status = PORTAL_DONE directly; call this instead.
412  */
413 void
MarkPortalDone(Portal portal)414 MarkPortalDone(Portal portal)
415 {
416 	/* Perform the state transition */
417 	Assert(portal->status == PORTAL_ACTIVE);
418 	portal->status = PORTAL_DONE;
419 
420 	/*
421 	 * Allow portalcmds.c to clean up the state it knows about.  We might as
422 	 * well do that now, since the portal can't be executed any more.
423 	 *
424 	 * In some cases involving execution of a ROLLBACK command in an already
425 	 * aborted transaction, this is necessary, or we'd reach AtCleanup_Portals
426 	 * with the cleanup hook still unexecuted.
427 	 */
428 	if (PointerIsValid(portal->cleanup))
429 	{
430 		portal->cleanup(portal);
431 		portal->cleanup = NULL;
432 	}
433 }
434 
435 /*
436  * MarkPortalFailed
437  *		Transition a portal into FAILED state.
438  *
439  * NOTE: never set portal->status = PORTAL_FAILED directly; call this instead.
440  */
441 void
MarkPortalFailed(Portal portal)442 MarkPortalFailed(Portal portal)
443 {
444 	/* Perform the state transition */
445 	Assert(portal->status != PORTAL_DONE);
446 	portal->status = PORTAL_FAILED;
447 
448 	/*
449 	 * Allow portalcmds.c to clean up the state it knows about.  We might as
450 	 * well do that now, since the portal can't be executed any more.
451 	 *
452 	 * In some cases involving cleanup of an already aborted transaction, this
453 	 * is necessary, or we'd reach AtCleanup_Portals with the cleanup hook
454 	 * still unexecuted.
455 	 */
456 	if (PointerIsValid(portal->cleanup))
457 	{
458 		portal->cleanup(portal);
459 		portal->cleanup = NULL;
460 	}
461 }
462 
463 /*
464  * PortalDrop
465  *		Destroy the portal.
466  */
467 void
PortalDrop(Portal portal,bool isTopCommit)468 PortalDrop(Portal portal, bool isTopCommit)
469 {
470 	AssertArg(PortalIsValid(portal));
471 
472 	/*
473 	 * Don't allow dropping a pinned portal, it's still needed by whoever
474 	 * pinned it.
475 	 */
476 	if (portal->portalPinned)
477 		ereport(ERROR,
478 				(errcode(ERRCODE_INVALID_CURSOR_STATE),
479 				 errmsg("cannot drop pinned portal \"%s\"", portal->name)));
480 
481 	/*
482 	 * Not sure if the PORTAL_ACTIVE case can validly happen or not...
483 	 */
484 	if (portal->status == PORTAL_ACTIVE)
485 		ereport(ERROR,
486 				(errcode(ERRCODE_INVALID_CURSOR_STATE),
487 				 errmsg("cannot drop active portal \"%s\"", portal->name)));
488 
489 	/*
490 	 * Allow portalcmds.c to clean up the state it knows about, in particular
491 	 * shutting down the executor if still active.  This step potentially runs
492 	 * user-defined code so failure has to be expected.  It's the cleanup
493 	 * hook's responsibility to not try to do that more than once, in the case
494 	 * that failure occurs and then we come back to drop the portal again
495 	 * during transaction abort.
496 	 *
497 	 * Note: in most paths of control, this will have been done already in
498 	 * MarkPortalDone or MarkPortalFailed.  We're just making sure.
499 	 */
500 	if (PointerIsValid(portal->cleanup))
501 	{
502 		portal->cleanup(portal);
503 		portal->cleanup = NULL;
504 	}
505 
506 	/* There shouldn't be an active snapshot anymore, except after error */
507 	Assert(portal->portalSnapshot == NULL || !isTopCommit);
508 
509 	/*
510 	 * Remove portal from hash table.  Because we do this here, we will not
511 	 * come back to try to remove the portal again if there's any error in the
512 	 * subsequent steps.  Better to leak a little memory than to get into an
513 	 * infinite error-recovery loop.
514 	 */
515 	PortalHashTableDelete(portal);
516 
517 	/* drop cached plan reference, if any */
518 	PortalReleaseCachedPlan(portal);
519 
520 	/*
521 	 * If portal has a snapshot protecting its data, release that.  This needs
522 	 * a little care since the registration will be attached to the portal's
523 	 * resowner; if the portal failed, we will already have released the
524 	 * resowner (and the snapshot) during transaction abort.
525 	 */
526 	if (portal->holdSnapshot)
527 	{
528 		if (portal->resowner)
529 			UnregisterSnapshotFromOwner(portal->holdSnapshot,
530 										portal->resowner);
531 		portal->holdSnapshot = NULL;
532 	}
533 
534 	/*
535 	 * Release any resources still attached to the portal.  There are several
536 	 * cases being covered here:
537 	 *
538 	 * Top transaction commit (indicated by isTopCommit): normally we should
539 	 * do nothing here and let the regular end-of-transaction resource
540 	 * releasing mechanism handle these resources too.  However, if we have a
541 	 * FAILED portal (eg, a cursor that got an error), we'd better clean up
542 	 * its resources to avoid resource-leakage warning messages.
543 	 *
544 	 * Sub transaction commit: never comes here at all, since we don't kill
545 	 * any portals in AtSubCommit_Portals().
546 	 *
547 	 * Main or sub transaction abort: we will do nothing here because
548 	 * portal->resowner was already set NULL; the resources were already
549 	 * cleaned up in transaction abort.
550 	 *
551 	 * Ordinary portal drop: must release resources.  However, if the portal
552 	 * is not FAILED then we do not release its locks.  The locks become the
553 	 * responsibility of the transaction's ResourceOwner (since it is the
554 	 * parent of the portal's owner) and will be released when the transaction
555 	 * eventually ends.
556 	 */
557 	if (portal->resowner &&
558 		(!isTopCommit || portal->status == PORTAL_FAILED))
559 	{
560 		bool		isCommit = (portal->status != PORTAL_FAILED);
561 
562 		ResourceOwnerRelease(portal->resowner,
563 							 RESOURCE_RELEASE_BEFORE_LOCKS,
564 							 isCommit, false);
565 		ResourceOwnerRelease(portal->resowner,
566 							 RESOURCE_RELEASE_LOCKS,
567 							 isCommit, false);
568 		ResourceOwnerRelease(portal->resowner,
569 							 RESOURCE_RELEASE_AFTER_LOCKS,
570 							 isCommit, false);
571 		ResourceOwnerDelete(portal->resowner);
572 	}
573 	portal->resowner = NULL;
574 
575 	/*
576 	 * Delete tuplestore if present.  We should do this even under error
577 	 * conditions; since the tuplestore would have been using cross-
578 	 * transaction storage, its temp files need to be explicitly deleted.
579 	 */
580 	if (portal->holdStore)
581 	{
582 		MemoryContext oldcontext;
583 
584 		oldcontext = MemoryContextSwitchTo(portal->holdContext);
585 		tuplestore_end(portal->holdStore);
586 		MemoryContextSwitchTo(oldcontext);
587 		portal->holdStore = NULL;
588 	}
589 
590 	/* delete tuplestore storage, if any */
591 	if (portal->holdContext)
592 		MemoryContextDelete(portal->holdContext);
593 
594 	/* release subsidiary storage */
595 	MemoryContextDelete(portal->portalContext);
596 
597 	/* release portal struct (it's in TopPortalContext) */
598 	pfree(portal);
599 }
600 
601 /*
602  * Delete all declared cursors.
603  *
604  * Used by commands: CLOSE ALL, DISCARD ALL
605  */
606 void
PortalHashTableDeleteAll(void)607 PortalHashTableDeleteAll(void)
608 {
609 	HASH_SEQ_STATUS status;
610 	PortalHashEnt *hentry;
611 
612 	if (PortalHashTable == NULL)
613 		return;
614 
615 	hash_seq_init(&status, PortalHashTable);
616 	while ((hentry = hash_seq_search(&status)) != NULL)
617 	{
618 		Portal		portal = hentry->portal;
619 
620 		/* Can't close the active portal (the one running the command) */
621 		if (portal->status == PORTAL_ACTIVE)
622 			continue;
623 
624 		PortalDrop(portal, false);
625 
626 		/* Restart the iteration in case that led to other drops */
627 		hash_seq_term(&status);
628 		hash_seq_init(&status, PortalHashTable);
629 	}
630 }
631 
632 /*
633  * "Hold" a portal.  Prepare it for access by later transactions.
634  */
635 static void
HoldPortal(Portal portal)636 HoldPortal(Portal portal)
637 {
638 	/*
639 	 * Note that PersistHoldablePortal() must release all resources used by
640 	 * the portal that are local to the creating transaction.
641 	 */
642 	PortalCreateHoldStore(portal);
643 	PersistHoldablePortal(portal);
644 
645 	/* drop cached plan reference, if any */
646 	PortalReleaseCachedPlan(portal);
647 
648 	/*
649 	 * Any resources belonging to the portal will be released in the upcoming
650 	 * transaction-wide cleanup; the portal will no longer have its own
651 	 * resources.
652 	 */
653 	portal->resowner = NULL;
654 
655 	/*
656 	 * Having successfully exported the holdable cursor, mark it as not
657 	 * belonging to this transaction.
658 	 */
659 	portal->createSubid = InvalidSubTransactionId;
660 	portal->activeSubid = InvalidSubTransactionId;
661 	portal->createLevel = 0;
662 }
663 
664 /*
665  * Pre-commit processing for portals.
666  *
667  * Holdable cursors created in this transaction need to be converted to
668  * materialized form, since we are going to close down the executor and
669  * release locks.  Non-holdable portals created in this transaction are
670  * simply removed.  Portals remaining from prior transactions should be
671  * left untouched.
672  *
673  * Returns true if any portals changed state (possibly causing user-defined
674  * code to be run), false if not.
675  */
676 bool
PreCommit_Portals(bool isPrepare)677 PreCommit_Portals(bool isPrepare)
678 {
679 	bool		result = false;
680 	HASH_SEQ_STATUS status;
681 	PortalHashEnt *hentry;
682 
683 	hash_seq_init(&status, PortalHashTable);
684 
685 	while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL)
686 	{
687 		Portal		portal = hentry->portal;
688 
689 		/*
690 		 * There should be no pinned portals anymore. Complain if someone
691 		 * leaked one. Auto-held portals are allowed; we assume that whoever
692 		 * pinned them is managing them.
693 		 */
694 		if (portal->portalPinned && !portal->autoHeld)
695 			elog(ERROR, "cannot commit while a portal is pinned");
696 
697 		/*
698 		 * Do not touch active portals --- this can only happen in the case of
699 		 * a multi-transaction utility command, such as VACUUM, or a commit in
700 		 * a procedure.
701 		 *
702 		 * Note however that any resource owner attached to such a portal is
703 		 * still going to go away, so don't leave a dangling pointer.  Also
704 		 * unregister any snapshots held by the portal, mainly to avoid
705 		 * snapshot leak warnings from ResourceOwnerRelease().
706 		 */
707 		if (portal->status == PORTAL_ACTIVE)
708 		{
709 			if (portal->holdSnapshot)
710 			{
711 				if (portal->resowner)
712 					UnregisterSnapshotFromOwner(portal->holdSnapshot,
713 												portal->resowner);
714 				portal->holdSnapshot = NULL;
715 			}
716 			portal->resowner = NULL;
717 			/* Clear portalSnapshot too, for cleanliness */
718 			portal->portalSnapshot = NULL;
719 			continue;
720 		}
721 
722 		/* Is it a holdable portal created in the current xact? */
723 		if ((portal->cursorOptions & CURSOR_OPT_HOLD) &&
724 			portal->createSubid != InvalidSubTransactionId &&
725 			portal->status == PORTAL_READY)
726 		{
727 			/*
728 			 * We are exiting the transaction that created a holdable cursor.
729 			 * Instead of dropping the portal, prepare it for access by later
730 			 * transactions.
731 			 *
732 			 * However, if this is PREPARE TRANSACTION rather than COMMIT,
733 			 * refuse PREPARE, because the semantics seem pretty unclear.
734 			 */
735 			if (isPrepare)
736 				ereport(ERROR,
737 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
738 						 errmsg("cannot PREPARE a transaction that has created a cursor WITH HOLD")));
739 
740 			HoldPortal(portal);
741 
742 			/* Report we changed state */
743 			result = true;
744 		}
745 		else if (portal->createSubid == InvalidSubTransactionId)
746 		{
747 			/*
748 			 * Do nothing to cursors held over from a previous transaction
749 			 * (including ones we just froze in a previous cycle of this loop)
750 			 */
751 			continue;
752 		}
753 		else
754 		{
755 			/* Zap all non-holdable portals */
756 			PortalDrop(portal, true);
757 
758 			/* Report we changed state */
759 			result = true;
760 		}
761 
762 		/*
763 		 * After either freezing or dropping a portal, we have to restart the
764 		 * iteration, because we could have invoked user-defined code that
765 		 * caused a drop of the next portal in the hash chain.
766 		 */
767 		hash_seq_term(&status);
768 		hash_seq_init(&status, PortalHashTable);
769 	}
770 
771 	return result;
772 }
773 
774 /*
775  * Abort processing for portals.
776  *
777  * At this point we run the cleanup hook if present, but we can't release the
778  * portal's memory until the cleanup call.
779  */
780 void
AtAbort_Portals(void)781 AtAbort_Portals(void)
782 {
783 	HASH_SEQ_STATUS status;
784 	PortalHashEnt *hentry;
785 
786 	hash_seq_init(&status, PortalHashTable);
787 
788 	while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL)
789 	{
790 		Portal		portal = hentry->portal;
791 
792 		/*
793 		 * When elog(FATAL) is progress, we need to set the active portal to
794 		 * failed, so that PortalCleanup() doesn't run the executor shutdown.
795 		 */
796 		if (portal->status == PORTAL_ACTIVE && shmem_exit_inprogress)
797 			MarkPortalFailed(portal);
798 
799 		/*
800 		 * Do nothing else to cursors held over from a previous transaction.
801 		 */
802 		if (portal->createSubid == InvalidSubTransactionId)
803 			continue;
804 
805 		/*
806 		 * Do nothing to auto-held cursors.  This is similar to the case of a
807 		 * cursor from a previous transaction, but it could also be that the
808 		 * cursor was auto-held in this transaction, so it wants to live on.
809 		 */
810 		if (portal->autoHeld)
811 			continue;
812 
813 		/*
814 		 * If it was created in the current transaction, we can't do normal
815 		 * shutdown on a READY portal either; it might refer to objects
816 		 * created in the failed transaction.  See comments in
817 		 * AtSubAbort_Portals.
818 		 */
819 		if (portal->status == PORTAL_READY)
820 			MarkPortalFailed(portal);
821 
822 		/*
823 		 * Allow portalcmds.c to clean up the state it knows about, if we
824 		 * haven't already.
825 		 */
826 		if (PointerIsValid(portal->cleanup))
827 		{
828 			portal->cleanup(portal);
829 			portal->cleanup = NULL;
830 		}
831 
832 		/* drop cached plan reference, if any */
833 		PortalReleaseCachedPlan(portal);
834 
835 		/*
836 		 * Any resources belonging to the portal will be released in the
837 		 * upcoming transaction-wide cleanup; they will be gone before we run
838 		 * PortalDrop.
839 		 */
840 		portal->resowner = NULL;
841 
842 		/*
843 		 * Although we can't delete the portal data structure proper, we can
844 		 * release any memory in subsidiary contexts, such as executor state.
845 		 * The cleanup hook was the last thing that might have needed data
846 		 * there.  But leave active portals alone.
847 		 */
848 		if (portal->status != PORTAL_ACTIVE)
849 			MemoryContextDeleteChildren(portal->portalContext);
850 	}
851 }
852 
853 /*
854  * Post-abort cleanup for portals.
855  *
856  * Delete all portals not held over from prior transactions.  */
857 void
AtCleanup_Portals(void)858 AtCleanup_Portals(void)
859 {
860 	HASH_SEQ_STATUS status;
861 	PortalHashEnt *hentry;
862 
863 	hash_seq_init(&status, PortalHashTable);
864 
865 	while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL)
866 	{
867 		Portal		portal = hentry->portal;
868 
869 		/*
870 		 * Do not touch active portals --- this can only happen in the case of
871 		 * a multi-transaction command.
872 		 */
873 		if (portal->status == PORTAL_ACTIVE)
874 			continue;
875 
876 		/*
877 		 * Do nothing to cursors held over from a previous transaction or
878 		 * auto-held ones.
879 		 */
880 		if (portal->createSubid == InvalidSubTransactionId || portal->autoHeld)
881 		{
882 			Assert(portal->status != PORTAL_ACTIVE);
883 			Assert(portal->resowner == NULL);
884 			continue;
885 		}
886 
887 		/*
888 		 * If a portal is still pinned, forcibly unpin it. PortalDrop will not
889 		 * let us drop the portal otherwise. Whoever pinned the portal was
890 		 * interrupted by the abort too and won't try to use it anymore.
891 		 */
892 		if (portal->portalPinned)
893 			portal->portalPinned = false;
894 
895 		/*
896 		 * We had better not call any user-defined code during cleanup, so if
897 		 * the cleanup hook hasn't been run yet, too bad; we'll just skip it.
898 		 */
899 		if (PointerIsValid(portal->cleanup))
900 		{
901 			elog(WARNING, "skipping cleanup for portal \"%s\"", portal->name);
902 			portal->cleanup = NULL;
903 		}
904 
905 		/* Zap it. */
906 		PortalDrop(portal, false);
907 	}
908 }
909 
910 /*
911  * Portal-related cleanup when we return to the main loop on error.
912  *
913  * This is different from the cleanup at transaction abort.  Auto-held portals
914  * are cleaned up on error but not on transaction abort.
915  */
916 void
PortalErrorCleanup(void)917 PortalErrorCleanup(void)
918 {
919 	HASH_SEQ_STATUS status;
920 	PortalHashEnt *hentry;
921 
922 	hash_seq_init(&status, PortalHashTable);
923 
924 	while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL)
925 	{
926 		Portal		portal = hentry->portal;
927 
928 		if (portal->autoHeld)
929 		{
930 			portal->portalPinned = false;
931 			PortalDrop(portal, false);
932 		}
933 	}
934 }
935 
936 /*
937  * Pre-subcommit processing for portals.
938  *
939  * Reassign portals created or used in the current subtransaction to the
940  * parent subtransaction.
941  */
942 void
AtSubCommit_Portals(SubTransactionId mySubid,SubTransactionId parentSubid,int parentLevel,ResourceOwner parentXactOwner)943 AtSubCommit_Portals(SubTransactionId mySubid,
944 					SubTransactionId parentSubid,
945 					int parentLevel,
946 					ResourceOwner parentXactOwner)
947 {
948 	HASH_SEQ_STATUS status;
949 	PortalHashEnt *hentry;
950 
951 	hash_seq_init(&status, PortalHashTable);
952 
953 	while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL)
954 	{
955 		Portal		portal = hentry->portal;
956 
957 		if (portal->createSubid == mySubid)
958 		{
959 			portal->createSubid = parentSubid;
960 			portal->createLevel = parentLevel;
961 			if (portal->resowner)
962 				ResourceOwnerNewParent(portal->resowner, parentXactOwner);
963 		}
964 		if (portal->activeSubid == mySubid)
965 			portal->activeSubid = parentSubid;
966 	}
967 }
968 
969 /*
970  * Subtransaction abort handling for portals.
971  *
972  * Deactivate portals created or used during the failed subtransaction.
973  * Note that per AtSubCommit_Portals, this will catch portals created/used
974  * in descendants of the subtransaction too.
975  *
976  * We don't destroy any portals here; that's done in AtSubCleanup_Portals.
977  */
978 void
AtSubAbort_Portals(SubTransactionId mySubid,SubTransactionId parentSubid,ResourceOwner myXactOwner,ResourceOwner parentXactOwner)979 AtSubAbort_Portals(SubTransactionId mySubid,
980 				   SubTransactionId parentSubid,
981 				   ResourceOwner myXactOwner,
982 				   ResourceOwner parentXactOwner)
983 {
984 	HASH_SEQ_STATUS status;
985 	PortalHashEnt *hentry;
986 
987 	hash_seq_init(&status, PortalHashTable);
988 
989 	while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL)
990 	{
991 		Portal		portal = hentry->portal;
992 
993 		/* Was it created in this subtransaction? */
994 		if (portal->createSubid != mySubid)
995 		{
996 			/* No, but maybe it was used in this subtransaction? */
997 			if (portal->activeSubid == mySubid)
998 			{
999 				/* Maintain activeSubid until the portal is removed */
1000 				portal->activeSubid = parentSubid;
1001 
1002 				/*
1003 				 * A MarkPortalActive() caller ran an upper-level portal in
1004 				 * this subtransaction and left the portal ACTIVE.  This can't
1005 				 * happen, but force the portal into FAILED state for the same
1006 				 * reasons discussed below.
1007 				 *
1008 				 * We assume we can get away without forcing upper-level READY
1009 				 * portals to fail, even if they were run and then suspended.
1010 				 * In theory a suspended upper-level portal could have
1011 				 * acquired some references to objects that are about to be
1012 				 * destroyed, but there should be sufficient defenses against
1013 				 * such cases: the portal's original query cannot contain such
1014 				 * references, and any references within, say, cached plans of
1015 				 * PL/pgSQL functions are not from active queries and should
1016 				 * be protected by revalidation logic.
1017 				 */
1018 				if (portal->status == PORTAL_ACTIVE)
1019 					MarkPortalFailed(portal);
1020 
1021 				/*
1022 				 * Also, if we failed it during the current subtransaction
1023 				 * (either just above, or earlier), reattach its resource
1024 				 * owner to the current subtransaction's resource owner, so
1025 				 * that any resources it still holds will be released while
1026 				 * cleaning up this subtransaction.  This prevents some corner
1027 				 * cases wherein we might get Asserts or worse while cleaning
1028 				 * up objects created during the current subtransaction
1029 				 * (because they're still referenced within this portal).
1030 				 */
1031 				if (portal->status == PORTAL_FAILED && portal->resowner)
1032 				{
1033 					ResourceOwnerNewParent(portal->resowner, myXactOwner);
1034 					portal->resowner = NULL;
1035 				}
1036 			}
1037 			/* Done if it wasn't created in this subtransaction */
1038 			continue;
1039 		}
1040 
1041 		/*
1042 		 * Force any live portals of my own subtransaction into FAILED state.
1043 		 * We have to do this because they might refer to objects created or
1044 		 * changed in the failed subtransaction, leading to crashes within
1045 		 * ExecutorEnd when portalcmds.c tries to close down the portal.
1046 		 * Currently, every MarkPortalActive() caller ensures it updates the
1047 		 * portal status again before relinquishing control, so ACTIVE can't
1048 		 * happen here.  If it does happen, dispose the portal like existing
1049 		 * MarkPortalActive() callers would.
1050 		 */
1051 		if (portal->status == PORTAL_READY ||
1052 			portal->status == PORTAL_ACTIVE)
1053 			MarkPortalFailed(portal);
1054 
1055 		/*
1056 		 * Allow portalcmds.c to clean up the state it knows about, if we
1057 		 * haven't already.
1058 		 */
1059 		if (PointerIsValid(portal->cleanup))
1060 		{
1061 			portal->cleanup(portal);
1062 			portal->cleanup = NULL;
1063 		}
1064 
1065 		/* drop cached plan reference, if any */
1066 		PortalReleaseCachedPlan(portal);
1067 
1068 		/*
1069 		 * Any resources belonging to the portal will be released in the
1070 		 * upcoming transaction-wide cleanup; they will be gone before we run
1071 		 * PortalDrop.
1072 		 */
1073 		portal->resowner = NULL;
1074 
1075 		/*
1076 		 * Although we can't delete the portal data structure proper, we can
1077 		 * release any memory in subsidiary contexts, such as executor state.
1078 		 * The cleanup hook was the last thing that might have needed data
1079 		 * there.
1080 		 */
1081 		MemoryContextDeleteChildren(portal->portalContext);
1082 	}
1083 }
1084 
1085 /*
1086  * Post-subabort cleanup for portals.
1087  *
1088  * Drop all portals created in the failed subtransaction (but note that
1089  * we will not drop any that were reassigned to the parent above).
1090  */
1091 void
AtSubCleanup_Portals(SubTransactionId mySubid)1092 AtSubCleanup_Portals(SubTransactionId mySubid)
1093 {
1094 	HASH_SEQ_STATUS status;
1095 	PortalHashEnt *hentry;
1096 
1097 	hash_seq_init(&status, PortalHashTable);
1098 
1099 	while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL)
1100 	{
1101 		Portal		portal = hentry->portal;
1102 
1103 		if (portal->createSubid != mySubid)
1104 			continue;
1105 
1106 		/*
1107 		 * If a portal is still pinned, forcibly unpin it. PortalDrop will not
1108 		 * let us drop the portal otherwise. Whoever pinned the portal was
1109 		 * interrupted by the abort too and won't try to use it anymore.
1110 		 */
1111 		if (portal->portalPinned)
1112 			portal->portalPinned = false;
1113 
1114 		/*
1115 		 * We had better not call any user-defined code during cleanup, so if
1116 		 * the cleanup hook hasn't been run yet, too bad; we'll just skip it.
1117 		 */
1118 		if (PointerIsValid(portal->cleanup))
1119 		{
1120 			elog(WARNING, "skipping cleanup for portal \"%s\"", portal->name);
1121 			portal->cleanup = NULL;
1122 		}
1123 
1124 		/* Zap it. */
1125 		PortalDrop(portal, false);
1126 	}
1127 }
1128 
1129 /* Find all available cursors */
1130 Datum
pg_cursor(PG_FUNCTION_ARGS)1131 pg_cursor(PG_FUNCTION_ARGS)
1132 {
1133 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1134 	TupleDesc	tupdesc;
1135 	Tuplestorestate *tupstore;
1136 	MemoryContext per_query_ctx;
1137 	MemoryContext oldcontext;
1138 	HASH_SEQ_STATUS hash_seq;
1139 	PortalHashEnt *hentry;
1140 
1141 	/* check to see if caller supports us returning a tuplestore */
1142 	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1143 		ereport(ERROR,
1144 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1145 				 errmsg("set-valued function called in context that cannot accept a set")));
1146 	if (!(rsinfo->allowedModes & SFRM_Materialize))
1147 		ereport(ERROR,
1148 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1149 				 errmsg("materialize mode required, but it is not allowed in this context")));
1150 
1151 	/* need to build tuplestore in query context */
1152 	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1153 	oldcontext = MemoryContextSwitchTo(per_query_ctx);
1154 
1155 	/*
1156 	 * build tupdesc for result tuples. This must match the definition of the
1157 	 * pg_cursors view in system_views.sql
1158 	 */
1159 	tupdesc = CreateTemplateTupleDesc(6);
1160 	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "name",
1161 					   TEXTOID, -1, 0);
1162 	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "statement",
1163 					   TEXTOID, -1, 0);
1164 	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "is_holdable",
1165 					   BOOLOID, -1, 0);
1166 	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "is_binary",
1167 					   BOOLOID, -1, 0);
1168 	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "is_scrollable",
1169 					   BOOLOID, -1, 0);
1170 	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "creation_time",
1171 					   TIMESTAMPTZOID, -1, 0);
1172 
1173 	/*
1174 	 * We put all the tuples into a tuplestore in one scan of the hashtable.
1175 	 * This avoids any issue of the hashtable possibly changing between calls.
1176 	 */
1177 	tupstore =
1178 		tuplestore_begin_heap(rsinfo->allowedModes & SFRM_Materialize_Random,
1179 							  false, work_mem);
1180 
1181 	/* generate junk in short-term context */
1182 	MemoryContextSwitchTo(oldcontext);
1183 
1184 	hash_seq_init(&hash_seq, PortalHashTable);
1185 	while ((hentry = hash_seq_search(&hash_seq)) != NULL)
1186 	{
1187 		Portal		portal = hentry->portal;
1188 		Datum		values[6];
1189 		bool		nulls[6];
1190 
1191 		/* report only "visible" entries */
1192 		if (!portal->visible)
1193 			continue;
1194 
1195 		MemSet(nulls, 0, sizeof(nulls));
1196 
1197 		values[0] = CStringGetTextDatum(portal->name);
1198 		values[1] = CStringGetTextDatum(portal->sourceText);
1199 		values[2] = BoolGetDatum(portal->cursorOptions & CURSOR_OPT_HOLD);
1200 		values[3] = BoolGetDatum(portal->cursorOptions & CURSOR_OPT_BINARY);
1201 		values[4] = BoolGetDatum(portal->cursorOptions & CURSOR_OPT_SCROLL);
1202 		values[5] = TimestampTzGetDatum(portal->creation_time);
1203 
1204 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1205 	}
1206 
1207 	/* clean up and return the tuplestore */
1208 	tuplestore_donestoring(tupstore);
1209 
1210 	rsinfo->returnMode = SFRM_Materialize;
1211 	rsinfo->setResult = tupstore;
1212 	rsinfo->setDesc = tupdesc;
1213 
1214 	return (Datum) 0;
1215 }
1216 
1217 bool
ThereAreNoReadyPortals(void)1218 ThereAreNoReadyPortals(void)
1219 {
1220 	HASH_SEQ_STATUS status;
1221 	PortalHashEnt *hentry;
1222 
1223 	hash_seq_init(&status, PortalHashTable);
1224 
1225 	while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL)
1226 	{
1227 		Portal		portal = hentry->portal;
1228 
1229 		if (portal->status == PORTAL_READY)
1230 			return false;
1231 	}
1232 
1233 	return true;
1234 }
1235 
1236 /*
1237  * Hold all pinned portals.
1238  *
1239  * When initiating a COMMIT or ROLLBACK inside a procedure, this must be
1240  * called to protect internally-generated cursors from being dropped during
1241  * the transaction shutdown.  Currently, SPI calls this automatically; PLs
1242  * that initiate COMMIT or ROLLBACK some other way are on the hook to do it
1243  * themselves.  (Note that we couldn't do this in, say, AtAbort_Portals
1244  * because we need to run user-defined code while persisting a portal.
1245  * It's too late to do that once transaction abort has started.)
1246  *
1247  * We protect such portals by converting them to held cursors.  We mark them
1248  * as "auto-held" so that exception exit knows to clean them up.  (In normal,
1249  * non-exception code paths, the PL needs to clean such portals itself, since
1250  * transaction end won't do it anymore; but that should be normal practice
1251  * anyway.)
1252  */
1253 void
HoldPinnedPortals(void)1254 HoldPinnedPortals(void)
1255 {
1256 	HASH_SEQ_STATUS status;
1257 	PortalHashEnt *hentry;
1258 
1259 	hash_seq_init(&status, PortalHashTable);
1260 
1261 	while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL)
1262 	{
1263 		Portal		portal = hentry->portal;
1264 
1265 		if (portal->portalPinned && !portal->autoHeld)
1266 		{
1267 			/*
1268 			 * Doing transaction control, especially abort, inside a cursor
1269 			 * loop that is not read-only, for example using UPDATE ...
1270 			 * RETURNING, has weird semantics issues.  Also, this
1271 			 * implementation wouldn't work, because such portals cannot be
1272 			 * held.  (The core grammar enforces that only SELECT statements
1273 			 * can drive a cursor, but for example PL/pgSQL does not restrict
1274 			 * it.)
1275 			 */
1276 			if (portal->strategy != PORTAL_ONE_SELECT)
1277 				ereport(ERROR,
1278 						(errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION),
1279 						 errmsg("cannot perform transaction commands inside a cursor loop that is not read-only")));
1280 
1281 			/* Verify it's in a suitable state to be held */
1282 			if (portal->status != PORTAL_READY)
1283 				elog(ERROR, "pinned portal is not ready to be auto-held");
1284 
1285 			HoldPortal(portal);
1286 			portal->autoHeld = true;
1287 		}
1288 	}
1289 }
1290 
1291 /*
1292  * Drop the outer active snapshots for all portals, so that no snapshots
1293  * remain active.
1294  *
1295  * Like HoldPinnedPortals, this must be called when initiating a COMMIT or
1296  * ROLLBACK inside a procedure.  This has to be separate from that since it
1297  * should not be run until we're done with steps that are likely to fail.
1298  *
1299  * It's tempting to fold this into PreCommit_Portals, but to do so, we'd
1300  * need to clean up snapshot management in VACUUM and perhaps other places.
1301  */
1302 void
ForgetPortalSnapshots(void)1303 ForgetPortalSnapshots(void)
1304 {
1305 	HASH_SEQ_STATUS status;
1306 	PortalHashEnt *hentry;
1307 	int			numPortalSnaps = 0;
1308 	int			numActiveSnaps = 0;
1309 
1310 	/* First, scan PortalHashTable and clear portalSnapshot fields */
1311 	hash_seq_init(&status, PortalHashTable);
1312 
1313 	while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL)
1314 	{
1315 		Portal		portal = hentry->portal;
1316 
1317 		if (portal->portalSnapshot != NULL)
1318 		{
1319 			portal->portalSnapshot = NULL;
1320 			numPortalSnaps++;
1321 		}
1322 		/* portal->holdSnapshot will be cleaned up in PreCommit_Portals */
1323 	}
1324 
1325 	/*
1326 	 * Now, pop all the active snapshots, which should be just those that were
1327 	 * portal snapshots.  Ideally we'd drive this directly off the portal
1328 	 * scan, but there's no good way to visit the portals in the correct
1329 	 * order.  So just cross-check after the fact.
1330 	 */
1331 	while (ActiveSnapshotSet())
1332 	{
1333 		PopActiveSnapshot();
1334 		numActiveSnaps++;
1335 	}
1336 
1337 	if (numPortalSnaps != numActiveSnaps)
1338 		elog(ERROR, "portal snapshots (%d) did not account for all active snapshots (%d)",
1339 			 numPortalSnaps, numActiveSnaps);
1340 }
1341