1 /*-------------------------------------------------------------------------
2 *
3 * execAsync.c
4 * Support routines for asynchronous execution
5 *
6 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/executor/execAsync.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15 #include "postgres.h"
16
17 #include "executor/execAsync.h"
18 #include "executor/executor.h"
19 #include "executor/nodeAppend.h"
20 #include "executor/nodeForeignscan.h"
21
22 /*
23 * Asynchronously request a tuple from a designed async-capable node.
24 */
25 void
ExecAsyncRequest(AsyncRequest * areq)26 ExecAsyncRequest(AsyncRequest *areq)
27 {
28 if (areq->requestee->chgParam != NULL) /* something changed? */
29 ExecReScan(areq->requestee); /* let ReScan handle this */
30
31 /* must provide our own instrumentation support */
32 if (areq->requestee->instrument)
33 InstrStartNode(areq->requestee->instrument);
34
35 switch (nodeTag(areq->requestee))
36 {
37 case T_ForeignScanState:
38 ExecAsyncForeignScanRequest(areq);
39 break;
40 default:
41 /* If the node doesn't support async, caller messed up. */
42 elog(ERROR, "unrecognized node type: %d",
43 (int) nodeTag(areq->requestee));
44 }
45
46 ExecAsyncResponse(areq);
47
48 /* must provide our own instrumentation support */
49 if (areq->requestee->instrument)
50 InstrStopNode(areq->requestee->instrument,
51 TupIsNull(areq->result) ? 0.0 : 1.0);
52 }
53
54 /*
55 * Give the asynchronous node a chance to configure the file descriptor event
56 * for which it wishes to wait. We expect the node-type specific callback to
57 * make a single call of the following form:
58 *
59 * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq);
60 */
61 void
ExecAsyncConfigureWait(AsyncRequest * areq)62 ExecAsyncConfigureWait(AsyncRequest *areq)
63 {
64 /* must provide our own instrumentation support */
65 if (areq->requestee->instrument)
66 InstrStartNode(areq->requestee->instrument);
67
68 switch (nodeTag(areq->requestee))
69 {
70 case T_ForeignScanState:
71 ExecAsyncForeignScanConfigureWait(areq);
72 break;
73 default:
74 /* If the node doesn't support async, caller messed up. */
75 elog(ERROR, "unrecognized node type: %d",
76 (int) nodeTag(areq->requestee));
77 }
78
79 /* must provide our own instrumentation support */
80 if (areq->requestee->instrument)
81 InstrStopNode(areq->requestee->instrument, 0.0);
82 }
83
84 /*
85 * Call the asynchronous node back when a relevant event has occurred.
86 */
87 void
ExecAsyncNotify(AsyncRequest * areq)88 ExecAsyncNotify(AsyncRequest *areq)
89 {
90 /* must provide our own instrumentation support */
91 if (areq->requestee->instrument)
92 InstrStartNode(areq->requestee->instrument);
93
94 switch (nodeTag(areq->requestee))
95 {
96 case T_ForeignScanState:
97 ExecAsyncForeignScanNotify(areq);
98 break;
99 default:
100 /* If the node doesn't support async, caller messed up. */
101 elog(ERROR, "unrecognized node type: %d",
102 (int) nodeTag(areq->requestee));
103 }
104
105 ExecAsyncResponse(areq);
106
107 /* must provide our own instrumentation support */
108 if (areq->requestee->instrument)
109 InstrStopNode(areq->requestee->instrument,
110 TupIsNull(areq->result) ? 0.0 : 1.0);
111 }
112
113 /*
114 * Call the requestor back when an asynchronous node has produced a result.
115 */
116 void
ExecAsyncResponse(AsyncRequest * areq)117 ExecAsyncResponse(AsyncRequest *areq)
118 {
119 switch (nodeTag(areq->requestor))
120 {
121 case T_AppendState:
122 ExecAsyncAppendResponse(areq);
123 break;
124 default:
125 /* If the node doesn't support async, caller messed up. */
126 elog(ERROR, "unrecognized node type: %d",
127 (int) nodeTag(areq->requestor));
128 }
129 }
130
131 /*
132 * A requestee node should call this function to deliver the tuple to its
133 * requestor node. The requestee node can call this from its ExecAsyncRequest
134 * or ExecAsyncNotify callback.
135 */
136 void
ExecAsyncRequestDone(AsyncRequest * areq,TupleTableSlot * result)137 ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result)
138 {
139 areq->request_complete = true;
140 areq->result = result;
141 }
142
143 /*
144 * A requestee node should call this function to indicate that it is pending
145 * for a callback. The requestee node can call this from its ExecAsyncRequest
146 * or ExecAsyncNotify callback.
147 */
148 void
ExecAsyncRequestPending(AsyncRequest * areq)149 ExecAsyncRequestPending(AsyncRequest *areq)
150 {
151 areq->callback_pending = true;
152 areq->request_complete = false;
153 areq->result = NULL;
154 }
155