1 /*------------------------------------------------------------------------- 2 * 3 * pqmq.c 4 * Use the frontend/backend protocol for communication over a shm_mq 5 * 6 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group 7 * Portions Copyright (c) 1994, Regents of the University of California 8 * 9 * src/backend/libpq/pqmq.c 10 * 11 *------------------------------------------------------------------------- 12 */ 13 14 #include "postgres.h" 15 16 #include "libpq/libpq.h" 17 #include "libpq/pqformat.h" 18 #include "libpq/pqmq.h" 19 #include "miscadmin.h" 20 #include "pgstat.h" 21 #include "tcop/tcopprot.h" 22 #include "utils/builtins.h" 23 24 static shm_mq_handle *pq_mq_handle; 25 static bool pq_mq_busy = false; 26 static pid_t pq_mq_parallel_leader_pid = 0; 27 static pid_t pq_mq_parallel_leader_backend_id = InvalidBackendId; 28 29 static void pq_cleanup_redirect_to_shm_mq(dsm_segment *seg, Datum arg); 30 static void mq_comm_reset(void); 31 static int mq_flush(void); 32 static int mq_flush_if_writable(void); 33 static bool mq_is_send_pending(void); 34 static int mq_putmessage(char msgtype, const char *s, size_t len); 35 static void mq_putmessage_noblock(char msgtype, const char *s, size_t len); 36 37 static const PQcommMethods PqCommMqMethods = { 38 mq_comm_reset, 39 mq_flush, 40 mq_flush_if_writable, 41 mq_is_send_pending, 42 mq_putmessage, 43 mq_putmessage_noblock 44 }; 45 46 /* 47 * Arrange to redirect frontend/backend protocol messages to a shared-memory 48 * message queue. 49 */ 50 void 51 pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh) 52 { 53 PqCommMethods = &PqCommMqMethods; 54 pq_mq_handle = mqh; 55 whereToSendOutput = DestRemote; 56 FrontendProtocol = PG_PROTOCOL_LATEST; 57 on_dsm_detach(seg, pq_cleanup_redirect_to_shm_mq, (Datum) 0); 58 } 59 60 /* 61 * When the DSM that contains our shm_mq goes away, we need to stop sending 62 * messages to it. 63 */ 64 static void 65 pq_cleanup_redirect_to_shm_mq(dsm_segment *seg, Datum arg) 66 { 67 pq_mq_handle = NULL; 68 whereToSendOutput = DestNone; 69 } 70 71 /* 72 * Arrange to SendProcSignal() to the parallel leader each time we transmit 73 * message data via the shm_mq. 74 */ 75 void 76 pq_set_parallel_leader(pid_t pid, BackendId backend_id) 77 { 78 Assert(PqCommMethods == &PqCommMqMethods); 79 pq_mq_parallel_leader_pid = pid; 80 pq_mq_parallel_leader_backend_id = backend_id; 81 } 82 83 static void 84 mq_comm_reset(void) 85 { 86 /* Nothing to do. */ 87 } 88 89 static int 90 mq_flush(void) 91 { 92 /* Nothing to do. */ 93 return 0; 94 } 95 96 static int 97 mq_flush_if_writable(void) 98 { 99 /* Nothing to do. */ 100 return 0; 101 } 102 103 static bool 104 mq_is_send_pending(void) 105 { 106 /* There's never anything pending. */ 107 return 0; 108 } 109 110 /* 111 * Transmit a libpq protocol message to the shared memory message queue 112 * selected via pq_mq_handle. We don't include a length word, because the 113 * receiver will know the length of the message from shm_mq_receive(). 114 */ 115 static int 116 mq_putmessage(char msgtype, const char *s, size_t len) 117 { 118 shm_mq_iovec iov[2]; 119 shm_mq_result result; 120 121 /* 122 * If we're sending a message, and we have to wait because the queue is 123 * full, and then we get interrupted, and that interrupt results in trying 124 * to send another message, we respond by detaching the queue. There's no 125 * way to return to the original context, but even if there were, just 126 * queueing the message would amount to indefinitely postponing the 127 * response to the interrupt. So we do this instead. 128 */ 129 if (pq_mq_busy) 130 { 131 if (pq_mq_handle != NULL) 132 shm_mq_detach(pq_mq_handle); 133 pq_mq_handle = NULL; 134 return EOF; 135 } 136 137 /* 138 * If the message queue is already gone, just ignore the message. This 139 * doesn't necessarily indicate a problem; for example, DEBUG messages can 140 * be generated late in the shutdown sequence, after all DSMs have already 141 * been detached. 142 */ 143 if (pq_mq_handle == NULL) 144 return 0; 145 146 pq_mq_busy = true; 147 148 iov[0].data = &msgtype; 149 iov[0].len = 1; 150 iov[1].data = s; 151 iov[1].len = len; 152 153 Assert(pq_mq_handle != NULL); 154 155 for (;;) 156 { 157 result = shm_mq_sendv(pq_mq_handle, iov, 2, true); 158 159 if (pq_mq_parallel_leader_pid != 0) 160 SendProcSignal(pq_mq_parallel_leader_pid, 161 PROCSIG_PARALLEL_MESSAGE, 162 pq_mq_parallel_leader_backend_id); 163 164 if (result != SHM_MQ_WOULD_BLOCK) 165 break; 166 167 (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, 168 WAIT_EVENT_MQ_PUT_MESSAGE); 169 ResetLatch(MyLatch); 170 CHECK_FOR_INTERRUPTS(); 171 } 172 173 pq_mq_busy = false; 174 175 Assert(result == SHM_MQ_SUCCESS || result == SHM_MQ_DETACHED); 176 if (result != SHM_MQ_SUCCESS) 177 return EOF; 178 return 0; 179 } 180 181 static void 182 mq_putmessage_noblock(char msgtype, const char *s, size_t len) 183 { 184 /* 185 * While the shm_mq machinery does support sending a message in 186 * non-blocking mode, there's currently no way to try sending beginning to 187 * send the message that doesn't also commit us to completing the 188 * transmission. This could be improved in the future, but for now we 189 * don't need it. 190 */ 191 elog(ERROR, "not currently supported"); 192 } 193 194 /* 195 * Parse an ErrorResponse or NoticeResponse payload and populate an ErrorData 196 * structure with the results. 197 */ 198 void 199 pq_parse_errornotice(StringInfo msg, ErrorData *edata) 200 { 201 /* Initialize edata with reasonable defaults. */ 202 MemSet(edata, 0, sizeof(ErrorData)); 203 edata->elevel = ERROR; 204 edata->assoc_context = CurrentMemoryContext; 205 206 /* Loop over fields and extract each one. */ 207 for (;;) 208 { 209 char code = pq_getmsgbyte(msg); 210 const char *value; 211 212 if (code == '\0') 213 { 214 pq_getmsgend(msg); 215 break; 216 } 217 value = pq_getmsgrawstring(msg); 218 219 switch (code) 220 { 221 case PG_DIAG_SEVERITY: 222 /* ignore, trusting we'll get a nonlocalized version */ 223 break; 224 case PG_DIAG_SEVERITY_NONLOCALIZED: 225 if (strcmp(value, "DEBUG") == 0) 226 { 227 /* 228 * We can't reconstruct the exact DEBUG level, but 229 * presumably it was >= client_min_messages, so select 230 * DEBUG1 to ensure we'll pass it on to the client. 231 */ 232 edata->elevel = DEBUG1; 233 } 234 else if (strcmp(value, "LOG") == 0) 235 { 236 /* 237 * It can't be LOG_SERVER_ONLY, or the worker wouldn't 238 * have sent it to us; so LOG is the correct value. 239 */ 240 edata->elevel = LOG; 241 } 242 else if (strcmp(value, "INFO") == 0) 243 edata->elevel = INFO; 244 else if (strcmp(value, "NOTICE") == 0) 245 edata->elevel = NOTICE; 246 else if (strcmp(value, "WARNING") == 0) 247 edata->elevel = WARNING; 248 else if (strcmp(value, "ERROR") == 0) 249 edata->elevel = ERROR; 250 else if (strcmp(value, "FATAL") == 0) 251 edata->elevel = FATAL; 252 else if (strcmp(value, "PANIC") == 0) 253 edata->elevel = PANIC; 254 else 255 elog(ERROR, "unrecognized error severity: \"%s\"", value); 256 break; 257 case PG_DIAG_SQLSTATE: 258 if (strlen(value) != 5) 259 elog(ERROR, "invalid SQLSTATE: \"%s\"", value); 260 edata->sqlerrcode = MAKE_SQLSTATE(value[0], value[1], value[2], 261 value[3], value[4]); 262 break; 263 case PG_DIAG_MESSAGE_PRIMARY: 264 edata->message = pstrdup(value); 265 break; 266 case PG_DIAG_MESSAGE_DETAIL: 267 edata->detail = pstrdup(value); 268 break; 269 case PG_DIAG_MESSAGE_HINT: 270 edata->hint = pstrdup(value); 271 break; 272 case PG_DIAG_STATEMENT_POSITION: 273 edata->cursorpos = pg_strtoint32(value); 274 break; 275 case PG_DIAG_INTERNAL_POSITION: 276 edata->internalpos = pg_strtoint32(value); 277 break; 278 case PG_DIAG_INTERNAL_QUERY: 279 edata->internalquery = pstrdup(value); 280 break; 281 case PG_DIAG_CONTEXT: 282 edata->context = pstrdup(value); 283 break; 284 case PG_DIAG_SCHEMA_NAME: 285 edata->schema_name = pstrdup(value); 286 break; 287 case PG_DIAG_TABLE_NAME: 288 edata->table_name = pstrdup(value); 289 break; 290 case PG_DIAG_COLUMN_NAME: 291 edata->column_name = pstrdup(value); 292 break; 293 case PG_DIAG_DATATYPE_NAME: 294 edata->datatype_name = pstrdup(value); 295 break; 296 case PG_DIAG_CONSTRAINT_NAME: 297 edata->constraint_name = pstrdup(value); 298 break; 299 case PG_DIAG_SOURCE_FILE: 300 edata->filename = pstrdup(value); 301 break; 302 case PG_DIAG_SOURCE_LINE: 303 edata->lineno = pg_strtoint32(value); 304 break; 305 case PG_DIAG_SOURCE_FUNCTION: 306 edata->funcname = pstrdup(value); 307 break; 308 default: 309 elog(ERROR, "unrecognized error field code: %d", (int) code); 310 break; 311 } 312 } 313 } 314