1 /*-------------------------------------------------------------------------
2  *
3  * pqmq.c
4  *	  Use the frontend/backend protocol for communication over a shm_mq
5  *
6  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *	src/backend/libpq/pqmq.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 
14 #include "postgres.h"
15 
16 #include "libpq/libpq.h"
17 #include "libpq/pqformat.h"
18 #include "libpq/pqmq.h"
19 #include "miscadmin.h"
20 #include "tcop/tcopprot.h"
21 #include "utils/builtins.h"
22 
23 static shm_mq *pq_mq;
24 static shm_mq_handle *pq_mq_handle;
25 static bool pq_mq_busy = false;
26 static pid_t pq_mq_parallel_master_pid = 0;
27 static pid_t pq_mq_parallel_master_backend_id = InvalidBackendId;
28 
29 static void pq_cleanup_redirect_to_shm_mq(dsm_segment *seg, Datum arg);
30 static void mq_comm_reset(void);
31 static int	mq_flush(void);
32 static int	mq_flush_if_writable(void);
33 static bool mq_is_send_pending(void);
34 static int	mq_putmessage(char msgtype, const char *s, size_t len);
35 static void mq_putmessage_noblock(char msgtype, const char *s, size_t len);
36 static void mq_startcopyout(void);
37 static void mq_endcopyout(bool errorAbort);
38 
39 static PQcommMethods PqCommMqMethods = {
40 	mq_comm_reset,
41 	mq_flush,
42 	mq_flush_if_writable,
43 	mq_is_send_pending,
44 	mq_putmessage,
45 	mq_putmessage_noblock,
46 	mq_startcopyout,
47 	mq_endcopyout
48 };
49 
50 /*
51  * Arrange to redirect frontend/backend protocol messages to a shared-memory
52  * message queue.
53  */
54 void
pq_redirect_to_shm_mq(dsm_segment * seg,shm_mq_handle * mqh)55 pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
56 {
57 	PqCommMethods = &PqCommMqMethods;
58 	pq_mq = shm_mq_get_queue(mqh);
59 	pq_mq_handle = mqh;
60 	whereToSendOutput = DestRemote;
61 	FrontendProtocol = PG_PROTOCOL_LATEST;
62 	on_dsm_detach(seg, pq_cleanup_redirect_to_shm_mq, (Datum) 0);
63 }
64 
65 /*
66  * When the DSM that contains our shm_mq goes away, we need to stop sending
67  * messages to it.
68  */
69 static void
pq_cleanup_redirect_to_shm_mq(dsm_segment * seg,Datum arg)70 pq_cleanup_redirect_to_shm_mq(dsm_segment *seg, Datum arg)
71 {
72 	pq_mq = NULL;
73 	pq_mq_handle = NULL;
74 	whereToSendOutput = DestNone;
75 }
76 
77 /*
78  * Arrange to SendProcSignal() to the parallel master each time we transmit
79  * message data via the shm_mq.
80  */
81 void
pq_set_parallel_master(pid_t pid,BackendId backend_id)82 pq_set_parallel_master(pid_t pid, BackendId backend_id)
83 {
84 	Assert(PqCommMethods == &PqCommMqMethods);
85 	pq_mq_parallel_master_pid = pid;
86 	pq_mq_parallel_master_backend_id = backend_id;
87 }
88 
89 static void
mq_comm_reset(void)90 mq_comm_reset(void)
91 {
92 	/* Nothing to do. */
93 }
94 
95 static int
mq_flush(void)96 mq_flush(void)
97 {
98 	/* Nothing to do. */
99 	return 0;
100 }
101 
102 static int
mq_flush_if_writable(void)103 mq_flush_if_writable(void)
104 {
105 	/* Nothing to do. */
106 	return 0;
107 }
108 
109 static bool
mq_is_send_pending(void)110 mq_is_send_pending(void)
111 {
112 	/* There's never anything pending. */
113 	return 0;
114 }
115 
116 /*
117  * Transmit a libpq protocol message to the shared memory message queue
118  * selected via pq_mq_handle.  We don't include a length word, because the
119  * receiver will know the length of the message from shm_mq_receive().
120  */
121 static int
mq_putmessage(char msgtype,const char * s,size_t len)122 mq_putmessage(char msgtype, const char *s, size_t len)
123 {
124 	shm_mq_iovec iov[2];
125 	shm_mq_result result;
126 
127 	/*
128 	 * If we're sending a message, and we have to wait because the queue is
129 	 * full, and then we get interrupted, and that interrupt results in trying
130 	 * to send another message, we respond by detaching the queue.  There's no
131 	 * way to return to the original context, but even if there were, just
132 	 * queueing the message would amount to indefinitely postponing the
133 	 * response to the interrupt.  So we do this instead.
134 	 */
135 	if (pq_mq_busy)
136 	{
137 		if (pq_mq != NULL)
138 			shm_mq_detach(pq_mq);
139 		pq_mq = NULL;
140 		pq_mq_handle = NULL;
141 		return EOF;
142 	}
143 
144 	/*
145 	 * If the message queue is already gone, just ignore the message. This
146 	 * doesn't necessarily indicate a problem; for example, DEBUG messages can
147 	 * be generated late in the shutdown sequence, after all DSMs have already
148 	 * been detached.
149 	 */
150 	if (pq_mq == NULL)
151 		return 0;
152 
153 	pq_mq_busy = true;
154 
155 	iov[0].data = &msgtype;
156 	iov[0].len = 1;
157 	iov[1].data = s;
158 	iov[1].len = len;
159 
160 	Assert(pq_mq_handle != NULL);
161 
162 	for (;;)
163 	{
164 		result = shm_mq_sendv(pq_mq_handle, iov, 2, true);
165 
166 		if (pq_mq_parallel_master_pid != 0)
167 			SendProcSignal(pq_mq_parallel_master_pid,
168 						   PROCSIG_PARALLEL_MESSAGE,
169 						   pq_mq_parallel_master_backend_id);
170 
171 		if (result != SHM_MQ_WOULD_BLOCK)
172 			break;
173 
174 		WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0);
175 		ResetLatch(&MyProc->procLatch);
176 		CHECK_FOR_INTERRUPTS();
177 	}
178 
179 	pq_mq_busy = false;
180 
181 	Assert(result == SHM_MQ_SUCCESS || result == SHM_MQ_DETACHED);
182 	if (result != SHM_MQ_SUCCESS)
183 		return EOF;
184 	return 0;
185 }
186 
187 static void
mq_putmessage_noblock(char msgtype,const char * s,size_t len)188 mq_putmessage_noblock(char msgtype, const char *s, size_t len)
189 {
190 	/*
191 	 * While the shm_mq machinery does support sending a message in
192 	 * non-blocking mode, there's currently no way to try sending beginning to
193 	 * send the message that doesn't also commit us to completing the
194 	 * transmission.  This could be improved in the future, but for now we
195 	 * don't need it.
196 	 */
197 	elog(ERROR, "not currently supported");
198 }
199 
200 static void
mq_startcopyout(void)201 mq_startcopyout(void)
202 {
203 	/* Nothing to do. */
204 }
205 
206 static void
mq_endcopyout(bool errorAbort)207 mq_endcopyout(bool errorAbort)
208 {
209 	/* Nothing to do. */
210 }
211 
212 /*
213  * Parse an ErrorResponse or NoticeResponse payload and populate an ErrorData
214  * structure with the results.
215  */
216 void
pq_parse_errornotice(StringInfo msg,ErrorData * edata)217 pq_parse_errornotice(StringInfo msg, ErrorData *edata)
218 {
219 	/* Initialize edata with reasonable defaults. */
220 	MemSet(edata, 0, sizeof(ErrorData));
221 	edata->elevel = ERROR;
222 	edata->assoc_context = CurrentMemoryContext;
223 
224 	/* Loop over fields and extract each one. */
225 	for (;;)
226 	{
227 		char		code = pq_getmsgbyte(msg);
228 		const char *value;
229 
230 		if (code == '\0')
231 		{
232 			pq_getmsgend(msg);
233 			break;
234 		}
235 		value = pq_getmsgrawstring(msg);
236 
237 		switch (code)
238 		{
239 			case PG_DIAG_SEVERITY:
240 				/* ignore, trusting we'll get a nonlocalized version */
241 				break;
242 			case PG_DIAG_SEVERITY_NONLOCALIZED:
243 				if (strcmp(value, "DEBUG") == 0)
244 				{
245 					/*
246 					 * We can't reconstruct the exact DEBUG level, but
247 					 * presumably it was >= client_min_messages, so select
248 					 * DEBUG1 to ensure we'll pass it on to the client.
249 					 */
250 					edata->elevel = DEBUG1;
251 				}
252 				else if (strcmp(value, "LOG") == 0)
253 				{
254 					/*
255 					 * It can't be LOG_SERVER_ONLY, or the worker wouldn't
256 					 * have sent it to us; so LOG is the correct value.
257 					 */
258 					edata->elevel = LOG;
259 				}
260 				else if (strcmp(value, "INFO") == 0)
261 					edata->elevel = INFO;
262 				else if (strcmp(value, "NOTICE") == 0)
263 					edata->elevel = NOTICE;
264 				else if (strcmp(value, "WARNING") == 0)
265 					edata->elevel = WARNING;
266 				else if (strcmp(value, "ERROR") == 0)
267 					edata->elevel = ERROR;
268 				else if (strcmp(value, "FATAL") == 0)
269 					edata->elevel = FATAL;
270 				else if (strcmp(value, "PANIC") == 0)
271 					edata->elevel = PANIC;
272 				else
273 					elog(ERROR, "unrecognized error severity: \"%s\"", value);
274 				break;
275 			case PG_DIAG_SQLSTATE:
276 				if (strlen(value) != 5)
277 					elog(ERROR, "invalid SQLSTATE: \"%s\"", value);
278 				edata->sqlerrcode = MAKE_SQLSTATE(value[0], value[1], value[2],
279 												  value[3], value[4]);
280 				break;
281 			case PG_DIAG_MESSAGE_PRIMARY:
282 				edata->message = pstrdup(value);
283 				break;
284 			case PG_DIAG_MESSAGE_DETAIL:
285 				edata->detail = pstrdup(value);
286 				break;
287 			case PG_DIAG_MESSAGE_HINT:
288 				edata->hint = pstrdup(value);
289 				break;
290 			case PG_DIAG_STATEMENT_POSITION:
291 				edata->cursorpos = pg_atoi(value, sizeof(int), '\0');
292 				break;
293 			case PG_DIAG_INTERNAL_POSITION:
294 				edata->internalpos = pg_atoi(value, sizeof(int), '\0');
295 				break;
296 			case PG_DIAG_INTERNAL_QUERY:
297 				edata->internalquery = pstrdup(value);
298 				break;
299 			case PG_DIAG_CONTEXT:
300 				edata->context = pstrdup(value);
301 				break;
302 			case PG_DIAG_SCHEMA_NAME:
303 				edata->schema_name = pstrdup(value);
304 				break;
305 			case PG_DIAG_TABLE_NAME:
306 				edata->table_name = pstrdup(value);
307 				break;
308 			case PG_DIAG_COLUMN_NAME:
309 				edata->column_name = pstrdup(value);
310 				break;
311 			case PG_DIAG_DATATYPE_NAME:
312 				edata->datatype_name = pstrdup(value);
313 				break;
314 			case PG_DIAG_CONSTRAINT_NAME:
315 				edata->constraint_name = pstrdup(value);
316 				break;
317 			case PG_DIAG_SOURCE_FILE:
318 				edata->filename = pstrdup(value);
319 				break;
320 			case PG_DIAG_SOURCE_LINE:
321 				edata->lineno = pg_atoi(value, sizeof(int), '\0');
322 				break;
323 			case PG_DIAG_SOURCE_FUNCTION:
324 				edata->funcname = pstrdup(value);
325 				break;
326 			default:
327 				elog(ERROR, "unrecognized error field code: %d", (int) code);
328 				break;
329 		}
330 	}
331 }
332