1 /*-------------------------------------------------------------------------
2  *
3  * execAsync.c
4  *	  Support routines for asynchronous execution
5  *
6  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *	  src/backend/executor/execAsync.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "executor/execAsync.h"
18 #include "executor/executor.h"
19 #include "executor/nodeAppend.h"
20 #include "executor/nodeForeignscan.h"
21 
22 /*
23  * Asynchronously request a tuple from a designed async-capable node.
24  */
25 void
ExecAsyncRequest(AsyncRequest * areq)26 ExecAsyncRequest(AsyncRequest *areq)
27 {
28 	if (areq->requestee->chgParam != NULL)	/* something changed? */
29 		ExecReScan(areq->requestee);	/* let ReScan handle this */
30 
31 	/* must provide our own instrumentation support */
32 	if (areq->requestee->instrument)
33 		InstrStartNode(areq->requestee->instrument);
34 
35 	switch (nodeTag(areq->requestee))
36 	{
37 		case T_ForeignScanState:
38 			ExecAsyncForeignScanRequest(areq);
39 			break;
40 		default:
41 			/* If the node doesn't support async, caller messed up. */
42 			elog(ERROR, "unrecognized node type: %d",
43 				 (int) nodeTag(areq->requestee));
44 	}
45 
46 	ExecAsyncResponse(areq);
47 
48 	/* must provide our own instrumentation support */
49 	if (areq->requestee->instrument)
50 		InstrStopNode(areq->requestee->instrument,
51 					  TupIsNull(areq->result) ? 0.0 : 1.0);
52 }
53 
54 /*
55  * Give the asynchronous node a chance to configure the file descriptor event
56  * for which it wishes to wait.  We expect the node-type specific callback to
57  * make a single call of the following form:
58  *
59  * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq);
60  */
61 void
ExecAsyncConfigureWait(AsyncRequest * areq)62 ExecAsyncConfigureWait(AsyncRequest *areq)
63 {
64 	/* must provide our own instrumentation support */
65 	if (areq->requestee->instrument)
66 		InstrStartNode(areq->requestee->instrument);
67 
68 	switch (nodeTag(areq->requestee))
69 	{
70 		case T_ForeignScanState:
71 			ExecAsyncForeignScanConfigureWait(areq);
72 			break;
73 		default:
74 			/* If the node doesn't support async, caller messed up. */
75 			elog(ERROR, "unrecognized node type: %d",
76 				 (int) nodeTag(areq->requestee));
77 	}
78 
79 	/* must provide our own instrumentation support */
80 	if (areq->requestee->instrument)
81 		InstrStopNode(areq->requestee->instrument, 0.0);
82 }
83 
84 /*
85  * Call the asynchronous node back when a relevant event has occurred.
86  */
87 void
ExecAsyncNotify(AsyncRequest * areq)88 ExecAsyncNotify(AsyncRequest *areq)
89 {
90 	/* must provide our own instrumentation support */
91 	if (areq->requestee->instrument)
92 		InstrStartNode(areq->requestee->instrument);
93 
94 	switch (nodeTag(areq->requestee))
95 	{
96 		case T_ForeignScanState:
97 			ExecAsyncForeignScanNotify(areq);
98 			break;
99 		default:
100 			/* If the node doesn't support async, caller messed up. */
101 			elog(ERROR, "unrecognized node type: %d",
102 				 (int) nodeTag(areq->requestee));
103 	}
104 
105 	ExecAsyncResponse(areq);
106 
107 	/* must provide our own instrumentation support */
108 	if (areq->requestee->instrument)
109 		InstrStopNode(areq->requestee->instrument,
110 					  TupIsNull(areq->result) ? 0.0 : 1.0);
111 }
112 
113 /*
114  * Call the requestor back when an asynchronous node has produced a result.
115  */
116 void
ExecAsyncResponse(AsyncRequest * areq)117 ExecAsyncResponse(AsyncRequest *areq)
118 {
119 	switch (nodeTag(areq->requestor))
120 	{
121 		case T_AppendState:
122 			ExecAsyncAppendResponse(areq);
123 			break;
124 		default:
125 			/* If the node doesn't support async, caller messed up. */
126 			elog(ERROR, "unrecognized node type: %d",
127 				 (int) nodeTag(areq->requestor));
128 	}
129 }
130 
131 /*
132  * A requestee node should call this function to deliver the tuple to its
133  * requestor node.  The requestee node can call this from its ExecAsyncRequest
134  * or ExecAsyncNotify callback.
135  */
136 void
ExecAsyncRequestDone(AsyncRequest * areq,TupleTableSlot * result)137 ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result)
138 {
139 	areq->request_complete = true;
140 	areq->result = result;
141 }
142 
143 /*
144  * A requestee node should call this function to indicate that it is pending
145  * for a callback.  The requestee node can call this from its ExecAsyncRequest
146  * or ExecAsyncNotify callback.
147  */
148 void
ExecAsyncRequestPending(AsyncRequest * areq)149 ExecAsyncRequestPending(AsyncRequest *areq)
150 {
151 	areq->callback_pending = true;
152 	areq->request_complete = false;
153 	areq->result = NULL;
154 }
155