1 /*
2 * dbutils.c - Database connection/management functions
3 *
4 * Copyright (c) 2ndQuadrant, 2010-2020
5 *
6 *
7 * This program is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include <unistd.h>
22 #include <time.h>
23 #include <sys/time.h>
24 #include <sys/stat.h>
25 #include <dirent.h>
26 #include <arpa/inet.h>
27
28 #include "repmgr.h"
29 #include "dbutils.h"
30 #include "controldata.h"
31 #include "dirutil.h"
32
33 #define NODE_RECORD_PARAM_COUNT 11
34
35
36 static void log_db_error(PGconn *conn, const char *query_text, const char *fmt,...)
37 __attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 4)));
38
39 static bool _is_server_available(const char *conninfo, bool quiet);
40
41 static PGconn *_establish_db_connection(const char *conninfo,
42 const bool exit_on_error,
43 const bool log_notice,
44 const bool verbose_only);
45
46 static PGconn * _establish_replication_connection_from_params(PGconn *conn, const char *conninfo, const char *repluser);
47
48 static PGconn *_get_primary_connection(PGconn *standby_conn, int *primary_id, char *primary_conninfo_out, bool quiet);
49
50 static bool _set_config(PGconn *conn, const char *config_param, const char *sqlquery);
51 static bool _get_pg_setting(PGconn *conn, const char *setting, char *str_output, bool *bool_output, int *int_output);
52
53 static RecordStatus _get_node_record(PGconn *conn, char *sqlquery, t_node_info *node_info, bool init_defaults);
54 static void _populate_node_record(PGresult *res, t_node_info *node_info, int row, bool init_defaults);
55
56 static void _populate_node_records(PGresult *res, NodeInfoList *node_list);
57
58 static bool _create_update_node_record(PGconn *conn, char *action, t_node_info *node_info);
59
60 static ReplSlotStatus _verify_replication_slot(PGconn *conn, char *slot_name, PQExpBufferData *error_msg);
61
62 static bool _create_event(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details, t_event_info *event_info, bool send_notification);
63
64 /*
65 * This provides a standardized way of logging database errors. Note
66 * that the provided PGconn can be a normal or a replication connection;
67 * no attempt is made to write to the database, only to report the output
68 * of PQerrorMessage().
69 */
70 void
log_db_error(PGconn * conn,const char * query_text,const char * fmt,...)71 log_db_error(PGconn *conn, const char *query_text, const char *fmt,...)
72 {
73 va_list ap;
74 char buf[MAXLEN];
75 int retval;
76
77 va_start(ap, fmt);
78 retval = vsnprintf(buf, MAXLEN, fmt, ap);
79 va_end(ap);
80
81 if (retval < MAXLEN)
82 log_error("%s", buf);
83
84 if (conn != NULL)
85 {
86 log_detail("\n%s", PQerrorMessage(conn));
87 }
88
89 if (query_text != NULL)
90 {
91 log_detail("query text is:\n%s", query_text);
92 }
93 }
94
95 /* ================= */
96 /* utility functions */
97 /* ================= */
98
99 XLogRecPtr
parse_lsn(const char * str)100 parse_lsn(const char *str)
101 {
102 XLogRecPtr ptr = InvalidXLogRecPtr;
103 uint32 high,
104 low;
105
106 if (sscanf(str, "%x/%x", &high, &low) == 2)
107 ptr = (((XLogRecPtr) high) << 32) + (XLogRecPtr) low;
108
109 return ptr;
110 }
111
112
113 /* ==================== */
114 /* Connection functions */
115 /* ==================== */
116
117 /*
118 * _establish_db_connection()
119 *
120 * Connect to a database using a conninfo string.
121 *
122 * NOTE: *do not* use this for replication connections; instead use:
123 * establish_db_connection_by_params()
124 */
125
126 static PGconn *
_establish_db_connection(const char * conninfo,const bool exit_on_error,const bool log_notice,const bool verbose_only)127 _establish_db_connection(const char *conninfo, const bool exit_on_error, const bool log_notice, const bool verbose_only)
128 {
129 PGconn *conn = NULL;
130 char *connection_string = NULL;
131 char *errmsg = NULL;
132
133 t_conninfo_param_list conninfo_params = T_CONNINFO_PARAM_LIST_INITIALIZER;
134 bool is_replication_connection = false;
135 bool parse_success = false;
136
137 initialize_conninfo_params(&conninfo_params, false);
138
139 parse_success = parse_conninfo_string(conninfo, &conninfo_params, &errmsg, false);
140
141 if (parse_success == false)
142 {
143 log_error(_("unable to parse provided conninfo string \"%s\""), conninfo);
144 log_detail("%s", errmsg);
145 free_conninfo_params(&conninfo_params);
146 return NULL;
147 }
148
149 /* set some default values if not explicitly provided */
150 param_set_ine(&conninfo_params, "connect_timeout", "2");
151 param_set_ine(&conninfo_params, "fallback_application_name", "repmgr");
152
153 if (param_get(&conninfo_params, "replication") != NULL)
154 is_replication_connection = true;
155
156 /* use a secure search_path */
157 param_set(&conninfo_params, "options", "-csearch_path=");
158
159 connection_string = param_list_to_string(&conninfo_params);
160
161 log_debug(_("connecting to: \"%s\""), connection_string);
162
163 conn = PQconnectdb(connection_string);
164
165 /* Check to see that the backend connection was successfully made */
166 if ((PQstatus(conn) != CONNECTION_OK))
167 {
168 bool emit_log = true;
169
170 if (verbose_only == true && verbose_logging == false)
171 emit_log = false;
172
173 if (emit_log)
174 {
175 if (log_notice)
176 {
177 log_notice(_("connection to database failed"));
178 log_detail("\n%s", PQerrorMessage(conn));
179 }
180 else
181 {
182 log_error(_("connection to database failed"));
183 log_detail("\n%s", PQerrorMessage(conn));
184 }
185 log_detail(_("attempted to connect using:\n %s"),
186 connection_string);
187 }
188
189 if (exit_on_error)
190 {
191 PQfinish(conn);
192 free_conninfo_params(&conninfo_params);
193 exit(ERR_DB_CONN);
194 }
195 }
196
197 /*
198 * set "synchronous_commit" to "local" in case synchronous replication is
199 * in use
200 *
201 * XXX set this explicitly before any write operations
202 */
203
204 else if (is_replication_connection == false &&
205 set_config(conn, "synchronous_commit", "local") == false)
206 {
207 if (exit_on_error)
208 {
209 PQfinish(conn);
210 free_conninfo_params(&conninfo_params);
211 exit(ERR_DB_CONN);
212 }
213 }
214
215 pfree(connection_string);
216 free_conninfo_params(&conninfo_params);
217
218 return conn;
219 }
220
221
222 /*
223 * Establish a database connection, optionally exit on error
224 */
225 PGconn *
establish_db_connection(const char * conninfo,const bool exit_on_error)226 establish_db_connection(const char *conninfo, const bool exit_on_error)
227 {
228 return _establish_db_connection(conninfo, exit_on_error, false, false);
229 }
230
231 /*
232 * Attempt to establish a database connection, never exit on error, only
233 * output error messages if --verbose option used
234 */
235 PGconn *
establish_db_connection_quiet(const char * conninfo)236 establish_db_connection_quiet(const char *conninfo)
237 {
238 return _establish_db_connection(conninfo, false, false, true);
239 }
240
241
242 PGconn *
establish_db_connection_with_replacement_param(const char * conninfo,const char * param,const char * value,const bool exit_on_error)243 establish_db_connection_with_replacement_param(const char *conninfo,
244 const char *param,
245 const char *value,
246 const bool exit_on_error)
247 {
248 t_conninfo_param_list node_conninfo = T_CONNINFO_PARAM_LIST_INITIALIZER;
249 char *errmsg = NULL;
250 bool parse_success = false;
251 PGconn *conn = NULL;
252
253 initialize_conninfo_params(&node_conninfo, false);
254
255 parse_success = parse_conninfo_string(conninfo,
256 &node_conninfo,
257 &errmsg, false);
258
259 if (parse_success == false)
260 {
261 log_error(_("unable to parse conninfo string \"%s\" for local node"),
262 conninfo);
263 log_detail("%s", errmsg);
264
265 if (exit_on_error == true)
266 exit(ERR_BAD_CONFIG);
267
268 return NULL;
269 }
270
271 param_set(&node_conninfo,
272 param,
273 value);
274
275 conn = establish_db_connection_by_params(&node_conninfo, exit_on_error);
276
277 free_conninfo_params(&node_conninfo);
278
279 return conn;
280 }
281
282 PGconn *
establish_primary_db_connection(PGconn * conn,const bool exit_on_error)283 establish_primary_db_connection(PGconn *conn,
284 const bool exit_on_error)
285 {
286 t_node_info primary_node_info = T_NODE_INFO_INITIALIZER;
287 bool primary_record_found = get_primary_node_record(conn, &primary_node_info);
288
289 if (primary_record_found == false)
290 {
291 return NULL;
292 }
293
294 return establish_db_connection(primary_node_info.conninfo,
295 exit_on_error);
296 }
297
298
299 PGconn *
establish_db_connection_by_params(t_conninfo_param_list * param_list,const bool exit_on_error)300 establish_db_connection_by_params(t_conninfo_param_list *param_list,
301 const bool exit_on_error)
302 {
303 PGconn *conn = NULL;
304
305 /* set some default values if not explicitly provided */
306 param_set_ine(param_list, "connect_timeout", "2");
307 param_set_ine(param_list, "fallback_application_name", "repmgr");
308
309 /* use a secure search_path */
310 param_set(param_list, "options", "-csearch_path=");
311
312 /* Connect to the database using the provided parameters */
313 conn = PQconnectdbParams((const char **) param_list->keywords, (const char **) param_list->values, true);
314
315 /* Check to see that the backend connection was successfully made */
316 if ((PQstatus(conn) != CONNECTION_OK))
317 {
318 log_error(_("connection to database failed"));
319 log_detail("\n%s", PQerrorMessage(conn));
320
321 if (exit_on_error)
322 {
323 PQfinish(conn);
324 exit(ERR_DB_CONN);
325 }
326 }
327 else
328 {
329 bool is_replication_connection = false;
330 int i;
331
332 /*
333 * set "synchronous_commit" to "local" in case synchronous replication
334 * is in use (provided this is not a replication connection)
335 */
336
337 for (i = 0; param_list->keywords[i]; i++)
338 {
339 if (strcmp(param_list->keywords[i], "replication") == 0)
340 is_replication_connection = true;
341 }
342
343 if (is_replication_connection == false && set_config(conn, "synchronous_commit", "local") == false)
344 {
345 if (exit_on_error)
346 {
347 PQfinish(conn);
348 exit(ERR_DB_CONN);
349 }
350 }
351 }
352
353 return conn;
354 }
355
356
357 /*
358 * Given an existing active connection and the name of a replication
359 * user, extract the connection parameters from that connection and
360 * attempt to return a replication connection.
361 */
362 PGconn *
establish_replication_connection_from_conn(PGconn * conn,const char * repluser)363 establish_replication_connection_from_conn(PGconn *conn, const char *repluser)
364 {
365 return _establish_replication_connection_from_params(conn, NULL, repluser);
366 }
367
368
369 PGconn *
establish_replication_connection_from_conninfo(const char * conninfo,const char * repluser)370 establish_replication_connection_from_conninfo(const char *conninfo, const char *repluser)
371 {
372 return _establish_replication_connection_from_params(NULL, conninfo, repluser);
373 }
374
375
376 static PGconn *
_establish_replication_connection_from_params(PGconn * conn,const char * conninfo,const char * repluser)377 _establish_replication_connection_from_params(PGconn *conn, const char *conninfo, const char *repluser)
378 {
379 t_conninfo_param_list repl_conninfo = T_CONNINFO_PARAM_LIST_INITIALIZER;
380 PGconn *repl_conn = NULL;
381
382 initialize_conninfo_params(&repl_conninfo, false);
383
384 if (conn != NULL)
385 conn_to_param_list(conn, &repl_conninfo);
386 else if (conninfo != NULL)
387 parse_conninfo_string(conninfo, &repl_conninfo, NULL, false);
388
389 /* Set the provided replication user */
390 param_set(&repl_conninfo, "user", repluser);
391 param_set(&repl_conninfo, "replication", "1");
392 param_set(&repl_conninfo, "dbname", "replication");
393
394 repl_conn = establish_db_connection_by_params(&repl_conninfo, false);
395 free_conninfo_params(&repl_conninfo);
396
397 return repl_conn;
398 }
399
400
401 PGconn *
get_primary_connection(PGconn * conn,int * primary_id,char * primary_conninfo_out)402 get_primary_connection(PGconn *conn,
403 int *primary_id, char *primary_conninfo_out)
404 {
405 return _get_primary_connection(conn, primary_id, primary_conninfo_out, false);
406 }
407
408
409 PGconn *
get_primary_connection_quiet(PGconn * conn,int * primary_id,char * primary_conninfo_out)410 get_primary_connection_quiet(PGconn *conn,
411 int *primary_id, char *primary_conninfo_out)
412 {
413 return _get_primary_connection(conn, primary_id, primary_conninfo_out, true);
414 }
415
416 PGconn *
duplicate_connection(PGconn * conn,const char * user,bool replication)417 duplicate_connection(PGconn *conn, const char *user, bool replication)
418 {
419 t_conninfo_param_list conninfo = T_CONNINFO_PARAM_LIST_INITIALIZER;
420 PGconn *duplicate_conn = NULL;
421
422 initialize_conninfo_params(&conninfo, false);
423 conn_to_param_list(conn, &conninfo);
424
425 if (user != NULL)
426 param_set(&conninfo, "user", user);
427
428 if (replication == true)
429 param_set(&conninfo, "replication", "1");
430
431 duplicate_conn = establish_db_connection_by_params(&conninfo, false);
432
433 free_conninfo_params(&conninfo);
434
435 return duplicate_conn;
436 }
437
438
439
440 void
close_connection(PGconn ** conn)441 close_connection(PGconn **conn)
442 {
443 if (*conn == NULL)
444 return;
445
446 PQfinish(*conn);
447
448 *conn = NULL;
449 }
450
451
452 /* =============================== */
453 /* conninfo manipulation functions */
454 /* =============================== */
455
456
457 /*
458 * get_conninfo_value()
459 *
460 * Extract the value represented by 'keyword' in 'conninfo' and copy
461 * it to the 'output' buffer.
462 *
463 * Returns true on success, or false on failure (conninfo string could
464 * not be parsed, or provided keyword not found).
465 */
466
467 bool
get_conninfo_value(const char * conninfo,const char * keyword,char * output)468 get_conninfo_value(const char *conninfo, const char *keyword, char *output)
469 {
470 PQconninfoOption *conninfo_options = NULL;
471 PQconninfoOption *conninfo_option = NULL;
472
473 conninfo_options = PQconninfoParse(conninfo, NULL);
474
475 if (conninfo_options == NULL)
476 {
477 log_error(_("unable to parse provided conninfo string \"%s\""), conninfo);
478 return false;
479 }
480
481 for (conninfo_option = conninfo_options; conninfo_option->keyword != NULL; conninfo_option++)
482 {
483 if (strcmp(conninfo_option->keyword, keyword) == 0)
484 {
485 if (conninfo_option->val != NULL && conninfo_option->val[0] != '\0')
486 {
487 strncpy(output, conninfo_option->val, MAXLEN);
488 break;
489 }
490 }
491 }
492
493 PQconninfoFree(conninfo_options);
494
495 return true;
496 }
497
498
499 /*
500 * Get a default conninfo value for the provided parameter, and copy
501 * it to the 'output' buffer.
502 *
503 * Returns true on success, or false on failure (provided keyword not found).
504 *
505 */
506 bool
get_conninfo_default_value(const char * param,char * output,int maxlen)507 get_conninfo_default_value(const char *param, char *output, int maxlen)
508 {
509 PQconninfoOption *defs = NULL;
510 PQconninfoOption *def = NULL;
511 bool found = false;
512
513 defs = PQconndefaults();
514
515 for (def = defs; def->keyword; def++)
516 {
517 if (strncmp(def->keyword, param, maxlen) == 0)
518 {
519 strncpy(output, def->val, maxlen);
520 found = true;
521 }
522 }
523
524 PQconninfoFree(defs);
525
526 return found;
527 }
528
529
530 void
initialize_conninfo_params(t_conninfo_param_list * param_list,bool set_defaults)531 initialize_conninfo_params(t_conninfo_param_list *param_list, bool set_defaults)
532 {
533 PQconninfoOption *defs = NULL;
534 PQconninfoOption *def = NULL;
535 int c;
536
537 defs = PQconndefaults();
538 param_list->size = 0;
539
540 /* Count maximum number of parameters */
541 for (def = defs; def->keyword; def++)
542 param_list->size++;
543
544 /* Initialize our internal parameter list */
545 param_list->keywords = pg_malloc0(sizeof(char *) * (param_list->size + 1));
546 param_list->values = pg_malloc0(sizeof(char *) * (param_list->size + 1));
547
548 for (c = 0; c < param_list->size; c++)
549 {
550 param_list->keywords[c] = NULL;
551 param_list->values[c] = NULL;
552 }
553
554 if (set_defaults == true)
555 {
556 /* Pre-set any defaults */
557
558 for (def = defs; def->keyword; def++)
559 {
560 if (def->val != NULL && def->val[0] != '\0')
561 {
562 param_set(param_list, def->keyword, def->val);
563 }
564 }
565 }
566
567 PQconninfoFree(defs);
568 }
569
570
571 void
free_conninfo_params(t_conninfo_param_list * param_list)572 free_conninfo_params(t_conninfo_param_list *param_list)
573 {
574 int c;
575
576 for (c = 0; c < param_list->size; c++)
577 {
578 if (param_list->keywords != NULL && param_list->keywords[c] != NULL)
579 pfree(param_list->keywords[c]);
580
581 if (param_list->values != NULL && param_list->values[c] != NULL)
582 pfree(param_list->values[c]);
583 }
584
585 if (param_list->keywords != NULL)
586 pfree(param_list->keywords);
587
588 if (param_list->values != NULL)
589 pfree(param_list->values);
590 }
591
592
593
594 void
copy_conninfo_params(t_conninfo_param_list * dest_list,t_conninfo_param_list * source_list)595 copy_conninfo_params(t_conninfo_param_list *dest_list, t_conninfo_param_list *source_list)
596 {
597 int c;
598
599 for (c = 0; c < source_list->size && source_list->keywords[c] != NULL; c++)
600 {
601 if (source_list->values[c] != NULL && source_list->values[c][0] != '\0')
602 {
603 param_set(dest_list, source_list->keywords[c], source_list->values[c]);
604 }
605 }
606 }
607
608 void
param_set(t_conninfo_param_list * param_list,const char * param,const char * value)609 param_set(t_conninfo_param_list *param_list, const char *param, const char *value)
610 {
611 int c;
612 int value_len = strlen(value) + 1;
613 int param_len;
614
615 /*
616 * Scan array to see if the parameter is already set - if not, replace it
617 */
618 for (c = 0; c < param_list->size && param_list->keywords[c] != NULL; c++)
619 {
620 if (strcmp(param_list->keywords[c], param) == 0)
621 {
622 if (param_list->values[c] != NULL)
623 pfree(param_list->values[c]);
624
625 param_list->values[c] = pg_malloc0(value_len);
626 strncpy(param_list->values[c], value, value_len);
627
628 return;
629 }
630 }
631
632 /*
633 * Sanity-check that the caller is not trying to overflow the array;
634 * in practice this is highly unlikely, and if it ever happens, this means
635 * something is highly wrong.
636 */
637 Assert(c < param_list->size);
638
639 /*
640 * Parameter not in array - add it and its associated value
641 */
642 param_len = strlen(param) + 1;
643
644 param_list->keywords[c] = pg_malloc0(param_len);
645 param_list->values[c] = pg_malloc0(value_len);
646
647 strncpy(param_list->keywords[c], param, param_len);
648 strncpy(param_list->values[c], value, value_len);
649 }
650
651
652 /*
653 * Like param_set(), but will only set the parameter if it doesn't exist
654 */
655 void
param_set_ine(t_conninfo_param_list * param_list,const char * param,const char * value)656 param_set_ine(t_conninfo_param_list *param_list, const char *param, const char *value)
657 {
658 int c;
659 int value_len = strlen(value) + 1;
660 int param_len;
661
662 /*
663 * Scan array to see if the parameter is already set - if so, do nothing
664 */
665 for (c = 0; c < param_list->size && param_list->keywords[c] != NULL; c++)
666 {
667 if (strcmp(param_list->keywords[c], param) == 0)
668 {
669 /* parameter exists, do nothing */
670 return;
671 }
672 }
673
674 /*
675 * Sanity-check that the caller is not trying to overflow the array;
676 * in practice this is highly unlikely, and if it ever happens, this means
677 * something is highly wrong.
678 */
679 Assert(c < param_list->size);
680
681 /*
682 * Parameter not in array - add it and its associated value
683 */
684 param_len = strlen(param) + 1;
685
686 param_list->keywords[c] = pg_malloc0(param_len);
687 param_list->values[c] = pg_malloc0(value_len);
688
689 strncpy(param_list->keywords[c], param, param_len);
690 strncpy(param_list->values[c], value, value_len);
691 }
692
693
694 char *
param_get(t_conninfo_param_list * param_list,const char * param)695 param_get(t_conninfo_param_list *param_list, const char *param)
696 {
697 int c;
698
699 for (c = 0; c < param_list->size && param_list->keywords[c] != NULL; c++)
700 {
701 if (strcmp(param_list->keywords[c], param) == 0)
702 {
703 if (param_list->values[c] != NULL && param_list->values[c][0] != '\0')
704 return param_list->values[c];
705 else
706 return NULL;
707 }
708 }
709
710 return NULL;
711 }
712
713
714 /*
715 * Validate a conninfo string by attempting to parse it.
716 *
717 * "errmsg": passed to PQconninfoParse(), may be NULL
718 *
719 * NOTE: PQconninfoParse() verifies the string format and checks for
720 * valid options but does not sanity check values.
721 */
722
723 bool
validate_conninfo_string(const char * conninfo_str,char ** errmsg)724 validate_conninfo_string(const char *conninfo_str, char **errmsg)
725 {
726 PQconninfoOption *connOptions = NULL;
727
728 connOptions = PQconninfoParse(conninfo_str, errmsg);
729
730 if (connOptions == NULL)
731 return false;
732
733 return true;
734 }
735
736
737 /*
738 * Parse a conninfo string into a t_conninfo_param_list
739 *
740 * See conn_to_param_list() to do the same for a PGconn.
741 *
742 * "errmsg": passed to PQconninfoParse(), may be NULL
743 *
744 * "ignore_local_params": ignores those parameters specific
745 * to a local installation, i.e. when parsing an upstream
746 * node's conninfo string for inclusion into "primary_conninfo",
747 * don't copy that node's values
748 */
749 bool
parse_conninfo_string(const char * conninfo_str,t_conninfo_param_list * param_list,char ** errmsg,bool ignore_local_params)750 parse_conninfo_string(const char *conninfo_str, t_conninfo_param_list *param_list, char **errmsg, bool ignore_local_params)
751 {
752 PQconninfoOption *connOptions = NULL;
753 PQconninfoOption *option = NULL;
754
755 connOptions = PQconninfoParse(conninfo_str, errmsg);
756
757 if (connOptions == NULL)
758 return false;
759
760 for (option = connOptions; option && option->keyword; option++)
761 {
762 /* Ignore non-set or blank parameter values */
763 if (option->val == NULL || option->val[0] == '\0')
764 continue;
765
766 /* Ignore settings specific to the upstream node */
767 if (ignore_local_params == true)
768 {
769 if (strcmp(option->keyword, "application_name") == 0)
770 continue;
771 if (strcmp(option->keyword, "passfile") == 0)
772 continue;
773 if (strcmp(option->keyword, "servicefile") == 0)
774 continue;
775 }
776 param_set(param_list, option->keyword, option->val);
777 }
778
779 PQconninfoFree(connOptions);
780
781 return true;
782 }
783
784
785 /*
786 * Parse a PGconn into a t_conninfo_param_list
787 *
788 * See parse_conninfo_string() to do the same for a conninfo string
789 *
790 * NOTE: the current use case for this is to take an active connection,
791 * replace the existing username (typically replacing it with the superuser
792 * or replication user name), and make a new connection as that user.
793 * If the "password" field is set, it will cause any connection made with
794 * these parameters to fail (unless of course the password happens to be the
795 * same). Therefore we remove the password altogether, and rely on it being
796 * available via .pgpass.
797 */
798 void
conn_to_param_list(PGconn * conn,t_conninfo_param_list * param_list)799 conn_to_param_list(PGconn *conn, t_conninfo_param_list *param_list)
800 {
801 PQconninfoOption *connOptions = NULL;
802 PQconninfoOption *option = NULL;
803
804 connOptions = PQconninfo(conn);
805 for (option = connOptions; option && option->keyword; option++)
806 {
807 /* Ignore non-set or blank parameter values */
808 if (option->val == NULL || option->val[0] == '\0')
809 continue;
810
811 /* Ignore "password" */
812 if (strcmp(option->keyword, "password") == 0)
813 continue;
814
815 param_set(param_list, option->keyword, option->val);
816 }
817
818 PQconninfoFree(connOptions);
819 }
820
821
822 /*
823 * Converts param list to string; caller must free returned pointer
824 */
825 char *
param_list_to_string(t_conninfo_param_list * param_list)826 param_list_to_string(t_conninfo_param_list *param_list)
827 {
828 int c;
829 PQExpBufferData conninfo_buf;
830 char *conninfo_str = NULL;
831 int len = 0;
832
833 initPQExpBuffer(&conninfo_buf);
834
835 for (c = 0; c < param_list->size && param_list->keywords[c] != NULL; c++)
836 {
837 if (param_list->values[c] != NULL && param_list->values[c][0] != '\0')
838 {
839 if (c > 0)
840 appendPQExpBufferChar(&conninfo_buf, ' ');
841
842 /* XXX escape value */
843 appendPQExpBuffer(&conninfo_buf,
844 "%s=%s",
845 param_list->keywords[c],
846 param_list->values[c]);
847 }
848 }
849
850 len = strlen(conninfo_buf.data) + 1;
851 conninfo_str = pg_malloc0(len);
852
853 strncpy(conninfo_str, conninfo_buf.data, len);
854
855 termPQExpBuffer(&conninfo_buf);
856
857 return conninfo_str;
858 }
859
860
861 /*
862 * Run a conninfo string through the parser, and pass it back as a normal
863 * conninfo string. This is mainly intended for converting connection URIs
864 * to parameter/value conninfo strings.
865 *
866 * Caller must free returned pointer.
867 */
868
869 char *
normalize_conninfo_string(const char * conninfo_str)870 normalize_conninfo_string(const char *conninfo_str)
871 {
872 t_conninfo_param_list conninfo_params = T_CONNINFO_PARAM_LIST_INITIALIZER;
873 bool parse_success = false;
874 char *normalized_string = NULL;
875 char *errmsg = NULL;
876
877 initialize_conninfo_params(&conninfo_params, false);
878
879 parse_success = parse_conninfo_string(conninfo_str, &conninfo_params, &errmsg, false);
880
881 if (parse_success == false)
882 {
883 log_error(_("unable to parse provided conninfo string \"%s\""), conninfo_str);
884 log_detail("%s", errmsg);
885 free_conninfo_params(&conninfo_params);
886 return NULL;
887 }
888
889
890 normalized_string = param_list_to_string(&conninfo_params);
891 free_conninfo_params(&conninfo_params);
892
893 return normalized_string;
894 }
895
896 /*
897 * check whether the libpq version in use recognizes the "passfile" parameter
898 * (should be 9.6 and later)
899 */
900 bool
has_passfile(void)901 has_passfile(void)
902 {
903 PQconninfoOption *defs = PQconndefaults();
904 PQconninfoOption *def = NULL;
905 bool has_passfile = false;
906
907 for (def = defs; def->keyword; def++)
908 {
909 if (strcmp(def->keyword, "passfile") == 0)
910 {
911 has_passfile = true;
912 break;
913 }
914 }
915
916 PQconninfoFree(defs);
917
918 return has_passfile;
919 }
920
921
922
923 /* ===================== */
924 /* transaction functions */
925 /* ===================== */
926
927 bool
begin_transaction(PGconn * conn)928 begin_transaction(PGconn *conn)
929 {
930 PGresult *res = NULL;
931
932 log_verbose(LOG_DEBUG, "begin_transaction()");
933
934 res = PQexec(conn, "BEGIN");
935
936 if (PQresultStatus(res) != PGRES_COMMAND_OK)
937 {
938 log_error(_("unable to begin transaction"));
939 log_detail("%s", PQerrorMessage(conn));
940
941 PQclear(res);
942 return false;
943 }
944
945 PQclear(res);
946
947 return true;
948 }
949
950
951 bool
commit_transaction(PGconn * conn)952 commit_transaction(PGconn *conn)
953 {
954 PGresult *res = NULL;
955
956 log_verbose(LOG_DEBUG, "commit_transaction()");
957
958 res = PQexec(conn, "COMMIT");
959
960 if (PQresultStatus(res) != PGRES_COMMAND_OK)
961 {
962 log_error(_("unable to commit transaction"));
963 log_detail("%s", PQerrorMessage(conn));
964 PQclear(res);
965
966 return false;
967 }
968
969 PQclear(res);
970
971 return true;
972 }
973
974
975 bool
rollback_transaction(PGconn * conn)976 rollback_transaction(PGconn *conn)
977 {
978 PGresult *res = NULL;
979
980 log_verbose(LOG_DEBUG, "rollback_transaction()");
981
982 res = PQexec(conn, "ROLLBACK");
983
984 if (PQresultStatus(res) != PGRES_COMMAND_OK)
985 {
986 log_error(_("unable to rollback transaction"));
987 log_detail("%s", PQerrorMessage(conn));
988 PQclear(res);
989
990 return false;
991 }
992
993 PQclear(res);
994
995 return true;
996 }
997
998
999 /* ========================== */
1000 /* GUC manipulation functions */
1001 /* ========================== */
1002
1003 static bool
_set_config(PGconn * conn,const char * config_param,const char * sqlquery)1004 _set_config(PGconn *conn, const char *config_param, const char *sqlquery)
1005 {
1006 bool success = true;
1007 PGresult *res = PQexec(conn, sqlquery);
1008
1009 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1010 {
1011 log_db_error(conn, sqlquery, "_set_config(): unable to set \"%s\"", config_param);
1012 success = false;
1013 }
1014
1015 PQclear(res);
1016
1017 return success;
1018 }
1019
1020
1021 bool
set_config(PGconn * conn,const char * config_param,const char * config_value)1022 set_config(PGconn *conn, const char *config_param, const char *config_value)
1023 {
1024 PQExpBufferData query;
1025 bool result = false;
1026
1027 initPQExpBuffer(&query);
1028 appendPQExpBuffer(&query,
1029 "SET %s TO '%s'",
1030 config_param,
1031 config_value);
1032
1033 log_verbose(LOG_DEBUG, "set_config():\n %s", query.data);
1034
1035 result = _set_config(conn, config_param, query.data);
1036
1037 termPQExpBuffer(&query);
1038
1039 return result;
1040 }
1041
1042 bool
set_config_bool(PGconn * conn,const char * config_param,bool state)1043 set_config_bool(PGconn *conn, const char *config_param, bool state)
1044 {
1045 PQExpBufferData query;
1046 bool result = false;
1047
1048 initPQExpBuffer(&query);
1049 appendPQExpBuffer(&query,
1050 "SET %s TO %s",
1051 config_param,
1052 state ? "TRUE" : "FALSE");
1053
1054 log_verbose(LOG_DEBUG, "set_config_bool():\n %s", query.data);
1055
1056
1057 result = _set_config(conn, config_param, query.data);
1058
1059 termPQExpBuffer(&query);
1060
1061 return result;
1062 }
1063
1064
1065 int
guc_set(PGconn * conn,const char * parameter,const char * op,const char * value)1066 guc_set(PGconn *conn, const char *parameter, const char *op,
1067 const char *value)
1068 {
1069 PQExpBufferData query;
1070 PGresult *res = NULL;
1071 int retval = 1;
1072
1073 char *escaped_parameter = escape_string(conn, parameter);
1074 char *escaped_value = escape_string(conn, value);
1075
1076 initPQExpBuffer(&query);
1077 appendPQExpBuffer(&query,
1078 "SELECT true FROM pg_catalog.pg_settings "
1079 " WHERE name = '%s' AND setting %s '%s'",
1080 escaped_parameter, op, escaped_value);
1081
1082 log_verbose(LOG_DEBUG, "guc_set():\n%s", query.data);
1083
1084 res = PQexec(conn, query.data);
1085
1086 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1087 {
1088 log_db_error(conn, query.data, _("guc_set(): unable to execute query"));
1089 retval = -1;
1090 }
1091 else if (PQntuples(res) == 0)
1092 {
1093 retval = 0;
1094 }
1095
1096 pfree(escaped_parameter);
1097 pfree(escaped_value);
1098 termPQExpBuffer(&query);
1099 PQclear(res);
1100
1101 return retval;
1102 }
1103
1104
1105 bool
get_pg_setting(PGconn * conn,const char * setting,char * output)1106 get_pg_setting(PGconn *conn, const char *setting, char *output)
1107 {
1108 bool success = _get_pg_setting(conn, setting, output, NULL, NULL);
1109
1110 if (success == true)
1111 {
1112 log_verbose(LOG_DEBUG, _("get_pg_setting(): returned value is \"%s\""), output);
1113 }
1114
1115 return success;
1116 }
1117
1118 bool
get_pg_setting_bool(PGconn * conn,const char * setting,bool * output)1119 get_pg_setting_bool(PGconn *conn, const char *setting, bool *output)
1120 {
1121 bool success = _get_pg_setting(conn, setting, NULL, output, NULL);
1122
1123 if (success == true)
1124 {
1125 log_verbose(LOG_DEBUG, _("get_pg_setting(): returned value is \"%s\""),
1126 *output == true ? "TRUE" : "FALSE");
1127 }
1128
1129 return success;
1130 }
1131
1132 bool
get_pg_setting_int(PGconn * conn,const char * setting,int * output)1133 get_pg_setting_int(PGconn *conn, const char *setting, int *output)
1134 {
1135 bool success = _get_pg_setting(conn, setting, NULL, NULL, output);
1136
1137 if (success == true)
1138 {
1139 log_verbose(LOG_DEBUG, _("get_pg_setting_int(): returned value is \"%i\""), *output);
1140 }
1141
1142 return success;
1143 }
1144
1145
1146 bool
_get_pg_setting(PGconn * conn,const char * setting,char * str_output,bool * bool_output,int * int_output)1147 _get_pg_setting(PGconn *conn, const char *setting, char *str_output, bool *bool_output, int *int_output)
1148 {
1149 PQExpBufferData query;
1150 PGresult *res = NULL;
1151 int i;
1152 bool success = false;
1153
1154 char *escaped_setting = escape_string(conn, setting);
1155
1156 if (escaped_setting == NULL)
1157 {
1158 log_error(_("unable to escape setting \"%s\""), setting);
1159 return false;
1160 }
1161
1162 initPQExpBuffer(&query);
1163 appendPQExpBuffer(&query,
1164 "SELECT name, setting "
1165 " FROM pg_catalog.pg_settings WHERE name = '%s'",
1166 escaped_setting);
1167
1168 log_verbose(LOG_DEBUG, "get_pg_setting():\n %s", query.data);
1169
1170 res = PQexec(conn, query.data);
1171
1172 pfree(escaped_setting);
1173
1174 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1175 {
1176 log_db_error(conn, query.data, _("get_pg_setting() - unable to execute query"));
1177
1178 termPQExpBuffer(&query);
1179 PQclear(res);
1180
1181 return false;
1182 }
1183
1184 for (i = 0; i < PQntuples(res); i++)
1185 {
1186 if (strcmp(PQgetvalue(res, i, 0), setting) == 0)
1187 {
1188 if (str_output != NULL)
1189 {
1190 snprintf(str_output, MAXLEN, "%s", PQgetvalue(res, i, 1));
1191 }
1192 else if (bool_output != NULL)
1193 {
1194 /*
1195 * Note we assume the caller is sure this is a boolean parameter
1196 */
1197 if (strncmp(PQgetvalue(res, i, 1), "on", MAXLEN) == 0)
1198 *bool_output = true;
1199 else
1200 *bool_output = false;
1201 }
1202 else if (int_output != NULL)
1203 {
1204 *int_output = atoi(PQgetvalue(res, i, 1));
1205 }
1206
1207 success = true;
1208 break;
1209 }
1210 else
1211 {
1212 /* highly unlikely this would ever happen */
1213 log_error(_("get_pg_setting(): unknown parameter \"%s\""), PQgetvalue(res, i, 0));
1214 }
1215 }
1216
1217
1218 termPQExpBuffer(&query);
1219 PQclear(res);
1220
1221 return success;
1222 }
1223
1224
1225
1226 bool
alter_system_int(PGconn * conn,const char * name,int value)1227 alter_system_int(PGconn *conn, const char *name, int value)
1228 {
1229 PQExpBufferData query;
1230 PGresult *res = NULL;
1231 bool success = true;
1232
1233 initPQExpBuffer(&query);
1234 appendPQExpBuffer(&query,
1235 "ALTER SYSTEM SET %s = %i",
1236 name, value);
1237
1238 res = PQexec(conn, query.data);
1239
1240 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1241 {
1242 log_db_error(conn, query.data, _("alter_system_int() - unable to execute query"));
1243
1244 success = false;
1245 }
1246
1247 termPQExpBuffer(&query);
1248 PQclear(res);
1249
1250 return success;
1251 }
1252
1253
1254 bool
pg_reload_conf(PGconn * conn)1255 pg_reload_conf(PGconn *conn)
1256 {
1257 PGresult *res = NULL;
1258 bool success = false;
1259
1260 res = PQexec(conn, "SELECT pg_catalog.pg_reload_conf()");
1261
1262 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1263 {
1264 log_db_error(conn, NULL, _("pg_reload_conf() - unable to execute query"));
1265
1266 success = false;
1267 }
1268
1269 PQclear(res);
1270
1271 return success;
1272 }
1273
1274
1275 /* ============================ */
1276 /* Server information functions */
1277 /* ============================ */
1278
1279
1280 bool
get_cluster_size(PGconn * conn,char * size)1281 get_cluster_size(PGconn *conn, char *size)
1282 {
1283 PQExpBufferData query;
1284 PGresult *res = NULL;
1285 bool success = true;
1286
1287 initPQExpBuffer(&query);
1288 appendPQExpBufferStr(&query,
1289 "SELECT pg_catalog.pg_size_pretty(pg_catalog.sum(pg_catalog.pg_database_size(oid))::bigint) "
1290 " FROM pg_catalog.pg_database ");
1291
1292 log_verbose(LOG_DEBUG, "get_cluster_size():\n%s", query.data);
1293
1294 res = PQexec(conn, query.data);
1295
1296 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1297 {
1298 log_db_error(conn, query.data, _("get_cluster_size(): unable to execute query"));
1299 success = false;
1300 }
1301 else
1302 {
1303 snprintf(size, MAXLEN, "%s", PQgetvalue(res, 0, 0));
1304 }
1305
1306 termPQExpBuffer(&query);
1307 PQclear(res);
1308
1309 return success;
1310 }
1311
1312
1313 /*
1314 * Return the server version number for the connection provided
1315 */
1316 int
get_server_version(PGconn * conn,char * server_version_buf)1317 get_server_version(PGconn *conn, char *server_version_buf)
1318 {
1319 PGresult *res = NULL;
1320 int _server_version_num = UNKNOWN_SERVER_VERSION_NUM;
1321
1322 const char *sqlquery =
1323 "SELECT pg_catalog.current_setting('server_version_num'), "
1324 " pg_catalog.current_setting('server_version')";
1325
1326 res = PQexec(conn, sqlquery);
1327
1328 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1329 {
1330 log_db_error(conn, sqlquery, _("unable to determine server version number"));
1331 PQclear(res);
1332
1333 return UNKNOWN_SERVER_VERSION_NUM;
1334 }
1335
1336 _server_version_num = atoi(PQgetvalue(res, 0, 0));
1337
1338 if (server_version_buf != NULL)
1339 {
1340 int i;
1341 char _server_version_buf[MAXVERSIONSTR] = "";
1342
1343 memset(_server_version_buf, 0, MAXVERSIONSTR);
1344
1345 /*
1346 * Some distributions may add extra info after the actual version number,
1347 * e.g. "10.4 (Debian 10.4-2.pgdg90+1)", so copy everything up until the
1348 * first space.
1349 */
1350
1351 snprintf(_server_version_buf, MAXVERSIONSTR, "%s", PQgetvalue(res, 0, 1));
1352
1353 for (i = 0; i < MAXVERSIONSTR; i++)
1354 {
1355 if (_server_version_buf[i] == ' ')
1356 break;
1357
1358 *server_version_buf++ = _server_version_buf[i];
1359 }
1360 }
1361
1362 PQclear(res);
1363
1364 return _server_version_num;
1365 }
1366
1367
1368 RecoveryType
get_recovery_type(PGconn * conn)1369 get_recovery_type(PGconn *conn)
1370 {
1371 PGresult *res = NULL;
1372 RecoveryType recovery_type = RECTYPE_UNKNOWN;
1373
1374 const char *sqlquery = "SELECT pg_catalog.pg_is_in_recovery()";
1375
1376 log_verbose(LOG_DEBUG, "get_recovery_type(): %s", sqlquery);
1377
1378 res = PQexec(conn, sqlquery);
1379
1380 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1381 {
1382 log_db_error(conn,
1383 sqlquery,
1384 _("unable to determine if server is in recovery"));
1385
1386 recovery_type = RECTYPE_UNKNOWN;
1387 }
1388 else if (PQntuples(res) == 1)
1389 {
1390 if (strcmp(PQgetvalue(res, 0, 0), "f") == 0)
1391 {
1392 recovery_type = RECTYPE_PRIMARY;
1393 }
1394 else
1395 {
1396 recovery_type = RECTYPE_STANDBY;
1397 }
1398 }
1399
1400 PQclear(res);
1401
1402 return recovery_type;
1403 }
1404
1405 /*
1406 * Read the node list from the provided connection and attempt to connect to each node
1407 * in turn to definitely establish if it's the cluster primary.
1408 *
1409 * The node list is returned in the order which makes it likely that the
1410 * current primary will be returned first, reducing the number of speculative
1411 * connections which need to be made to other nodes.
1412 *
1413 * If primary_conninfo_out points to allocated memory of MAXCONNINFO in length,
1414 * the primary server's conninfo string will be copied there.
1415 */
1416
1417 PGconn *
_get_primary_connection(PGconn * conn,int * primary_id,char * primary_conninfo_out,bool quiet)1418 _get_primary_connection(PGconn *conn,
1419 int *primary_id, char *primary_conninfo_out, bool quiet)
1420 {
1421 PQExpBufferData query;
1422
1423 PGconn *remote_conn = NULL;
1424 PGresult *res = NULL;
1425
1426 char remote_conninfo_stack[MAXCONNINFO];
1427 char *remote_conninfo = &*remote_conninfo_stack;
1428
1429 int i,
1430 node_id;
1431
1432 /*
1433 * If the caller wanted to get a copy of the connection info string, sub
1434 * out the local stack pointer for the pointer passed by the caller.
1435 */
1436 if (primary_conninfo_out != NULL)
1437 remote_conninfo = primary_conninfo_out;
1438
1439 if (primary_id != NULL)
1440 {
1441 *primary_id = NODE_NOT_FOUND;
1442 }
1443
1444 /* find all registered nodes */
1445 log_verbose(LOG_INFO, _("searching for primary node"));
1446
1447 initPQExpBuffer(&query);
1448 appendPQExpBufferStr(&query,
1449 " SELECT node_id, conninfo, "
1450 " CASE WHEN type = 'primary' THEN 1 ELSE 2 END AS type_priority"
1451 " FROM repmgr.nodes "
1452 " WHERE active IS TRUE "
1453 " AND type != 'witness' "
1454 "ORDER BY active DESC, type_priority, priority, node_id");
1455
1456 log_verbose(LOG_DEBUG, "get_primary_connection():\n%s", query.data);
1457
1458 res = PQexec(conn, query.data);
1459
1460 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1461 {
1462 log_db_error(conn, query.data, _("_get_primary_connection(): unable to retrieve node records"));
1463
1464 termPQExpBuffer(&query);
1465 PQclear(res);
1466
1467 return NULL;
1468 }
1469
1470 termPQExpBuffer(&query);
1471
1472 for (i = 0; i < PQntuples(res); i++)
1473 {
1474 RecoveryType recovery_type;
1475
1476 /* initialize with the values of the current node being processed */
1477 node_id = atoi(PQgetvalue(res, i, 0));
1478 snprintf(remote_conninfo, MAXCONNINFO, "%s", PQgetvalue(res, i, 1));
1479
1480 log_verbose(LOG_INFO,
1481 _("checking if node %i is primary"),
1482 node_id);
1483
1484 if (quiet)
1485 {
1486 remote_conn = establish_db_connection_quiet(remote_conninfo);
1487 }
1488 else
1489 {
1490 remote_conn = establish_db_connection(remote_conninfo, false);
1491 }
1492
1493 if (PQstatus(remote_conn) != CONNECTION_OK)
1494 {
1495 PQfinish(remote_conn);
1496 remote_conn = NULL;
1497 continue;
1498 }
1499
1500 recovery_type = get_recovery_type(remote_conn);
1501
1502 if (recovery_type == RECTYPE_UNKNOWN)
1503 {
1504 log_warning(_("unable to retrieve recovery state from node %i"),
1505 node_id);
1506
1507 PQfinish(remote_conn);
1508 continue;
1509 }
1510
1511 if (recovery_type == RECTYPE_PRIMARY)
1512 {
1513 PQclear(res);
1514 log_verbose(LOG_INFO, _("current primary node is %i"), node_id);
1515
1516 if (primary_id != NULL)
1517 {
1518 *primary_id = node_id;
1519 }
1520
1521 return remote_conn;
1522 }
1523
1524 PQfinish(remote_conn);
1525 }
1526
1527 PQclear(res);
1528 return NULL;
1529 }
1530
1531
1532
1533 /*
1534 * Return the id of the active primary node, or NODE_NOT_FOUND if no
1535 * record available.
1536 *
1537 * This reports the value stored in the database only and
1538 * does not verify whether the node is actually available
1539 */
1540 int
get_primary_node_id(PGconn * conn)1541 get_primary_node_id(PGconn *conn)
1542 {
1543 PQExpBufferData query;
1544 PGresult *res = NULL;
1545 int retval = NODE_NOT_FOUND;
1546
1547 initPQExpBuffer(&query);
1548 appendPQExpBufferStr(&query,
1549 "SELECT node_id "
1550 " FROM repmgr.nodes "
1551 " WHERE type = 'primary' "
1552 " AND active IS TRUE ");
1553
1554 log_verbose(LOG_DEBUG, "get_primary_node_id():\n%s", query.data);
1555
1556 res = PQexec(conn, query.data);
1557
1558 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1559 {
1560 log_db_error(conn, query.data, _("get_primary_node_id(): unable to execute query"));
1561 retval = UNKNOWN_NODE_ID;
1562 }
1563 else if (PQntuples(res) == 0)
1564 {
1565 log_verbose(LOG_WARNING, _("get_primary_node_id(): no active primary found"));
1566 retval = NODE_NOT_FOUND;
1567 }
1568 else
1569 {
1570 retval = atoi(PQgetvalue(res, 0, 0));
1571 }
1572
1573 termPQExpBuffer(&query);
1574 PQclear(res);
1575
1576 return retval;
1577 }
1578
1579
1580
1581
1582 int
get_ready_archive_files(PGconn * conn,const char * data_directory)1583 get_ready_archive_files(PGconn *conn, const char *data_directory)
1584 {
1585 char archive_status_dir[MAXPGPATH] = "";
1586 struct stat statbuf;
1587 struct dirent *arcdir_ent;
1588 DIR *arcdir;
1589
1590 int ready_count = 0;
1591
1592 if (PQserverVersion(conn) >= 100000)
1593 {
1594 snprintf(archive_status_dir, MAXPGPATH,
1595 "%s/pg_wal/archive_status",
1596 data_directory);
1597 }
1598 else
1599 {
1600 snprintf(archive_status_dir, MAXPGPATH,
1601 "%s/pg_xlog/archive_status",
1602 data_directory);
1603 }
1604
1605 /* sanity-check directory path */
1606 if (stat(archive_status_dir, &statbuf) == -1)
1607 {
1608 log_error(_("unable to access archive_status directory \"%s\""),
1609 archive_status_dir);
1610 log_detail("%s", strerror(errno));
1611
1612 return ARCHIVE_STATUS_DIR_ERROR;
1613 }
1614
1615 arcdir = opendir(archive_status_dir);
1616
1617 if (arcdir == NULL)
1618 {
1619 log_error(_("unable to open archive directory \"%s\""),
1620 archive_status_dir);
1621 log_detail("%s", strerror(errno));
1622
1623 return ARCHIVE_STATUS_DIR_ERROR;
1624 }
1625
1626 while ((arcdir_ent = readdir(arcdir)) != NULL)
1627 {
1628 struct stat statbuf;
1629 char file_path[MAXPGPATH + sizeof(arcdir_ent->d_name)];
1630 int basenamelen = 0;
1631
1632 snprintf(file_path, sizeof(file_path),
1633 "%s/%s",
1634 archive_status_dir,
1635 arcdir_ent->d_name);
1636
1637 /* skip non-files */
1638 if (stat(file_path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
1639 {
1640 continue;
1641 }
1642
1643 basenamelen = (int) strlen(arcdir_ent->d_name) - 6;
1644
1645 /*
1646 * count anything ending in ".ready"; for a more precise
1647 * implementation see: src/backend/postmaster/pgarch.c
1648 */
1649 if (strcmp(arcdir_ent->d_name + basenamelen, ".ready") == 0)
1650 ready_count++;
1651 }
1652
1653 closedir(arcdir);
1654
1655 return ready_count;
1656 }
1657
1658
1659 bool
identify_system(PGconn * repl_conn,t_system_identification * identification)1660 identify_system(PGconn *repl_conn, t_system_identification *identification)
1661 {
1662 PGresult *res = NULL;
1663
1664 /* semicolon required here */
1665 res = PQexec(repl_conn, "IDENTIFY_SYSTEM;");
1666
1667 if (PQresultStatus(res) != PGRES_TUPLES_OK || !PQntuples(res))
1668 {
1669 log_db_error(repl_conn, NULL, _("unable to execute IDENTIFY_SYSTEM"));
1670
1671 PQclear(res);
1672 return false;
1673 }
1674
1675 #if defined(__i386__) || defined(__i386)
1676 identification->system_identifier = atoll(PQgetvalue(res, 0, 0));
1677 #else
1678 identification->system_identifier = atol(PQgetvalue(res, 0, 0));
1679 #endif
1680
1681 identification->timeline = atoi(PQgetvalue(res, 0, 1));
1682 identification->xlogpos = parse_lsn(PQgetvalue(res, 0, 2));
1683
1684 PQclear(res);
1685 return true;
1686 }
1687
1688
1689 /*
1690 * Return the system identifier by querying pg_control_system().
1691 *
1692 * Note there is a similar function in controldata.c ("get_system_identifier()")
1693 * which reads the control file.
1694 */
1695 uint64
system_identifier(PGconn * conn)1696 system_identifier(PGconn *conn)
1697 {
1698 uint64 system_identifier = UNKNOWN_SYSTEM_IDENTIFIER;
1699 PGresult *res = NULL;
1700
1701 /*
1702 * pg_control_system() was introduced in PostgreSQL 9.6
1703 */
1704 if (PQserverVersion(conn) < 90600)
1705 {
1706 return UNKNOWN_SYSTEM_IDENTIFIER;
1707 }
1708
1709 res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
1710
1711 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1712 {
1713 log_db_error(conn, NULL, _("system_identifier(): unable to query pg_control_system()"));
1714 }
1715 else
1716 {
1717 #if defined(__i386__) || defined(__i386)
1718 system_identifier = atoll(PQgetvalue(res, 0, 0));
1719 #else
1720 system_identifier = atol(PQgetvalue(res, 0, 0));
1721 #endif
1722 }
1723
1724 PQclear(res);
1725
1726 return system_identifier;
1727 }
1728
1729
1730 TimeLineHistoryEntry *
get_timeline_history(PGconn * repl_conn,TimeLineID tli)1731 get_timeline_history(PGconn *repl_conn, TimeLineID tli)
1732 {
1733 PQExpBufferData query;
1734 PGresult *res = NULL;
1735
1736 PQExpBufferData result;
1737 char *resptr;
1738
1739 TimeLineHistoryEntry *history;
1740 TimeLineID file_tli = UNKNOWN_TIMELINE_ID;
1741 uint32 switchpoint_hi;
1742 uint32 switchpoint_lo;
1743
1744 initPQExpBuffer(&query);
1745
1746 appendPQExpBuffer(&query,
1747 "TIMELINE_HISTORY %i",
1748 (int)tli);
1749
1750 res = PQexec(repl_conn, query.data);
1751 log_verbose(LOG_DEBUG, "get_timeline_history():\n%s", query.data);
1752
1753 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1754 {
1755 log_db_error(repl_conn, query.data, _("get_timeline_history(): unable to execute query"));
1756 termPQExpBuffer(&query);
1757 PQclear(res);
1758 return NULL;
1759 }
1760
1761 termPQExpBuffer(&query);
1762
1763 if (PQntuples(res) != 1 || PQnfields(res) != 2)
1764 {
1765 log_error(_("unexpected response to TIMELINE_HISTORY command"));
1766 log_detail(_("got %i rows and %i fields, expected %i rows and %i fields"),
1767 PQntuples(res), PQnfields(res), 1, 2);
1768 PQclear(res);
1769 return NULL;
1770 }
1771
1772 initPQExpBuffer(&result);
1773 appendPQExpBufferStr(&result, PQgetvalue(res, 0, 1));
1774 PQclear(res);
1775
1776 resptr = result.data;
1777
1778 while (*resptr)
1779 {
1780 char buf[MAXLEN];
1781 char *bufptr = buf;
1782
1783 if (*resptr != '\n')
1784 {
1785 int len = 0;
1786
1787 memset(buf, 0, MAXLEN);
1788
1789 while (*resptr && *resptr != '\n' && len < MAXLEN)
1790 {
1791 *bufptr++ = *resptr++;
1792 len++;
1793 }
1794
1795 if (buf[0])
1796 {
1797 int nfields = sscanf(buf,
1798 "%u\t%X/%X",
1799 &file_tli, &switchpoint_hi, &switchpoint_lo);
1800 if (nfields == 3 && file_tli == tli - 1)
1801 break;
1802 }
1803 }
1804
1805 if (*resptr)
1806 resptr++;
1807 }
1808
1809 termPQExpBuffer(&result);
1810
1811 if (file_tli == UNKNOWN_TIMELINE_ID || file_tli != tli - 1)
1812 {
1813 log_error(_("timeline %i not found in timeline history file content"), tli);
1814 log_detail(_("content is: \"%s\""), result.data);
1815 return NULL;
1816 }
1817
1818 history = (TimeLineHistoryEntry *) palloc(sizeof(TimeLineHistoryEntry));
1819 history->tli = file_tli;
1820 history->begin = InvalidXLogRecPtr; /* we don't care about this */
1821 history->end = ((uint64) (switchpoint_hi)) << 32 | (uint64) switchpoint_lo;
1822
1823 return history;
1824 }
1825
1826
1827 /* =============================== */
1828 /* user/role information functions */
1829 /* =============================== */
1830
1831
1832 bool
can_execute_pg_promote(PGconn * conn)1833 can_execute_pg_promote(PGconn *conn)
1834 {
1835 PQExpBufferData query;
1836 PGresult *res;
1837 bool has_pg_promote= false;
1838
1839 /* pg_promote() available from PostgreSQL 12 */
1840 if (PQserverVersion(conn) < 120000)
1841 return false;
1842
1843 initPQExpBuffer(&query);
1844 appendPQExpBufferStr(&query,
1845 " SELECT pg_catalog.has_function_privilege( "
1846 " CURRENT_USER, "
1847 " 'pg_catalog.pg_promote(bool,int)', "
1848 " 'execute' "
1849 " )");
1850
1851 res = PQexec(conn, query.data);
1852
1853 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1854 {
1855 log_db_error(conn, query.data,
1856 _("can_execute_pg_promote(): unable to query user function privilege"));
1857 }
1858 else
1859 {
1860 has_pg_promote = atobool(PQgetvalue(res, 0, 0));
1861 }
1862 termPQExpBuffer(&query);
1863
1864 return has_pg_promote;
1865 }
1866
1867
1868 /*
1869 * Determine if the user associated with the current connection is
1870 * a member of the "pg_monitor" default role, or optionally one
1871 * of its three constituent "subroles".
1872 */
1873 bool
connection_has_pg_monitor_role(PGconn * conn,const char * subrole)1874 connection_has_pg_monitor_role(PGconn *conn, const char *subrole)
1875 {
1876 PQExpBufferData query;
1877 PGresult *res;
1878 bool has_pg_monitor_role = false;
1879
1880 /* superusers can read anything, no role check needed */
1881 if (is_superuser_connection(conn, NULL) == true)
1882 return true;
1883
1884 /* pg_monitor and associated "subroles" introduced in PostgreSQL 10 */
1885 if (PQserverVersion(conn) < 100000)
1886 return false;
1887
1888 initPQExpBuffer(&query);
1889 appendPQExpBufferStr(&query,
1890 " SELECT CASE "
1891 " WHEN pg_catalog.pg_has_role('pg_monitor','MEMBER') "
1892 " THEN TRUE ");
1893
1894 if (subrole != NULL)
1895 {
1896 appendPQExpBuffer(&query,
1897 " WHEN pg_catalog.pg_has_role('%s','MEMBER') "
1898 " THEN TRUE ",
1899 subrole);
1900 }
1901
1902 appendPQExpBufferStr(&query,
1903 " ELSE FALSE "
1904 " END AS has_pg_monitor");
1905
1906 res = PQexec(conn, query.data);
1907
1908 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1909 {
1910 log_db_error(conn, query.data,
1911 _("connection_has_pg_monitor_role(): unable to query user roles"));
1912 }
1913 else
1914 {
1915 has_pg_monitor_role = atobool(PQgetvalue(res, 0, 0));
1916 }
1917 termPQExpBuffer(&query);
1918 PQclear(res);
1919
1920 return has_pg_monitor_role;
1921 }
1922
1923
1924 bool
is_replication_role(PGconn * conn,char * rolname)1925 is_replication_role(PGconn *conn, char *rolname)
1926 {
1927 PQExpBufferData query;
1928 PGresult *res;
1929 bool is_replication_role = false;
1930
1931 initPQExpBuffer(&query);
1932
1933 appendPQExpBufferStr(&query,
1934 " SELECT rolreplication "
1935 " FROM pg_catalog.pg_roles "
1936 " WHERE rolname = ");
1937
1938 if (rolname != NULL)
1939 {
1940 appendPQExpBuffer(&query,
1941 "'%s'",
1942 rolname);
1943 }
1944 else
1945 {
1946 appendPQExpBufferStr(&query,
1947 "CURRENT_USER");
1948 }
1949
1950 res = PQexec(conn, query.data);
1951
1952 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1953 {
1954 log_db_error(conn, query.data,
1955 _("is_replication_role(): unable to query user roles"));
1956 }
1957 else
1958 {
1959 is_replication_role = atobool(PQgetvalue(res, 0, 0));
1960 }
1961
1962 termPQExpBuffer(&query);
1963 PQclear(res);
1964
1965 return is_replication_role;
1966 }
1967
1968
1969 bool
is_superuser_connection(PGconn * conn,t_connection_user * userinfo)1970 is_superuser_connection(PGconn *conn, t_connection_user *userinfo)
1971 {
1972 bool is_superuser = false;
1973 const char *current_user = PQuser(conn);
1974 const char *superuser_status = PQparameterStatus(conn, "is_superuser");
1975
1976 is_superuser = (strcmp(superuser_status, "on") == 0) ? true : false;
1977
1978 if (userinfo != NULL)
1979 {
1980 snprintf(userinfo->username,
1981 sizeof(userinfo->username),
1982 "%s", current_user);
1983 userinfo->is_superuser = is_superuser;
1984 }
1985
1986 return is_superuser;
1987 }
1988
1989
1990 /* =============================== */
1991 /* repmgrd shared memory functions */
1992 /* =============================== */
1993
1994 bool
repmgrd_set_local_node_id(PGconn * conn,int local_node_id)1995 repmgrd_set_local_node_id(PGconn *conn, int local_node_id)
1996 {
1997 PQExpBufferData query;
1998 PGresult *res = NULL;
1999 bool success = true;
2000
2001 initPQExpBuffer(&query);
2002
2003 appendPQExpBuffer(&query,
2004 "SELECT repmgr.set_local_node_id(%i)",
2005 local_node_id);
2006
2007 res = PQexec(conn, query.data);
2008
2009 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2010 {
2011 log_db_error(conn, query.data, _("repmgrd_set_local_node_id(): unable to execute query"));
2012
2013 success = false;
2014 }
2015
2016 termPQExpBuffer(&query);
2017 PQclear(res);
2018
2019 return success;
2020 }
2021
2022
2023 int
repmgrd_get_local_node_id(PGconn * conn)2024 repmgrd_get_local_node_id(PGconn *conn)
2025 {
2026 PGresult *res = NULL;
2027 int local_node_id = UNKNOWN_NODE_ID;
2028
2029 const char *sqlquery = "SELECT repmgr.get_local_node_id()";
2030
2031 res = PQexec(conn, sqlquery);
2032
2033 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2034 {
2035 log_db_error(conn, sqlquery, _("repmgrd_get_local_node_id(): unable to execute query"));
2036 }
2037 else if (!PQgetisnull(res, 0, 0))
2038 {
2039 local_node_id = atoi(PQgetvalue(res, 0, 0));
2040 }
2041
2042 PQclear(res);
2043
2044 return local_node_id;
2045 }
2046
2047
2048 bool
repmgrd_check_local_node_id(PGconn * conn)2049 repmgrd_check_local_node_id(PGconn *conn)
2050 {
2051 PGresult *res = NULL;
2052 bool node_id_settable = true;
2053 const char *sqlquery = "SELECT repmgr.get_local_node_id()";
2054
2055 res = PQexec(conn, sqlquery);
2056
2057 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2058 {
2059 log_db_error(conn, sqlquery, _("repmgrd_get_local_node_id(): unable to execute query"));
2060 }
2061
2062 if (PQgetisnull(res, 0, 0))
2063 {
2064 node_id_settable = false;
2065 }
2066
2067 PQclear(res);
2068
2069 return node_id_settable;
2070 }
2071
2072
2073 /*
2074 * Function that checks if the primary is in exclusive backup mode.
2075 * We'll use this when executing an action can conflict with an exclusive
2076 * backup.
2077 */
2078 BackupState
server_in_exclusive_backup_mode(PGconn * conn)2079 server_in_exclusive_backup_mode(PGconn *conn)
2080 {
2081 BackupState backup_state = BACKUP_STATE_UNKNOWN;
2082 const char *sqlquery = "SELECT pg_catalog.pg_is_in_backup()";
2083 PGresult *res = PQexec(conn, sqlquery);
2084
2085 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2086 {
2087 log_db_error(conn, sqlquery, _("unable to retrieve information regarding backup mode of node"));
2088
2089 backup_state = BACKUP_STATE_UNKNOWN;
2090 }
2091 else if (atobool(PQgetvalue(res, 0, 0)) == true)
2092 {
2093 backup_state = BACKUP_STATE_IN_BACKUP;
2094 }
2095 else
2096 {
2097 backup_state = BACKUP_STATE_NO_BACKUP;
2098 }
2099
2100 PQclear(res);
2101
2102 return backup_state;
2103 }
2104
2105
2106 void
repmgrd_set_pid(PGconn * conn,pid_t repmgrd_pid,const char * pidfile)2107 repmgrd_set_pid(PGconn *conn, pid_t repmgrd_pid, const char *pidfile)
2108 {
2109 PQExpBufferData query;
2110 PGresult *res = NULL;
2111
2112 log_verbose(LOG_DEBUG, "repmgrd_set_pid(): pid is %i", (int) repmgrd_pid);
2113
2114 initPQExpBuffer(&query);
2115
2116 appendPQExpBuffer(&query,
2117 "SELECT repmgr.set_repmgrd_pid(%i, ",
2118 (int) repmgrd_pid);
2119
2120 if (pidfile != NULL)
2121 {
2122 appendPQExpBuffer(&query,
2123 " '%s')",
2124 pidfile);
2125 }
2126 else
2127 {
2128 appendPQExpBufferStr(&query,
2129 " NULL)");
2130 }
2131
2132 res = PQexec(conn, query.data);
2133 termPQExpBuffer(&query);
2134
2135 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2136 {
2137 log_error(_("unable to execute \"SELECT repmgr.set_repmgrd_pid()\""));
2138 log_detail("%s", PQerrorMessage(conn));
2139 }
2140
2141 PQclear(res);
2142
2143 return;
2144 }
2145
2146
2147 pid_t
repmgrd_get_pid(PGconn * conn)2148 repmgrd_get_pid(PGconn *conn)
2149 {
2150 PGresult *res = NULL;
2151 pid_t repmgrd_pid = UNKNOWN_PID;
2152
2153 res = PQexec(conn, "SELECT repmgr.get_repmgrd_pid()");
2154
2155 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2156 {
2157 log_error(_("unable to execute \"SELECT repmgr.get_repmgrd_pid()\""));
2158 log_detail("%s", PQerrorMessage(conn));
2159 }
2160 else if (!PQgetisnull(res, 0, 0))
2161 {
2162 repmgrd_pid = atoi(PQgetvalue(res, 0, 0));
2163 }
2164
2165 PQclear(res);
2166
2167 return repmgrd_pid;
2168 }
2169
2170
2171 bool
repmgrd_is_running(PGconn * conn)2172 repmgrd_is_running(PGconn *conn)
2173 {
2174 PGresult *res = NULL;
2175 bool is_running = false;
2176
2177 res = PQexec(conn, "SELECT repmgr.repmgrd_is_running()");
2178
2179 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2180 {
2181 log_error(_("unable to execute \"SELECT repmgr.repmgrd_is_running()\""));
2182 log_detail("%s", PQerrorMessage(conn));
2183 }
2184 else if (!PQgetisnull(res, 0, 0))
2185 {
2186 is_running = atobool(PQgetvalue(res, 0, 0));
2187 }
2188
2189 PQclear(res);
2190
2191 return is_running;
2192 }
2193
2194
2195 bool
repmgrd_is_paused(PGconn * conn)2196 repmgrd_is_paused(PGconn *conn)
2197 {
2198 PGresult *res = NULL;
2199 bool is_paused = false;
2200
2201 res = PQexec(conn, "SELECT repmgr.repmgrd_is_paused()");
2202
2203 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2204 {
2205 log_error(_("unable to execute \"SELECT repmgr.repmgrd_is_paused()\""));
2206 log_detail("%s", PQerrorMessage(conn));
2207 }
2208 else if (!PQgetisnull(res, 0, 0))
2209 {
2210 is_paused = atobool(PQgetvalue(res, 0, 0));
2211 }
2212
2213 PQclear(res);
2214
2215 return is_paused;
2216 }
2217
2218
2219 bool
repmgrd_pause(PGconn * conn,bool pause)2220 repmgrd_pause(PGconn *conn, bool pause)
2221 {
2222 PQExpBufferData query;
2223 PGresult *res = NULL;
2224 bool success = true;
2225
2226 initPQExpBuffer(&query);
2227
2228 appendPQExpBuffer(&query,
2229 "SELECT repmgr.repmgrd_pause(%s)",
2230 pause == true ? "TRUE" : "FALSE");
2231 res = PQexec(conn, query.data);
2232 termPQExpBuffer(&query);
2233
2234 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2235 {
2236 log_error(_("unable to execute \"SELECT repmgr.repmgrd_pause()\""));
2237 log_detail("%s", PQerrorMessage(conn));
2238
2239 success = false;
2240 }
2241
2242 PQclear(res);
2243
2244 return success;
2245 }
2246
2247 pid_t
get_wal_receiver_pid(PGconn * conn)2248 get_wal_receiver_pid(PGconn *conn)
2249 {
2250 PGresult *res = NULL;
2251 pid_t wal_receiver_pid = UNKNOWN_PID;
2252
2253 res = PQexec(conn, "SELECT repmgr.get_wal_receiver_pid()");
2254
2255 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2256 {
2257 log_error(_("unable to execute \"SELECT repmgr.get_wal_receiver_pid()\""));
2258 log_detail("%s", PQerrorMessage(conn));
2259 }
2260 else if (!PQgetisnull(res, 0, 0))
2261 {
2262 wal_receiver_pid = atoi(PQgetvalue(res, 0, 0));
2263 }
2264
2265 PQclear(res);
2266
2267 return wal_receiver_pid;
2268 }
2269
2270
2271 int
repmgrd_get_upstream_node_id(PGconn * conn)2272 repmgrd_get_upstream_node_id(PGconn *conn)
2273 {
2274 PGresult *res = NULL;
2275 int upstream_node_id = UNKNOWN_NODE_ID;
2276
2277 const char *sqlquery = "SELECT repmgr.get_upstream_node_id()";
2278
2279 res = PQexec(conn, sqlquery);
2280
2281 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2282 {
2283 log_db_error(conn, sqlquery, _("repmgrd_get_upstream_node_id(): unable to execute query"));
2284 }
2285 else if (!PQgetisnull(res, 0, 0))
2286 {
2287 upstream_node_id = atoi(PQgetvalue(res, 0, 0));
2288 }
2289
2290 PQclear(res);
2291
2292 return upstream_node_id;
2293 }
2294
2295
2296 bool
repmgrd_set_upstream_node_id(PGconn * conn,int node_id)2297 repmgrd_set_upstream_node_id(PGconn *conn, int node_id)
2298 {
2299 PQExpBufferData query;
2300 PGresult *res = NULL;
2301 bool success = true;
2302
2303 initPQExpBuffer(&query);
2304 appendPQExpBuffer(&query,
2305 " SELECT repmgr.set_upstream_node_id(%i) ",
2306 node_id);
2307
2308 log_verbose(LOG_DEBUG, "repmgrd_set_upstream_node_id():\n %s", query.data);
2309
2310 res = PQexec(conn, query.data);
2311
2312 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2313 {
2314 log_db_error(conn, query.data,
2315 _("repmgrd_set_upstream_node_id(): unable to set upstream node ID (provided value: %i)"), node_id);
2316 success = false;
2317 }
2318
2319 termPQExpBuffer(&query);
2320 PQclear(res);
2321
2322 return success;
2323 }
2324
2325 /* ================ */
2326 /* result functions */
2327 /* ================ */
2328
2329 bool
atobool(const char * value)2330 atobool(const char *value)
2331 {
2332 return (strcmp(value, "t") == 0)
2333 ? true
2334 : false;
2335 }
2336
2337
2338 /* =================== */
2339 /* extension functions */
2340 /* =================== */
2341
2342 ExtensionStatus
get_repmgr_extension_status(PGconn * conn,t_extension_versions * extversions)2343 get_repmgr_extension_status(PGconn *conn, t_extension_versions *extversions)
2344 {
2345 PQExpBufferData query;
2346 PGresult *res = NULL;
2347 ExtensionStatus status = REPMGR_UNKNOWN;
2348
2349 /* TODO: check version */
2350
2351 initPQExpBuffer(&query);
2352
2353 appendPQExpBufferStr(&query,
2354 " SELECT ae.name, e.extname, "
2355 " ae.default_version, "
2356 " (((FLOOR(ae.default_version::NUMERIC)::INT) * 10000) + (ae.default_version::NUMERIC - FLOOR(ae.default_version::NUMERIC)::INT) * 1000)::INT AS available, "
2357 " ae.installed_version, "
2358 " (((FLOOR(ae.installed_version::NUMERIC)::INT) * 10000) + (ae.installed_version::NUMERIC - FLOOR(ae.installed_version::NUMERIC)::INT) * 1000)::INT AS installed "
2359 " FROM pg_catalog.pg_available_extensions ae "
2360 "LEFT JOIN pg_catalog.pg_extension e "
2361 " ON e.extname=ae.name "
2362 " WHERE ae.name='repmgr' ");
2363
2364 res = PQexec(conn, query.data);
2365
2366 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2367 {
2368 log_db_error(conn, query.data, _("get_repmgr_extension_status(): unable to execute extension query"));
2369 status = REPMGR_UNKNOWN;
2370 }
2371
2372 /* 1. Check extension is actually available */
2373 else if (PQntuples(res) == 0)
2374 {
2375 status = REPMGR_UNAVAILABLE;
2376 }
2377
2378 /* 2. Check if extension installed */
2379 else if (PQgetisnull(res, 0, 1) == 0)
2380 {
2381 int available_version = atoi(PQgetvalue(res, 0, 3));
2382 int installed_version = atoi(PQgetvalue(res, 0, 5));
2383
2384 /* caller wants to know which versions are installed/available */
2385 if (extversions != NULL)
2386 {
2387 snprintf(extversions->default_version,
2388 sizeof(extversions->default_version),
2389 "%s", PQgetvalue(res, 0, 2));
2390 extversions->default_version_num = available_version;
2391 snprintf(extversions->installed_version,
2392 sizeof(extversions->installed_version),
2393 "%s", PQgetvalue(res, 0, 4));
2394 extversions->installed_version_num = installed_version;
2395 }
2396
2397 if (available_version > installed_version)
2398 {
2399 status = REPMGR_OLD_VERSION_INSTALLED;
2400 }
2401 else
2402 {
2403 status = REPMGR_INSTALLED;
2404 }
2405 }
2406 else
2407 {
2408 status = REPMGR_AVAILABLE;
2409 }
2410
2411 termPQExpBuffer(&query);
2412 PQclear(res);
2413
2414 return status;
2415 }
2416
2417 /* ========================= */
2418 /* node management functions */
2419 /* ========================= */
2420
2421 /* assumes superuser connection */
2422 void
checkpoint(PGconn * conn)2423 checkpoint(PGconn *conn)
2424 {
2425 PGresult *res = NULL;
2426
2427 res = PQexec(conn, "CHECKPOINT");
2428
2429 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2430 {
2431 log_db_error(conn, NULL, _("unable to execute CHECKPOINT"));
2432 }
2433
2434 PQclear(res);
2435 return;
2436 }
2437
2438
2439 bool
vacuum_table(PGconn * primary_conn,const char * table)2440 vacuum_table(PGconn *primary_conn, const char *table)
2441 {
2442 PQExpBufferData query;
2443 bool success = true;
2444 PGresult *res = NULL;
2445
2446 initPQExpBuffer(&query);
2447
2448 appendPQExpBuffer(&query, "VACUUM %s", table);
2449
2450 res = PQexec(primary_conn, query.data);
2451
2452 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2453 {
2454 log_db_error(primary_conn, NULL,
2455 _("unable to vacuum table \"%s\""), table);
2456 success = false;
2457 }
2458
2459 termPQExpBuffer(&query);
2460 PQclear(res);
2461
2462 return success;
2463 }
2464
2465 /*
2466 * For use in PostgreSQL 12 and later
2467 */
2468 bool
promote_standby(PGconn * conn,bool wait,int wait_seconds)2469 promote_standby(PGconn *conn, bool wait, int wait_seconds)
2470 {
2471 PQExpBufferData query;
2472 bool success = true;
2473 PGresult *res = NULL;
2474
2475 initPQExpBuffer(&query);
2476
2477 appendPQExpBuffer(&query,
2478 "SELECT pg_catalog.pg_promote(wait := %s",
2479 wait ? "TRUE" : "FALSE");
2480
2481 if (wait_seconds > 0)
2482 {
2483 appendPQExpBuffer(&query,
2484 ", wait_seconds := %i",
2485 wait_seconds);
2486 }
2487
2488 appendPQExpBufferStr(&query, ")");
2489
2490 res = PQexec(conn, query.data);
2491
2492 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2493 {
2494 log_db_error(conn, query.data, _("unable to execute pg_promote()"));
2495 success = false;
2496 }
2497 else
2498 {
2499 /* NOTE: if "wait" is false, pg_promote() will always return true */
2500 success = atobool(PQgetvalue(res, 0, 0));
2501 }
2502
2503 termPQExpBuffer(&query);
2504 PQclear(res);
2505
2506 return success;
2507 }
2508
2509
2510 bool
resume_wal_replay(PGconn * conn)2511 resume_wal_replay(PGconn *conn)
2512 {
2513 PGresult *res = NULL;
2514 PQExpBufferData query;
2515 bool success = true;
2516
2517 initPQExpBuffer(&query);
2518
2519 if (PQserverVersion(conn) >= 100000)
2520 {
2521 appendPQExpBufferStr(&query,
2522 "SELECT pg_catalog.pg_wal_replay_resume()");
2523 }
2524 else
2525 {
2526 appendPQExpBufferStr(&query,
2527 "SELECT pg_catalog.pg_xlog_replay_resume()");
2528 }
2529
2530 res = PQexec(conn, query.data);
2531
2532 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2533 {
2534 log_db_error(conn, query.data, _("resume_wal_replay(): unable to resume WAL replay"));
2535 success = false;
2536 }
2537
2538 termPQExpBuffer(&query);
2539 PQclear(res);
2540
2541 return success;
2542 }
2543
2544
2545 /* ===================== */
2546 /* Node record functions */
2547 /* ===================== */
2548
2549 /*
2550 * Note: init_defaults may only be false when the caller is refreshing a previously
2551 * populated record.
2552 */
2553 static RecordStatus
_get_node_record(PGconn * conn,char * sqlquery,t_node_info * node_info,bool init_defaults)2554 _get_node_record(PGconn *conn, char *sqlquery, t_node_info *node_info, bool init_defaults)
2555 {
2556 int ntuples = 0;
2557 PGresult *res = PQexec(conn, sqlquery);
2558
2559 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2560 {
2561 log_db_error(conn, sqlquery, _("_get_node_record(): unable to execute query"));
2562
2563 PQclear(res);
2564 return RECORD_ERROR;
2565 }
2566
2567 ntuples = PQntuples(res);
2568
2569 if (ntuples == 0)
2570 {
2571 PQclear(res);
2572 return RECORD_NOT_FOUND;
2573 }
2574
2575 _populate_node_record(res, node_info, 0, init_defaults);
2576
2577 PQclear(res);
2578
2579 return RECORD_FOUND;
2580 }
2581
2582
2583 /*
2584 * Note: init_defaults may only be false when the caller is refreshing a previously
2585 * populated record.
2586 */
2587 static void
_populate_node_record(PGresult * res,t_node_info * node_info,int row,bool init_defaults)2588 _populate_node_record(PGresult *res, t_node_info *node_info, int row, bool init_defaults)
2589 {
2590 node_info->node_id = atoi(PQgetvalue(res, row, 0));
2591 node_info->type = parse_node_type(PQgetvalue(res, row, 1));
2592
2593 if (PQgetisnull(res, row, 2))
2594 {
2595 node_info->upstream_node_id = NO_UPSTREAM_NODE;
2596 }
2597 else
2598 {
2599 node_info->upstream_node_id = atoi(PQgetvalue(res, row, 2));
2600 }
2601
2602 snprintf(node_info->node_name, sizeof(node_info->node_name), "%s", PQgetvalue(res, row, 3));
2603 snprintf(node_info->conninfo, sizeof(node_info->conninfo), "%s", PQgetvalue(res, row, 4));
2604 snprintf(node_info->repluser, sizeof(node_info->repluser), "%s", PQgetvalue(res, row, 5));
2605 snprintf(node_info->slot_name, sizeof(node_info->slot_name), "%s", PQgetvalue(res, row, 6));
2606 snprintf(node_info->location, sizeof(node_info->location), "%s", PQgetvalue(res, row, 7));
2607 node_info->priority = atoi(PQgetvalue(res, row, 8));
2608 node_info->active = atobool(PQgetvalue(res, row, 9));
2609 snprintf(node_info->config_file, sizeof(node_info->config_file), "%s", PQgetvalue(res, row, 10));
2610
2611 /* These are only set by certain queries */
2612 snprintf(node_info->upstream_node_name, sizeof(node_info->upstream_node_name), "%s", PQgetvalue(res, row, 11));
2613
2614 if (PQgetisnull(res, row, 12))
2615 {
2616 node_info->attached = NODE_ATTACHED_UNKNOWN;
2617 }
2618 else
2619 {
2620 node_info->attached = atobool(PQgetvalue(res, row, 12)) ? NODE_ATTACHED : NODE_DETACHED;
2621 }
2622
2623 /* Set remaining struct fields with default values */
2624
2625 if (init_defaults == true)
2626 {
2627 node_info->node_status = NODE_STATUS_UNKNOWN;
2628 node_info->recovery_type = RECTYPE_UNKNOWN;
2629 node_info->last_wal_receive_lsn = InvalidXLogRecPtr;
2630 node_info->monitoring_state = MS_NORMAL;
2631 node_info->conn = NULL;
2632 }
2633 }
2634
2635
2636 t_server_type
parse_node_type(const char * type)2637 parse_node_type(const char *type)
2638 {
2639 if (strcmp(type, "primary") == 0)
2640 {
2641 return PRIMARY;
2642 }
2643 else if (strcmp(type, "standby") == 0)
2644 {
2645 return STANDBY;
2646 }
2647 else if (strcmp(type, "witness") == 0)
2648 {
2649 return WITNESS;
2650 }
2651
2652 return UNKNOWN;
2653 }
2654
2655
2656 const char *
get_node_type_string(t_server_type type)2657 get_node_type_string(t_server_type type)
2658 {
2659 switch (type)
2660 {
2661 case PRIMARY:
2662 return "primary";
2663 case STANDBY:
2664 return "standby";
2665 case WITNESS:
2666 return "witness";
2667 /* this should never happen */
2668 case UNKNOWN:
2669 default:
2670 log_error(_("unknown node type %i"), type);
2671 return "unknown";
2672 }
2673 }
2674
2675
2676 RecordStatus
get_node_record(PGconn * conn,int node_id,t_node_info * node_info)2677 get_node_record(PGconn *conn, int node_id, t_node_info *node_info)
2678 {
2679 PQExpBufferData query;
2680 RecordStatus result;
2681
2682 initPQExpBuffer(&query);
2683 appendPQExpBuffer(&query,
2684 "SELECT " REPMGR_NODES_COLUMNS
2685 " FROM repmgr.nodes n "
2686 " WHERE n.node_id = %i",
2687 node_id);
2688
2689 log_verbose(LOG_DEBUG, "get_node_record():\n %s", query.data);
2690
2691 result = _get_node_record(conn, query.data, node_info, true);
2692 termPQExpBuffer(&query);
2693
2694 if (result == RECORD_NOT_FOUND)
2695 {
2696 log_verbose(LOG_DEBUG, "get_node_record(): no record found for node %i", node_id);
2697 }
2698
2699 return result;
2700 }
2701
2702
2703 RecordStatus
refresh_node_record(PGconn * conn,int node_id,t_node_info * node_info)2704 refresh_node_record(PGconn *conn, int node_id, t_node_info *node_info)
2705 {
2706 PQExpBufferData query;
2707 RecordStatus result;
2708
2709 initPQExpBuffer(&query);
2710 appendPQExpBuffer(&query,
2711 "SELECT " REPMGR_NODES_COLUMNS
2712 " FROM repmgr.nodes n "
2713 " WHERE n.node_id = %i",
2714 node_id);
2715
2716 log_verbose(LOG_DEBUG, "get_node_record():\n %s", query.data);
2717
2718 result = _get_node_record(conn, query.data, node_info, false);
2719 termPQExpBuffer(&query);
2720
2721 if (result == RECORD_NOT_FOUND)
2722 {
2723 log_verbose(LOG_DEBUG, "refresh_node_record(): no record found for node %i", node_id);
2724 }
2725
2726 return result;
2727 }
2728
2729
2730 RecordStatus
get_node_record_with_upstream(PGconn * conn,int node_id,t_node_info * node_info)2731 get_node_record_with_upstream(PGconn *conn, int node_id, t_node_info *node_info)
2732 {
2733 PQExpBufferData query;
2734 RecordStatus result;
2735
2736 initPQExpBuffer(&query);
2737 appendPQExpBuffer(&query,
2738 " SELECT " REPMGR_NODES_COLUMNS_WITH_UPSTREAM
2739 " FROM repmgr.nodes n "
2740 " LEFT JOIN repmgr.nodes un "
2741 " ON un.node_id = n.upstream_node_id"
2742 " WHERE n.node_id = %i",
2743 node_id);
2744
2745 log_verbose(LOG_DEBUG, "get_node_record():\n %s", query.data);
2746
2747 result = _get_node_record(conn, query.data, node_info, true);
2748 termPQExpBuffer(&query);
2749
2750 if (result == RECORD_NOT_FOUND)
2751 {
2752 log_verbose(LOG_DEBUG, "get_node_record(): no record found for node %i", node_id);
2753 }
2754
2755 return result;
2756 }
2757
2758
2759 RecordStatus
get_node_record_by_name(PGconn * conn,const char * node_name,t_node_info * node_info)2760 get_node_record_by_name(PGconn *conn, const char *node_name, t_node_info *node_info)
2761 {
2762 PQExpBufferData query;
2763 RecordStatus record_status = RECORD_NOT_FOUND;
2764
2765 initPQExpBuffer(&query);
2766
2767 appendPQExpBuffer(&query,
2768 "SELECT " REPMGR_NODES_COLUMNS
2769 " FROM repmgr.nodes n "
2770 " WHERE n.node_name = '%s' ",
2771 node_name);
2772
2773 log_verbose(LOG_DEBUG, "get_node_record_by_name():\n %s", query.data);
2774
2775 record_status = _get_node_record(conn, query.data, node_info, true);
2776
2777 termPQExpBuffer(&query);
2778
2779 if (record_status == RECORD_NOT_FOUND)
2780 {
2781 log_verbose(LOG_DEBUG, "get_node_record_by_name(): no record found for node \"%s\"",
2782 node_name);
2783 }
2784
2785 return record_status;
2786 }
2787
2788
2789 t_node_info *
get_node_record_pointer(PGconn * conn,int node_id)2790 get_node_record_pointer(PGconn *conn, int node_id)
2791 {
2792 t_node_info *node_info = pg_malloc0(sizeof(t_node_info));
2793 RecordStatus record_status = RECORD_NOT_FOUND;
2794
2795 record_status = get_node_record(conn, node_id, node_info);
2796
2797 if (record_status != RECORD_FOUND)
2798 {
2799 pfree(node_info);
2800 return NULL;
2801 }
2802
2803 return node_info;
2804 }
2805
2806
2807 bool
get_primary_node_record(PGconn * conn,t_node_info * node_info)2808 get_primary_node_record(PGconn *conn, t_node_info *node_info)
2809 {
2810 RecordStatus record_status = RECORD_NOT_FOUND;
2811
2812 int primary_node_id = get_primary_node_id(conn);
2813
2814 if (primary_node_id == UNKNOWN_NODE_ID)
2815 {
2816 return false;
2817 }
2818
2819 record_status = get_node_record(conn, primary_node_id, node_info);
2820
2821 return record_status == RECORD_FOUND ? true : false;
2822 }
2823
2824
2825 /*
2826 * Get the local node record; if this fails, exit. Many operations
2827 * depend on this being available, so we'll centralize the check
2828 * and failure messages here.
2829 */
2830 bool
get_local_node_record(PGconn * conn,int node_id,t_node_info * node_info)2831 get_local_node_record(PGconn *conn, int node_id, t_node_info *node_info)
2832 {
2833 RecordStatus record_status = get_node_record(conn, node_id, node_info);
2834
2835 if (record_status != RECORD_FOUND)
2836 {
2837 log_error(_("unable to retrieve record for local node"));
2838 log_detail(_("local node id is %i"), node_id);
2839 log_hint(_("check this node was correctly registered"));
2840
2841 PQfinish(conn);
2842 exit(ERR_BAD_CONFIG);
2843 }
2844
2845 return true;
2846 }
2847
2848
2849 static
2850 void
_populate_node_records(PGresult * res,NodeInfoList * node_list)2851 _populate_node_records(PGresult *res, NodeInfoList *node_list)
2852 {
2853 int i;
2854
2855 clear_node_info_list(node_list);
2856
2857 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2858 {
2859 return;
2860 }
2861
2862 for (i = 0; i < PQntuples(res); i++)
2863 {
2864 NodeInfoListCell *cell;
2865
2866 cell = (NodeInfoListCell *) pg_malloc0(sizeof(NodeInfoListCell));
2867
2868 cell->node_info = pg_malloc0(sizeof(t_node_info));
2869
2870 _populate_node_record(res, cell->node_info, i, true);
2871
2872 if (node_list->tail)
2873 node_list->tail->next = cell;
2874 else
2875 node_list->head = cell;
2876
2877 node_list->tail = cell;
2878 node_list->node_count++;
2879 }
2880
2881 return;
2882 }
2883
2884
2885 bool
get_all_node_records(PGconn * conn,NodeInfoList * node_list)2886 get_all_node_records(PGconn *conn, NodeInfoList *node_list)
2887 {
2888 PQExpBufferData query;
2889 PGresult *res = NULL;
2890 bool success = true;
2891 initPQExpBuffer(&query);
2892
2893 appendPQExpBufferStr(&query,
2894 " SELECT " REPMGR_NODES_COLUMNS
2895 " FROM repmgr.nodes n "
2896 "ORDER BY n.node_id ");
2897
2898 log_verbose(LOG_DEBUG, "get_all_node_records():\n%s", query.data);
2899
2900 res = PQexec(conn, query.data);
2901
2902 /* this will return an empty list if there was an error executing the query */
2903 _populate_node_records(res, node_list);
2904
2905 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2906 {
2907 log_db_error(conn, query.data, _("get_all_node_records(): unable to execute query"));
2908 success = false;
2909 }
2910
2911 PQclear(res);
2912 termPQExpBuffer(&query);
2913
2914 return success;
2915 }
2916
2917 bool
get_all_nodes_count(PGconn * conn,int * count)2918 get_all_nodes_count(PGconn *conn, int *count)
2919 {
2920 PQExpBufferData query;
2921 PGresult *res = NULL;
2922 bool success = true;
2923 initPQExpBuffer(&query);
2924
2925 appendPQExpBufferStr(&query,
2926 " SELECT count(*) "
2927 " FROM repmgr.nodes n ");
2928
2929 log_verbose(LOG_DEBUG, "get_all_nodes_count():\n%s", query.data);
2930
2931 res = PQexec(conn, query.data);
2932
2933 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2934 {
2935 log_db_error(conn, query.data, _("get_all_nodes_count(): unable to execute query"));
2936 success = false;
2937 }
2938 else
2939 {
2940 *count = atoi(PQgetvalue(res, 0, 0));
2941 }
2942
2943 PQclear(res);
2944 termPQExpBuffer(&query);
2945
2946 return success;
2947 }
2948
2949 void
get_downstream_node_records(PGconn * conn,int node_id,NodeInfoList * node_list)2950 get_downstream_node_records(PGconn *conn, int node_id, NodeInfoList *node_list)
2951 {
2952 PQExpBufferData query;
2953 PGresult *res = NULL;
2954
2955 initPQExpBuffer(&query);
2956
2957 appendPQExpBuffer(&query,
2958 " SELECT " REPMGR_NODES_COLUMNS
2959 " FROM repmgr.nodes n "
2960 " WHERE n.upstream_node_id = %i "
2961 "ORDER BY n.node_id ",
2962 node_id);
2963
2964 log_verbose(LOG_DEBUG, "get_downstream_node_records():\n%s", query.data);
2965
2966 res = PQexec(conn, query.data);
2967
2968 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2969 {
2970 log_db_error(conn, query.data, _("get_downstream_node_records(): unable to execute query"));
2971 }
2972
2973 termPQExpBuffer(&query);
2974
2975 /* this will return an empty list if there was an error executing the query */
2976 _populate_node_records(res, node_list);
2977
2978 PQclear(res);
2979
2980 return;
2981 }
2982
2983
2984 void
get_active_sibling_node_records(PGconn * conn,int node_id,int upstream_node_id,NodeInfoList * node_list)2985 get_active_sibling_node_records(PGconn *conn, int node_id, int upstream_node_id, NodeInfoList *node_list)
2986 {
2987 PQExpBufferData query;
2988 PGresult *res = NULL;
2989
2990 initPQExpBuffer(&query);
2991
2992 appendPQExpBuffer(&query,
2993 " SELECT " REPMGR_NODES_COLUMNS
2994 " FROM repmgr.nodes n "
2995 " WHERE n.upstream_node_id = %i "
2996 " AND n.node_id != %i "
2997 " AND n.active IS TRUE "
2998 "ORDER BY n.node_id ",
2999 upstream_node_id,
3000 node_id);
3001
3002 log_verbose(LOG_DEBUG, "get_active_sibling_node_records():\n%s", query.data);
3003
3004 res = PQexec(conn, query.data);
3005
3006 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3007 {
3008 log_db_error(conn, query.data, _("get_active_sibling_records(): unable to execute query"));
3009 }
3010
3011 termPQExpBuffer(&query);
3012
3013 /* this will return an empty list if there was an error executing the query */
3014 _populate_node_records(res, node_list);
3015
3016 PQclear(res);
3017
3018 return;
3019 }
3020
3021 bool
get_child_nodes(PGconn * conn,int node_id,NodeInfoList * node_list)3022 get_child_nodes(PGconn *conn, int node_id, NodeInfoList *node_list)
3023 {
3024 PQExpBufferData query;
3025 PGresult *res = NULL;
3026 bool success = true;
3027
3028 initPQExpBuffer(&query);
3029
3030 appendPQExpBuffer(&query,
3031 " SELECT n.node_id, n.type, n.upstream_node_id, n.node_name, n.conninfo, n.repluser, "
3032 " n.slot_name, n.location, n.priority, n.active, n.config_file, "
3033 " '' AS upstream_node_name, "
3034 " CASE WHEN sr.application_name IS NULL THEN FALSE ELSE TRUE END AS attached "
3035 " FROM repmgr.nodes n "
3036 " LEFT JOIN pg_catalog.pg_stat_replication sr "
3037 " ON sr.application_name = n.node_name "
3038 " WHERE n.upstream_node_id = %i ",
3039 node_id);
3040
3041 log_verbose(LOG_DEBUG, "get_child_nodes():\n%s", query.data);
3042
3043 res = PQexec(conn, query.data);
3044
3045 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3046 {
3047 log_db_error(conn, query.data, _("get_active_sibling_records(): unable to execute query"));
3048 success = false;
3049 }
3050
3051 termPQExpBuffer(&query);
3052
3053 /* this will return an empty list if there was an error executing the query */
3054 _populate_node_records(res, node_list);
3055
3056 PQclear(res);
3057
3058 return success;
3059 }
3060
3061
3062 void
get_node_records_by_priority(PGconn * conn,NodeInfoList * node_list)3063 get_node_records_by_priority(PGconn *conn, NodeInfoList *node_list)
3064 {
3065 PQExpBufferData query;
3066 PGresult *res = NULL;
3067
3068 initPQExpBuffer(&query);
3069
3070 appendPQExpBufferStr(&query,
3071 " SELECT " REPMGR_NODES_COLUMNS
3072 " FROM repmgr.nodes n "
3073 "ORDER BY n.priority DESC, n.node_name ");
3074
3075 log_verbose(LOG_DEBUG, "get_node_records_by_priority():\n%s", query.data);
3076
3077 res = PQexec(conn, query.data);
3078
3079 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3080 {
3081 log_db_error(conn, query.data, _("get_node_records_by_priority(): unable to execute query"));
3082 }
3083
3084 termPQExpBuffer(&query);
3085
3086 /* this will return an empty list if there was an error executing the query */
3087 _populate_node_records(res, node_list);
3088
3089 PQclear(res);
3090
3091 return;
3092 }
3093
3094 /*
3095 * return all node records together with their upstream's node name,
3096 * if available.
3097 */
3098 bool
get_all_node_records_with_upstream(PGconn * conn,NodeInfoList * node_list)3099 get_all_node_records_with_upstream(PGconn *conn, NodeInfoList *node_list)
3100 {
3101 PQExpBufferData query;
3102 PGresult *res = NULL;
3103 bool success = true;
3104
3105 initPQExpBuffer(&query);
3106
3107 appendPQExpBufferStr(&query,
3108 " SELECT " REPMGR_NODES_COLUMNS_WITH_UPSTREAM
3109 " FROM repmgr.nodes n "
3110 " LEFT JOIN repmgr.nodes un "
3111 " ON un.node_id = n.upstream_node_id"
3112 " ORDER BY n.node_id ");
3113
3114 log_verbose(LOG_DEBUG, "get_all_node_records_with_upstream():\n%s", query.data);
3115
3116 res = PQexec(conn, query.data);
3117
3118 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3119 {
3120 log_db_error(conn, query.data, _("get_all_node_records_with_upstream(): unable to retrieve node records"));
3121 success = false;
3122 }
3123
3124
3125 /* this will return an empty list if there was an error executing the query */
3126 _populate_node_records(res, node_list);
3127
3128 termPQExpBuffer(&query);
3129 PQclear(res);
3130
3131 return success;
3132 }
3133
3134
3135
3136 bool
get_downstream_nodes_with_missing_slot(PGconn * conn,int this_node_id,NodeInfoList * node_list)3137 get_downstream_nodes_with_missing_slot(PGconn *conn, int this_node_id, NodeInfoList *node_list)
3138 {
3139 PQExpBufferData query;
3140 PGresult *res = NULL;
3141 bool success = true;
3142
3143 initPQExpBuffer(&query);
3144
3145 appendPQExpBuffer(&query,
3146 " SELECT " REPMGR_NODES_COLUMNS
3147 " FROM repmgr.nodes n "
3148 "LEFT JOIN pg_catalog.pg_replication_slots rs "
3149 " ON rs.slot_name = n.slot_name "
3150 " WHERE n.slot_name IS NOT NULL"
3151 " AND rs.slot_name IS NULL "
3152 " AND n.upstream_node_id = %i "
3153 " AND n.type = 'standby'",
3154 this_node_id);
3155
3156 log_verbose(LOG_DEBUG, "get_all_node_records_with_missing_slot():\n%s", query.data);
3157
3158 res = PQexec(conn, query.data);
3159
3160 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3161 {
3162 log_db_error(conn, query.data, _("get_downstream_nodes_with_missing_slot(): unable to retrieve node records"));
3163 success = false;
3164 }
3165
3166 /* this will return an empty list if there was an error executing the query */
3167 _populate_node_records(res, node_list);
3168
3169 termPQExpBuffer(&query);
3170 PQclear(res);
3171
3172 return success;
3173 }
3174
3175 bool
create_node_record(PGconn * conn,char * repmgr_action,t_node_info * node_info)3176 create_node_record(PGconn *conn, char *repmgr_action, t_node_info *node_info)
3177 {
3178 if (repmgr_action != NULL)
3179 log_verbose(LOG_DEBUG, "create_node_record(): action is \"%s\"", repmgr_action);
3180
3181 return _create_update_node_record(conn, "create", node_info);
3182 }
3183
3184
3185 bool
update_node_record(PGconn * conn,char * repmgr_action,t_node_info * node_info)3186 update_node_record(PGconn *conn, char *repmgr_action, t_node_info *node_info)
3187 {
3188 if (repmgr_action != NULL)
3189 log_verbose(LOG_DEBUG, "update_node_record(): action is \"%s\"", repmgr_action);
3190
3191 return _create_update_node_record(conn, "update", node_info);
3192 }
3193
3194
3195 static bool
_create_update_node_record(PGconn * conn,char * action,t_node_info * node_info)3196 _create_update_node_record(PGconn *conn, char *action, t_node_info *node_info)
3197 {
3198 PQExpBufferData query;
3199 char node_id[MAXLEN] = "";
3200 char priority[MAXLEN] = "";
3201
3202 char upstream_node_id[MAXLEN] = "";
3203 char *upstream_node_id_ptr = NULL;
3204
3205 char *slot_name_ptr = NULL;
3206
3207 int param_count = NODE_RECORD_PARAM_COUNT;
3208 const char *param_values[NODE_RECORD_PARAM_COUNT];
3209
3210 PGresult *res;
3211 bool success = true;
3212
3213 maxlen_snprintf(node_id, "%i", node_info->node_id);
3214 maxlen_snprintf(priority, "%i", node_info->priority);
3215
3216 if (node_info->upstream_node_id == NO_UPSTREAM_NODE && node_info->type == STANDBY)
3217 {
3218 /*
3219 * No explicit upstream node id provided for standby - attempt to get
3220 * primary node id
3221 */
3222 int primary_node_id = get_primary_node_id(conn);
3223
3224 maxlen_snprintf(upstream_node_id, "%i", primary_node_id);
3225 upstream_node_id_ptr = upstream_node_id;
3226 }
3227 else if (node_info->upstream_node_id != NO_UPSTREAM_NODE)
3228 {
3229 maxlen_snprintf(upstream_node_id, "%i", node_info->upstream_node_id);
3230 upstream_node_id_ptr = upstream_node_id;
3231 }
3232
3233 if (node_info->slot_name[0] != '\0')
3234 {
3235 slot_name_ptr = node_info->slot_name;
3236 }
3237
3238
3239 param_values[0] = get_node_type_string(node_info->type);
3240 param_values[1] = upstream_node_id_ptr;
3241 param_values[2] = node_info->node_name;
3242 param_values[3] = node_info->conninfo;
3243 param_values[4] = node_info->repluser;
3244 param_values[5] = slot_name_ptr;
3245 param_values[6] = node_info->location;
3246 param_values[7] = priority;
3247 param_values[8] = node_info->active == true ? "TRUE" : "FALSE";
3248 param_values[9] = node_info->config_file;
3249 param_values[10] = node_id;
3250
3251 initPQExpBuffer(&query);
3252
3253 if (strcmp(action, "create") == 0)
3254 {
3255 appendPQExpBufferStr(&query,
3256 "INSERT INTO repmgr.nodes "
3257 " (node_id, type, upstream_node_id, "
3258 " node_name, conninfo, repluser, slot_name, "
3259 " location, priority, active, config_file) "
3260 "VALUES ($11, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ");
3261 }
3262 else
3263 {
3264 appendPQExpBufferStr(&query,
3265 "UPDATE repmgr.nodes SET "
3266 " type = $1, "
3267 " upstream_node_id = $2, "
3268 " node_name = $3, "
3269 " conninfo = $4, "
3270 " repluser = $5, "
3271 " slot_name = $6, "
3272 " location = $7, "
3273 " priority = $8, "
3274 " active = $9, "
3275 " config_file = $10 "
3276 " WHERE node_id = $11 ");
3277 }
3278
3279 res = PQexecParams(conn,
3280 query.data,
3281 param_count,
3282 NULL,
3283 param_values,
3284 NULL,
3285 NULL,
3286 0);
3287
3288 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3289 {
3290 log_db_error(conn, query.data,
3291 _("_create_update_node_record(): unable to %s node record for node \"%s\" (ID: %i)"),
3292 action,
3293 node_info->node_name,
3294 node_info->node_id);
3295
3296 success = false;
3297 }
3298
3299 termPQExpBuffer(&query);
3300 PQclear(res);
3301
3302 return success;
3303 }
3304
3305
3306 bool
update_node_record_set_active(PGconn * conn,int this_node_id,bool active)3307 update_node_record_set_active(PGconn *conn, int this_node_id, bool active)
3308 {
3309 PQExpBufferData query;
3310 PGresult *res = NULL;
3311 bool success = true;
3312
3313 initPQExpBuffer(&query);
3314
3315 appendPQExpBuffer(&query,
3316 "UPDATE repmgr.nodes SET active = %s "
3317 " WHERE node_id = %i",
3318 active == true ? "TRUE" : "FALSE",
3319 this_node_id);
3320
3321 log_verbose(LOG_DEBUG, "update_node_record_set_active():\n %s", query.data);
3322
3323 res = PQexec(conn, query.data);
3324
3325 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3326 {
3327 log_db_error(conn, query.data,
3328 _("update_node_record_set_active(): unable to update node record"));
3329 success = false;
3330 }
3331
3332 termPQExpBuffer(&query);
3333 PQclear(res);
3334
3335 return success;
3336 }
3337
3338
3339 bool
update_node_record_set_active_standby(PGconn * conn,int this_node_id)3340 update_node_record_set_active_standby(PGconn *conn, int this_node_id)
3341 {
3342 PQExpBufferData query;
3343 PGresult *res = NULL;
3344 bool success = true;
3345
3346 initPQExpBuffer(&query);
3347
3348 appendPQExpBuffer(&query,
3349 "UPDATE repmgr.nodes "
3350 " SET type = 'standby', "
3351 " active = TRUE "
3352 " WHERE node_id = %i",
3353 this_node_id);
3354
3355 log_verbose(LOG_DEBUG, "update_node_record_set_active_standby():\n %s", query.data);
3356
3357 res = PQexec(conn, query.data);
3358
3359 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3360 {
3361 log_db_error(conn, query.data, _("update_node_record_set_active_standby(): unable to update node record"));
3362 success = false;
3363 }
3364
3365 termPQExpBuffer(&query);
3366 PQclear(res);
3367
3368 return success;
3369 }
3370
3371
3372 bool
update_node_record_set_primary(PGconn * conn,int this_node_id)3373 update_node_record_set_primary(PGconn *conn, int this_node_id)
3374 {
3375 PQExpBufferData query;
3376 PGresult *res = NULL;
3377
3378 log_debug(_("setting node %i as primary and marking existing primary as failed"),
3379 this_node_id);
3380
3381 begin_transaction(conn);
3382
3383 initPQExpBuffer(&query);
3384
3385 appendPQExpBuffer(&query,
3386 " UPDATE repmgr.nodes "
3387 " SET active = FALSE "
3388 " WHERE type = 'primary' "
3389 " AND active IS TRUE "
3390 " AND node_id != %i ",
3391 this_node_id);
3392
3393 res = PQexec(conn, query.data);
3394
3395 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3396 {
3397 log_db_error(conn, query.data,
3398 _("update_node_record_set_primary(): unable to set old primary node as inactive"));
3399
3400 termPQExpBuffer(&query);
3401 PQclear(res);
3402
3403 rollback_transaction(conn);
3404
3405 return false;
3406 }
3407
3408 termPQExpBuffer(&query);
3409 PQclear(res);
3410
3411 initPQExpBuffer(&query);
3412
3413 appendPQExpBuffer(&query,
3414 " UPDATE repmgr.nodes"
3415 " SET type = 'primary', "
3416 " upstream_node_id = NULL, "
3417 " active = TRUE "
3418 " WHERE node_id = %i ",
3419 this_node_id);
3420
3421 res = PQexec(conn, query.data);
3422
3423 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3424 {
3425 log_db_error(conn, query.data,
3426 _("unable to set current node %i as active primary"),
3427 this_node_id);
3428
3429 termPQExpBuffer(&query);
3430 PQclear(res);
3431
3432 rollback_transaction(conn);
3433
3434 return false;
3435 }
3436
3437 termPQExpBuffer(&query);
3438 PQclear(res);
3439
3440 return commit_transaction(conn);
3441 }
3442
3443
3444 bool
update_node_record_set_upstream(PGconn * conn,int this_node_id,int new_upstream_node_id)3445 update_node_record_set_upstream(PGconn *conn, int this_node_id, int new_upstream_node_id)
3446 {
3447 PQExpBufferData query;
3448 PGresult *res = NULL;
3449 bool success = true;
3450
3451 log_debug(_("update_node_record_set_upstream(): Updating node %i's upstream node to %i"),
3452 this_node_id, new_upstream_node_id);
3453
3454 initPQExpBuffer(&query);
3455
3456 appendPQExpBuffer(&query,
3457 " UPDATE repmgr.nodes "
3458 " SET upstream_node_id = %i "
3459 " WHERE node_id = %i ",
3460 new_upstream_node_id,
3461 this_node_id);
3462
3463 log_verbose(LOG_DEBUG, "update_node_record_set_upstream():\n%s", query.data);
3464
3465 res = PQexec(conn, query.data);
3466
3467 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3468 {
3469 log_db_error(conn, query.data, _("update_node_record_set_upstream(): unable to set new upstream node id"));
3470
3471 success = false;
3472 }
3473
3474 termPQExpBuffer(&query);
3475 PQclear(res);
3476
3477 return success;
3478 }
3479
3480
3481 /*
3482 * Update node record following change of status
3483 * (e.g. inactive primary converted to standby)
3484 */
3485 bool
update_node_record_status(PGconn * conn,int this_node_id,char * type,int upstream_node_id,bool active)3486 update_node_record_status(PGconn *conn, int this_node_id, char *type, int upstream_node_id, bool active)
3487 {
3488 PQExpBufferData query;
3489 PGresult *res = NULL;
3490 bool success = true;
3491
3492 initPQExpBuffer(&query);
3493
3494 appendPQExpBuffer(&query,
3495 " UPDATE repmgr.nodes "
3496 " SET type = '%s', "
3497 " upstream_node_id = %i, "
3498 " active = %s "
3499 " WHERE node_id = %i ",
3500 type,
3501 upstream_node_id,
3502 active ? "TRUE" : "FALSE",
3503 this_node_id);
3504
3505 log_verbose(LOG_DEBUG, "update_node_record_status():\n %s", query.data);
3506
3507 res = PQexec(conn, query.data);
3508
3509 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3510 {
3511 log_db_error(conn, query.data,
3512 _("update_node_record_status(): unable to update node record status for node %i"),
3513 this_node_id);
3514
3515 success = false;
3516 }
3517
3518 termPQExpBuffer(&query);
3519 PQclear(res);
3520
3521 return success;
3522 }
3523
3524
3525 /*
3526 * Update node record's "conninfo" and "priority" fields. Called by repmgrd
3527 * following a configuration file reload.
3528 */
3529 bool
update_node_record_conn_priority(PGconn * conn,t_configuration_options * options)3530 update_node_record_conn_priority(PGconn *conn, t_configuration_options *options)
3531 {
3532 PQExpBufferData query;
3533 PGresult *res = NULL;
3534 bool success = true;
3535
3536 initPQExpBuffer(&query);
3537
3538 appendPQExpBuffer(&query,
3539 "UPDATE repmgr.nodes "
3540 " SET conninfo = '%s', "
3541 " priority = %d "
3542 " WHERE node_id = %d ",
3543 options->conninfo,
3544 options->priority,
3545 options->node_id);
3546
3547 res = PQexec(conn, query.data);
3548
3549 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3550 {
3551 log_db_error(conn, query.data, _("update_node_record_conn_priority(): unable to execute query"));
3552 success = false;
3553 }
3554
3555 termPQExpBuffer(&query);
3556
3557 PQclear(res);
3558
3559 return success;
3560 }
3561
3562
3563 /*
3564 * Copy node records from primary to witness servers.
3565 *
3566 * This is used when initially registering a witness server, and
3567 * by repmgrd to update the node records when required.
3568 */
3569
3570 bool
witness_copy_node_records(PGconn * primary_conn,PGconn * witness_conn)3571 witness_copy_node_records(PGconn *primary_conn, PGconn *witness_conn)
3572 {
3573 PGresult *res = NULL;
3574 NodeInfoList nodes = T_NODE_INFO_LIST_INITIALIZER;
3575 NodeInfoListCell *cell = NULL;
3576
3577 begin_transaction(witness_conn);
3578
3579 /* Defer constraints */
3580
3581 res = PQexec(witness_conn, "SET CONSTRAINTS ALL DEFERRED");
3582
3583 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3584 {
3585 log_db_error(witness_conn, NULL, ("witness_copy_node_records(): unable to defer constraints"));
3586
3587 rollback_transaction(witness_conn);
3588 PQclear(res);
3589
3590 return false;
3591 }
3592
3593 PQclear(res);
3594
3595 /* truncate existing records */
3596
3597 if (truncate_node_records(witness_conn) == false)
3598 {
3599 rollback_transaction(witness_conn);
3600
3601 return false;
3602 }
3603
3604 if (get_all_node_records(primary_conn, &nodes) == false)
3605 {
3606 rollback_transaction(witness_conn);
3607
3608 return false;
3609 }
3610
3611 for (cell = nodes.head; cell; cell = cell->next)
3612 {
3613 if (create_node_record(witness_conn, NULL, cell->node_info) == false)
3614 {
3615 rollback_transaction(witness_conn);
3616
3617 return false;
3618 }
3619 }
3620
3621 /* and done */
3622 commit_transaction(witness_conn);
3623
3624 clear_node_info_list(&nodes);
3625
3626 return true;
3627 }
3628
3629
3630 bool
delete_node_record(PGconn * conn,int node)3631 delete_node_record(PGconn *conn, int node)
3632 {
3633 PQExpBufferData query;
3634 PGresult *res = NULL;
3635 bool success = true;
3636
3637 initPQExpBuffer(&query);
3638
3639 appendPQExpBuffer(&query,
3640 "DELETE FROM repmgr.nodes "
3641 " WHERE node_id = %i",
3642 node);
3643
3644 log_verbose(LOG_DEBUG, "delete_node_record():\n %s", query.data);
3645
3646 res = PQexec(conn, query.data);
3647
3648 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3649 {
3650 log_db_error(conn, query.data, _("delete_node_record(): unable to delete node record"));
3651
3652 success = false;
3653 }
3654
3655 termPQExpBuffer(&query);
3656 PQclear(res);
3657
3658 return success;
3659 }
3660
3661 bool
truncate_node_records(PGconn * conn)3662 truncate_node_records(PGconn *conn)
3663 {
3664 PGresult *res = NULL;
3665 bool success = true;
3666
3667 res = PQexec(conn, "TRUNCATE TABLE repmgr.nodes");
3668
3669 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3670 {
3671 log_db_error(conn, NULL, _("truncate_node_records(): unable to truncate table \"repmgr.nodes\""));
3672
3673 success = false;
3674 }
3675
3676 PQclear(res);
3677
3678 return success;
3679 }
3680
3681
3682 bool
update_node_record_slot_name(PGconn * primary_conn,int node_id,char * slot_name)3683 update_node_record_slot_name(PGconn *primary_conn, int node_id, char *slot_name)
3684 {
3685 PQExpBufferData query;
3686 PGresult *res = NULL;
3687 bool success = true;
3688
3689 initPQExpBuffer(&query);
3690
3691 appendPQExpBuffer(&query,
3692 " UPDATE repmgr.nodes "
3693 " SET slot_name = '%s' "
3694 " WHERE node_id = %i ",
3695 slot_name,
3696 node_id);
3697
3698 res = PQexec(primary_conn, query.data);
3699
3700 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3701 {
3702 log_db_error(primary_conn, query.data, _("unable to set node record slot name"));
3703
3704 success = false;
3705 }
3706
3707 termPQExpBuffer(&query);
3708 PQclear(res);
3709
3710 return success;
3711 }
3712
3713
3714
3715
3716 void
clear_node_info_list(NodeInfoList * nodes)3717 clear_node_info_list(NodeInfoList *nodes)
3718 {
3719 NodeInfoListCell *cell = NULL;
3720 NodeInfoListCell *next_cell = NULL;
3721
3722 log_verbose(LOG_DEBUG, "clear_node_info_list() - closing open connections");
3723
3724 /* close any open connections */
3725 for (cell = nodes->head; cell; cell = cell->next)
3726 {
3727
3728 if (PQstatus(cell->node_info->conn) == CONNECTION_OK)
3729 {
3730 PQfinish(cell->node_info->conn);
3731 cell->node_info->conn = NULL;
3732 }
3733 }
3734
3735 log_verbose(LOG_DEBUG, "clear_node_info_list() - unlinking");
3736
3737 cell = nodes->head;
3738
3739 while (cell != NULL)
3740 {
3741 next_cell = cell->next;
3742
3743 if (cell->node_info->replication_info != NULL)
3744 pfree(cell->node_info->replication_info);
3745
3746 pfree(cell->node_info);
3747 pfree(cell);
3748 cell = next_cell;
3749 }
3750
3751 nodes->head = NULL;
3752 nodes->tail = NULL;
3753 nodes->node_count = 0;
3754 }
3755
3756
3757 /* ================================================ */
3758 /* PostgreSQL configuration file location functions */
3759 /* ================================================ */
3760
3761 bool
get_datadir_configuration_files(PGconn * conn,KeyValueList * list)3762 get_datadir_configuration_files(PGconn *conn, KeyValueList *list)
3763 {
3764 PQExpBufferData query;
3765 PGresult *res = NULL;
3766 int i;
3767 bool success = true;
3768
3769 initPQExpBuffer(&query);
3770
3771 appendPQExpBufferStr(&query,
3772 "WITH files AS ( "
3773 " WITH dd AS ( "
3774 " SELECT setting "
3775 " FROM pg_catalog.pg_settings "
3776 " WHERE name = 'data_directory') "
3777 " SELECT distinct(sourcefile) AS config_file"
3778 " FROM dd, pg_catalog.pg_settings ps "
3779 " WHERE ps.sourcefile IS NOT NULL "
3780 " AND ps.sourcefile ~ ('^' || dd.setting) "
3781 " UNION "
3782 " SELECT ps.setting AS config_file"
3783 " FROM dd, pg_catalog.pg_settings ps "
3784 " WHERE ps.name IN ('config_file', 'hba_file', 'ident_file') "
3785 " AND ps.setting ~ ('^' || dd.setting) "
3786 ") "
3787 " SELECT config_file, "
3788 " pg_catalog.regexp_replace(config_file, '^.*\\/','') AS filename "
3789 " FROM files "
3790 "ORDER BY config_file");
3791
3792 res = PQexec(conn, query.data);
3793
3794 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3795 {
3796 log_db_error(conn, query.data,
3797 _("get_datadir_configuration_files(): unable to retrieve configuration file information"));
3798
3799 success = false;
3800 }
3801 else
3802 {
3803 for (i = 0; i < PQntuples(res); i++)
3804 {
3805 key_value_list_set(list,
3806 PQgetvalue(res, i, 1),
3807 PQgetvalue(res, i, 0));
3808 }
3809 }
3810
3811 termPQExpBuffer(&query);
3812 PQclear(res);
3813
3814 return success;
3815 }
3816
3817
3818 bool
get_configuration_file_locations(PGconn * conn,t_configfile_list * list)3819 get_configuration_file_locations(PGconn *conn, t_configfile_list *list)
3820 {
3821 PQExpBufferData query;
3822 PGresult *res = NULL;
3823 int i;
3824
3825 initPQExpBuffer(&query);
3826
3827 appendPQExpBufferStr(&query,
3828 " WITH dd AS ( "
3829 " SELECT setting AS data_directory"
3830 " FROM pg_catalog.pg_settings "
3831 " WHERE name = 'data_directory' "
3832 " ) "
3833 " SELECT DISTINCT(sourcefile), "
3834 " pg_catalog.regexp_replace(sourcefile, '^.*\\/', '') AS filename, "
3835 " sourcefile ~ ('^' || dd.data_directory) AS in_data_dir "
3836 " FROM dd, pg_catalog.pg_settings ps "
3837 " WHERE sourcefile IS NOT NULL "
3838 " ORDER BY 1 ");
3839
3840 log_verbose(LOG_DEBUG, "get_configuration_file_locations():\n %s",
3841 query.data);
3842
3843 res = PQexec(conn, query.data);
3844
3845 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3846 {
3847 log_db_error(conn, query.data,
3848 _("get_configuration_file_locations(): unable to retrieve configuration file locations"));
3849
3850 termPQExpBuffer(&query);
3851 PQclear(res);
3852
3853 return false;
3854 }
3855
3856 /*
3857 * allocate memory for config file array - number of rows returned from
3858 * above query + 2 for pg_hba.conf, pg_ident.conf
3859 */
3860
3861 config_file_list_init(list, PQntuples(res) + 2);
3862
3863 for (i = 0; i < PQntuples(res); i++)
3864 {
3865 config_file_list_add(list,
3866 PQgetvalue(res, i, 0),
3867 PQgetvalue(res, i, 1),
3868 atobool(PQgetvalue(res, i, 2)));
3869 }
3870
3871 termPQExpBuffer(&query);
3872 PQclear(res);
3873
3874 /* Fetch locations of pg_hba.conf and pg_ident.conf */
3875 initPQExpBuffer(&query);
3876
3877 appendPQExpBufferStr(&query,
3878 " WITH dd AS ( "
3879 " SELECT setting AS data_directory"
3880 " FROM pg_catalog.pg_settings "
3881 " WHERE name = 'data_directory' "
3882 " ) "
3883 " SELECT ps.setting, "
3884 " pg_catalog.regexp_replace(setting, '^.*\\/', '') AS filename, "
3885 " ps.setting ~ ('^' || dd.data_directory) AS in_data_dir "
3886 " FROM dd, pg_catalog.pg_settings ps "
3887 " WHERE ps.name IN ('hba_file', 'ident_file') "
3888 " ORDER BY 1 ");
3889
3890 log_verbose(LOG_DEBUG, "get_configuration_file_locations():\n %s",
3891 query.data);
3892
3893 res = PQexec(conn, query.data);
3894
3895 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3896 {
3897 log_db_error(conn, query.data,
3898 _("get_configuration_file_locations(): unable to retrieve configuration file locations"));
3899
3900 termPQExpBuffer(&query);
3901 PQclear(res);
3902
3903 return false;
3904 }
3905
3906 for (i = 0; i < PQntuples(res); i++)
3907 {
3908 config_file_list_add(list,
3909 PQgetvalue(res, i, 0),
3910 PQgetvalue(res, i, 1),
3911 atobool(PQgetvalue(res, i, 2)));
3912 }
3913
3914 termPQExpBuffer(&query);
3915 PQclear(res);
3916
3917 return true;
3918 }
3919
3920
3921 void
config_file_list_init(t_configfile_list * list,int max_size)3922 config_file_list_init(t_configfile_list *list, int max_size)
3923 {
3924 list->size = max_size;
3925 list->entries = 0;
3926 list->files = pg_malloc0(sizeof(t_configfile_info *) * max_size);
3927
3928 if (list->files == NULL)
3929 {
3930 log_error(_("config_file_list_init(): unable to allocate memory; terminating"));
3931 exit(ERR_OUT_OF_MEMORY);
3932 }
3933 }
3934
3935
3936 void
config_file_list_add(t_configfile_list * list,const char * file,const char * filename,bool in_data_dir)3937 config_file_list_add(t_configfile_list *list, const char *file, const char *filename, bool in_data_dir)
3938 {
3939 /* Failsafe to prevent entries being added beyond the end */
3940 if (list->entries == list->size)
3941 return;
3942
3943 list->files[list->entries] = pg_malloc0(sizeof(t_configfile_info));
3944
3945 if (list->files[list->entries] == NULL)
3946 {
3947 log_error(_("config_file_list_add(): unable to allocate memory; terminating"));
3948 exit(ERR_OUT_OF_MEMORY);
3949 }
3950
3951
3952 snprintf(list->files[list->entries]->filepath,
3953 sizeof(list->files[list->entries]->filepath),
3954 "%s", file);
3955 canonicalize_path(list->files[list->entries]->filepath);
3956
3957 snprintf(list->files[list->entries]->filename,
3958 sizeof(list->files[list->entries]->filename),
3959 "%s", filename);
3960
3961 list->files[list->entries]->in_data_directory = in_data_dir;
3962
3963 list->entries++;
3964 }
3965
3966
3967 /* ====================== */
3968 /* event record functions */
3969 /* ====================== */
3970
3971
3972 /*
3973 * create_event_record()
3974 *
3975 * Create a record in the "events" table, but don't execute the
3976 * "event_notification_command".
3977 */
3978
3979 bool
create_event_record(PGconn * conn,t_configuration_options * options,int node_id,char * event,bool successful,char * details)3980 create_event_record(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details)
3981 {
3982 /* create dummy t_event_info */
3983 t_event_info event_info = T_EVENT_INFO_INITIALIZER;
3984
3985 return _create_event(conn, options, node_id, event, successful, details, &event_info, false);
3986 }
3987
3988
3989 /*
3990 * create_event_notification()
3991 *
3992 * If `conn` is not NULL, insert a record into the events table.
3993 *
3994 * If configuration parameter "event_notification_command" is set, also
3995 * attempt to execute that command.
3996 *
3997 * Returns true if all operations succeeded, false if one or more failed.
3998 *
3999 * Note this function may be called with "conn" set to NULL in cases where
4000 * the primary node is not available and it's therefore not possible to write
4001 * an event record. In this case, if `event_notification_command` is set, a
4002 * user-defined notification to be generated; if not, this function will have
4003 * no effect.
4004 */
4005 bool
create_event_notification(PGconn * conn,t_configuration_options * options,int node_id,char * event,bool successful,char * details)4006 create_event_notification(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details)
4007 {
4008 /* create dummy t_event_info */
4009 t_event_info event_info = T_EVENT_INFO_INITIALIZER;
4010
4011 return _create_event(conn, options, node_id, event, successful, details, &event_info, true);
4012 }
4013
4014
4015 /*
4016 * create_event_notification_extended()
4017 *
4018 * The caller may need to pass additional parameters to the event notification
4019 * command (currently only the conninfo string of another node)
4020
4021 */
4022 bool
create_event_notification_extended(PGconn * conn,t_configuration_options * options,int node_id,char * event,bool successful,char * details,t_event_info * event_info)4023 create_event_notification_extended(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details, t_event_info *event_info)
4024 {
4025 return _create_event(conn, options, node_id, event, successful, details, event_info, true);
4026 }
4027
4028
4029 static bool
_create_event(PGconn * conn,t_configuration_options * options,int node_id,char * event,bool successful,char * details,t_event_info * event_info,bool send_notification)4030 _create_event(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details, t_event_info *event_info, bool send_notification)
4031 {
4032 PQExpBufferData query;
4033 PGresult *res = NULL;
4034 char event_timestamp[MAXLEN] = "";
4035 bool success = true;
4036
4037 log_verbose(LOG_DEBUG, "_create_event(): event is \"%s\" for node %i", event, node_id);
4038
4039 /*
4040 * Only attempt to write a record if a connection handle was provided,
4041 * and the connection handle points to a node which is not in recovery.
4042 */
4043 if (conn != NULL && PQstatus(conn) == CONNECTION_OK && get_recovery_type(conn) == RECTYPE_PRIMARY)
4044 {
4045 int n_node_id = htonl(node_id);
4046 char *t_successful = successful ? "TRUE" : "FALSE";
4047
4048 const char *values[4] = {(char *) &n_node_id,
4049 event,
4050 t_successful,
4051 details
4052 };
4053
4054 int lengths[4] = {sizeof(n_node_id),
4055 0,
4056 0,
4057 0
4058 };
4059
4060 int binary[4] = {1, 0, 0, 0};
4061
4062 initPQExpBuffer(&query);
4063 appendPQExpBufferStr(&query,
4064 " INSERT INTO repmgr.events ( "
4065 " node_id, "
4066 " event, "
4067 " successful, "
4068 " details "
4069 " ) "
4070 " VALUES ($1, $2, $3, $4) "
4071 " RETURNING event_timestamp ");
4072
4073 log_verbose(LOG_DEBUG, "_create_event():\n %s", query.data);
4074
4075 res = PQexecParams(conn,
4076 query.data,
4077 4,
4078 NULL,
4079 values,
4080 lengths,
4081 binary,
4082 0);
4083
4084
4085 if (PQresultStatus(res) != PGRES_TUPLES_OK)
4086 {
4087 /* we don't treat this as a fatal error */
4088 log_warning(_("unable to create event record"));
4089 log_detail("%s", PQerrorMessage(conn));
4090 log_detail("%s", query.data);
4091
4092 success = false;
4093 }
4094 else
4095 {
4096 /* Store timestamp to send to the notification command */
4097 snprintf(event_timestamp, MAXLEN, "%s", PQgetvalue(res, 0, 0));
4098 }
4099
4100 termPQExpBuffer(&query);
4101 PQclear(res);
4102 }
4103
4104 /*
4105 * If no database connection provided, or the query failed, generate a
4106 * current timestamp ourselves. This isn't quite the same format as
4107 * PostgreSQL, but is close enough for diagnostic use.
4108 */
4109 if (!strlen(event_timestamp))
4110 {
4111 time_t now;
4112 struct tm ts;
4113
4114 time(&now);
4115 ts = *localtime(&now);
4116 strftime(event_timestamp, MAXLEN, "%Y-%m-%d %H:%M:%S%z", &ts);
4117 }
4118
4119 log_verbose(LOG_DEBUG, "_create_event(): Event timestamp is \"%s\"", event_timestamp);
4120
4121 /* an event notification command was provided - parse and execute it */
4122 if (send_notification == true && strlen(options->event_notification_command))
4123 {
4124 char parsed_command[MAXPGPATH] = "";
4125 const char *src_ptr = NULL;
4126 char *dst_ptr = NULL;
4127 char *end_ptr = NULL;
4128 int r = 0;
4129
4130 log_verbose(LOG_DEBUG, "_create_event(): command is '%s'", options->event_notification_command);
4131 /*
4132 * If configuration option 'event_notifications' was provided, check
4133 * if this event is one of the ones listed; if not listed, don't
4134 * execute the notification script.
4135 *
4136 * (If 'event_notifications' was not provided, we assume the script
4137 * should be executed for all events).
4138 */
4139 if (options->event_notifications.head != NULL)
4140 {
4141 EventNotificationListCell *cell = NULL;
4142 bool notify_ok = false;
4143
4144 for (cell = options->event_notifications.head; cell; cell = cell->next)
4145 {
4146 if (strcmp(event, cell->event_type) == 0)
4147 {
4148 notify_ok = true;
4149 break;
4150 }
4151 }
4152
4153 /*
4154 * Event type not found in the 'event_notifications' list - return
4155 * early
4156 */
4157 if (notify_ok == false)
4158 {
4159 log_debug(_("not executing notification script for event type \"%s\""), event);
4160 return success;
4161 }
4162 }
4163
4164 dst_ptr = parsed_command;
4165 end_ptr = parsed_command + MAXPGPATH - 1;
4166 *end_ptr = '\0';
4167
4168 for (src_ptr = options->event_notification_command; *src_ptr; src_ptr++)
4169 {
4170 if (*src_ptr == '%')
4171 {
4172 switch (src_ptr[1])
4173 {
4174 case '%':
4175 /* %%: replace with % */
4176 if (dst_ptr < end_ptr)
4177 {
4178 src_ptr++;
4179 *dst_ptr++ = *src_ptr;
4180 }
4181 break;
4182 case 'n':
4183 /* %n: node id */
4184 src_ptr++;
4185 snprintf(dst_ptr, end_ptr - dst_ptr, "%i", node_id);
4186 dst_ptr += strlen(dst_ptr);
4187 break;
4188 case 'a':
4189 /* %a: node name */
4190 src_ptr++;
4191 if (event_info->node_name != NULL)
4192 {
4193 log_verbose(LOG_DEBUG, "node_name: %s", event_info->node_name);
4194 strlcpy(dst_ptr, event_info->node_name, end_ptr - dst_ptr);
4195 dst_ptr += strlen(dst_ptr);
4196 }
4197 break;
4198 case 'e':
4199 /* %e: event type */
4200 src_ptr++;
4201 strlcpy(dst_ptr, event, end_ptr - dst_ptr);
4202 dst_ptr += strlen(dst_ptr);
4203 break;
4204 case 'd':
4205 /* %d: details */
4206 src_ptr++;
4207 if (details != NULL)
4208 {
4209 PQExpBufferData details_escaped;
4210 initPQExpBuffer(&details_escaped);
4211
4212 escape_double_quotes(details, &details_escaped);
4213
4214 strlcpy(dst_ptr, details_escaped.data, end_ptr - dst_ptr);
4215 dst_ptr += strlen(dst_ptr);
4216 termPQExpBuffer(&details_escaped);
4217 }
4218 break;
4219 case 's':
4220 /* %s: successful */
4221 src_ptr++;
4222 strlcpy(dst_ptr, successful ? "1" : "0", end_ptr - dst_ptr);
4223 dst_ptr += strlen(dst_ptr);
4224 break;
4225 case 't':
4226 /* %t: timestamp */
4227 src_ptr++;
4228 strlcpy(dst_ptr, event_timestamp, end_ptr - dst_ptr);
4229 dst_ptr += strlen(dst_ptr);
4230 break;
4231 case 'c':
4232 /* %c: conninfo for next available node */
4233 src_ptr++;
4234 if (event_info->conninfo_str != NULL)
4235 {
4236 log_debug("conninfo: %s", event_info->conninfo_str);
4237
4238 strlcpy(dst_ptr, event_info->conninfo_str, end_ptr - dst_ptr);
4239 dst_ptr += strlen(dst_ptr);
4240 }
4241 break;
4242 case 'p':
4243 /* %p: primary id ("standby_switchover": former primary id) */
4244 src_ptr++;
4245 if (event_info->node_id != UNKNOWN_NODE_ID)
4246 {
4247 PQExpBufferData node_id;
4248 initPQExpBuffer(&node_id);
4249 appendPQExpBuffer(&node_id,
4250 "%i", event_info->node_id);
4251 strlcpy(dst_ptr, node_id.data, end_ptr - dst_ptr);
4252 dst_ptr += strlen(dst_ptr);
4253 termPQExpBuffer(&node_id);
4254 }
4255 break;
4256 default:
4257 /* otherwise treat the % as not special */
4258 if (dst_ptr < end_ptr)
4259 *dst_ptr++ = *src_ptr;
4260 break;
4261 }
4262 }
4263 else
4264 {
4265 if (dst_ptr < end_ptr)
4266 *dst_ptr++ = *src_ptr;
4267 }
4268 }
4269
4270 *dst_ptr = '\0';
4271
4272 log_info(_("executing notification command for event \"%s\""),
4273 event);
4274
4275 log_detail(_("command is:\n %s"), parsed_command);
4276 r = system(parsed_command);
4277 if (r != 0)
4278 {
4279 log_warning(_("unable to execute event notification command"));
4280 log_detail(_("parsed event notification command was:\n %s"), parsed_command);
4281 success = false;
4282 }
4283 }
4284
4285 return success;
4286 }
4287
4288
4289 PGresult *
get_event_records(PGconn * conn,int node_id,const char * node_name,const char * event,bool all,int limit)4290 get_event_records(PGconn *conn, int node_id, const char *node_name, const char *event, bool all, int limit)
4291 {
4292 PGresult *res;
4293
4294 PQExpBufferData query;
4295 PQExpBufferData where_clause;
4296
4297
4298 initPQExpBuffer(&query);
4299 initPQExpBuffer(&where_clause);
4300
4301 /* LEFT JOIN used here as a node record may have been removed */
4302 appendPQExpBufferStr(&query,
4303 " SELECT e.node_id, n.node_name, e.event, e.successful, "
4304 " pg_catalog.to_char(e.event_timestamp, 'YYYY-MM-DD HH24:MI:SS') AS timestamp, "
4305 " e.details "
4306 " FROM repmgr.events e "
4307 "LEFT JOIN repmgr.nodes n ON e.node_id = n.node_id ");
4308
4309 if (node_id != UNKNOWN_NODE_ID)
4310 {
4311 append_where_clause(&where_clause,
4312 "n.node_id=%i", node_id);
4313 }
4314 else if (node_name[0] != '\0')
4315 {
4316 char *escaped = escape_string(conn, node_name);
4317
4318 if (escaped == NULL)
4319 {
4320 log_error(_("unable to escape value provided for node name"));
4321 log_detail(_("node name is: \"%s\""), node_name);
4322 }
4323 else
4324 {
4325 append_where_clause(&where_clause,
4326 "n.node_name='%s'",
4327 escaped);
4328 pfree(escaped);
4329 }
4330 }
4331
4332 if (event[0] != '\0')
4333 {
4334 char *escaped = escape_string(conn, event);
4335
4336 if (escaped == NULL)
4337 {
4338 log_error(_("unable to escape value provided for event"));
4339 log_detail(_("event is: \"%s\""), event);
4340 }
4341 else
4342 {
4343 append_where_clause(&where_clause,
4344 "e.event='%s'",
4345 escaped);
4346 pfree(escaped);
4347 }
4348 }
4349
4350 appendPQExpBuffer(&query, "\n%s\n",
4351 where_clause.data);
4352
4353 appendPQExpBufferStr(&query,
4354 " ORDER BY e.event_timestamp DESC");
4355
4356 if (all == false && limit > 0)
4357 {
4358 appendPQExpBuffer(&query, " LIMIT %i",
4359 limit);
4360 }
4361
4362 log_debug("do_cluster_event():\n%s", query.data);
4363 res = PQexec(conn, query.data);
4364
4365 termPQExpBuffer(&query);
4366 termPQExpBuffer(&where_clause);
4367
4368 return res;
4369 }
4370
4371
4372 /* ========================== */
4373 /* replication slot functions */
4374 /* ========================== */
4375
4376
4377 void
create_slot_name(char * slot_name,int node_id)4378 create_slot_name(char *slot_name, int node_id)
4379 {
4380 maxlen_snprintf(slot_name, "repmgr_slot_%i", node_id);
4381 }
4382
4383
4384 static ReplSlotStatus
_verify_replication_slot(PGconn * conn,char * slot_name,PQExpBufferData * error_msg)4385 _verify_replication_slot(PGconn *conn, char *slot_name, PQExpBufferData *error_msg)
4386 {
4387 RecordStatus record_status = RECORD_NOT_FOUND;
4388 t_replication_slot slot_info = T_REPLICATION_SLOT_INITIALIZER;
4389
4390 /*
4391 * Check whether slot exists already; if it exists and is active, that
4392 * means another active standby is using it, which creates an error
4393 * situation; if not we can reuse it as-is.
4394 */
4395 record_status = get_slot_record(conn, slot_name, &slot_info);
4396
4397 if (record_status == RECORD_FOUND)
4398 {
4399 if (strcmp(slot_info.slot_type, "physical") != 0)
4400 {
4401 if (error_msg)
4402 appendPQExpBuffer(error_msg,
4403 _("slot \"%s\" exists and is not a physical slot\n"),
4404 slot_name);
4405 return SLOT_NOT_PHYSICAL;
4406 }
4407
4408 if (slot_info.active == false)
4409 {
4410 log_debug("replication slot \"%s\" exists but is inactive; reusing",
4411 slot_name);
4412
4413 return SLOT_INACTIVE;
4414 }
4415
4416 if (error_msg)
4417 appendPQExpBuffer(error_msg,
4418 _("slot \"%s\" already exists as an active slot\n"),
4419 slot_name);
4420 return SLOT_ACTIVE;
4421 }
4422
4423 return SLOT_NOT_FOUND;
4424 }
4425
4426
4427 bool
create_replication_slot_replprot(PGconn * conn,PGconn * repl_conn,char * slot_name,PQExpBufferData * error_msg)4428 create_replication_slot_replprot(PGconn *conn, PGconn *repl_conn, char *slot_name, PQExpBufferData *error_msg)
4429 {
4430 PQExpBufferData query;
4431 PGresult *res = NULL;
4432 bool success = true;
4433
4434 ReplSlotStatus slot_status = _verify_replication_slot(conn, slot_name, error_msg);
4435
4436 /* Replication slot is unusable */
4437 if (slot_status == SLOT_NOT_PHYSICAL || slot_status == SLOT_ACTIVE)
4438 return false;
4439
4440 /* Replication slot can be reused */
4441 if (slot_status == SLOT_INACTIVE)
4442 return true;
4443
4444 initPQExpBuffer(&query);
4445
4446 appendPQExpBuffer(&query,
4447 "CREATE_REPLICATION_SLOT %s PHYSICAL",
4448 slot_name);
4449
4450 /* In 9.6 and later, reserve the LSN straight away */
4451 if (PQserverVersion(conn) >= 90600)
4452 {
4453 appendPQExpBufferStr(&query,
4454 " RESERVE_WAL");
4455 }
4456
4457 appendPQExpBufferChar(&query, ';');
4458
4459 res = PQexec(repl_conn, query.data);
4460
4461
4462 if ((PQresultStatus(res) != PGRES_TUPLES_OK || !PQntuples(res)) && error_msg != NULL)
4463 {
4464 appendPQExpBuffer(error_msg,
4465 _("unable to create replication slot \"%s\" on the upstream node: %s\n"),
4466 slot_name,
4467 PQerrorMessage(conn));
4468 success = false;
4469 }
4470
4471 PQclear(res);
4472 return success;
4473 }
4474
4475
4476 bool
create_replication_slot_sql(PGconn * conn,char * slot_name,PQExpBufferData * error_msg)4477 create_replication_slot_sql(PGconn *conn, char *slot_name, PQExpBufferData *error_msg)
4478 {
4479 PQExpBufferData query;
4480 PGresult *res = NULL;
4481 bool success = true;
4482
4483 ReplSlotStatus slot_status = _verify_replication_slot(conn, slot_name, error_msg);
4484
4485 /* Replication slot is unusable */
4486 if (slot_status == SLOT_NOT_PHYSICAL || slot_status == SLOT_ACTIVE)
4487 return false;
4488
4489 /* Replication slot can be reused */
4490 if (slot_status == SLOT_INACTIVE)
4491 return true;
4492
4493 initPQExpBuffer(&query);
4494
4495 /* In 9.6 and later, reserve the LSN straight away */
4496 if (PQserverVersion(conn) >= 90600)
4497 {
4498 appendPQExpBuffer(&query,
4499 "SELECT * FROM pg_catalog.pg_create_physical_replication_slot('%s', TRUE)",
4500 slot_name);
4501 }
4502 else
4503 {
4504 appendPQExpBuffer(&query,
4505 "SELECT * FROM pg_catalog.pg_create_physical_replication_slot('%s')",
4506 slot_name);
4507 }
4508
4509 log_debug(_("create_replication_slot_sql(): creating slot \"%s\" on upstream"), slot_name);
4510 log_verbose(LOG_DEBUG, "create_replication_slot_sql():\n%s", query.data);
4511
4512 res = PQexec(conn, query.data);
4513 termPQExpBuffer(&query);
4514
4515 if (PQresultStatus(res) != PGRES_TUPLES_OK && error_msg != NULL)
4516 {
4517 appendPQExpBuffer(error_msg,
4518 _("unable to create replication slot \"%s\" on the upstream node: %s\n"),
4519 slot_name,
4520 PQerrorMessage(conn));
4521 success = false;
4522 }
4523
4524 PQclear(res);
4525 return success;
4526 }
4527
4528
4529 bool
drop_replication_slot_sql(PGconn * conn,char * slot_name)4530 drop_replication_slot_sql(PGconn *conn, char *slot_name)
4531 {
4532 PQExpBufferData query;
4533 PGresult *res = NULL;
4534 bool success = true;
4535
4536 initPQExpBuffer(&query);
4537
4538 appendPQExpBuffer(&query,
4539 "SELECT pg_catalog.pg_drop_replication_slot('%s')",
4540 slot_name);
4541
4542 log_verbose(LOG_DEBUG, "drop_replication_slot_sql():\n %s", query.data);
4543
4544 res = PQexec(conn, query.data);
4545
4546 if (PQresultStatus(res) != PGRES_TUPLES_OK)
4547 {
4548 log_db_error(conn, query.data,
4549 _("drop_replication_slot_sql(): unable to drop replication slot \"%s\""),
4550 slot_name);
4551
4552 success = false;
4553 }
4554 else
4555 {
4556 log_verbose(LOG_DEBUG, "replication slot \"%s\" successfully dropped",
4557 slot_name);
4558 }
4559
4560 termPQExpBuffer(&query);
4561 PQclear(res);
4562
4563 return success;
4564 }
4565
4566
4567 bool
drop_replication_slot_replprot(PGconn * repl_conn,char * slot_name)4568 drop_replication_slot_replprot(PGconn *repl_conn, char *slot_name)
4569 {
4570 PQExpBufferData query;
4571 PGresult *res = NULL;
4572 bool success = true;
4573
4574 initPQExpBuffer(&query);
4575
4576 appendPQExpBuffer(&query,
4577 "DROP_REPLICATION_SLOT %s",
4578 slot_name);
4579
4580 log_verbose(LOG_DEBUG, "drop_replication_slot_replprot():\n %s", query.data);
4581
4582 res = PQexec(repl_conn, query.data);
4583
4584 if (PQresultStatus(res) != PGRES_TUPLES_OK)
4585 {
4586 log_db_error(repl_conn, query.data,
4587 _("drop_replication_slot_sql(): unable to drop replication slot \"%s\""),
4588 slot_name);
4589
4590 success = false;
4591 }
4592 else
4593 {
4594 log_verbose(LOG_DEBUG, "replication slot \"%s\" successfully dropped",
4595 slot_name);
4596 }
4597
4598 termPQExpBuffer(&query);
4599 PQclear(res);
4600
4601 return success;
4602 }
4603
4604
4605 RecordStatus
get_slot_record(PGconn * conn,char * slot_name,t_replication_slot * record)4606 get_slot_record(PGconn *conn, char *slot_name, t_replication_slot *record)
4607 {
4608 PQExpBufferData query;
4609 PGresult *res = NULL;
4610 RecordStatus record_status = RECORD_FOUND;
4611
4612 initPQExpBuffer(&query);
4613
4614 appendPQExpBuffer(&query,
4615 "SELECT slot_name, slot_type, active "
4616 " FROM pg_catalog.pg_replication_slots "
4617 " WHERE slot_name = '%s' ",
4618 slot_name);
4619
4620 log_verbose(LOG_DEBUG, "get_slot_record():\n%s", query.data);
4621
4622 res = PQexec(conn, query.data);
4623
4624 if (PQresultStatus(res) != PGRES_TUPLES_OK)
4625 {
4626 log_db_error(conn, query.data,
4627 _("get_slot_record(): unable to query pg_replication_slots"));
4628
4629 record_status = RECORD_ERROR;
4630 }
4631 else if (!PQntuples(res))
4632 {
4633 record_status = RECORD_NOT_FOUND;
4634 }
4635 else
4636 {
4637 snprintf(record->slot_name,
4638 sizeof(record->slot_name),
4639 "%s", PQgetvalue(res, 0, 0));
4640 snprintf(record->slot_type,
4641 sizeof(record->slot_type),
4642 "%s", PQgetvalue(res, 0, 1));
4643 record->active = atobool(PQgetvalue(res, 0, 2));
4644 }
4645
4646 termPQExpBuffer(&query);
4647 PQclear(res);
4648
4649 return record_status;
4650 }
4651
4652
4653 int
get_free_replication_slot_count(PGconn * conn,int * max_replication_slots)4654 get_free_replication_slot_count(PGconn *conn, int *max_replication_slots)
4655 {
4656 PQExpBufferData query;
4657 PGresult *res = NULL;
4658 int free_slots = 0;
4659
4660 initPQExpBuffer(&query);
4661
4662 appendPQExpBufferStr(&query,
4663 " SELECT pg_catalog.current_setting('max_replication_slots')::INT - "
4664 " pg_catalog.count(*) "
4665 " AS free_slots, "
4666 " pg_catalog.current_setting('max_replication_slots')::INT "
4667 " AS max_replication_slots "
4668 " FROM pg_catalog.pg_replication_slots s"
4669 " WHERE s.slot_type = 'physical'");
4670
4671 res = PQexec(conn, query.data);
4672
4673 if (PQresultStatus(res) != PGRES_TUPLES_OK)
4674 {
4675 log_db_error(conn, query.data,
4676 _("get_free_replication_slot_count(): unable to execute replication slot query"));
4677
4678 free_slots = UNKNOWN_VALUE;
4679 }
4680 else if (PQntuples(res) == 0)
4681 {
4682 free_slots = UNKNOWN_VALUE;
4683 }
4684 else
4685 {
4686 free_slots = atoi(PQgetvalue(res, 0, 0));
4687 if (max_replication_slots != NULL)
4688 *max_replication_slots = atoi(PQgetvalue(res, 0, 1));
4689 }
4690
4691 termPQExpBuffer(&query);
4692 PQclear(res);
4693
4694 return free_slots;
4695 }
4696
4697
4698 int
get_inactive_replication_slots(PGconn * conn,KeyValueList * list)4699 get_inactive_replication_slots(PGconn *conn, KeyValueList *list)
4700 {
4701 PQExpBufferData query;
4702 PGresult *res = NULL;
4703 int i, inactive_slots = 0;
4704
4705 initPQExpBuffer(&query);
4706
4707 appendPQExpBufferStr(&query,
4708 " SELECT slot_name, slot_type "
4709 " FROM pg_catalog.pg_replication_slots "
4710 " WHERE active IS FALSE "
4711 " AND slot_type = 'physical' "
4712 " ORDER BY slot_name ");
4713
4714 res = PQexec(conn, query.data);
4715
4716 if (PQresultStatus(res) != PGRES_TUPLES_OK)
4717 {
4718 log_db_error(conn, query.data,
4719 _("get_inactive_replication_slots(): unable to execute replication slot query"));
4720
4721 inactive_slots = -1;
4722 }
4723 else
4724 {
4725 inactive_slots = PQntuples(res);
4726
4727 for (i = 0; i < inactive_slots; i++)
4728 {
4729 key_value_list_set(list,
4730 PQgetvalue(res, i, 0),
4731 PQgetvalue(res, i, 1));
4732 }
4733 }
4734
4735 termPQExpBuffer(&query);
4736 PQclear(res);
4737
4738 return inactive_slots;
4739 }
4740
4741
4742
4743 /* ==================== */
4744 /* tablespace functions */
4745 /* ==================== */
4746
4747 bool
get_tablespace_name_by_location(PGconn * conn,const char * location,char * name)4748 get_tablespace_name_by_location(PGconn *conn, const char *location, char *name)
4749 {
4750 PQExpBufferData query;
4751 PGresult *res = NULL;
4752 bool success = true;
4753
4754 initPQExpBuffer(&query);
4755
4756 appendPQExpBuffer(&query,
4757 "SELECT spcname "
4758 " FROM pg_catalog.pg_tablespace "
4759 " WHERE pg_catalog.pg_tablespace_location(oid) = '%s'",
4760 location);
4761
4762 log_verbose(LOG_DEBUG, "get_tablespace_name_by_location():\n%s", query.data);
4763
4764 res = PQexec(conn, query.data);
4765
4766 if (PQresultStatus(res) != PGRES_TUPLES_OK)
4767 {
4768 log_db_error(conn, query.data,
4769 _("get_tablespace_name_by_location(): unable to execute tablespace query"));
4770 success = false;
4771 }
4772 else if (PQntuples(res) == 0)
4773 {
4774 success = false;
4775 }
4776 else
4777 {
4778 snprintf(name, MAXLEN,
4779 "%s", PQgetvalue(res, 0, 0));
4780 }
4781
4782 termPQExpBuffer(&query);
4783 PQclear(res);
4784
4785 return success;
4786 }
4787
4788 /* ============================ */
4789 /* asynchronous query functions */
4790 /* ============================ */
4791
4792 bool
cancel_query(PGconn * conn,int timeout)4793 cancel_query(PGconn *conn, int timeout)
4794 {
4795 char errbuf[ERRBUFF_SIZE] = "";
4796 PGcancel *pgcancel = NULL;
4797
4798 if (wait_connection_availability(conn, timeout) != 1)
4799 return false;
4800
4801 pgcancel = PQgetCancel(conn);
4802
4803 if (pgcancel == NULL)
4804 return false;
4805
4806 /*
4807 * PQcancel can only return 0 if socket()/connect()/send() fails, in any
4808 * of those cases we can assume something bad happened to the connection
4809 */
4810 if (PQcancel(pgcancel, errbuf, ERRBUFF_SIZE) == 0)
4811 {
4812 log_warning(_("unable to cancel current query"));
4813 log_detail("\n%s", errbuf);
4814 PQfreeCancel(pgcancel);
4815 return false;
4816 }
4817
4818 PQfreeCancel(pgcancel);
4819
4820 return true;
4821 }
4822
4823
4824 /*
4825 * Wait until current query finishes, ignoring any results.
4826 * Usually this will be an async query or query cancellation.
4827 *
4828 * Returns 1 for success; 0 if any error ocurred; -1 if timeout reached.
4829 */
4830 int
wait_connection_availability(PGconn * conn,int timeout)4831 wait_connection_availability(PGconn *conn, int timeout)
4832 {
4833 PGresult *res = NULL;
4834 fd_set read_set;
4835 int sock = PQsocket(conn);
4836 struct timeval tmout,
4837 before,
4838 after;
4839 struct timezone tz;
4840 long long timeout_ms;
4841
4842 /* calculate timeout in microseconds */
4843 timeout_ms = (long long) timeout * 1000000;
4844
4845 while (timeout_ms > 0)
4846 {
4847 if (PQconsumeInput(conn) == 0)
4848 {
4849 log_warning(_("wait_connection_availability(): unable to receive data from connection"));
4850 log_detail("%s", PQerrorMessage(conn));
4851 return 0;
4852 }
4853
4854 if (PQisBusy(conn) == 0)
4855 {
4856 do
4857 {
4858 res = PQgetResult(conn);
4859 PQclear(res);
4860 } while (res != NULL);
4861
4862 break;
4863 }
4864
4865 tmout.tv_sec = 0;
4866 tmout.tv_usec = 250000;
4867
4868 FD_ZERO(&read_set);
4869 FD_SET(sock, &read_set);
4870
4871 gettimeofday(&before, &tz);
4872 if (select(sock, &read_set, NULL, NULL, &tmout) == -1)
4873 {
4874 log_warning(_("wait_connection_availability(): select() returned with error"));
4875 log_detail("%s", strerror(errno));
4876 return -1;
4877 }
4878
4879 gettimeofday(&after, &tz);
4880
4881 timeout_ms -= (after.tv_sec * 1000000 + after.tv_usec) -
4882 (before.tv_sec * 1000000 + before.tv_usec);
4883 }
4884
4885
4886 if (timeout_ms >= 0)
4887 {
4888 return 1;
4889 }
4890
4891 log_warning(_("wait_connection_availability(): timeout (%i secs) reached"), timeout);
4892 return -1;
4893 }
4894
4895
4896 /* =========================== */
4897 /* node availability functions */
4898 /* =========================== */
4899
4900 bool
is_server_available(const char * conninfo)4901 is_server_available(const char *conninfo)
4902 {
4903 return _is_server_available(conninfo, false);
4904 }
4905
4906
4907 bool
is_server_available_quiet(const char * conninfo)4908 is_server_available_quiet(const char *conninfo)
4909 {
4910 return _is_server_available(conninfo, true);
4911 }
4912
4913
4914 static bool
_is_server_available(const char * conninfo,bool quiet)4915 _is_server_available(const char *conninfo, bool quiet)
4916 {
4917 PGPing status = PQping(conninfo);
4918
4919 log_verbose(LOG_DEBUG, "is_server_available(): ping status for \"%s\" is %s", conninfo, print_pqping_status(status));
4920 if (status == PQPING_OK)
4921 return true;
4922
4923 if (quiet == false)
4924 {
4925 log_warning(_("unable to ping \"%s\""), conninfo);
4926 log_detail(_("PQping() returned \"%s\""), print_pqping_status(status));
4927 }
4928
4929 return false;
4930 }
4931
4932
4933 bool
is_server_available_params(t_conninfo_param_list * param_list)4934 is_server_available_params(t_conninfo_param_list *param_list)
4935 {
4936 PGPing status = PQpingParams((const char **) param_list->keywords,
4937 (const char **) param_list->values,
4938 false);
4939
4940 /* deparsing the param_list adds overhead, so only do it if needed */
4941 if (log_level == LOG_DEBUG || status != PQPING_OK)
4942 {
4943 char *conninfo_str = param_list_to_string(param_list);
4944 log_verbose(LOG_DEBUG, "is_server_available_params(): ping status for \"%s\" is %s", conninfo_str, print_pqping_status(status));
4945
4946 if (status != PQPING_OK)
4947 {
4948 log_warning(_("unable to ping \"%s\""), conninfo_str);
4949 log_detail(_("PQping() returned \"%s\""), print_pqping_status(status));
4950 }
4951
4952 pfree(conninfo_str);
4953 }
4954
4955 if (status == PQPING_OK)
4956 return true;
4957
4958 return false;
4959 }
4960
4961
4962
4963 /*
4964 * Simple throw-away query to stop a connection handle going stale.
4965 */
4966 ExecStatusType
connection_ping(PGconn * conn)4967 connection_ping(PGconn *conn)
4968 {
4969 PGresult *res = PQexec(conn, "SELECT TRUE");
4970 ExecStatusType ping_result;
4971
4972 log_verbose(LOG_DEBUG, "connection_ping(): result is %s", PQresStatus(PQresultStatus(res)));
4973
4974 ping_result = PQresultStatus(res);
4975 PQclear(res);
4976
4977 return ping_result;
4978 }
4979
4980
4981 ExecStatusType
connection_ping_reconnect(PGconn * conn)4982 connection_ping_reconnect(PGconn *conn)
4983 {
4984 ExecStatusType ping_result = connection_ping(conn);
4985
4986 if (PQstatus(conn) != CONNECTION_OK)
4987 {
4988 log_warning(_("connection error, attempting to reset"));
4989 log_detail("\n%s", PQerrorMessage(conn));
4990 PQreset(conn);
4991 ping_result = connection_ping(conn);
4992 }
4993
4994 log_verbose(LOG_DEBUG, "connection_ping_reconnect(): result is %s", PQresStatus(ping_result));
4995
4996 return ping_result;
4997 }
4998
4999
5000
5001 /* ==================== */
5002 /* monitoring functions */
5003 /* ==================== */
5004
5005 void
add_monitoring_record(PGconn * primary_conn,PGconn * local_conn,int primary_node_id,int local_node_id,char * monitor_standby_timestamp,XLogRecPtr primary_last_wal_location,XLogRecPtr last_wal_receive_lsn,char * last_xact_replay_timestamp,long long unsigned int replication_lag_bytes,long long unsigned int apply_lag_bytes)5006 add_monitoring_record(PGconn *primary_conn,
5007 PGconn *local_conn,
5008 int primary_node_id,
5009 int local_node_id,
5010 char *monitor_standby_timestamp,
5011 XLogRecPtr primary_last_wal_location,
5012 XLogRecPtr last_wal_receive_lsn,
5013 char *last_xact_replay_timestamp,
5014 long long unsigned int replication_lag_bytes,
5015 long long unsigned int apply_lag_bytes
5016 )
5017 {
5018 PQExpBufferData query;
5019
5020 initPQExpBuffer(&query);
5021
5022 appendPQExpBuffer(&query,
5023 "INSERT INTO repmgr.monitoring_history "
5024 " (primary_node_id, "
5025 " standby_node_id, "
5026 " last_monitor_time, "
5027 " last_apply_time, "
5028 " last_wal_primary_location, "
5029 " last_wal_standby_location, "
5030 " replication_lag, "
5031 " apply_lag ) "
5032 " VALUES(%i, "
5033 " %i, "
5034 " '%s'::TIMESTAMP WITH TIME ZONE, "
5035 " '%s'::TIMESTAMP WITH TIME ZONE, "
5036 " '%X/%X', "
5037 " '%X/%X', "
5038 " %llu, "
5039 " %llu) ",
5040 primary_node_id,
5041 local_node_id,
5042 monitor_standby_timestamp,
5043 last_xact_replay_timestamp,
5044 format_lsn(primary_last_wal_location),
5045 format_lsn(last_wal_receive_lsn),
5046 replication_lag_bytes,
5047 apply_lag_bytes);
5048
5049 log_verbose(LOG_DEBUG, "standby_monitor:()\n%s", query.data);
5050
5051 if (PQsendQuery(primary_conn, query.data) == 0)
5052 {
5053 log_warning(_("query could not be sent to primary:\n %s"),
5054 PQerrorMessage(primary_conn));
5055 }
5056 else
5057 {
5058 PGresult *res = PQexec(local_conn, "SELECT repmgr.standby_set_last_updated()");
5059
5060 /* not critical if the above query fails */
5061 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5062 log_warning(_("add_monitoring_record(): unable to set last_updated:\n %s"),
5063 PQerrorMessage(local_conn));
5064
5065 PQclear(res);
5066 }
5067
5068 termPQExpBuffer(&query);
5069
5070 return;
5071 }
5072
5073
5074 int
get_number_of_monitoring_records_to_delete(PGconn * primary_conn,int keep_history,int node_id)5075 get_number_of_monitoring_records_to_delete(PGconn *primary_conn, int keep_history, int node_id)
5076 {
5077 PQExpBufferData query;
5078 int record_count = -1;
5079 PGresult *res = NULL;
5080
5081 initPQExpBuffer(&query);
5082
5083 appendPQExpBuffer(&query,
5084 "SELECT pg_catalog.count(*) "
5085 " FROM repmgr.monitoring_history "
5086 " WHERE pg_catalog.age(pg_catalog.now(), last_monitor_time) >= '%d days'::interval",
5087 keep_history);
5088
5089 if (node_id != UNKNOWN_NODE_ID)
5090 {
5091 appendPQExpBuffer(&query,
5092 " AND standby_node_id = %i", node_id);
5093 }
5094
5095 log_verbose(LOG_DEBUG, "get_number_of_monitoring_records_to_delete():\n %s", query.data);
5096
5097 res = PQexec(primary_conn, query.data);
5098
5099 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5100 {
5101 log_db_error(primary_conn, query.data,
5102 _("get_number_of_monitoring_records_to_delete(): unable to query number of monitoring records to clean up"));
5103 }
5104 else
5105 {
5106 record_count = atoi(PQgetvalue(res, 0, 0));
5107 }
5108
5109 termPQExpBuffer(&query);
5110 PQclear(res);
5111
5112 return record_count;
5113 }
5114
5115
5116 bool
delete_monitoring_records(PGconn * primary_conn,int keep_history,int node_id)5117 delete_monitoring_records(PGconn *primary_conn, int keep_history, int node_id)
5118 {
5119 PQExpBufferData query;
5120 bool success = true;
5121 PGresult *res = NULL;
5122
5123 initPQExpBuffer(&query);
5124
5125 if (keep_history > 0 || node_id != UNKNOWN_NODE_ID)
5126 {
5127 appendPQExpBuffer(&query,
5128 "DELETE FROM repmgr.monitoring_history "
5129 " WHERE pg_catalog.age(pg_catalog.now(), last_monitor_time) >= '%d days'::INTERVAL ",
5130 keep_history);
5131
5132 if (node_id != UNKNOWN_NODE_ID)
5133 {
5134 appendPQExpBuffer(&query,
5135 " AND standby_node_id = %i", node_id);
5136 }
5137 }
5138 else
5139 {
5140 appendPQExpBufferStr(&query,
5141 "TRUNCATE TABLE repmgr.monitoring_history");
5142 }
5143
5144 res = PQexec(primary_conn, query.data);
5145
5146 if (PQresultStatus(res) != PGRES_COMMAND_OK)
5147 {
5148 log_db_error(primary_conn, query.data,
5149 _("delete_monitoring_records(): unable to delete monitoring records"));
5150 success = false;
5151 }
5152
5153 termPQExpBuffer(&query);
5154 PQclear(res);
5155
5156 return success;
5157 }
5158
5159 /*
5160 * node voting functions
5161 *
5162 * These are intended to run under repmgrd and mainly rely on shared memory
5163 */
5164
5165 int
get_current_term(PGconn * conn)5166 get_current_term(PGconn *conn)
5167 {
5168 PGresult *res = NULL;
5169 int term = VOTING_TERM_NOT_SET;
5170
5171 res = PQexec(conn, "SELECT term FROM repmgr.voting_term");
5172
5173 /* it doesn't matter if for whatever reason the table has no rows */
5174
5175 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5176 {
5177 log_db_error(conn, NULL,
5178 _("get_current_term(): unable to query \"repmgr.voting_term\""));
5179 }
5180 else if (PQntuples(res) > 0)
5181 {
5182 term = atoi(PQgetvalue(res, 0, 0));
5183 }
5184
5185 PQclear(res);
5186 return term;
5187 }
5188
5189
5190 void
initialize_voting_term(PGconn * conn)5191 initialize_voting_term(PGconn *conn)
5192 {
5193 PGresult *res = NULL;
5194
5195 int current_term = get_current_term(conn);
5196
5197 if (current_term == VOTING_TERM_NOT_SET)
5198 {
5199 res = PQexec(conn, "INSERT INTO repmgr.voting_term (term) VALUES (1)");
5200 }
5201 else
5202 {
5203 res = PQexec(conn, "UPDATE repmgr.voting_term SET term = 1");
5204 }
5205
5206 if (PQresultStatus(res) != PGRES_COMMAND_OK)
5207 {
5208 log_db_error(conn, NULL, _("unable to initialize repmgr.voting_term"));
5209 }
5210
5211 PQclear(res);
5212 return;
5213 }
5214
5215
5216 void
increment_current_term(PGconn * conn)5217 increment_current_term(PGconn *conn)
5218 {
5219 PGresult *res = NULL;
5220
5221 res = PQexec(conn, "UPDATE repmgr.voting_term SET term = term + 1");
5222
5223 if (PQresultStatus(res) != PGRES_COMMAND_OK)
5224 {
5225 log_db_error(conn, NULL, _("unable to increment repmgr.voting_term"));
5226 }
5227
5228 PQclear(res);
5229 return;
5230 }
5231
5232
5233 bool
announce_candidature(PGconn * conn,t_node_info * this_node,t_node_info * other_node,int electoral_term)5234 announce_candidature(PGconn *conn, t_node_info *this_node, t_node_info *other_node, int electoral_term)
5235 {
5236 PQExpBufferData query;
5237 PGresult *res = NULL;
5238
5239 bool retval = false;
5240
5241 initPQExpBuffer(&query);
5242
5243 appendPQExpBuffer(&query,
5244 "SELECT repmgr.other_node_is_candidate(%i, %i)",
5245 this_node->node_id,
5246 electoral_term);
5247
5248 res = PQexec(conn, query.data);
5249
5250 if (PQresultStatus(res) != PGRES_COMMAND_OK)
5251 {
5252 log_db_error(conn, query.data, _("announce_candidature(): unable to execute repmgr.other_node_is_candidate()"));
5253 }
5254 else
5255 {
5256 retval = atobool(PQgetvalue(res, 0, 0));
5257 }
5258
5259 termPQExpBuffer(&query);
5260 PQclear(res);
5261
5262 return retval;
5263 }
5264
5265
5266 void
notify_follow_primary(PGconn * conn,int primary_node_id)5267 notify_follow_primary(PGconn *conn, int primary_node_id)
5268 {
5269 PQExpBufferData query;
5270 PGresult *res = NULL;
5271
5272 initPQExpBuffer(&query);
5273
5274 appendPQExpBuffer(&query,
5275 "SELECT repmgr.notify_follow_primary(%i)",
5276 primary_node_id);
5277
5278 log_verbose(LOG_DEBUG, "notify_follow_primary():\n %s", query.data);
5279
5280 res = PQexec(conn, query.data);
5281
5282 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5283 {
5284 log_db_error(conn, query.data, _("unable to execute repmgr.notify_follow_primary()"));
5285 }
5286
5287 termPQExpBuffer(&query);
5288 PQclear(res);
5289
5290 return;
5291 }
5292
5293
5294 bool
get_new_primary(PGconn * conn,int * primary_node_id)5295 get_new_primary(PGconn *conn, int *primary_node_id)
5296 {
5297 PGresult *res = NULL;
5298 int new_primary_node_id = UNKNOWN_NODE_ID;
5299 bool success = true;
5300
5301 const char *sqlquery = "SELECT repmgr.get_new_primary()";
5302
5303 res = PQexec(conn, sqlquery);
5304
5305 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5306 {
5307 log_db_error(conn, sqlquery, _("unable to execute repmgr.get_new_primary()"));
5308 success = false;
5309 }
5310 else if (PQgetisnull(res, 0, 0))
5311 {
5312 success = false;
5313 }
5314 else
5315 {
5316 new_primary_node_id = atoi(PQgetvalue(res, 0, 0));
5317 }
5318
5319 PQclear(res);
5320
5321 /*
5322 * repmgr.get_new_primary() will return UNKNOWN_NODE_ID if
5323 * "follow_new_primary" is false
5324 */
5325 if (new_primary_node_id == UNKNOWN_NODE_ID)
5326 success = false;
5327
5328 *primary_node_id = new_primary_node_id;
5329
5330 return success;
5331 }
5332
5333
5334 void
reset_voting_status(PGconn * conn)5335 reset_voting_status(PGconn *conn)
5336 {
5337 PGresult *res = NULL;
5338
5339 const char *sqlquery = "SELECT repmgr.reset_voting_status()";
5340
5341 res = PQexec(conn, sqlquery);
5342
5343 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5344 {
5345 log_db_error(conn, sqlquery, _("unable to execute repmgr.reset_voting_status()"));
5346 }
5347
5348 PQclear(res);
5349 return;
5350 }
5351
5352
5353 /* ============================ */
5354 /* replication status functions */
5355 /* ============================ */
5356
5357 /*
5358 * Returns the current LSN on the primary.
5359 *
5360 * This just executes "pg_current_wal_lsn()".
5361 *
5362 * Function "get_node_current_lsn()" below will return the latest
5363 * LSN regardless of recovery state.
5364 */
5365 XLogRecPtr
get_primary_current_lsn(PGconn * conn)5366 get_primary_current_lsn(PGconn *conn)
5367 {
5368 PGresult *res = NULL;
5369 XLogRecPtr ptr = InvalidXLogRecPtr;
5370
5371 if (PQserverVersion(conn) >= 100000)
5372 {
5373 res = PQexec(conn, "SELECT pg_catalog.pg_current_wal_lsn()");
5374 }
5375 else
5376 {
5377 res = PQexec(conn, "SELECT pg_catalog.pg_current_xlog_location()");
5378 }
5379
5380 if (PQresultStatus(res) == PGRES_TUPLES_OK)
5381 {
5382 ptr = parse_lsn(PQgetvalue(res, 0, 0));
5383 }
5384 else
5385 {
5386 log_db_error(conn, NULL, _("unable to execute get_primary_current_lsn()"));
5387 }
5388
5389
5390 PQclear(res);
5391
5392 return ptr;
5393 }
5394
5395
5396 XLogRecPtr
get_last_wal_receive_location(PGconn * conn)5397 get_last_wal_receive_location(PGconn *conn)
5398 {
5399 PGresult *res = NULL;
5400 XLogRecPtr ptr = InvalidXLogRecPtr;
5401
5402 if (PQserverVersion(conn) >= 100000)
5403 {
5404 res = PQexec(conn, "SELECT pg_catalog.pg_last_wal_receive_lsn()");
5405 }
5406 else
5407 {
5408 res = PQexec(conn, "SELECT pg_catalog.pg_last_xlog_receive_location()");
5409 }
5410
5411 if (PQresultStatus(res) == PGRES_TUPLES_OK)
5412 {
5413 ptr = parse_lsn(PQgetvalue(res, 0, 0));
5414 }
5415 else
5416 {
5417 log_db_error(conn, NULL, _("unable to execute get_last_wal_receive_location()"));
5418 }
5419
5420 PQclear(res);
5421
5422 return ptr;
5423 }
5424
5425 /*
5426 * Returns the latest LSN for the node regardless of recovery state.
5427 */
5428 XLogRecPtr
get_node_current_lsn(PGconn * conn)5429 get_node_current_lsn(PGconn *conn)
5430 {
5431 PQExpBufferData query;
5432 PGresult *res = NULL;
5433 XLogRecPtr ptr = InvalidXLogRecPtr;
5434
5435 initPQExpBuffer(&query);
5436
5437 if (PQserverVersion(conn) >= 100000)
5438 {
5439 appendPQExpBufferStr(&query,
5440 " WITH lsn_states AS ( "
5441 " SELECT "
5442 " CASE WHEN pg_catalog.pg_is_in_recovery() IS FALSE "
5443 " THEN pg_catalog.pg_current_wal_lsn() "
5444 " ELSE NULL "
5445 " END "
5446 " AS current_wal_lsn, "
5447 " CASE WHEN pg_catalog.pg_is_in_recovery() IS TRUE "
5448 " THEN pg_catalog.pg_last_wal_receive_lsn() "
5449 " ELSE NULL "
5450 " END "
5451 " AS last_wal_receive_lsn, "
5452 " CASE WHEN pg_catalog.pg_is_in_recovery() IS TRUE "
5453 " THEN pg_catalog.pg_last_wal_replay_lsn() "
5454 " ELSE NULL "
5455 " END "
5456 " AS last_wal_replay_lsn "
5457 " ) ");
5458 }
5459 else
5460 {
5461 appendPQExpBufferStr(&query,
5462 " WITH lsn_states AS ( "
5463 " SELECT "
5464 " CASE WHEN pg_catalog.pg_is_in_recovery() IS FALSE "
5465 " THEN pg_catalog.pg_current_xlog_location() "
5466 " ELSE NULL "
5467 " END "
5468 " AS current_wal_lsn, "
5469 " CASE WHEN pg_catalog.pg_is_in_recovery() IS TRUE "
5470 " THEN pg_catalog.pg_last_xlog_receive_location() "
5471 " ELSE NULL "
5472 " END "
5473 " AS last_wal_receive_lsn, "
5474 " CASE WHEN pg_catalog.pg_is_in_recovery() IS TRUE "
5475 " THEN pg_catalog.pg_last_xlog_replay_location() "
5476 " ELSE NULL "
5477 " END "
5478 " AS last_wal_replay_lsn "
5479 " ) ");
5480 }
5481
5482 appendPQExpBufferStr(&query,
5483 " SELECT "
5484 " CASE WHEN pg_catalog.pg_is_in_recovery() IS FALSE "
5485 " THEN current_wal_lsn "
5486 " ELSE "
5487 " CASE WHEN last_wal_receive_lsn IS NULL "
5488 " THEN last_wal_replay_lsn "
5489 " ELSE "
5490 " CASE WHEN last_wal_replay_lsn > last_wal_receive_lsn "
5491 " THEN last_wal_replay_lsn "
5492 " ELSE last_wal_receive_lsn "
5493 " END "
5494 " END "
5495 " END "
5496 " AS current_lsn "
5497 " FROM lsn_states ");
5498
5499 res = PQexec(conn, query.data);
5500
5501 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5502 {
5503 log_db_error(conn, query.data, _("unable to execute get_node_current_lsn()"));
5504 }
5505 else if (!PQgetisnull(res, 0, 0))
5506 {
5507 ptr = parse_lsn(PQgetvalue(res, 0, 0));
5508 }
5509
5510 termPQExpBuffer(&query);
5511 PQclear(res);
5512
5513 return ptr;
5514 }
5515
5516
5517 void
init_replication_info(ReplInfo * replication_info)5518 init_replication_info(ReplInfo *replication_info)
5519 {
5520 memset(replication_info->current_timestamp, 0, sizeof(replication_info->current_timestamp));
5521 replication_info->in_recovery = false;
5522 replication_info->timeline_id = UNKNOWN_TIMELINE_ID;
5523 replication_info->last_wal_receive_lsn = InvalidXLogRecPtr;
5524 replication_info->last_wal_replay_lsn = InvalidXLogRecPtr;
5525 memset(replication_info->last_xact_replay_timestamp, 0, sizeof(replication_info->last_xact_replay_timestamp));
5526 replication_info->replication_lag_time = 0;
5527 replication_info->receiving_streamed_wal = true;
5528 replication_info->wal_replay_paused = false;
5529 replication_info->upstream_last_seen = -1;
5530 replication_info->upstream_node_id = UNKNOWN_NODE_ID;
5531 }
5532
5533
5534 bool
get_replication_info(PGconn * conn,t_server_type node_type,ReplInfo * replication_info)5535 get_replication_info(PGconn *conn, t_server_type node_type, ReplInfo *replication_info)
5536 {
5537 PQExpBufferData query;
5538 PGresult *res = NULL;
5539 bool success = true;
5540
5541 initPQExpBuffer(&query);
5542 appendPQExpBufferStr(&query,
5543 " SELECT ts, "
5544 " in_recovery, "
5545 " last_wal_receive_lsn, "
5546 " last_wal_replay_lsn, "
5547 " last_xact_replay_timestamp, "
5548 " CASE WHEN (last_wal_receive_lsn = last_wal_replay_lsn) "
5549 " THEN 0::INT "
5550 " ELSE "
5551 " CASE WHEN last_xact_replay_timestamp IS NULL "
5552 " THEN 0::INT "
5553 " ELSE "
5554 " EXTRACT(epoch FROM (pg_catalog.clock_timestamp() - last_xact_replay_timestamp))::INT "
5555 " END "
5556 " END AS replication_lag_time, "
5557 " last_wal_receive_lsn >= last_wal_replay_lsn AS receiving_streamed_wal, "
5558 " wal_replay_paused, "
5559 " upstream_last_seen, "
5560 " upstream_node_id "
5561 " FROM ( "
5562 " SELECT CURRENT_TIMESTAMP AS ts, "
5563 " pg_catalog.pg_is_in_recovery() AS in_recovery, "
5564 " pg_catalog.pg_last_xact_replay_timestamp() AS last_xact_replay_timestamp, ");
5565
5566
5567 if (PQserverVersion(conn) >= 100000)
5568 {
5569 appendPQExpBufferStr(&query,
5570 " COALESCE(pg_catalog.pg_last_wal_receive_lsn(), '0/0'::PG_LSN) AS last_wal_receive_lsn, "
5571 " COALESCE(pg_catalog.pg_last_wal_replay_lsn(), '0/0'::PG_LSN) AS last_wal_replay_lsn, "
5572 " CASE WHEN pg_catalog.pg_is_in_recovery() IS FALSE "
5573 " THEN FALSE "
5574 " ELSE pg_catalog.pg_is_wal_replay_paused() "
5575 " END AS wal_replay_paused, ");
5576 }
5577 else
5578 {
5579 appendPQExpBufferStr(&query,
5580 " COALESCE(pg_catalog.pg_last_xlog_receive_location(), '0/0'::PG_LSN) AS last_wal_receive_lsn, "
5581 " COALESCE(pg_catalog.pg_last_xlog_replay_location(), '0/0'::PG_LSN) AS last_wal_replay_lsn, "
5582 " CASE WHEN pg_catalog.pg_is_in_recovery() IS FALSE "
5583 " THEN FALSE "
5584 " ELSE pg_catalog.pg_is_xlog_replay_paused() "
5585 " END AS wal_replay_paused, ");
5586 }
5587
5588 /* Add information about upstream node from shared memory */
5589 if (node_type == WITNESS)
5590 {
5591 appendPQExpBufferStr(&query,
5592 " repmgr.get_upstream_last_seen() AS upstream_last_seen, "
5593 " repmgr.get_upstream_node_id() AS upstream_node_id ");
5594 }
5595 else
5596 {
5597 appendPQExpBufferStr(&query,
5598 " CASE WHEN pg_catalog.pg_is_in_recovery() IS FALSE "
5599 " THEN -1 "
5600 " ELSE repmgr.get_upstream_last_seen() "
5601 " END AS upstream_last_seen, ");
5602 appendPQExpBufferStr(&query,
5603 " CASE WHEN pg_catalog.pg_is_in_recovery() IS FALSE "
5604 " THEN -1 "
5605 " ELSE repmgr.get_upstream_node_id() "
5606 " END AS upstream_node_id ");
5607 }
5608
5609 appendPQExpBufferStr(&query,
5610 " ) q ");
5611
5612 log_verbose(LOG_DEBUG, "get_replication_info():\n%s", query.data);
5613
5614 res = PQexec(conn, query.data);
5615
5616 if (PQresultStatus(res) != PGRES_TUPLES_OK || !PQntuples(res))
5617 {
5618 log_db_error(conn, query.data, _("get_replication_info(): unable to execute query"));
5619
5620 success = false;
5621 }
5622 else
5623 {
5624 snprintf(replication_info->current_timestamp,
5625 sizeof(replication_info->current_timestamp),
5626 "%s", PQgetvalue(res, 0, 0));
5627 replication_info->in_recovery = atobool(PQgetvalue(res, 0, 1));
5628 replication_info->last_wal_receive_lsn = parse_lsn(PQgetvalue(res, 0, 2));
5629 replication_info->last_wal_replay_lsn = parse_lsn(PQgetvalue(res, 0, 3));
5630 snprintf(replication_info->last_xact_replay_timestamp,
5631 sizeof(replication_info->last_xact_replay_timestamp),
5632 "%s", PQgetvalue(res, 0, 4));
5633 replication_info->replication_lag_time = atoi(PQgetvalue(res, 0, 5));
5634 replication_info->receiving_streamed_wal = atobool(PQgetvalue(res, 0, 6));
5635 replication_info->wal_replay_paused = atobool(PQgetvalue(res, 0, 7));
5636 replication_info->upstream_last_seen = atoi(PQgetvalue(res, 0, 8));
5637 replication_info->upstream_node_id = atoi(PQgetvalue(res, 0, 9));
5638 }
5639
5640 termPQExpBuffer(&query);
5641 PQclear(res);
5642
5643 return success;
5644 }
5645
5646
5647 int
get_replication_lag_seconds(PGconn * conn)5648 get_replication_lag_seconds(PGconn *conn)
5649 {
5650 PQExpBufferData query;
5651 PGresult *res = NULL;
5652 int lag_seconds = 0;
5653
5654 initPQExpBuffer(&query);
5655
5656 if (PQserverVersion(conn) >= 100000)
5657 {
5658 appendPQExpBufferStr(&query,
5659 " SELECT CASE WHEN (pg_catalog.pg_last_wal_receive_lsn() = pg_catalog.pg_last_wal_replay_lsn()) ");
5660
5661 }
5662 else
5663 {
5664 appendPQExpBufferStr(&query,
5665 " SELECT CASE WHEN (pg_catalog.pg_last_xlog_receive_location() = pg_catalog.pg_last_xlog_replay_location()) ");
5666 }
5667
5668 appendPQExpBufferStr(&query,
5669 " THEN 0 "
5670 " ELSE EXTRACT(epoch FROM (pg_catalog.clock_timestamp() - pg_catalog.pg_last_xact_replay_timestamp()))::INT "
5671 " END "
5672 " AS lag_seconds");
5673
5674 res = PQexec(conn, query.data);
5675 log_verbose(LOG_DEBUG, "get_replication_lag_seconds():\n%s", query.data);
5676 termPQExpBuffer(&query);
5677
5678 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5679 {
5680 log_warning("%s", PQerrorMessage(conn));
5681 PQclear(res);
5682
5683 return UNKNOWN_REPLICATION_LAG;
5684 }
5685
5686 if (!PQntuples(res))
5687 {
5688 return UNKNOWN_REPLICATION_LAG;
5689 }
5690
5691 lag_seconds = atoi(PQgetvalue(res, 0, 0));
5692
5693 PQclear(res);
5694 return lag_seconds;
5695 }
5696
5697
5698
5699 TimeLineID
get_node_timeline(PGconn * conn,char * timeline_id_str)5700 get_node_timeline(PGconn *conn, char *timeline_id_str)
5701 {
5702 TimeLineID timeline_id = UNKNOWN_TIMELINE_ID;
5703
5704 /*
5705 * PG_control_checkpoint() was introduced in PostgreSQL 9.6
5706 */
5707 if (PQserverVersion(conn) >= 90600)
5708 {
5709 PGresult *res = NULL;
5710
5711 res = PQexec(conn, "SELECT timeline_id FROM pg_catalog.pg_control_checkpoint()");
5712
5713 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5714 {
5715 log_db_error(conn, NULL, _("get_node_timeline(): unable to query pg_control_system()"));
5716 }
5717 else
5718 {
5719 timeline_id = atoi(PQgetvalue(res, 0, 0));
5720 }
5721
5722 PQclear(res);
5723 }
5724
5725 /* If requested, format the timeline ID as a string */
5726 if (timeline_id_str != NULL)
5727 {
5728 if (timeline_id == UNKNOWN_TIMELINE_ID)
5729 {
5730 strncpy(timeline_id_str, "?", MAXLEN);
5731 }
5732 else
5733 {
5734 snprintf(timeline_id_str, MAXLEN, "%i", timeline_id);
5735 }
5736 }
5737
5738 return timeline_id;
5739 }
5740
5741
5742 void
get_node_replication_stats(PGconn * conn,t_node_info * node_info)5743 get_node_replication_stats(PGconn *conn, t_node_info *node_info)
5744 {
5745 PQExpBufferData query;
5746 PGresult *res = NULL;
5747
5748 initPQExpBuffer(&query);
5749
5750 appendPQExpBufferStr(&query,
5751 " SELECT pg_catalog.current_setting('max_wal_senders')::INT AS max_wal_senders, "
5752 " (SELECT pg_catalog.count(*) FROM pg_catalog.pg_stat_replication) AS attached_wal_receivers, "
5753 " current_setting('max_replication_slots')::INT AS max_replication_slots, "
5754 " (SELECT pg_catalog.count(*) FROM pg_catalog.pg_replication_slots WHERE slot_type='physical') AS total_replication_slots, "
5755 " (SELECT pg_catalog.count(*) FROM pg_catalog.pg_replication_slots WHERE active IS TRUE AND slot_type='physical') AS active_replication_slots, "
5756 " (SELECT pg_catalog.count(*) FROM pg_catalog.pg_replication_slots WHERE active IS FALSE AND slot_type='physical') AS inactive_replication_slots, "
5757 " pg_catalog.pg_is_in_recovery() AS in_recovery");
5758
5759 log_verbose(LOG_DEBUG, "get_node_replication_stats():\n%s", query.data);
5760
5761 res = PQexec(conn, query.data);
5762
5763 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5764 {
5765 log_warning(_("unable to retrieve node replication statistics"));
5766 log_detail("%s", PQerrorMessage(conn));
5767 log_detail("%s", query.data);
5768
5769 termPQExpBuffer(&query);
5770 PQclear(res);
5771
5772 return;
5773 }
5774
5775 node_info->max_wal_senders = atoi(PQgetvalue(res, 0, 0));
5776 node_info->attached_wal_receivers = atoi(PQgetvalue(res, 0, 1));
5777 node_info->max_replication_slots = atoi(PQgetvalue(res, 0, 2));
5778 node_info->total_replication_slots = atoi(PQgetvalue(res, 0, 3));
5779 node_info->active_replication_slots = atoi(PQgetvalue(res, 0, 4));
5780 node_info->inactive_replication_slots = atoi(PQgetvalue(res, 0, 5));
5781 node_info->recovery_type = strcmp(PQgetvalue(res, 0, 6), "f") == 0 ? RECTYPE_PRIMARY : RECTYPE_STANDBY;
5782
5783 termPQExpBuffer(&query);
5784 PQclear(res);
5785
5786 return;
5787 }
5788
5789
5790 NodeAttached
is_downstream_node_attached(PGconn * conn,char * node_name,char ** node_state)5791 is_downstream_node_attached(PGconn *conn, char *node_name, char **node_state)
5792 {
5793 PQExpBufferData query;
5794 PGresult *res = NULL;
5795
5796 initPQExpBuffer(&query);
5797
5798 appendPQExpBuffer(&query,
5799 " SELECT pid, state "
5800 " FROM pg_catalog.pg_stat_replication "
5801 " WHERE application_name = '%s'",
5802 node_name);
5803
5804 res = PQexec(conn, query.data);
5805
5806 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5807 {
5808 log_verbose(LOG_WARNING, _("unable to query pg_stat_replication"));
5809 log_detail("%s", PQerrorMessage(conn));
5810 log_detail("%s", query.data);
5811
5812 termPQExpBuffer(&query);
5813 PQclear(res);
5814
5815 return NODE_ATTACHED_UNKNOWN;
5816 }
5817
5818 termPQExpBuffer(&query);
5819
5820 /*
5821 * If there's more than one entry in pg_stat_application, there's no
5822 * way we can reliably determine which one belongs to the node we're
5823 * checking, so there's nothing more we can do.
5824 */
5825 if (PQntuples(res) > 1)
5826 {
5827 log_error(_("multiple entries with \"application_name\" set to \"%s\" found in \"pg_stat_replication\""),
5828 node_name);
5829 log_hint(_("verify that a unique node name is configured for each node"));
5830
5831 PQclear(res);
5832
5833 return NODE_ATTACHED_UNKNOWN;
5834 }
5835
5836 if (PQntuples(res) == 0)
5837 {
5838 log_warning(_("node \"%s\" not found in \"pg_stat_replication\""), node_name);
5839
5840 PQclear(res);
5841
5842 return NODE_DETACHED;
5843 }
5844
5845 /*
5846 * If the connection is not a superuser or member of pg_read_all_stats, we
5847 * won't be able to retrieve the "state" column, so we'll assume
5848 * the node is attached.
5849 */
5850
5851 if (connection_has_pg_monitor_role(conn, "pg_read_all_stats"))
5852 {
5853 const char *state = PQgetvalue(res, 0, 1);
5854
5855 if (node_state != NULL)
5856 {
5857 int state_len = strlen(state);
5858 *node_state = palloc0(state_len + 1);
5859 strncpy(*node_state, state, state_len);
5860 }
5861
5862 if (strcmp(state, "streaming") != 0)
5863 {
5864 log_warning(_("node \"%s\" attached in state \"%s\""),
5865 node_name,
5866 state);
5867
5868 PQclear(res);
5869
5870 return NODE_NOT_ATTACHED;
5871 }
5872 }
5873 else if (node_state != NULL)
5874 {
5875 *node_state = palloc0(1);
5876 *node_state[0] = '\0';
5877 }
5878
5879 PQclear(res);
5880
5881 return NODE_ATTACHED;
5882 }
5883
5884
5885 void
set_upstream_last_seen(PGconn * conn,int upstream_node_id)5886 set_upstream_last_seen(PGconn *conn, int upstream_node_id)
5887 {
5888 PQExpBufferData query;
5889 PGresult *res = NULL;
5890
5891 initPQExpBuffer(&query);
5892
5893 appendPQExpBuffer(&query,
5894 "SELECT repmgr.set_upstream_last_seen(%i)",
5895 upstream_node_id);
5896
5897 res = PQexec(conn, query.data);
5898
5899 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5900 {
5901 log_db_error(conn, query.data, _("unable to execute repmgr.set_upstream_last_seen()"));
5902 }
5903
5904 termPQExpBuffer(&query);
5905 PQclear(res);
5906 }
5907
5908
5909 int
get_upstream_last_seen(PGconn * conn,t_server_type node_type)5910 get_upstream_last_seen(PGconn *conn, t_server_type node_type)
5911 {
5912 PQExpBufferData query;
5913 PGresult *res = NULL;
5914 int upstream_last_seen = -1;
5915
5916 initPQExpBuffer(&query);
5917
5918 if (node_type == WITNESS)
5919 {
5920 appendPQExpBufferStr(&query,
5921 "SELECT repmgr.get_upstream_last_seen()");
5922 }
5923 else
5924 {
5925 appendPQExpBufferStr(&query,
5926 "SELECT CASE WHEN pg_catalog.pg_is_in_recovery() IS FALSE "
5927 " THEN -1 "
5928 " ELSE repmgr.get_upstream_last_seen() "
5929 " END AS upstream_last_seen ");
5930 }
5931
5932 res = PQexec(conn, query.data);
5933
5934 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5935 {
5936 log_db_error(conn, query.data, _("unable to execute repmgr.get_upstream_last_seen()"));
5937 }
5938 else
5939 {
5940 upstream_last_seen = atoi(PQgetvalue(res, 0, 0));
5941 }
5942
5943 termPQExpBuffer(&query);
5944 PQclear(res);
5945
5946 return upstream_last_seen;
5947 }
5948
5949
5950 bool
is_wal_replay_paused(PGconn * conn,bool check_pending_wal)5951 is_wal_replay_paused(PGconn *conn, bool check_pending_wal)
5952 {
5953 PQExpBufferData query;
5954 PGresult *res = NULL;
5955 bool is_paused = false;
5956
5957 initPQExpBuffer(&query);
5958
5959 appendPQExpBufferStr(&query,
5960 "SELECT paused.wal_replay_paused ");
5961
5962 if (PQserverVersion(conn) >= 100000)
5963 {
5964 if (check_pending_wal == true)
5965 {
5966 appendPQExpBufferStr(&query,
5967 " AND pg_catalog.pg_last_wal_replay_lsn() < pg_catalog.pg_last_wal_receive_lsn() ");
5968 }
5969
5970 appendPQExpBufferStr(&query,
5971 " FROM (SELECT CASE WHEN pg_catalog.pg_is_in_recovery() IS FALSE "
5972 " THEN FALSE "
5973 " ELSE pg_catalog.pg_is_wal_replay_paused() "
5974 " END AS wal_replay_paused) paused ");
5975 }
5976 else
5977 {
5978 if (check_pending_wal == true)
5979 {
5980 appendPQExpBufferStr(&query,
5981 " AND pg_catalog.pg_last_xlog_replay_location() < pg_catalog.pg_last_xlog_receive_location() ");
5982 }
5983
5984 appendPQExpBufferStr(&query,
5985 " FROM (SELECT CASE WHEN pg_catalog.pg_is_in_recovery() IS FALSE "
5986 " THEN FALSE "
5987 " ELSE pg_catalog.pg_is_xlog_replay_paused() "
5988 " END AS wal_replay_paused) paused ");
5989
5990 }
5991
5992 res = PQexec(conn, query.data);
5993
5994 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5995 {
5996 log_db_error(conn, query.data, _("unable to execute WAL replay pause query"));
5997 }
5998 else
5999 {
6000 is_paused = atobool(PQgetvalue(res, 0, 0));
6001 }
6002
6003 termPQExpBuffer(&query);
6004 PQclear(res);
6005
6006 return is_paused;
6007 }
6008
6009
6010 /* miscellaneous debugging functions */
6011
6012 const char *
print_node_status(NodeStatus node_status)6013 print_node_status(NodeStatus node_status)
6014 {
6015 switch (node_status)
6016 {
6017 case NODE_STATUS_UNKNOWN:
6018 return "UNKNOWN";
6019 case NODE_STATUS_UP:
6020 return "UP";
6021 case NODE_STATUS_SHUTTING_DOWN:
6022 return "SHUTTING_DOWN";
6023 case NODE_STATUS_DOWN:
6024 return "SHUTDOWN";
6025 case NODE_STATUS_UNCLEAN_SHUTDOWN:
6026 return "UNCLEAN_SHUTDOWN";
6027 case NODE_STATUS_REJECTED:
6028 return "REJECTED";
6029 }
6030
6031 return "UNIDENTIFIED_STATUS";
6032 }
6033
6034
6035 const char *
print_pqping_status(PGPing ping_status)6036 print_pqping_status(PGPing ping_status)
6037 {
6038 switch (ping_status)
6039 {
6040 case PQPING_OK:
6041 return "PQPING_OK";
6042 case PQPING_REJECT:
6043 return "PQPING_REJECT";
6044 case PQPING_NO_RESPONSE:
6045 return "PQPING_NO_RESPONSE";
6046 case PQPING_NO_ATTEMPT:
6047 return "PQPING_NO_ATTEMPT";
6048 }
6049
6050 return "PQPING_UNKNOWN_STATUS";
6051 }
6052