1 /*-------------------------------------------------------------------------
2  *
3  * pqmq.c
4  *	  Use the frontend/backend protocol for communication over a shm_mq
5  *
6  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *	src/backend/libpq/pqmq.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 
14 #include "postgres.h"
15 
16 #include "libpq/libpq.h"
17 #include "libpq/pqformat.h"
18 #include "libpq/pqmq.h"
19 #include "miscadmin.h"
20 #include "pgstat.h"
21 #include "tcop/tcopprot.h"
22 #include "utils/builtins.h"
23 
24 static shm_mq_handle *pq_mq_handle;
25 static bool pq_mq_busy = false;
26 static pid_t pq_mq_parallel_master_pid = 0;
27 static pid_t pq_mq_parallel_master_backend_id = InvalidBackendId;
28 
29 static void pq_cleanup_redirect_to_shm_mq(dsm_segment *seg, Datum arg);
30 static void mq_comm_reset(void);
31 static int	mq_flush(void);
32 static int	mq_flush_if_writable(void);
33 static bool mq_is_send_pending(void);
34 static int	mq_putmessage(char msgtype, const char *s, size_t len);
35 static void mq_putmessage_noblock(char msgtype, const char *s, size_t len);
36 static void mq_startcopyout(void);
37 static void mq_endcopyout(bool errorAbort);
38 
39 static const PQcommMethods PqCommMqMethods = {
40 	mq_comm_reset,
41 	mq_flush,
42 	mq_flush_if_writable,
43 	mq_is_send_pending,
44 	mq_putmessage,
45 	mq_putmessage_noblock,
46 	mq_startcopyout,
47 	mq_endcopyout
48 };
49 
50 /*
51  * Arrange to redirect frontend/backend protocol messages to a shared-memory
52  * message queue.
53  */
54 void
pq_redirect_to_shm_mq(dsm_segment * seg,shm_mq_handle * mqh)55 pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
56 {
57 	PqCommMethods = &PqCommMqMethods;
58 	pq_mq_handle = mqh;
59 	whereToSendOutput = DestRemote;
60 	FrontendProtocol = PG_PROTOCOL_LATEST;
61 	on_dsm_detach(seg, pq_cleanup_redirect_to_shm_mq, (Datum) 0);
62 }
63 
64 /*
65  * When the DSM that contains our shm_mq goes away, we need to stop sending
66  * messages to it.
67  */
68 static void
pq_cleanup_redirect_to_shm_mq(dsm_segment * seg,Datum arg)69 pq_cleanup_redirect_to_shm_mq(dsm_segment *seg, Datum arg)
70 {
71 	pq_mq_handle = NULL;
72 	whereToSendOutput = DestNone;
73 }
74 
75 /*
76  * Arrange to SendProcSignal() to the parallel master each time we transmit
77  * message data via the shm_mq.
78  */
79 void
pq_set_parallel_master(pid_t pid,BackendId backend_id)80 pq_set_parallel_master(pid_t pid, BackendId backend_id)
81 {
82 	Assert(PqCommMethods == &PqCommMqMethods);
83 	pq_mq_parallel_master_pid = pid;
84 	pq_mq_parallel_master_backend_id = backend_id;
85 }
86 
87 static void
mq_comm_reset(void)88 mq_comm_reset(void)
89 {
90 	/* Nothing to do. */
91 }
92 
93 static int
mq_flush(void)94 mq_flush(void)
95 {
96 	/* Nothing to do. */
97 	return 0;
98 }
99 
100 static int
mq_flush_if_writable(void)101 mq_flush_if_writable(void)
102 {
103 	/* Nothing to do. */
104 	return 0;
105 }
106 
107 static bool
mq_is_send_pending(void)108 mq_is_send_pending(void)
109 {
110 	/* There's never anything pending. */
111 	return 0;
112 }
113 
114 /*
115  * Transmit a libpq protocol message to the shared memory message queue
116  * selected via pq_mq_handle.  We don't include a length word, because the
117  * receiver will know the length of the message from shm_mq_receive().
118  */
119 static int
mq_putmessage(char msgtype,const char * s,size_t len)120 mq_putmessage(char msgtype, const char *s, size_t len)
121 {
122 	shm_mq_iovec iov[2];
123 	shm_mq_result result;
124 
125 	/*
126 	 * If we're sending a message, and we have to wait because the queue is
127 	 * full, and then we get interrupted, and that interrupt results in trying
128 	 * to send another message, we respond by detaching the queue.  There's no
129 	 * way to return to the original context, but even if there were, just
130 	 * queueing the message would amount to indefinitely postponing the
131 	 * response to the interrupt.  So we do this instead.
132 	 */
133 	if (pq_mq_busy)
134 	{
135 		if (pq_mq_handle != NULL)
136 			shm_mq_detach(pq_mq_handle);
137 		pq_mq_handle = NULL;
138 		return EOF;
139 	}
140 
141 	/*
142 	 * If the message queue is already gone, just ignore the message. This
143 	 * doesn't necessarily indicate a problem; for example, DEBUG messages can
144 	 * be generated late in the shutdown sequence, after all DSMs have already
145 	 * been detached.
146 	 */
147 	if (pq_mq_handle == NULL)
148 		return 0;
149 
150 	pq_mq_busy = true;
151 
152 	iov[0].data = &msgtype;
153 	iov[0].len = 1;
154 	iov[1].data = s;
155 	iov[1].len = len;
156 
157 	Assert(pq_mq_handle != NULL);
158 
159 	for (;;)
160 	{
161 		result = shm_mq_sendv(pq_mq_handle, iov, 2, true);
162 
163 		if (pq_mq_parallel_master_pid != 0)
164 			SendProcSignal(pq_mq_parallel_master_pid,
165 						   PROCSIG_PARALLEL_MESSAGE,
166 						   pq_mq_parallel_master_backend_id);
167 
168 		if (result != SHM_MQ_WOULD_BLOCK)
169 			break;
170 
171 		(void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
172 						 WAIT_EVENT_MQ_PUT_MESSAGE);
173 		ResetLatch(MyLatch);
174 		CHECK_FOR_INTERRUPTS();
175 	}
176 
177 	pq_mq_busy = false;
178 
179 	Assert(result == SHM_MQ_SUCCESS || result == SHM_MQ_DETACHED);
180 	if (result != SHM_MQ_SUCCESS)
181 		return EOF;
182 	return 0;
183 }
184 
185 static void
mq_putmessage_noblock(char msgtype,const char * s,size_t len)186 mq_putmessage_noblock(char msgtype, const char *s, size_t len)
187 {
188 	/*
189 	 * While the shm_mq machinery does support sending a message in
190 	 * non-blocking mode, there's currently no way to try sending beginning to
191 	 * send the message that doesn't also commit us to completing the
192 	 * transmission.  This could be improved in the future, but for now we
193 	 * don't need it.
194 	 */
195 	elog(ERROR, "not currently supported");
196 }
197 
198 static void
mq_startcopyout(void)199 mq_startcopyout(void)
200 {
201 	/* Nothing to do. */
202 }
203 
204 static void
mq_endcopyout(bool errorAbort)205 mq_endcopyout(bool errorAbort)
206 {
207 	/* Nothing to do. */
208 }
209 
210 /*
211  * Parse an ErrorResponse or NoticeResponse payload and populate an ErrorData
212  * structure with the results.
213  */
214 void
pq_parse_errornotice(StringInfo msg,ErrorData * edata)215 pq_parse_errornotice(StringInfo msg, ErrorData *edata)
216 {
217 	/* Initialize edata with reasonable defaults. */
218 	MemSet(edata, 0, sizeof(ErrorData));
219 	edata->elevel = ERROR;
220 	edata->assoc_context = CurrentMemoryContext;
221 
222 	/* Loop over fields and extract each one. */
223 	for (;;)
224 	{
225 		char		code = pq_getmsgbyte(msg);
226 		const char *value;
227 
228 		if (code == '\0')
229 		{
230 			pq_getmsgend(msg);
231 			break;
232 		}
233 		value = pq_getmsgrawstring(msg);
234 
235 		switch (code)
236 		{
237 			case PG_DIAG_SEVERITY:
238 				/* ignore, trusting we'll get a nonlocalized version */
239 				break;
240 			case PG_DIAG_SEVERITY_NONLOCALIZED:
241 				if (strcmp(value, "DEBUG") == 0)
242 				{
243 					/*
244 					 * We can't reconstruct the exact DEBUG level, but
245 					 * presumably it was >= client_min_messages, so select
246 					 * DEBUG1 to ensure we'll pass it on to the client.
247 					 */
248 					edata->elevel = DEBUG1;
249 				}
250 				else if (strcmp(value, "LOG") == 0)
251 				{
252 					/*
253 					 * It can't be LOG_SERVER_ONLY, or the worker wouldn't
254 					 * have sent it to us; so LOG is the correct value.
255 					 */
256 					edata->elevel = LOG;
257 				}
258 				else if (strcmp(value, "INFO") == 0)
259 					edata->elevel = INFO;
260 				else if (strcmp(value, "NOTICE") == 0)
261 					edata->elevel = NOTICE;
262 				else if (strcmp(value, "WARNING") == 0)
263 					edata->elevel = WARNING;
264 				else if (strcmp(value, "ERROR") == 0)
265 					edata->elevel = ERROR;
266 				else if (strcmp(value, "FATAL") == 0)
267 					edata->elevel = FATAL;
268 				else if (strcmp(value, "PANIC") == 0)
269 					edata->elevel = PANIC;
270 				else
271 					elog(ERROR, "unrecognized error severity: \"%s\"", value);
272 				break;
273 			case PG_DIAG_SQLSTATE:
274 				if (strlen(value) != 5)
275 					elog(ERROR, "invalid SQLSTATE: \"%s\"", value);
276 				edata->sqlerrcode = MAKE_SQLSTATE(value[0], value[1], value[2],
277 												  value[3], value[4]);
278 				break;
279 			case PG_DIAG_MESSAGE_PRIMARY:
280 				edata->message = pstrdup(value);
281 				break;
282 			case PG_DIAG_MESSAGE_DETAIL:
283 				edata->detail = pstrdup(value);
284 				break;
285 			case PG_DIAG_MESSAGE_HINT:
286 				edata->hint = pstrdup(value);
287 				break;
288 			case PG_DIAG_STATEMENT_POSITION:
289 				edata->cursorpos = pg_strtoint32(value);
290 				break;
291 			case PG_DIAG_INTERNAL_POSITION:
292 				edata->internalpos = pg_strtoint32(value);
293 				break;
294 			case PG_DIAG_INTERNAL_QUERY:
295 				edata->internalquery = pstrdup(value);
296 				break;
297 			case PG_DIAG_CONTEXT:
298 				edata->context = pstrdup(value);
299 				break;
300 			case PG_DIAG_SCHEMA_NAME:
301 				edata->schema_name = pstrdup(value);
302 				break;
303 			case PG_DIAG_TABLE_NAME:
304 				edata->table_name = pstrdup(value);
305 				break;
306 			case PG_DIAG_COLUMN_NAME:
307 				edata->column_name = pstrdup(value);
308 				break;
309 			case PG_DIAG_DATATYPE_NAME:
310 				edata->datatype_name = pstrdup(value);
311 				break;
312 			case PG_DIAG_CONSTRAINT_NAME:
313 				edata->constraint_name = pstrdup(value);
314 				break;
315 			case PG_DIAG_SOURCE_FILE:
316 				edata->filename = pstrdup(value);
317 				break;
318 			case PG_DIAG_SOURCE_LINE:
319 				edata->lineno = pg_strtoint32(value);
320 				break;
321 			case PG_DIAG_SOURCE_FUNCTION:
322 				edata->funcname = pstrdup(value);
323 				break;
324 			default:
325 				elog(ERROR, "unrecognized error field code: %d", (int) code);
326 				break;
327 		}
328 	}
329 }
330