1 /*
2  * dbutils.c - Database connection/management functions
3  *
4  * Copyright (c) 2ndQuadrant, 2010-2020
5  *
6  *
7  * This program is free software: you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation, either version 3 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
19  */
20 
21 #include <unistd.h>
22 #include <time.h>
23 #include <sys/time.h>
24 #include <sys/stat.h>
25 #include <dirent.h>
26 #include <arpa/inet.h>
27 
28 #include "repmgr.h"
29 #include "dbutils.h"
30 #include "controldata.h"
31 #include "dirutil.h"
32 
33 #define NODE_RECORD_PARAM_COUNT 11
34 
35 
36 static void log_db_error(PGconn *conn, const char *query_text, const char *fmt,...)
37 __attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 4)));
38 
39 static bool _is_server_available(const char *conninfo, bool quiet);
40 
41 static PGconn *_establish_db_connection(const char *conninfo,
42 						 const bool exit_on_error,
43 						 const bool log_notice,
44 						 const bool verbose_only);
45 
46 static PGconn * _establish_replication_connection_from_params(PGconn *conn, const char *conninfo, const char *repluser);
47 
48 static PGconn *_get_primary_connection(PGconn *standby_conn, int *primary_id, char *primary_conninfo_out, bool quiet);
49 
50 static bool _set_config(PGconn *conn, const char *config_param, const char *sqlquery);
51 static bool _get_pg_setting(PGconn *conn, const char *setting, char *str_output, bool *bool_output, int *int_output);
52 
53 static RecordStatus _get_node_record(PGconn *conn, char *sqlquery, t_node_info *node_info, bool init_defaults);
54 static void _populate_node_record(PGresult *res, t_node_info *node_info, int row, bool init_defaults);
55 
56 static void _populate_node_records(PGresult *res, NodeInfoList *node_list);
57 
58 static bool _create_update_node_record(PGconn *conn, char *action, t_node_info *node_info);
59 
60 static ReplSlotStatus _verify_replication_slot(PGconn *conn, char *slot_name, PQExpBufferData *error_msg);
61 
62 static bool _create_event(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details, t_event_info *event_info, bool send_notification);
63 
64 /*
65  * This provides a standardized way of logging database errors. Note
66  * that the provided PGconn can be a normal or a replication connection;
67  * no attempt is made to write to the database, only to report the output
68  * of PQerrorMessage().
69  */
70 void
log_db_error(PGconn * conn,const char * query_text,const char * fmt,...)71 log_db_error(PGconn *conn, const char *query_text, const char *fmt,...)
72 {
73 	va_list		ap;
74 	char		buf[MAXLEN];
75 	int			retval;
76 
77 	va_start(ap, fmt);
78 	retval = vsnprintf(buf, MAXLEN, fmt, ap);
79 	va_end(ap);
80 
81 	if (retval < MAXLEN)
82 		log_error("%s", buf);
83 
84 	if (conn != NULL)
85 	{
86 		log_detail("\n%s", PQerrorMessage(conn));
87 	}
88 
89 	if (query_text != NULL)
90 	{
91 		log_detail("query text is:\n%s", query_text);
92 	}
93 }
94 
95 /* ================= */
96 /* utility functions */
97 /* ================= */
98 
99 XLogRecPtr
parse_lsn(const char * str)100 parse_lsn(const char *str)
101 {
102 	XLogRecPtr	ptr = InvalidXLogRecPtr;
103 	uint32		high,
104 				low;
105 
106 	if (sscanf(str, "%x/%x", &high, &low) == 2)
107 		ptr = (((XLogRecPtr) high) << 32) + (XLogRecPtr) low;
108 
109 	return ptr;
110 }
111 
112 
113 /* ==================== */
114 /* Connection functions */
115 /* ==================== */
116 
117 /*
118  * _establish_db_connection()
119  *
120  * Connect to a database using a conninfo string.
121  *
122  * NOTE: *do not* use this for replication connections; instead use:
123  *	 establish_db_connection_by_params()
124  */
125 
126 static PGconn *
_establish_db_connection(const char * conninfo,const bool exit_on_error,const bool log_notice,const bool verbose_only)127 _establish_db_connection(const char *conninfo, const bool exit_on_error, const bool log_notice, const bool verbose_only)
128 {
129 	PGconn	   *conn = NULL;
130 	char	   *connection_string = NULL;
131 	char	   *errmsg = NULL;
132 
133 	t_conninfo_param_list conninfo_params = T_CONNINFO_PARAM_LIST_INITIALIZER;
134 	bool		is_replication_connection = false;
135 	bool		parse_success = false;
136 
137 	initialize_conninfo_params(&conninfo_params, false);
138 
139 	parse_success = parse_conninfo_string(conninfo, &conninfo_params, &errmsg, false);
140 
141 	if (parse_success == false)
142 	{
143 		log_error(_("unable to parse provided conninfo string \"%s\""), conninfo);
144 		log_detail("%s", errmsg);
145 		free_conninfo_params(&conninfo_params);
146 		return NULL;
147 	}
148 
149 	/* set some default values if not explicitly provided */
150 	param_set_ine(&conninfo_params, "connect_timeout", "2");
151 	param_set_ine(&conninfo_params, "fallback_application_name", "repmgr");
152 
153 	if (param_get(&conninfo_params, "replication") != NULL)
154 		is_replication_connection = true;
155 
156 	/* use a secure search_path */
157 	param_set(&conninfo_params, "options", "-csearch_path=");
158 
159 	connection_string = param_list_to_string(&conninfo_params);
160 
161 	log_debug(_("connecting to: \"%s\""), connection_string);
162 
163 	conn = PQconnectdb(connection_string);
164 
165 	/* Check to see that the backend connection was successfully made */
166 	if ((PQstatus(conn) != CONNECTION_OK))
167 	{
168 		bool		emit_log = true;
169 
170 		if (verbose_only == true && verbose_logging == false)
171 			emit_log = false;
172 
173 		if (emit_log)
174 		{
175 			if (log_notice)
176 			{
177 				log_notice(_("connection to database failed"));
178 				log_detail("\n%s", PQerrorMessage(conn));
179 			}
180 			else
181 			{
182 				log_error(_("connection to database failed"));
183 				log_detail("\n%s", PQerrorMessage(conn));
184 			}
185 			log_detail(_("attempted to connect using:\n  %s"),
186 					   connection_string);
187 		}
188 
189 		if (exit_on_error)
190 		{
191 			PQfinish(conn);
192 			free_conninfo_params(&conninfo_params);
193 			exit(ERR_DB_CONN);
194 		}
195 	}
196 
197 	/*
198 	 * set "synchronous_commit" to "local" in case synchronous replication is
199 	 * in use
200 	 *
201 	 * XXX set this explicitly before any write operations
202 	 */
203 
204 	else if (is_replication_connection == false &&
205 			 set_config(conn, "synchronous_commit", "local") == false)
206 	{
207 		if (exit_on_error)
208 		{
209 			PQfinish(conn);
210 			free_conninfo_params(&conninfo_params);
211 			exit(ERR_DB_CONN);
212 		}
213 	}
214 
215 	pfree(connection_string);
216 	free_conninfo_params(&conninfo_params);
217 
218 	return conn;
219 }
220 
221 
222 /*
223  * Establish a database connection, optionally exit on error
224  */
225 PGconn *
establish_db_connection(const char * conninfo,const bool exit_on_error)226 establish_db_connection(const char *conninfo, const bool exit_on_error)
227 {
228 	return _establish_db_connection(conninfo, exit_on_error, false, false);
229 }
230 
231 /*
232  * Attempt to establish a database connection, never exit on error, only
233  * output error messages if --verbose option used
234  */
235 PGconn *
establish_db_connection_quiet(const char * conninfo)236 establish_db_connection_quiet(const char *conninfo)
237 {
238 	return _establish_db_connection(conninfo, false, false, true);
239 }
240 
241 
242 PGconn *
establish_db_connection_with_replacement_param(const char * conninfo,const char * param,const char * value,const bool exit_on_error)243 establish_db_connection_with_replacement_param(const char *conninfo,
244 											   const char *param,
245 											   const char *value,
246 											   const bool exit_on_error)
247 {
248 	t_conninfo_param_list node_conninfo = T_CONNINFO_PARAM_LIST_INITIALIZER;
249 	char	   *errmsg = NULL;
250 	bool		parse_success = false;
251 	PGconn	   *conn = NULL;
252 
253 	initialize_conninfo_params(&node_conninfo, false);
254 
255 	parse_success = parse_conninfo_string(conninfo,
256 										  &node_conninfo,
257 										  &errmsg, false);
258 
259 	if (parse_success == false)
260 	{
261 		log_error(_("unable to parse conninfo string \"%s\" for local node"),
262 				  conninfo);
263 		log_detail("%s", errmsg);
264 
265 		if (exit_on_error == true)
266 			exit(ERR_BAD_CONFIG);
267 
268 		return NULL;
269 	}
270 
271 	param_set(&node_conninfo,
272 			  param,
273 			  value);
274 
275 	conn = establish_db_connection_by_params(&node_conninfo, exit_on_error);
276 
277 	free_conninfo_params(&node_conninfo);
278 
279 	return conn;
280 }
281 
282 PGconn *
establish_primary_db_connection(PGconn * conn,const bool exit_on_error)283 establish_primary_db_connection(PGconn *conn,
284 								const bool exit_on_error)
285 {
286 	t_node_info primary_node_info = T_NODE_INFO_INITIALIZER;
287 	bool		primary_record_found = get_primary_node_record(conn, &primary_node_info);
288 
289 	if (primary_record_found == false)
290 	{
291 		return NULL;
292 	}
293 
294 	return establish_db_connection(primary_node_info.conninfo,
295 								   exit_on_error);
296 }
297 
298 
299 PGconn *
establish_db_connection_by_params(t_conninfo_param_list * param_list,const bool exit_on_error)300 establish_db_connection_by_params(t_conninfo_param_list *param_list,
301 								  const bool exit_on_error)
302 {
303 	PGconn	   *conn = NULL;
304 
305 	/* set some default values if not explicitly provided */
306 	param_set_ine(param_list, "connect_timeout", "2");
307 	param_set_ine(param_list, "fallback_application_name", "repmgr");
308 
309 	/* use a secure search_path */
310 	param_set(param_list, "options", "-csearch_path=");
311 
312 	/* Connect to the database using the provided parameters */
313 	conn = PQconnectdbParams((const char **) param_list->keywords, (const char **) param_list->values, true);
314 
315 	/* Check to see that the backend connection was successfully made */
316 	if ((PQstatus(conn) != CONNECTION_OK))
317 	{
318 		log_error(_("connection to database failed"));
319 		log_detail("\n%s", PQerrorMessage(conn));
320 
321 		if (exit_on_error)
322 		{
323 			PQfinish(conn);
324 			exit(ERR_DB_CONN);
325 		}
326 	}
327 	else
328 	{
329 		bool		is_replication_connection = false;
330 		int			i;
331 
332 		/*
333 		 * set "synchronous_commit" to "local" in case synchronous replication
334 		 * is in use (provided this is not a replication connection)
335 		 */
336 
337 		for (i = 0; param_list->keywords[i]; i++)
338 		{
339 			if (strcmp(param_list->keywords[i], "replication") == 0)
340 				is_replication_connection = true;
341 		}
342 
343 		if (is_replication_connection == false && set_config(conn, "synchronous_commit", "local") == false)
344 		{
345 			if (exit_on_error)
346 			{
347 				PQfinish(conn);
348 				exit(ERR_DB_CONN);
349 			}
350 		}
351 	}
352 
353 	return conn;
354 }
355 
356 
357 /*
358  * Given an existing active connection and the name of a replication
359  * user, extract the connection parameters from that connection and
360  * attempt to return a replication connection.
361  */
362 PGconn *
establish_replication_connection_from_conn(PGconn * conn,const char * repluser)363 establish_replication_connection_from_conn(PGconn *conn, const char *repluser)
364 {
365 	return _establish_replication_connection_from_params(conn, NULL, repluser);
366 }
367 
368 
369 PGconn *
establish_replication_connection_from_conninfo(const char * conninfo,const char * repluser)370 establish_replication_connection_from_conninfo(const char *conninfo, const char *repluser)
371 {
372 	return _establish_replication_connection_from_params(NULL, conninfo, repluser);
373 }
374 
375 
376 static PGconn *
_establish_replication_connection_from_params(PGconn * conn,const char * conninfo,const char * repluser)377 _establish_replication_connection_from_params(PGconn *conn, const char *conninfo, const char *repluser)
378 {
379 	t_conninfo_param_list repl_conninfo = T_CONNINFO_PARAM_LIST_INITIALIZER;
380 	PGconn *repl_conn = NULL;
381 
382 	initialize_conninfo_params(&repl_conninfo, false);
383 
384 	if (conn != NULL)
385 		conn_to_param_list(conn, &repl_conninfo);
386 	else if (conninfo != NULL)
387 		parse_conninfo_string(conninfo, &repl_conninfo, NULL, false);
388 
389 	/* Set the provided replication user */
390 	param_set(&repl_conninfo, "user", repluser);
391 	param_set(&repl_conninfo, "replication", "1");
392 	param_set(&repl_conninfo, "dbname", "replication");
393 
394 	repl_conn = establish_db_connection_by_params(&repl_conninfo, false);
395 	free_conninfo_params(&repl_conninfo);
396 
397 	return repl_conn;
398 }
399 
400 
401 PGconn *
get_primary_connection(PGconn * conn,int * primary_id,char * primary_conninfo_out)402 get_primary_connection(PGconn *conn,
403 					   int *primary_id, char *primary_conninfo_out)
404 {
405 	return _get_primary_connection(conn, primary_id, primary_conninfo_out, false);
406 }
407 
408 
409 PGconn *
get_primary_connection_quiet(PGconn * conn,int * primary_id,char * primary_conninfo_out)410 get_primary_connection_quiet(PGconn *conn,
411 							 int *primary_id, char *primary_conninfo_out)
412 {
413 	return _get_primary_connection(conn, primary_id, primary_conninfo_out, true);
414 }
415 
416 PGconn *
duplicate_connection(PGconn * conn,const char * user,bool replication)417 duplicate_connection(PGconn *conn, const char *user, bool replication)
418 {
419 	t_conninfo_param_list conninfo = T_CONNINFO_PARAM_LIST_INITIALIZER;
420 	PGconn *duplicate_conn = NULL;
421 
422 	initialize_conninfo_params(&conninfo, false);
423 	conn_to_param_list(conn, &conninfo);
424 
425 	if (user != NULL)
426 		param_set(&conninfo, "user", user);
427 
428 	if (replication == true)
429 		param_set(&conninfo, "replication", "1");
430 
431 	duplicate_conn = establish_db_connection_by_params(&conninfo, false);
432 
433 	free_conninfo_params(&conninfo);
434 
435 	return duplicate_conn;
436 }
437 
438 
439 
440 void
close_connection(PGconn ** conn)441 close_connection(PGconn **conn)
442 {
443 	if (*conn == NULL)
444 		return;
445 
446 	PQfinish(*conn);
447 
448 	*conn = NULL;
449 }
450 
451 
452 /* =============================== */
453 /* conninfo manipulation functions */
454 /* =============================== */
455 
456 
457 /*
458  * get_conninfo_value()
459  *
460  * Extract the value represented by 'keyword' in 'conninfo' and copy
461  * it to the 'output' buffer.
462  *
463  * Returns true on success, or false on failure (conninfo string could
464  * not be parsed, or provided keyword not found).
465  */
466 
467 bool
get_conninfo_value(const char * conninfo,const char * keyword,char * output)468 get_conninfo_value(const char *conninfo, const char *keyword, char *output)
469 {
470 	PQconninfoOption *conninfo_options = NULL;
471 	PQconninfoOption *conninfo_option = NULL;
472 
473 	conninfo_options = PQconninfoParse(conninfo, NULL);
474 
475 	if (conninfo_options == NULL)
476 	{
477 		log_error(_("unable to parse provided conninfo string \"%s\""), conninfo);
478 		return false;
479 	}
480 
481 	for (conninfo_option = conninfo_options; conninfo_option->keyword != NULL; conninfo_option++)
482 	{
483 		if (strcmp(conninfo_option->keyword, keyword) == 0)
484 		{
485 			if (conninfo_option->val != NULL && conninfo_option->val[0] != '\0')
486 			{
487 				strncpy(output, conninfo_option->val, MAXLEN);
488 				break;
489 			}
490 		}
491 	}
492 
493 	PQconninfoFree(conninfo_options);
494 
495 	return true;
496 }
497 
498 
499 /*
500  * Get a default conninfo value for the provided parameter, and copy
501  * it to the 'output' buffer.
502  *
503  * Returns true on success, or false on failure (provided keyword not found).
504  *
505  */
506 bool
get_conninfo_default_value(const char * param,char * output,int maxlen)507 get_conninfo_default_value(const char *param, char *output, int maxlen)
508 {
509 	PQconninfoOption *defs = NULL;
510 	PQconninfoOption *def = NULL;
511 	bool found = false;
512 
513 	defs = PQconndefaults();
514 
515 	for (def = defs; def->keyword; def++)
516 	{
517 		if (strncmp(def->keyword, param, maxlen) == 0)
518 		{
519 			strncpy(output, def->val, maxlen);
520 			found = true;
521 		}
522 	}
523 
524 	PQconninfoFree(defs);
525 
526 	return found;
527 }
528 
529 
530 void
initialize_conninfo_params(t_conninfo_param_list * param_list,bool set_defaults)531 initialize_conninfo_params(t_conninfo_param_list *param_list, bool set_defaults)
532 {
533 	PQconninfoOption *defs = NULL;
534 	PQconninfoOption *def = NULL;
535 	int			c;
536 
537 	defs = PQconndefaults();
538 	param_list->size = 0;
539 
540 	/* Count maximum number of parameters */
541 	for (def = defs; def->keyword; def++)
542 		param_list->size++;
543 
544 	/* Initialize our internal parameter list */
545 	param_list->keywords = pg_malloc0(sizeof(char *) * (param_list->size + 1));
546 	param_list->values = pg_malloc0(sizeof(char *) * (param_list->size + 1));
547 
548 	for (c = 0; c < param_list->size; c++)
549 	{
550 		param_list->keywords[c] = NULL;
551 		param_list->values[c] = NULL;
552 	}
553 
554 	if (set_defaults == true)
555 	{
556 		/* Pre-set any defaults */
557 
558 		for (def = defs; def->keyword; def++)
559 		{
560 			if (def->val != NULL && def->val[0] != '\0')
561 			{
562 				param_set(param_list, def->keyword, def->val);
563 			}
564 		}
565 	}
566 
567 	PQconninfoFree(defs);
568 }
569 
570 
571 void
free_conninfo_params(t_conninfo_param_list * param_list)572 free_conninfo_params(t_conninfo_param_list *param_list)
573 {
574 	int			c;
575 
576 	for (c = 0; c < param_list->size; c++)
577 	{
578 		if (param_list->keywords != NULL && param_list->keywords[c] != NULL)
579 			pfree(param_list->keywords[c]);
580 
581 		if (param_list->values != NULL && param_list->values[c] != NULL)
582 			pfree(param_list->values[c]);
583 	}
584 
585 	if (param_list->keywords != NULL)
586 		pfree(param_list->keywords);
587 
588 	if (param_list->values != NULL)
589 		pfree(param_list->values);
590 }
591 
592 
593 
594 void
copy_conninfo_params(t_conninfo_param_list * dest_list,t_conninfo_param_list * source_list)595 copy_conninfo_params(t_conninfo_param_list *dest_list, t_conninfo_param_list *source_list)
596 {
597 	int			c;
598 
599 	for (c = 0; c < source_list->size && source_list->keywords[c] != NULL; c++)
600 	{
601 		if (source_list->values[c] != NULL && source_list->values[c][0] != '\0')
602 		{
603 			param_set(dest_list, source_list->keywords[c], source_list->values[c]);
604 		}
605 	}
606 }
607 
608 void
param_set(t_conninfo_param_list * param_list,const char * param,const char * value)609 param_set(t_conninfo_param_list *param_list, const char *param, const char *value)
610 {
611 	int			c;
612 	int			value_len = strlen(value) + 1;
613 	int			param_len;
614 
615 	/*
616 	 * Scan array to see if the parameter is already set - if not, replace it
617 	 */
618 	for (c = 0; c < param_list->size && param_list->keywords[c] != NULL; c++)
619 	{
620 		if (strcmp(param_list->keywords[c], param) == 0)
621 		{
622 			if (param_list->values[c] != NULL)
623 				pfree(param_list->values[c]);
624 
625 			param_list->values[c] = pg_malloc0(value_len);
626 			strncpy(param_list->values[c], value, value_len);
627 
628 			return;
629 		}
630 	}
631 
632 	/*
633 	 * Sanity-check that the caller is not trying to overflow the array;
634 	 * in practice this is highly unlikely, and if it ever happens, this means
635 	 * something is highly wrong.
636 	 */
637 	Assert(c < param_list->size);
638 
639 	/*
640 	 * Parameter not in array - add it and its associated value
641 	 */
642 	param_len = strlen(param) + 1;
643 
644 	param_list->keywords[c] = pg_malloc0(param_len);
645 	param_list->values[c] = pg_malloc0(value_len);
646 
647 	strncpy(param_list->keywords[c], param, param_len);
648 	strncpy(param_list->values[c], value, value_len);
649 }
650 
651 
652 /*
653  * Like param_set(), but will only set the parameter if it doesn't exist
654  */
655 void
param_set_ine(t_conninfo_param_list * param_list,const char * param,const char * value)656 param_set_ine(t_conninfo_param_list *param_list, const char *param, const char *value)
657 {
658 	int			c;
659 	int			value_len = strlen(value) + 1;
660 	int			param_len;
661 
662 	/*
663 	 * Scan array to see if the parameter is already set - if so, do nothing
664 	 */
665 	for (c = 0; c < param_list->size && param_list->keywords[c] != NULL; c++)
666 	{
667 		if (strcmp(param_list->keywords[c], param) == 0)
668 		{
669 			/* parameter exists, do nothing */
670 			return;
671 		}
672 	}
673 
674 	/*
675 	 * Sanity-check that the caller is not trying to overflow the array;
676 	 * in practice this is highly unlikely, and if it ever happens, this means
677 	 * something is highly wrong.
678 	 */
679 	Assert(c < param_list->size);
680 
681 	/*
682 	 * Parameter not in array - add it and its associated value
683 	 */
684 	param_len = strlen(param) + 1;
685 
686 	param_list->keywords[c] = pg_malloc0(param_len);
687 	param_list->values[c] = pg_malloc0(value_len);
688 
689 	strncpy(param_list->keywords[c], param, param_len);
690 	strncpy(param_list->values[c], value, value_len);
691 }
692 
693 
694 char *
param_get(t_conninfo_param_list * param_list,const char * param)695 param_get(t_conninfo_param_list *param_list, const char *param)
696 {
697 	int			c;
698 
699 	for (c = 0; c < param_list->size && param_list->keywords[c] != NULL; c++)
700 	{
701 		if (strcmp(param_list->keywords[c], param) == 0)
702 		{
703 			if (param_list->values[c] != NULL && param_list->values[c][0] != '\0')
704 				return param_list->values[c];
705 			else
706 				return NULL;
707 		}
708 	}
709 
710 	return NULL;
711 }
712 
713 
714 /*
715  * Validate a conninfo string by attempting to parse it.
716  *
717  * "errmsg": passed to PQconninfoParse(), may be NULL
718  *
719  * NOTE: PQconninfoParse() verifies the string format and checks for
720  * valid options but does not sanity check values.
721  */
722 
723 bool
validate_conninfo_string(const char * conninfo_str,char ** errmsg)724 validate_conninfo_string(const char *conninfo_str, char **errmsg)
725 {
726 	PQconninfoOption *connOptions = NULL;
727 
728 	connOptions = PQconninfoParse(conninfo_str, errmsg);
729 
730 	if (connOptions == NULL)
731 		return false;
732 
733 	return true;
734 }
735 
736 
737 /*
738  * Parse a conninfo string into a t_conninfo_param_list
739  *
740  * See conn_to_param_list() to do the same for a PGconn.
741  *
742  * "errmsg": passed to PQconninfoParse(), may be NULL
743  *
744  * "ignore_local_params": ignores those parameters specific
745  * to a local installation, i.e. when parsing an upstream
746  * node's conninfo string for inclusion into "primary_conninfo",
747  * don't copy that node's values
748  */
749 bool
parse_conninfo_string(const char * conninfo_str,t_conninfo_param_list * param_list,char ** errmsg,bool ignore_local_params)750 parse_conninfo_string(const char *conninfo_str, t_conninfo_param_list *param_list, char **errmsg, bool ignore_local_params)
751 {
752 	PQconninfoOption *connOptions = NULL;
753 	PQconninfoOption *option = NULL;
754 
755 	connOptions = PQconninfoParse(conninfo_str, errmsg);
756 
757 	if (connOptions == NULL)
758 		return false;
759 
760 	for (option = connOptions; option && option->keyword; option++)
761 	{
762 		/* Ignore non-set or blank parameter values */
763 		if (option->val == NULL || option->val[0] == '\0')
764 			continue;
765 
766 		/* Ignore settings specific to the upstream node */
767 		if (ignore_local_params == true)
768 		{
769 			if (strcmp(option->keyword, "application_name") == 0)
770 				continue;
771 			if (strcmp(option->keyword, "passfile") == 0)
772 				continue;
773 			if (strcmp(option->keyword, "servicefile") == 0)
774 				continue;
775 		}
776 		param_set(param_list, option->keyword, option->val);
777 	}
778 
779 	PQconninfoFree(connOptions);
780 
781 	return true;
782 }
783 
784 
785 /*
786  * Parse a PGconn into a t_conninfo_param_list
787  *
788  * See parse_conninfo_string() to do the same for a conninfo string
789  *
790  * NOTE: the current use case for this is to take an active connection,
791  * replace the existing username (typically replacing it with the superuser
792  * or replication user name), and make a new connection as that user.
793  * If the "password" field is set, it will cause any connection made with
794  * these parameters to fail (unless of course the password happens to be the
795  * same). Therefore we remove the password altogether, and rely on it being
796  * available via .pgpass.
797  */
798 void
conn_to_param_list(PGconn * conn,t_conninfo_param_list * param_list)799 conn_to_param_list(PGconn *conn, t_conninfo_param_list *param_list)
800 {
801 	PQconninfoOption *connOptions = NULL;
802 	PQconninfoOption *option = NULL;
803 
804 	connOptions = PQconninfo(conn);
805 	for (option = connOptions; option && option->keyword; option++)
806 	{
807 		/* Ignore non-set or blank parameter values */
808 		if (option->val == NULL || option->val[0] == '\0')
809 			continue;
810 
811 		/* Ignore "password" */
812 		if (strcmp(option->keyword, "password") == 0)
813 			continue;
814 
815 		param_set(param_list, option->keyword, option->val);
816 	}
817 
818 	PQconninfoFree(connOptions);
819 }
820 
821 
822 /*
823  * Converts param list to string; caller must free returned pointer
824  */
825 char *
param_list_to_string(t_conninfo_param_list * param_list)826 param_list_to_string(t_conninfo_param_list *param_list)
827 {
828 	int			c;
829 	PQExpBufferData conninfo_buf;
830 	char	   *conninfo_str = NULL;
831 	int			len = 0;
832 
833 	initPQExpBuffer(&conninfo_buf);
834 
835 	for (c = 0; c < param_list->size && param_list->keywords[c] != NULL; c++)
836 	{
837 		if (param_list->values[c] != NULL && param_list->values[c][0] != '\0')
838 		{
839 			if (c > 0)
840 				appendPQExpBufferChar(&conninfo_buf, ' ');
841 
842 			/* XXX escape value */
843 			appendPQExpBuffer(&conninfo_buf,
844 							  "%s=%s",
845 							  param_list->keywords[c],
846 							  param_list->values[c]);
847 		}
848 	}
849 
850 	len = strlen(conninfo_buf.data) + 1;
851 	conninfo_str = pg_malloc0(len);
852 
853 	strncpy(conninfo_str, conninfo_buf.data, len);
854 
855 	termPQExpBuffer(&conninfo_buf);
856 
857 	return conninfo_str;
858 }
859 
860 
861 /*
862  * Run a conninfo string through the parser, and pass it back as a normal
863  * conninfo string. This is mainly intended for converting connection URIs
864  * to parameter/value conninfo strings.
865  *
866  * Caller must free returned pointer.
867  */
868 
869 char *
normalize_conninfo_string(const char * conninfo_str)870 normalize_conninfo_string(const char *conninfo_str)
871 {
872 	t_conninfo_param_list conninfo_params = T_CONNINFO_PARAM_LIST_INITIALIZER;
873 	bool		parse_success = false;
874 	char	   *normalized_string = NULL;
875 	char	   *errmsg = NULL;
876 
877 	initialize_conninfo_params(&conninfo_params, false);
878 
879 	parse_success = parse_conninfo_string(conninfo_str, &conninfo_params, &errmsg, false);
880 
881 	if (parse_success == false)
882 	{
883 		log_error(_("unable to parse provided conninfo string \"%s\""), conninfo_str);
884 		log_detail("%s", errmsg);
885 		free_conninfo_params(&conninfo_params);
886 		return NULL;
887 	}
888 
889 
890 	normalized_string = param_list_to_string(&conninfo_params);
891 	free_conninfo_params(&conninfo_params);
892 
893 	return normalized_string;
894 }
895 
896 /*
897  * check whether the libpq version in use recognizes the "passfile" parameter
898  * (should be 9.6 and later)
899  */
900 bool
has_passfile(void)901 has_passfile(void)
902 {
903 	PQconninfoOption *defs = PQconndefaults();
904 	PQconninfoOption *def = NULL;
905     bool has_passfile = false;
906 
907    	for (def = defs; def->keyword; def++)
908     {
909         if (strcmp(def->keyword, "passfile") == 0)
910         {
911             has_passfile = true;
912             break;
913         }
914     }
915 
916 	PQconninfoFree(defs);
917 
918 	return has_passfile;
919 }
920 
921 
922 
923 /* ===================== */
924 /* transaction functions */
925 /* ===================== */
926 
927 bool
begin_transaction(PGconn * conn)928 begin_transaction(PGconn *conn)
929 {
930 	PGresult   *res = NULL;
931 
932 	log_verbose(LOG_DEBUG, "begin_transaction()");
933 
934 	res = PQexec(conn, "BEGIN");
935 
936 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
937 	{
938 		log_error(_("unable to begin transaction"));
939 		log_detail("%s", PQerrorMessage(conn));
940 
941 		PQclear(res);
942 		return false;
943 	}
944 
945 	PQclear(res);
946 
947 	return true;
948 }
949 
950 
951 bool
commit_transaction(PGconn * conn)952 commit_transaction(PGconn *conn)
953 {
954 	PGresult   *res = NULL;
955 
956 	log_verbose(LOG_DEBUG, "commit_transaction()");
957 
958 	res = PQexec(conn, "COMMIT");
959 
960 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
961 	{
962 		log_error(_("unable to commit transaction"));
963 		log_detail("%s", PQerrorMessage(conn));
964 		PQclear(res);
965 
966 		return false;
967 	}
968 
969 	PQclear(res);
970 
971 	return true;
972 }
973 
974 
975 bool
rollback_transaction(PGconn * conn)976 rollback_transaction(PGconn *conn)
977 {
978 	PGresult   *res = NULL;
979 
980 	log_verbose(LOG_DEBUG, "rollback_transaction()");
981 
982 	res = PQexec(conn, "ROLLBACK");
983 
984 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
985 	{
986 		log_error(_("unable to rollback transaction"));
987 		log_detail("%s", PQerrorMessage(conn));
988 		PQclear(res);
989 
990 		return false;
991 	}
992 
993 	PQclear(res);
994 
995 	return true;
996 }
997 
998 
999 /* ========================== */
1000 /* GUC manipulation functions */
1001 /* ========================== */
1002 
1003 static bool
_set_config(PGconn * conn,const char * config_param,const char * sqlquery)1004 _set_config(PGconn *conn, const char *config_param, const char *sqlquery)
1005 {
1006 	bool		success = true;
1007 	PGresult   *res = PQexec(conn, sqlquery);
1008 
1009 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
1010 	{
1011 		log_db_error(conn, sqlquery, "_set_config(): unable to set \"%s\"", config_param);
1012 		success = false;
1013 	}
1014 
1015 	PQclear(res);
1016 
1017 	return success;
1018 }
1019 
1020 
1021 bool
set_config(PGconn * conn,const char * config_param,const char * config_value)1022 set_config(PGconn *conn, const char *config_param, const char *config_value)
1023 {
1024 	PQExpBufferData query;
1025 	bool		result = false;
1026 
1027 	initPQExpBuffer(&query);
1028 	appendPQExpBuffer(&query,
1029 					  "SET %s TO '%s'",
1030 					  config_param,
1031 					  config_value);
1032 
1033 	log_verbose(LOG_DEBUG, "set_config():\n  %s", query.data);
1034 
1035 	result = _set_config(conn, config_param, query.data);
1036 
1037 	termPQExpBuffer(&query);
1038 
1039 	return result;
1040 }
1041 
1042 bool
set_config_bool(PGconn * conn,const char * config_param,bool state)1043 set_config_bool(PGconn *conn, const char *config_param, bool state)
1044 {
1045 	PQExpBufferData query;
1046 	bool		result = false;
1047 
1048 	initPQExpBuffer(&query);
1049 	appendPQExpBuffer(&query,
1050 					  "SET %s TO %s",
1051 					  config_param,
1052 					  state ? "TRUE" : "FALSE");
1053 
1054 	log_verbose(LOG_DEBUG, "set_config_bool():\n  %s", query.data);
1055 
1056 
1057 	result = _set_config(conn, config_param, query.data);
1058 
1059 	termPQExpBuffer(&query);
1060 
1061 	return result;
1062 }
1063 
1064 
1065 int
guc_set(PGconn * conn,const char * parameter,const char * op,const char * value)1066 guc_set(PGconn *conn, const char *parameter, const char *op,
1067 		const char *value)
1068 {
1069 	PQExpBufferData query;
1070 	PGresult   *res = NULL;
1071 	int			retval = 1;
1072 
1073 	char	   *escaped_parameter = escape_string(conn, parameter);
1074 	char	   *escaped_value = escape_string(conn, value);
1075 
1076 	initPQExpBuffer(&query);
1077 	appendPQExpBuffer(&query,
1078 					  "SELECT true FROM pg_catalog.pg_settings "
1079 					  " WHERE name = '%s' AND setting %s '%s'",
1080 					  escaped_parameter, op, escaped_value);
1081 
1082 	log_verbose(LOG_DEBUG, "guc_set():\n%s", query.data);
1083 
1084 	res = PQexec(conn, query.data);
1085 
1086 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
1087 	{
1088 		log_db_error(conn, query.data, _("guc_set(): unable to execute query"));
1089 		retval = -1;
1090 	}
1091 	else if (PQntuples(res) == 0)
1092 	{
1093 		retval = 0;
1094 	}
1095 
1096 	pfree(escaped_parameter);
1097 	pfree(escaped_value);
1098 	termPQExpBuffer(&query);
1099 	PQclear(res);
1100 
1101 	return retval;
1102 }
1103 
1104 
1105 bool
get_pg_setting(PGconn * conn,const char * setting,char * output)1106 get_pg_setting(PGconn *conn, const char *setting, char *output)
1107 {
1108 	bool success = _get_pg_setting(conn, setting, output, NULL, NULL);
1109 
1110 	if (success == true)
1111 	{
1112 		log_verbose(LOG_DEBUG, _("get_pg_setting(): returned value is \"%s\""), output);
1113 	}
1114 
1115 	return success;
1116 }
1117 
1118 bool
get_pg_setting_bool(PGconn * conn,const char * setting,bool * output)1119 get_pg_setting_bool(PGconn *conn, const char *setting, bool *output)
1120 {
1121 	bool success = _get_pg_setting(conn, setting, NULL, output, NULL);
1122 
1123 	if (success == true)
1124 	{
1125 		log_verbose(LOG_DEBUG, _("get_pg_setting(): returned value is \"%s\""),
1126 					*output == true ? "TRUE" : "FALSE");
1127 	}
1128 
1129 	return success;
1130 }
1131 
1132 bool
get_pg_setting_int(PGconn * conn,const char * setting,int * output)1133 get_pg_setting_int(PGconn *conn, const char *setting, int *output)
1134 {
1135 	bool success = _get_pg_setting(conn, setting, NULL, NULL, output);
1136 
1137 	if (success == true)
1138 	{
1139 		log_verbose(LOG_DEBUG, _("get_pg_setting_int(): returned value is \"%i\""), *output);
1140 	}
1141 
1142 	return success;
1143 }
1144 
1145 
1146 bool
_get_pg_setting(PGconn * conn,const char * setting,char * str_output,bool * bool_output,int * int_output)1147 _get_pg_setting(PGconn *conn, const char *setting, char *str_output, bool *bool_output, int *int_output)
1148 {
1149 	PQExpBufferData query;
1150 	PGresult   *res = NULL;
1151 	int			i;
1152 	bool		success = false;
1153 
1154 	char	   *escaped_setting = escape_string(conn, setting);
1155 
1156 	if (escaped_setting == NULL)
1157 	{
1158 		log_error(_("unable to escape setting \"%s\""), setting);
1159 		return false;
1160 	}
1161 
1162 	initPQExpBuffer(&query);
1163 	appendPQExpBuffer(&query,
1164 					  "SELECT name, setting "
1165 					  "  FROM pg_catalog.pg_settings WHERE name = '%s'",
1166 					  escaped_setting);
1167 
1168 	log_verbose(LOG_DEBUG, "get_pg_setting():\n  %s", query.data);
1169 
1170 	res = PQexec(conn, query.data);
1171 
1172 	pfree(escaped_setting);
1173 
1174 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
1175 	{
1176 		log_db_error(conn, query.data, _("get_pg_setting() - unable to execute query"));
1177 
1178 		termPQExpBuffer(&query);
1179 		PQclear(res);
1180 
1181 		return false;
1182 	}
1183 
1184 	for (i = 0; i < PQntuples(res); i++)
1185 	{
1186 		if (strcmp(PQgetvalue(res, i, 0), setting) == 0)
1187 		{
1188 			if (str_output != NULL)
1189 			{
1190 				snprintf(str_output, MAXLEN, "%s", PQgetvalue(res, i, 1));
1191 			}
1192 			else if (bool_output != NULL)
1193 			{
1194 				/*
1195 				 * Note we assume the caller is sure this is a boolean parameter
1196 				 */
1197 				if (strncmp(PQgetvalue(res, i, 1), "on", MAXLEN) == 0)
1198 					*bool_output = true;
1199 				else
1200 					*bool_output = false;
1201 			}
1202 			else if (int_output != NULL)
1203 			{
1204 				*int_output = atoi(PQgetvalue(res, i, 1));
1205 			}
1206 
1207 			success = true;
1208 			break;
1209 		}
1210 		else
1211 		{
1212 			/* highly unlikely this would ever happen */
1213 			log_error(_("get_pg_setting(): unknown parameter \"%s\""), PQgetvalue(res, i, 0));
1214 		}
1215 	}
1216 
1217 
1218 	termPQExpBuffer(&query);
1219 	PQclear(res);
1220 
1221 	return success;
1222 }
1223 
1224 
1225 
1226 bool
alter_system_int(PGconn * conn,const char * name,int value)1227 alter_system_int(PGconn *conn, const char *name, int value)
1228 {
1229 	PQExpBufferData query;
1230 	PGresult   *res = NULL;
1231 	bool		success = true;
1232 
1233 	initPQExpBuffer(&query);
1234 	appendPQExpBuffer(&query,
1235 					  "ALTER SYSTEM SET %s = %i",
1236 					  name, value);
1237 
1238 	res = PQexec(conn, query.data);
1239 
1240 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
1241 	{
1242 		log_db_error(conn, query.data, _("alter_system_int() - unable to execute query"));
1243 
1244 		success = false;
1245 	}
1246 
1247 	termPQExpBuffer(&query);
1248 	PQclear(res);
1249 
1250 	return success;
1251 }
1252 
1253 
1254 bool
pg_reload_conf(PGconn * conn)1255 pg_reload_conf(PGconn *conn)
1256 {
1257 	PGresult   *res = NULL;
1258 	bool		success = false;
1259 
1260 	res = PQexec(conn, "SELECT pg_catalog.pg_reload_conf()");
1261 
1262 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
1263 	{
1264 		log_db_error(conn, NULL, _("pg_reload_conf() - unable to execute query"));
1265 
1266 		success = false;
1267 	}
1268 
1269 	PQclear(res);
1270 
1271 	return success;
1272 }
1273 
1274 
1275 /* ============================ */
1276 /* Server information functions */
1277 /* ============================ */
1278 
1279 
1280 bool
get_cluster_size(PGconn * conn,char * size)1281 get_cluster_size(PGconn *conn, char *size)
1282 {
1283 	PQExpBufferData query;
1284 	PGresult   *res = NULL;
1285 	bool		success = true;
1286 
1287 	initPQExpBuffer(&query);
1288 	appendPQExpBufferStr(&query,
1289 						 "SELECT pg_catalog.pg_size_pretty(pg_catalog.sum(pg_catalog.pg_database_size(oid))::bigint) "
1290 						 "	 FROM pg_catalog.pg_database ");
1291 
1292 	log_verbose(LOG_DEBUG, "get_cluster_size():\n%s", query.data);
1293 
1294 	res = PQexec(conn, query.data);
1295 
1296 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
1297 	{
1298 		log_db_error(conn, query.data, _("get_cluster_size(): unable to execute query"));
1299 		success = false;
1300 	}
1301 	else
1302 	{
1303 		snprintf(size, MAXLEN, "%s", PQgetvalue(res, 0, 0));
1304 	}
1305 
1306 	termPQExpBuffer(&query);
1307 	PQclear(res);
1308 
1309 	return success;
1310 }
1311 
1312 
1313 /*
1314  * Return the server version number for the connection provided
1315  */
1316 int
get_server_version(PGconn * conn,char * server_version_buf)1317 get_server_version(PGconn *conn, char *server_version_buf)
1318 {
1319 	PGresult   *res = NULL;
1320 	int			_server_version_num = UNKNOWN_SERVER_VERSION_NUM;
1321 
1322 	const char	   *sqlquery =
1323 		"SELECT pg_catalog.current_setting('server_version_num'), "
1324 		"       pg_catalog.current_setting('server_version')";
1325 
1326 	res = PQexec(conn, sqlquery);
1327 
1328 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
1329 	{
1330 		log_db_error(conn, sqlquery, _("unable to determine server version number"));
1331 		PQclear(res);
1332 
1333 		return UNKNOWN_SERVER_VERSION_NUM;
1334 	}
1335 
1336 	_server_version_num = atoi(PQgetvalue(res, 0, 0));
1337 
1338 	if (server_version_buf != NULL)
1339 	{
1340 		int			i;
1341 		char		_server_version_buf[MAXVERSIONSTR] = "";
1342 
1343 		memset(_server_version_buf, 0, MAXVERSIONSTR);
1344 
1345 		/*
1346 		 * Some distributions may add extra info after the actual version number,
1347 		 * e.g. "10.4 (Debian 10.4-2.pgdg90+1)", so copy everything up until the
1348 		 * first space.
1349 		 */
1350 
1351 		snprintf(_server_version_buf, MAXVERSIONSTR, "%s", PQgetvalue(res, 0, 1));
1352 
1353 		for (i = 0; i < MAXVERSIONSTR; i++)
1354 		{
1355 			if (_server_version_buf[i] == ' ')
1356 				break;
1357 
1358 			*server_version_buf++ = _server_version_buf[i];
1359 		}
1360 	}
1361 
1362 	PQclear(res);
1363 
1364 	return _server_version_num;
1365 }
1366 
1367 
1368 RecoveryType
get_recovery_type(PGconn * conn)1369 get_recovery_type(PGconn *conn)
1370 {
1371 	PGresult   *res = NULL;
1372 	RecoveryType recovery_type = RECTYPE_UNKNOWN;
1373 
1374 	const char	   *sqlquery = "SELECT pg_catalog.pg_is_in_recovery()";
1375 
1376 	log_verbose(LOG_DEBUG, "get_recovery_type(): %s", sqlquery);
1377 
1378 	res = PQexec(conn, sqlquery);
1379 
1380 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
1381 	{
1382 		log_db_error(conn,
1383 					 sqlquery,
1384 					 _("unable to determine if server is in recovery"));
1385 
1386 		recovery_type = RECTYPE_UNKNOWN;
1387 	}
1388 	else if (PQntuples(res) == 1)
1389 	{
1390 		if (strcmp(PQgetvalue(res, 0, 0), "f") == 0)
1391 		{
1392 			recovery_type = RECTYPE_PRIMARY;
1393 		}
1394 		else
1395 		{
1396 			recovery_type = RECTYPE_STANDBY;
1397 		}
1398 	}
1399 
1400 	PQclear(res);
1401 
1402 	return recovery_type;
1403 }
1404 
1405 /*
1406  * Read the node list from the provided connection and attempt to connect to each node
1407  * in turn to definitely establish if it's the cluster primary.
1408  *
1409  * The node list is returned in the order which makes it likely that the
1410  * current primary will be returned first, reducing the number of speculative
1411  * connections which need to be made to other nodes.
1412  *
1413  * If primary_conninfo_out points to allocated memory of MAXCONNINFO in length,
1414  * the primary server's conninfo string will be copied there.
1415  */
1416 
1417 PGconn *
_get_primary_connection(PGconn * conn,int * primary_id,char * primary_conninfo_out,bool quiet)1418 _get_primary_connection(PGconn *conn,
1419 						int *primary_id, char *primary_conninfo_out, bool quiet)
1420 {
1421 	PQExpBufferData query;
1422 
1423 	PGconn	   *remote_conn = NULL;
1424 	PGresult   *res = NULL;
1425 
1426 	char		remote_conninfo_stack[MAXCONNINFO];
1427 	char	   *remote_conninfo = &*remote_conninfo_stack;
1428 
1429 	int			i,
1430 				node_id;
1431 
1432 	/*
1433 	 * If the caller wanted to get a copy of the connection info string, sub
1434 	 * out the local stack pointer for the pointer passed by the caller.
1435 	 */
1436 	if (primary_conninfo_out != NULL)
1437 		remote_conninfo = primary_conninfo_out;
1438 
1439 	if (primary_id != NULL)
1440 	{
1441 		*primary_id = NODE_NOT_FOUND;
1442 	}
1443 
1444 	/* find all registered nodes  */
1445 	log_verbose(LOG_INFO, _("searching for primary node"));
1446 
1447 	initPQExpBuffer(&query);
1448 	appendPQExpBufferStr(&query,
1449 						 "  SELECT node_id, conninfo, "
1450 						 "         CASE WHEN type = 'primary' THEN 1 ELSE 2 END AS type_priority"
1451 						 "	   FROM repmgr.nodes "
1452 						 "   WHERE active IS TRUE "
1453 						 "     AND type != 'witness' "
1454 						 "ORDER BY active DESC, type_priority, priority, node_id");
1455 
1456 	log_verbose(LOG_DEBUG, "get_primary_connection():\n%s", query.data);
1457 
1458 	res = PQexec(conn, query.data);
1459 
1460 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
1461 	{
1462 		log_db_error(conn, query.data, _("_get_primary_connection(): unable to retrieve node records"));
1463 
1464 		termPQExpBuffer(&query);
1465 		PQclear(res);
1466 
1467 		return NULL;
1468 	}
1469 
1470 	termPQExpBuffer(&query);
1471 
1472 	for (i = 0; i < PQntuples(res); i++)
1473 	{
1474 		RecoveryType recovery_type;
1475 
1476 		/* initialize with the values of the current node being processed */
1477 		node_id = atoi(PQgetvalue(res, i, 0));
1478 		snprintf(remote_conninfo, MAXCONNINFO, "%s", PQgetvalue(res, i, 1));
1479 
1480 		log_verbose(LOG_INFO,
1481 					_("checking if node %i is primary"),
1482 					node_id);
1483 
1484 		if (quiet)
1485 		{
1486 			remote_conn = establish_db_connection_quiet(remote_conninfo);
1487 		}
1488 		else
1489 		{
1490 			remote_conn = establish_db_connection(remote_conninfo, false);
1491 		}
1492 
1493 		if (PQstatus(remote_conn) != CONNECTION_OK)
1494 		{
1495 			PQfinish(remote_conn);
1496 			remote_conn = NULL;
1497 			continue;
1498 		}
1499 
1500 		recovery_type = get_recovery_type(remote_conn);
1501 
1502 		if (recovery_type == RECTYPE_UNKNOWN)
1503 		{
1504 			log_warning(_("unable to retrieve recovery state from node %i"),
1505 						node_id);
1506 
1507 			PQfinish(remote_conn);
1508 			continue;
1509 		}
1510 
1511 		if (recovery_type == RECTYPE_PRIMARY)
1512 		{
1513 			PQclear(res);
1514 			log_verbose(LOG_INFO, _("current primary node is %i"), node_id);
1515 
1516 			if (primary_id != NULL)
1517 			{
1518 				*primary_id = node_id;
1519 			}
1520 
1521 			return remote_conn;
1522 		}
1523 
1524 		PQfinish(remote_conn);
1525 	}
1526 
1527 	PQclear(res);
1528 	return NULL;
1529 }
1530 
1531 
1532 
1533 /*
1534  * Return the id of the active primary node, or NODE_NOT_FOUND if no
1535  * record available.
1536  *
1537  * This reports the value stored in the database only and
1538  * does not verify whether the node is actually available
1539  */
1540 int
get_primary_node_id(PGconn * conn)1541 get_primary_node_id(PGconn *conn)
1542 {
1543 	PQExpBufferData query;
1544 	PGresult   *res = NULL;
1545 	int			retval = NODE_NOT_FOUND;
1546 
1547 	initPQExpBuffer(&query);
1548 	appendPQExpBufferStr(&query,
1549 						 "SELECT node_id		  "
1550 						 "	 FROM repmgr.nodes    "
1551 						 " WHERE type = 'primary' "
1552 						 "   AND active IS TRUE  ");
1553 
1554 	log_verbose(LOG_DEBUG, "get_primary_node_id():\n%s", query.data);
1555 
1556 	res = PQexec(conn, query.data);
1557 
1558 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
1559 	{
1560 		log_db_error(conn, query.data, _("get_primary_node_id(): unable to execute query"));
1561 		retval = UNKNOWN_NODE_ID;
1562 	}
1563 	else if (PQntuples(res) == 0)
1564 	{
1565 		log_verbose(LOG_WARNING, _("get_primary_node_id(): no active primary found"));
1566 		retval = NODE_NOT_FOUND;
1567 	}
1568 	else
1569 	{
1570 		retval = atoi(PQgetvalue(res, 0, 0));
1571 	}
1572 
1573 	termPQExpBuffer(&query);
1574 	PQclear(res);
1575 
1576 	return retval;
1577 }
1578 
1579 
1580 
1581 
1582 int
get_ready_archive_files(PGconn * conn,const char * data_directory)1583 get_ready_archive_files(PGconn *conn, const char *data_directory)
1584 {
1585 	char		archive_status_dir[MAXPGPATH] = "";
1586 	struct stat statbuf;
1587 	struct dirent *arcdir_ent;
1588 	DIR		   *arcdir;
1589 
1590 	int			ready_count = 0;
1591 
1592 	if (PQserverVersion(conn) >= 100000)
1593 	{
1594 		snprintf(archive_status_dir, MAXPGPATH,
1595 				 "%s/pg_wal/archive_status",
1596 				 data_directory);
1597 	}
1598 	else
1599 	{
1600 		snprintf(archive_status_dir, MAXPGPATH,
1601 				 "%s/pg_xlog/archive_status",
1602 				 data_directory);
1603 	}
1604 
1605 	/* sanity-check directory path */
1606 	if (stat(archive_status_dir, &statbuf) == -1)
1607 	{
1608 		log_error(_("unable to access archive_status directory \"%s\""),
1609 				  archive_status_dir);
1610 		log_detail("%s", strerror(errno));
1611 
1612 		return ARCHIVE_STATUS_DIR_ERROR;
1613 	}
1614 
1615 	arcdir = opendir(archive_status_dir);
1616 
1617 	if (arcdir == NULL)
1618 	{
1619 		log_error(_("unable to open archive directory \"%s\""),
1620 				  archive_status_dir);
1621 		log_detail("%s", strerror(errno));
1622 
1623 		return ARCHIVE_STATUS_DIR_ERROR;
1624 	}
1625 
1626 	while ((arcdir_ent = readdir(arcdir)) != NULL)
1627 	{
1628 		struct stat statbuf;
1629 		char		file_path[MAXPGPATH + sizeof(arcdir_ent->d_name)];
1630 		int			basenamelen = 0;
1631 
1632 		snprintf(file_path, sizeof(file_path),
1633 				 "%s/%s",
1634 				 archive_status_dir,
1635 				 arcdir_ent->d_name);
1636 
1637 		/* skip non-files */
1638 		if (stat(file_path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
1639 		{
1640 			continue;
1641 		}
1642 
1643 		basenamelen = (int) strlen(arcdir_ent->d_name) - 6;
1644 
1645 		/*
1646 		 * count anything ending in ".ready"; for a more precise
1647 		 * implementation see: src/backend/postmaster/pgarch.c
1648 		 */
1649 		if (strcmp(arcdir_ent->d_name + basenamelen, ".ready") == 0)
1650 			ready_count++;
1651 	}
1652 
1653 	closedir(arcdir);
1654 
1655 	return ready_count;
1656 }
1657 
1658 
1659 bool
identify_system(PGconn * repl_conn,t_system_identification * identification)1660 identify_system(PGconn *repl_conn, t_system_identification *identification)
1661 {
1662 	PGresult   *res = NULL;
1663 
1664 	/* semicolon required here */
1665 	res = PQexec(repl_conn, "IDENTIFY_SYSTEM;");
1666 
1667 	if (PQresultStatus(res) != PGRES_TUPLES_OK || !PQntuples(res))
1668 	{
1669 		log_db_error(repl_conn, NULL, _("unable to execute IDENTIFY_SYSTEM"));
1670 
1671 		PQclear(res);
1672 		return false;
1673 	}
1674 
1675 #if defined(__i386__) || defined(__i386)
1676 	identification->system_identifier = atoll(PQgetvalue(res, 0, 0));
1677 #else
1678 	identification->system_identifier = atol(PQgetvalue(res, 0, 0));
1679 #endif
1680 
1681 	identification->timeline = atoi(PQgetvalue(res, 0, 1));
1682 	identification->xlogpos = parse_lsn(PQgetvalue(res, 0, 2));
1683 
1684 	PQclear(res);
1685 	return true;
1686 }
1687 
1688 
1689 /*
1690  * Return the system identifier by querying pg_control_system().
1691  *
1692  * Note there is a similar function in controldata.c ("get_system_identifier()")
1693  * which reads the control file.
1694  */
1695 uint64
system_identifier(PGconn * conn)1696 system_identifier(PGconn *conn)
1697 {
1698 	uint64		system_identifier = UNKNOWN_SYSTEM_IDENTIFIER;
1699 	PGresult   *res = NULL;
1700 
1701 	/*
1702 	 * pg_control_system() was introduced in PostgreSQL 9.6
1703 	 */
1704 	if (PQserverVersion(conn) < 90600)
1705 	{
1706 		return UNKNOWN_SYSTEM_IDENTIFIER;
1707 	}
1708 
1709 	res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
1710 
1711 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
1712 	{
1713 		log_db_error(conn, NULL, _("system_identifier(): unable to query pg_control_system()"));
1714 	}
1715 	else
1716 	{
1717 #if defined(__i386__) || defined(__i386)
1718 		system_identifier = atoll(PQgetvalue(res, 0, 0));
1719 #else
1720 		system_identifier = atol(PQgetvalue(res, 0, 0));
1721 #endif
1722 	}
1723 
1724 	PQclear(res);
1725 
1726 	return system_identifier;
1727 }
1728 
1729 
1730 TimeLineHistoryEntry *
get_timeline_history(PGconn * repl_conn,TimeLineID tli)1731 get_timeline_history(PGconn *repl_conn, TimeLineID tli)
1732 {
1733 	PQExpBufferData query;
1734 	PGresult   *res = NULL;
1735 
1736 	PQExpBufferData result;
1737 	char		*resptr;
1738 
1739 	TimeLineHistoryEntry *history;
1740 	TimeLineID	file_tli = UNKNOWN_TIMELINE_ID;
1741 	uint32		switchpoint_hi;
1742 	uint32		switchpoint_lo;
1743 
1744 	initPQExpBuffer(&query);
1745 
1746 	appendPQExpBuffer(&query,
1747 					  "TIMELINE_HISTORY %i",
1748 					  (int)tli);
1749 
1750 	res = PQexec(repl_conn, query.data);
1751 	log_verbose(LOG_DEBUG, "get_timeline_history():\n%s", query.data);
1752 
1753 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
1754 	{
1755 		log_db_error(repl_conn, query.data, _("get_timeline_history(): unable to execute query"));
1756 		termPQExpBuffer(&query);
1757 		PQclear(res);
1758 		return NULL;
1759 	}
1760 
1761 	termPQExpBuffer(&query);
1762 
1763 	if (PQntuples(res) != 1 || PQnfields(res) != 2)
1764 	{
1765 		log_error(_("unexpected response to TIMELINE_HISTORY command"));
1766 		log_detail(_("got %i rows and %i fields, expected %i rows and %i fields"),
1767 				   PQntuples(res), PQnfields(res), 1, 2);
1768 		PQclear(res);
1769 		return NULL;
1770 	}
1771 
1772 	initPQExpBuffer(&result);
1773 	appendPQExpBufferStr(&result, PQgetvalue(res, 0, 1));
1774 	PQclear(res);
1775 
1776 	resptr = result.data;
1777 
1778 	while (*resptr)
1779 	{
1780 		char	buf[MAXLEN];
1781 		char   *bufptr = buf;
1782 
1783 		if (*resptr != '\n')
1784 		{
1785 			int		len  = 0;
1786 
1787 			memset(buf, 0, MAXLEN);
1788 
1789 			while (*resptr && *resptr != '\n' && len < MAXLEN)
1790 			{
1791 				*bufptr++ = *resptr++;
1792 				len++;
1793 			}
1794 
1795 			if (buf[0])
1796 			{
1797 				int nfields = sscanf(buf,
1798 									 "%u\t%X/%X",
1799 									 &file_tli, &switchpoint_hi, &switchpoint_lo);
1800 				if (nfields == 3 && file_tli == tli - 1)
1801 					break;
1802 			}
1803 		}
1804 
1805 		if (*resptr)
1806 			resptr++;
1807 	}
1808 
1809 	termPQExpBuffer(&result);
1810 
1811 	if (file_tli == UNKNOWN_TIMELINE_ID || file_tli != tli - 1)
1812 	{
1813 		log_error(_("timeline %i not found in timeline history file content"), tli);
1814 		log_detail(_("content is: \"%s\""), result.data);
1815 		return NULL;
1816 	}
1817 
1818 	history = (TimeLineHistoryEntry *) palloc(sizeof(TimeLineHistoryEntry));
1819 	history->tli = file_tli;
1820 	history->begin = InvalidXLogRecPtr; /* we don't care about this */
1821 	history->end = ((uint64) (switchpoint_hi)) << 32 | (uint64) switchpoint_lo;
1822 
1823 	return history;
1824 }
1825 
1826 
1827 /* =============================== */
1828 /* user/role information functions */
1829 /* =============================== */
1830 
1831 
1832 bool
can_execute_pg_promote(PGconn * conn)1833 can_execute_pg_promote(PGconn *conn)
1834 {
1835 	PQExpBufferData query;
1836 	PGresult   *res;
1837 	bool		has_pg_promote= false;
1838 
1839 	/* pg_promote() available from PostgreSQL 12 */
1840 	if (PQserverVersion(conn) < 120000)
1841 		return false;
1842 
1843 	initPQExpBuffer(&query);
1844 	appendPQExpBufferStr(&query,
1845 						 " SELECT pg_catalog.has_function_privilege( "
1846 						 "    CURRENT_USER, "
1847 						 "    'pg_catalog.pg_promote(bool,int)', "
1848 						 "    'execute' "
1849 						 " )");
1850 
1851 	res = PQexec(conn, query.data);
1852 
1853 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
1854 	{
1855 		log_db_error(conn, query.data,
1856 					 _("can_execute_pg_promote(): unable to query user function privilege"));
1857 	}
1858 	else
1859 	{
1860 		has_pg_promote = atobool(PQgetvalue(res, 0, 0));
1861 	}
1862 	termPQExpBuffer(&query);
1863 
1864 	return has_pg_promote;
1865 }
1866 
1867 
1868 /*
1869  * Determine if the user associated with the current connection is
1870  * a member of the "pg_monitor" default role, or optionally one
1871  * of its three constituent "subroles".
1872  */
1873 bool
connection_has_pg_monitor_role(PGconn * conn,const char * subrole)1874 connection_has_pg_monitor_role(PGconn *conn, const char *subrole)
1875 {
1876 	PQExpBufferData query;
1877 	PGresult   *res;
1878 	bool		has_pg_monitor_role = false;
1879 
1880 	/* superusers can read anything, no role check needed */
1881 	if (is_superuser_connection(conn, NULL) == true)
1882 		return true;
1883 
1884 	/* pg_monitor and associated "subroles" introduced in PostgreSQL 10 */
1885 	if (PQserverVersion(conn) < 100000)
1886 		return false;
1887 
1888 	initPQExpBuffer(&query);
1889 	appendPQExpBufferStr(&query,
1890 						 "  SELECT CASE "
1891 						 "           WHEN pg_catalog.pg_has_role('pg_monitor','MEMBER') "
1892 						 "             THEN TRUE ");
1893 
1894 	if (subrole != NULL)
1895 	{
1896 		appendPQExpBuffer(&query,
1897 						  "           WHEN pg_catalog.pg_has_role('%s','MEMBER') "
1898 						  "             THEN TRUE ",
1899 						  subrole);
1900 	}
1901 
1902 	appendPQExpBufferStr(&query,
1903 						 "           ELSE FALSE "
1904 						 "         END AS has_pg_monitor");
1905 
1906 	res = PQexec(conn, query.data);
1907 
1908 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
1909 	{
1910 		log_db_error(conn, query.data,
1911 					 _("connection_has_pg_monitor_role(): unable to query user roles"));
1912 	}
1913 	else
1914 	{
1915 		has_pg_monitor_role = atobool(PQgetvalue(res, 0, 0));
1916 	}
1917 	termPQExpBuffer(&query);
1918 	PQclear(res);
1919 
1920 	return has_pg_monitor_role;
1921 }
1922 
1923 
1924 bool
is_replication_role(PGconn * conn,char * rolname)1925 is_replication_role(PGconn *conn, char *rolname)
1926 {
1927 	PQExpBufferData query;
1928 	PGresult   *res;
1929 	bool		is_replication_role = false;
1930 
1931 	initPQExpBuffer(&query);
1932 
1933 	appendPQExpBufferStr(&query,
1934 						 "  SELECT rolreplication "
1935 						 "    FROM pg_catalog.pg_roles "
1936 						 "   WHERE rolname = ");
1937 
1938 	if (rolname != NULL)
1939 	{
1940 		appendPQExpBuffer(&query,
1941 						  "'%s'",
1942 						  rolname);
1943 	}
1944 	else
1945 	{
1946 		appendPQExpBufferStr(&query,
1947 							 "CURRENT_USER");
1948 	}
1949 
1950 	res = PQexec(conn, query.data);
1951 
1952 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
1953 	{
1954 		log_db_error(conn, query.data,
1955 					 _("is_replication_role(): unable to query user roles"));
1956 	}
1957 	else
1958 	{
1959 		is_replication_role = atobool(PQgetvalue(res, 0, 0));
1960 	}
1961 
1962 	termPQExpBuffer(&query);
1963 	PQclear(res);
1964 
1965 	return is_replication_role;
1966 }
1967 
1968 
1969 bool
is_superuser_connection(PGconn * conn,t_connection_user * userinfo)1970 is_superuser_connection(PGconn *conn, t_connection_user *userinfo)
1971 {
1972 	bool		is_superuser = false;
1973 	const char *current_user = PQuser(conn);
1974 	const char *superuser_status = PQparameterStatus(conn, "is_superuser");
1975 
1976 	is_superuser = (strcmp(superuser_status, "on") == 0) ? true : false;
1977 
1978 	if (userinfo != NULL)
1979 	{
1980 		snprintf(userinfo->username,
1981 				 sizeof(userinfo->username),
1982 				 "%s", current_user);
1983 		userinfo->is_superuser = is_superuser;
1984 	}
1985 
1986 	return is_superuser;
1987 }
1988 
1989 
1990 /* =============================== */
1991 /* repmgrd shared memory functions */
1992 /* =============================== */
1993 
1994 bool
repmgrd_set_local_node_id(PGconn * conn,int local_node_id)1995 repmgrd_set_local_node_id(PGconn *conn, int local_node_id)
1996 {
1997 	PQExpBufferData query;
1998 	PGresult   *res = NULL;
1999 	bool		success = true;
2000 
2001 	initPQExpBuffer(&query);
2002 
2003 	appendPQExpBuffer(&query,
2004 					  "SELECT repmgr.set_local_node_id(%i)",
2005 					  local_node_id);
2006 
2007 	res = PQexec(conn, query.data);
2008 
2009 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
2010 	{
2011 		log_db_error(conn, query.data, _("repmgrd_set_local_node_id(): unable to execute query"));
2012 
2013 		success = false;
2014 	}
2015 
2016 	termPQExpBuffer(&query);
2017 	PQclear(res);
2018 
2019 	return success;
2020 }
2021 
2022 
2023 int
repmgrd_get_local_node_id(PGconn * conn)2024 repmgrd_get_local_node_id(PGconn *conn)
2025 {
2026 	PGresult   *res = NULL;
2027 	int			local_node_id = UNKNOWN_NODE_ID;
2028 
2029 	const char *sqlquery = "SELECT repmgr.get_local_node_id()";
2030 
2031 	res = PQexec(conn, sqlquery);
2032 
2033 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
2034 	{
2035 		log_db_error(conn, sqlquery, _("repmgrd_get_local_node_id(): unable to execute query"));
2036 	}
2037 	else if (!PQgetisnull(res, 0, 0))
2038 	{
2039 		local_node_id = atoi(PQgetvalue(res, 0, 0));
2040 	}
2041 
2042 	PQclear(res);
2043 
2044 	return local_node_id;
2045 }
2046 
2047 
2048 bool
repmgrd_check_local_node_id(PGconn * conn)2049 repmgrd_check_local_node_id(PGconn *conn)
2050 {
2051 	PGresult   *res = NULL;
2052 	bool		node_id_settable = true;
2053 	const char *sqlquery = "SELECT repmgr.get_local_node_id()";
2054 
2055 	res = PQexec(conn, sqlquery);
2056 
2057 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
2058 	{
2059 		log_db_error(conn, sqlquery, _("repmgrd_get_local_node_id(): unable to execute query"));
2060 	}
2061 
2062 	if (PQgetisnull(res, 0, 0))
2063 	{
2064 		node_id_settable = false;
2065 	}
2066 
2067 	PQclear(res);
2068 
2069 	return node_id_settable;
2070 }
2071 
2072 
2073 /*
2074  * Function that checks if the primary is in exclusive backup mode.
2075  * We'll use this when executing an action can conflict with an exclusive
2076  * backup.
2077  */
2078 BackupState
server_in_exclusive_backup_mode(PGconn * conn)2079 server_in_exclusive_backup_mode(PGconn *conn)
2080 {
2081 	BackupState backup_state = BACKUP_STATE_UNKNOWN;
2082 	const char *sqlquery = "SELECT pg_catalog.pg_is_in_backup()";
2083 	PGresult   *res = PQexec(conn, sqlquery);
2084 
2085 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
2086 	{
2087 		log_db_error(conn, sqlquery, _("unable to retrieve information regarding backup mode of node"));
2088 
2089 		backup_state = BACKUP_STATE_UNKNOWN;
2090 	}
2091 	else if (atobool(PQgetvalue(res, 0, 0)) == true)
2092 	{
2093 		backup_state = BACKUP_STATE_IN_BACKUP;
2094 	}
2095 	else
2096 	{
2097 		backup_state = BACKUP_STATE_NO_BACKUP;
2098 	}
2099 
2100 	PQclear(res);
2101 
2102 	return backup_state;
2103 }
2104 
2105 
2106 void
repmgrd_set_pid(PGconn * conn,pid_t repmgrd_pid,const char * pidfile)2107 repmgrd_set_pid(PGconn *conn, pid_t repmgrd_pid, const char *pidfile)
2108 {
2109 	PQExpBufferData query;
2110 	PGresult   *res = NULL;
2111 
2112 	log_verbose(LOG_DEBUG, "repmgrd_set_pid(): pid is %i", (int) repmgrd_pid);
2113 
2114 	initPQExpBuffer(&query);
2115 
2116 	appendPQExpBuffer(&query,
2117 					  "SELECT repmgr.set_repmgrd_pid(%i, ",
2118 					  (int) repmgrd_pid);
2119 
2120 	if (pidfile != NULL)
2121 	{
2122 		appendPQExpBuffer(&query,
2123 						  " '%s')",
2124 						  pidfile);
2125 	}
2126 	else
2127 	{
2128 		appendPQExpBufferStr(&query,
2129 							 " NULL)");
2130 	}
2131 
2132 	res = PQexec(conn, query.data);
2133 	termPQExpBuffer(&query);
2134 
2135 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
2136 	{
2137 		log_error(_("unable to execute \"SELECT repmgr.set_repmgrd_pid()\""));
2138 		log_detail("%s", PQerrorMessage(conn));
2139 	}
2140 
2141 	PQclear(res);
2142 
2143 	return;
2144 }
2145 
2146 
2147 pid_t
repmgrd_get_pid(PGconn * conn)2148 repmgrd_get_pid(PGconn *conn)
2149 {
2150 	PGresult   *res = NULL;
2151 	pid_t		repmgrd_pid = UNKNOWN_PID;
2152 
2153 	res = PQexec(conn, "SELECT repmgr.get_repmgrd_pid()");
2154 
2155 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
2156 	{
2157 		log_error(_("unable to execute \"SELECT repmgr.get_repmgrd_pid()\""));
2158 		log_detail("%s", PQerrorMessage(conn));
2159 	}
2160 	else if (!PQgetisnull(res, 0, 0))
2161 	{
2162 		repmgrd_pid = atoi(PQgetvalue(res, 0, 0));
2163 	}
2164 
2165 	PQclear(res);
2166 
2167 	return repmgrd_pid;
2168 }
2169 
2170 
2171 bool
repmgrd_is_running(PGconn * conn)2172 repmgrd_is_running(PGconn *conn)
2173 {
2174 	PGresult   *res = NULL;
2175 	bool		is_running = false;
2176 
2177 	res = PQexec(conn, "SELECT repmgr.repmgrd_is_running()");
2178 
2179 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
2180 	{
2181 		log_error(_("unable to execute \"SELECT repmgr.repmgrd_is_running()\""));
2182 		log_detail("%s", PQerrorMessage(conn));
2183 	}
2184 	else if (!PQgetisnull(res, 0, 0))
2185 	{
2186 		is_running = atobool(PQgetvalue(res, 0, 0));
2187 	}
2188 
2189 	PQclear(res);
2190 
2191 	return is_running;
2192 }
2193 
2194 
2195 bool
repmgrd_is_paused(PGconn * conn)2196 repmgrd_is_paused(PGconn *conn)
2197 {
2198 	PGresult   *res = NULL;
2199 	bool		is_paused = false;
2200 
2201 	res = PQexec(conn, "SELECT repmgr.repmgrd_is_paused()");
2202 
2203 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
2204 	{
2205 		log_error(_("unable to execute \"SELECT repmgr.repmgrd_is_paused()\""));
2206 		log_detail("%s", PQerrorMessage(conn));
2207 	}
2208 	else if (!PQgetisnull(res, 0, 0))
2209 	{
2210 		is_paused = atobool(PQgetvalue(res, 0, 0));
2211 	}
2212 
2213 	PQclear(res);
2214 
2215 	return is_paused;
2216 }
2217 
2218 
2219 bool
repmgrd_pause(PGconn * conn,bool pause)2220 repmgrd_pause(PGconn *conn, bool pause)
2221 {
2222 	PQExpBufferData query;
2223 	PGresult   *res = NULL;
2224 	bool		success = true;
2225 
2226 	initPQExpBuffer(&query);
2227 
2228 	appendPQExpBuffer(&query,
2229 					  "SELECT repmgr.repmgrd_pause(%s)",
2230 					  pause == true ? "TRUE" : "FALSE");
2231 	res = PQexec(conn, query.data);
2232 	termPQExpBuffer(&query);
2233 
2234 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
2235 	{
2236 		log_error(_("unable to execute \"SELECT repmgr.repmgrd_pause()\""));
2237 		log_detail("%s", PQerrorMessage(conn));
2238 
2239 		success = false;
2240 	}
2241 
2242 	PQclear(res);
2243 
2244 	return success;
2245 }
2246 
2247 pid_t
get_wal_receiver_pid(PGconn * conn)2248 get_wal_receiver_pid(PGconn *conn)
2249 {
2250 	PGresult   *res = NULL;
2251 	pid_t		wal_receiver_pid = UNKNOWN_PID;
2252 
2253 	res = PQexec(conn, "SELECT repmgr.get_wal_receiver_pid()");
2254 
2255 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
2256 	{
2257 		log_error(_("unable to execute \"SELECT repmgr.get_wal_receiver_pid()\""));
2258 		log_detail("%s", PQerrorMessage(conn));
2259 	}
2260 	else if (!PQgetisnull(res, 0, 0))
2261 	{
2262 		wal_receiver_pid = atoi(PQgetvalue(res, 0, 0));
2263 	}
2264 
2265 	PQclear(res);
2266 
2267 	return wal_receiver_pid;
2268 }
2269 
2270 
2271 int
repmgrd_get_upstream_node_id(PGconn * conn)2272 repmgrd_get_upstream_node_id(PGconn *conn)
2273 {
2274 	PGresult   *res = NULL;
2275 	int upstream_node_id = UNKNOWN_NODE_ID;
2276 
2277 	const char *sqlquery = "SELECT repmgr.get_upstream_node_id()";
2278 
2279 	res = PQexec(conn, sqlquery);
2280 
2281 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
2282 	{
2283 		log_db_error(conn, sqlquery, _("repmgrd_get_upstream_node_id(): unable to execute query"));
2284 	}
2285 	else if (!PQgetisnull(res, 0, 0))
2286 	{
2287 		upstream_node_id = atoi(PQgetvalue(res, 0, 0));
2288 	}
2289 
2290 	PQclear(res);
2291 
2292 	return upstream_node_id;
2293 }
2294 
2295 
2296 bool
repmgrd_set_upstream_node_id(PGconn * conn,int node_id)2297 repmgrd_set_upstream_node_id(PGconn *conn, int node_id)
2298 {
2299 	PQExpBufferData query;
2300 	PGresult   *res = NULL;
2301 	bool		success = true;
2302 
2303 	initPQExpBuffer(&query);
2304 	appendPQExpBuffer(&query,
2305 					  " SELECT repmgr.set_upstream_node_id(%i) ",
2306 					  node_id);
2307 
2308 	log_verbose(LOG_DEBUG, "repmgrd_set_upstream_node_id():\n  %s", query.data);
2309 
2310 	res = PQexec(conn, query.data);
2311 
2312 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
2313 	{
2314 		log_db_error(conn, query.data,
2315 					 _("repmgrd_set_upstream_node_id(): unable to set upstream node ID (provided value: %i)"), node_id);
2316 		success = false;
2317 	}
2318 
2319 	termPQExpBuffer(&query);
2320 	PQclear(res);
2321 
2322 	return success;
2323 }
2324 
2325 /* ================ */
2326 /* result functions */
2327 /* ================ */
2328 
2329 bool
atobool(const char * value)2330 atobool(const char *value)
2331 {
2332 	return (strcmp(value, "t") == 0)
2333 		? true
2334 		: false;
2335 }
2336 
2337 
2338 /* =================== */
2339 /* extension functions */
2340 /* =================== */
2341 
2342 ExtensionStatus
get_repmgr_extension_status(PGconn * conn,t_extension_versions * extversions)2343 get_repmgr_extension_status(PGconn *conn, t_extension_versions *extversions)
2344 {
2345 	PQExpBufferData query;
2346 	PGresult   *res = NULL;
2347 	ExtensionStatus status = REPMGR_UNKNOWN;
2348 
2349 	/* TODO: check version */
2350 
2351 	initPQExpBuffer(&query);
2352 
2353 	appendPQExpBufferStr(&query,
2354 						 "	  SELECT ae.name, e.extname, "
2355 						 "           ae.default_version, "
2356 						 "           (((FLOOR(ae.default_version::NUMERIC)::INT) * 10000) + (ae.default_version::NUMERIC - FLOOR(ae.default_version::NUMERIC)::INT) * 1000)::INT AS available, "
2357 						 "           ae.installed_version, "
2358 						 "           (((FLOOR(ae.installed_version::NUMERIC)::INT) * 10000) + (ae.installed_version::NUMERIC - FLOOR(ae.installed_version::NUMERIC)::INT) * 1000)::INT AS installed "
2359 						 "     FROM pg_catalog.pg_available_extensions ae "
2360 						 "LEFT JOIN pg_catalog.pg_extension e "
2361 						 "       ON e.extname=ae.name "
2362 						 "	   WHERE ae.name='repmgr' ");
2363 
2364 	res = PQexec(conn, query.data);
2365 
2366 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
2367 	{
2368 		log_db_error(conn, query.data, _("get_repmgr_extension_status(): unable to execute extension query"));
2369 		status = REPMGR_UNKNOWN;
2370 	}
2371 
2372 	/* 1. Check extension is actually available */
2373 	else if (PQntuples(res) == 0)
2374 	{
2375 		status = REPMGR_UNAVAILABLE;
2376 	}
2377 
2378 	/* 2. Check if extension installed */
2379 	else if (PQgetisnull(res, 0, 1) == 0)
2380 	{
2381 		int available_version = atoi(PQgetvalue(res, 0, 3));
2382 		int installed_version = atoi(PQgetvalue(res, 0, 5));
2383 
2384 		/* caller wants to know which versions are installed/available */
2385 		if (extversions != NULL)
2386 		{
2387 			snprintf(extversions->default_version,
2388 					 sizeof(extversions->default_version),
2389 					 "%s", PQgetvalue(res, 0, 2));
2390 			extversions->default_version_num = available_version;
2391 			snprintf(extversions->installed_version,
2392 					 sizeof(extversions->installed_version),
2393 					 "%s", PQgetvalue(res, 0, 4));
2394 			extversions->installed_version_num = installed_version;
2395 		}
2396 
2397 		if (available_version > installed_version)
2398 		{
2399 			status = REPMGR_OLD_VERSION_INSTALLED;
2400 		}
2401 		else
2402 		{
2403 			status = REPMGR_INSTALLED;
2404 		}
2405 	}
2406 	else
2407 	{
2408 		status = REPMGR_AVAILABLE;
2409 	}
2410 
2411 	termPQExpBuffer(&query);
2412 	PQclear(res);
2413 
2414 	return status;
2415 }
2416 
2417 /* ========================= */
2418 /* node management functions */
2419 /* ========================= */
2420 
2421 /* assumes superuser connection */
2422 void
checkpoint(PGconn * conn)2423 checkpoint(PGconn *conn)
2424 {
2425 	PGresult   *res = NULL;
2426 
2427 	res = PQexec(conn, "CHECKPOINT");
2428 
2429 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
2430 	{
2431 		log_db_error(conn, NULL, _("unable to execute CHECKPOINT"));
2432 	}
2433 
2434 	PQclear(res);
2435 	return;
2436 }
2437 
2438 
2439 bool
vacuum_table(PGconn * primary_conn,const char * table)2440 vacuum_table(PGconn *primary_conn, const char *table)
2441 {
2442 	PQExpBufferData query;
2443 	bool		success = true;
2444 	PGresult   *res = NULL;
2445 
2446 	initPQExpBuffer(&query);
2447 
2448 	appendPQExpBuffer(&query, "VACUUM %s", table);
2449 
2450 	res = PQexec(primary_conn, query.data);
2451 
2452 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
2453 	{
2454 		log_db_error(primary_conn, NULL,
2455 					 _("unable to vacuum table \"%s\""), table);
2456 		success = false;
2457 	}
2458 
2459 	termPQExpBuffer(&query);
2460 	PQclear(res);
2461 
2462 	return success;
2463 }
2464 
2465 /*
2466  * For use in PostgreSQL 12 and later
2467  */
2468 bool
promote_standby(PGconn * conn,bool wait,int wait_seconds)2469 promote_standby(PGconn *conn, bool wait, int wait_seconds)
2470 {
2471 	PQExpBufferData query;
2472 	bool		success = true;
2473 	PGresult   *res = NULL;
2474 
2475 	initPQExpBuffer(&query);
2476 
2477 	appendPQExpBuffer(&query,
2478 					  "SELECT pg_catalog.pg_promote(wait := %s",
2479 					  wait ? "TRUE" : "FALSE");
2480 
2481 	if (wait_seconds > 0)
2482 	{
2483 		appendPQExpBuffer(&query,
2484 						  ", wait_seconds := %i",
2485 						  wait_seconds);
2486 	}
2487 
2488 	appendPQExpBufferStr(&query, ")");
2489 
2490 	res = PQexec(conn, query.data);
2491 
2492 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
2493 	{
2494 		log_db_error(conn, query.data, _("unable to execute pg_promote()"));
2495 		success = false;
2496 	}
2497 	else
2498 	{
2499 		/* NOTE: if "wait" is false, pg_promote() will always return true */
2500 		success = atobool(PQgetvalue(res, 0, 0));
2501 	}
2502 
2503 	termPQExpBuffer(&query);
2504 	PQclear(res);
2505 
2506 	return success;
2507 }
2508 
2509 
2510 bool
resume_wal_replay(PGconn * conn)2511 resume_wal_replay(PGconn *conn)
2512 {
2513 	PGresult   *res = NULL;
2514 	PQExpBufferData query;
2515 	bool		success = true;
2516 
2517 	initPQExpBuffer(&query);
2518 
2519 	if (PQserverVersion(conn) >= 100000)
2520 	{
2521 		appendPQExpBufferStr(&query,
2522 							 "SELECT pg_catalog.pg_wal_replay_resume()");
2523 	}
2524 	else
2525 	{
2526 		appendPQExpBufferStr(&query,
2527 							 "SELECT pg_catalog.pg_xlog_replay_resume()");
2528 	}
2529 
2530 	res = PQexec(conn, query.data);
2531 
2532 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
2533 	{
2534 		log_db_error(conn, query.data, _("resume_wal_replay(): unable to resume WAL replay"));
2535 		success = false;
2536 	}
2537 
2538 	termPQExpBuffer(&query);
2539 	PQclear(res);
2540 
2541 	return success;
2542 }
2543 
2544 
2545 /* ===================== */
2546 /* Node record functions */
2547 /* ===================== */
2548 
2549 /*
2550  * Note: init_defaults may only be false when the caller is refreshing a previously
2551  * populated record.
2552  */
2553 static RecordStatus
_get_node_record(PGconn * conn,char * sqlquery,t_node_info * node_info,bool init_defaults)2554 _get_node_record(PGconn *conn, char *sqlquery, t_node_info *node_info, bool init_defaults)
2555 {
2556 	int			ntuples = 0;
2557 	PGresult   *res = PQexec(conn, sqlquery);
2558 
2559 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
2560 	{
2561 		log_db_error(conn, sqlquery, _("_get_node_record(): unable to execute query"));
2562 
2563 		PQclear(res);
2564 		return RECORD_ERROR;
2565 	}
2566 
2567 	ntuples = PQntuples(res);
2568 
2569 	if (ntuples == 0)
2570 	{
2571 		PQclear(res);
2572 		return RECORD_NOT_FOUND;
2573 	}
2574 
2575 	_populate_node_record(res, node_info, 0, init_defaults);
2576 
2577 	PQclear(res);
2578 
2579 	return RECORD_FOUND;
2580 }
2581 
2582 
2583 /*
2584  * Note: init_defaults may only be false when the caller is refreshing a previously
2585  * populated record.
2586  */
2587 static void
_populate_node_record(PGresult * res,t_node_info * node_info,int row,bool init_defaults)2588 _populate_node_record(PGresult *res, t_node_info *node_info, int row, bool init_defaults)
2589 {
2590 	node_info->node_id = atoi(PQgetvalue(res, row, 0));
2591 	node_info->type = parse_node_type(PQgetvalue(res, row, 1));
2592 
2593 	if (PQgetisnull(res, row, 2))
2594 	{
2595 		node_info->upstream_node_id = NO_UPSTREAM_NODE;
2596 	}
2597 	else
2598 	{
2599 		node_info->upstream_node_id = atoi(PQgetvalue(res, row, 2));
2600 	}
2601 
2602 	snprintf(node_info->node_name, sizeof(node_info->node_name), "%s", PQgetvalue(res, row, 3));
2603 	snprintf(node_info->conninfo, sizeof(node_info->conninfo), "%s", PQgetvalue(res, row, 4));
2604 	snprintf(node_info->repluser, sizeof(node_info->repluser), "%s", PQgetvalue(res, row, 5));
2605 	snprintf(node_info->slot_name, sizeof(node_info->slot_name), "%s", PQgetvalue(res, row, 6));
2606 	snprintf(node_info->location, sizeof(node_info->location), "%s", PQgetvalue(res, row, 7));
2607 	node_info->priority = atoi(PQgetvalue(res, row, 8));
2608 	node_info->active = atobool(PQgetvalue(res, row, 9));
2609 	snprintf(node_info->config_file, sizeof(node_info->config_file), "%s", PQgetvalue(res, row, 10));
2610 
2611 	/* These are only set by certain queries */
2612 	snprintf(node_info->upstream_node_name, sizeof(node_info->upstream_node_name), "%s", PQgetvalue(res, row, 11));
2613 
2614 	if (PQgetisnull(res, row, 12))
2615 	{
2616 		node_info->attached = NODE_ATTACHED_UNKNOWN;
2617 	}
2618 	else
2619 	{
2620 		node_info->attached = atobool(PQgetvalue(res, row, 12)) ? NODE_ATTACHED : NODE_DETACHED;
2621 	}
2622 
2623 	/* Set remaining struct fields with default values */
2624 
2625 	if (init_defaults == true)
2626 	{
2627 		node_info->node_status = NODE_STATUS_UNKNOWN;
2628 		node_info->recovery_type = RECTYPE_UNKNOWN;
2629 		node_info->last_wal_receive_lsn = InvalidXLogRecPtr;
2630 		node_info->monitoring_state = MS_NORMAL;
2631 		node_info->conn = NULL;
2632 	}
2633 }
2634 
2635 
2636 t_server_type
parse_node_type(const char * type)2637 parse_node_type(const char *type)
2638 {
2639 	if (strcmp(type, "primary") == 0)
2640 	{
2641 		return PRIMARY;
2642 	}
2643 	else if (strcmp(type, "standby") == 0)
2644 	{
2645 		return STANDBY;
2646 	}
2647 	else if (strcmp(type, "witness") == 0)
2648 	{
2649 		return WITNESS;
2650 	}
2651 
2652 	return UNKNOWN;
2653 }
2654 
2655 
2656 const char *
get_node_type_string(t_server_type type)2657 get_node_type_string(t_server_type type)
2658 {
2659 	switch (type)
2660 	{
2661 		case PRIMARY:
2662 			return "primary";
2663 		case STANDBY:
2664 			return "standby";
2665 		case WITNESS:
2666 			return "witness";
2667 			/* this should never happen */
2668 		case UNKNOWN:
2669 		default:
2670 			log_error(_("unknown node type %i"), type);
2671 			return "unknown";
2672 	}
2673 }
2674 
2675 
2676 RecordStatus
get_node_record(PGconn * conn,int node_id,t_node_info * node_info)2677 get_node_record(PGconn *conn, int node_id, t_node_info *node_info)
2678 {
2679 	PQExpBufferData query;
2680 	RecordStatus result;
2681 
2682 	initPQExpBuffer(&query);
2683 	appendPQExpBuffer(&query,
2684 					  "SELECT " REPMGR_NODES_COLUMNS
2685 					  "  FROM repmgr.nodes n "
2686 					  " WHERE n.node_id = %i",
2687 					  node_id);
2688 
2689 	log_verbose(LOG_DEBUG, "get_node_record():\n  %s", query.data);
2690 
2691 	result = _get_node_record(conn, query.data, node_info, true);
2692 	termPQExpBuffer(&query);
2693 
2694 	if (result == RECORD_NOT_FOUND)
2695 	{
2696 		log_verbose(LOG_DEBUG, "get_node_record(): no record found for node %i", node_id);
2697 	}
2698 
2699 	return result;
2700 }
2701 
2702 
2703 RecordStatus
refresh_node_record(PGconn * conn,int node_id,t_node_info * node_info)2704 refresh_node_record(PGconn *conn, int node_id, t_node_info *node_info)
2705 {
2706 	PQExpBufferData query;
2707 	RecordStatus result;
2708 
2709 	initPQExpBuffer(&query);
2710 	appendPQExpBuffer(&query,
2711 					  "SELECT " REPMGR_NODES_COLUMNS
2712 					  "  FROM repmgr.nodes n "
2713 					  " WHERE n.node_id = %i",
2714 					  node_id);
2715 
2716 	log_verbose(LOG_DEBUG, "get_node_record():\n  %s", query.data);
2717 
2718 	result = _get_node_record(conn, query.data, node_info, false);
2719 	termPQExpBuffer(&query);
2720 
2721 	if (result == RECORD_NOT_FOUND)
2722 	{
2723 		log_verbose(LOG_DEBUG, "refresh_node_record(): no record found for node %i", node_id);
2724 	}
2725 
2726 	return result;
2727 }
2728 
2729 
2730 RecordStatus
get_node_record_with_upstream(PGconn * conn,int node_id,t_node_info * node_info)2731 get_node_record_with_upstream(PGconn *conn, int node_id, t_node_info *node_info)
2732 {
2733 	PQExpBufferData query;
2734 	RecordStatus result;
2735 
2736 	initPQExpBuffer(&query);
2737 	appendPQExpBuffer(&query,
2738 					  "    SELECT " REPMGR_NODES_COLUMNS_WITH_UPSTREAM
2739 					  "      FROM repmgr.nodes n "
2740 					  " LEFT JOIN repmgr.nodes un "
2741 					  "        ON un.node_id = n.upstream_node_id"
2742 					  " WHERE n.node_id = %i",
2743 					  node_id);
2744 
2745 	log_verbose(LOG_DEBUG, "get_node_record():\n  %s", query.data);
2746 
2747 	result = _get_node_record(conn, query.data, node_info, true);
2748 	termPQExpBuffer(&query);
2749 
2750 	if (result == RECORD_NOT_FOUND)
2751 	{
2752 		log_verbose(LOG_DEBUG, "get_node_record(): no record found for node %i", node_id);
2753 	}
2754 
2755 	return result;
2756 }
2757 
2758 
2759 RecordStatus
get_node_record_by_name(PGconn * conn,const char * node_name,t_node_info * node_info)2760 get_node_record_by_name(PGconn *conn, const char *node_name, t_node_info *node_info)
2761 {
2762 	PQExpBufferData query;
2763 	RecordStatus record_status = RECORD_NOT_FOUND;
2764 
2765 	initPQExpBuffer(&query);
2766 
2767 	appendPQExpBuffer(&query,
2768 					  "SELECT " REPMGR_NODES_COLUMNS
2769 					  "  FROM repmgr.nodes n "
2770 					  " WHERE n.node_name = '%s' ",
2771 					  node_name);
2772 
2773 	log_verbose(LOG_DEBUG, "get_node_record_by_name():\n  %s", query.data);
2774 
2775 	record_status = _get_node_record(conn, query.data, node_info, true);
2776 
2777 	termPQExpBuffer(&query);
2778 
2779 	if (record_status == RECORD_NOT_FOUND)
2780 	{
2781 		log_verbose(LOG_DEBUG, "get_node_record_by_name(): no record found for node \"%s\"",
2782 					node_name);
2783 	}
2784 
2785 	return record_status;
2786 }
2787 
2788 
2789 t_node_info *
get_node_record_pointer(PGconn * conn,int node_id)2790 get_node_record_pointer(PGconn *conn, int node_id)
2791 {
2792 	t_node_info *node_info = pg_malloc0(sizeof(t_node_info));
2793 	RecordStatus record_status = RECORD_NOT_FOUND;
2794 
2795 	record_status = get_node_record(conn, node_id, node_info);
2796 
2797 	if (record_status != RECORD_FOUND)
2798 	{
2799 		pfree(node_info);
2800 		return NULL;
2801 	}
2802 
2803 	return node_info;
2804 }
2805 
2806 
2807 bool
get_primary_node_record(PGconn * conn,t_node_info * node_info)2808 get_primary_node_record(PGconn *conn, t_node_info *node_info)
2809 {
2810 	RecordStatus record_status = RECORD_NOT_FOUND;
2811 
2812 	int			primary_node_id = get_primary_node_id(conn);
2813 
2814 	if (primary_node_id == UNKNOWN_NODE_ID)
2815 	{
2816 		return false;
2817 	}
2818 
2819 	record_status = get_node_record(conn, primary_node_id, node_info);
2820 
2821 	return record_status == RECORD_FOUND ? true : false;
2822 }
2823 
2824 
2825 /*
2826  * Get the local node record; if this fails, exit. Many operations
2827  * depend on this being available, so we'll centralize the check
2828  * and failure messages here.
2829  */
2830 bool
get_local_node_record(PGconn * conn,int node_id,t_node_info * node_info)2831 get_local_node_record(PGconn *conn, int node_id, t_node_info *node_info)
2832 {
2833 	RecordStatus record_status = get_node_record(conn, node_id, node_info);
2834 
2835 	if (record_status != RECORD_FOUND)
2836 	{
2837 		log_error(_("unable to retrieve record for local node"));
2838 		log_detail(_("local node id is  %i"), node_id);
2839 		log_hint(_("check this node was correctly registered"));
2840 
2841 		PQfinish(conn);
2842 		exit(ERR_BAD_CONFIG);
2843 	}
2844 
2845 	return true;
2846 }
2847 
2848 
2849 static
2850 void
_populate_node_records(PGresult * res,NodeInfoList * node_list)2851 _populate_node_records(PGresult *res, NodeInfoList *node_list)
2852 {
2853 	int			i;
2854 
2855 	clear_node_info_list(node_list);
2856 
2857 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
2858 	{
2859 		return;
2860 	}
2861 
2862 	for (i = 0; i < PQntuples(res); i++)
2863 	{
2864 		NodeInfoListCell *cell;
2865 
2866 		cell = (NodeInfoListCell *) pg_malloc0(sizeof(NodeInfoListCell));
2867 
2868 		cell->node_info = pg_malloc0(sizeof(t_node_info));
2869 
2870 		_populate_node_record(res, cell->node_info, i, true);
2871 
2872 		if (node_list->tail)
2873 			node_list->tail->next = cell;
2874 		else
2875 			node_list->head = cell;
2876 
2877 		node_list->tail = cell;
2878 		node_list->node_count++;
2879 	}
2880 
2881 	return;
2882 }
2883 
2884 
2885 bool
get_all_node_records(PGconn * conn,NodeInfoList * node_list)2886 get_all_node_records(PGconn *conn, NodeInfoList *node_list)
2887 {
2888 	PQExpBufferData query;
2889 	PGresult   *res = NULL;
2890 	bool success = true;
2891 	initPQExpBuffer(&query);
2892 
2893 	appendPQExpBufferStr(&query,
2894 						 "  SELECT " REPMGR_NODES_COLUMNS
2895 						 "    FROM repmgr.nodes n "
2896 						 "ORDER BY n.node_id ");
2897 
2898 	log_verbose(LOG_DEBUG, "get_all_node_records():\n%s", query.data);
2899 
2900 	res = PQexec(conn, query.data);
2901 
2902 	/* this will return an empty list if there was an error executing the query */
2903 	_populate_node_records(res, node_list);
2904 
2905 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
2906 	{
2907 		log_db_error(conn, query.data, _("get_all_node_records(): unable to execute query"));
2908 		success = false;
2909 	}
2910 
2911 	PQclear(res);
2912 	termPQExpBuffer(&query);
2913 
2914 	return success;
2915 }
2916 
2917 bool
get_all_nodes_count(PGconn * conn,int * count)2918 get_all_nodes_count(PGconn *conn, int *count)
2919 {
2920 	PQExpBufferData query;
2921 	PGresult   *res = NULL;
2922 	bool success = true;
2923 	initPQExpBuffer(&query);
2924 
2925 	appendPQExpBufferStr(&query,
2926 						 "  SELECT count(*) "
2927 						 "    FROM repmgr.nodes n ");
2928 
2929 	log_verbose(LOG_DEBUG, "get_all_nodes_count():\n%s", query.data);
2930 
2931 	res = PQexec(conn, query.data);
2932 
2933 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
2934 	{
2935 		log_db_error(conn, query.data, _("get_all_nodes_count(): unable to execute query"));
2936 		success = false;
2937 	}
2938 	else
2939 	{
2940 		*count = atoi(PQgetvalue(res, 0, 0));
2941 	}
2942 
2943 	PQclear(res);
2944 	termPQExpBuffer(&query);
2945 
2946 	return success;
2947 }
2948 
2949 void
get_downstream_node_records(PGconn * conn,int node_id,NodeInfoList * node_list)2950 get_downstream_node_records(PGconn *conn, int node_id, NodeInfoList *node_list)
2951 {
2952 	PQExpBufferData query;
2953 	PGresult   *res = NULL;
2954 
2955 	initPQExpBuffer(&query);
2956 
2957 	appendPQExpBuffer(&query,
2958 					  "  SELECT " REPMGR_NODES_COLUMNS
2959 					  "    FROM repmgr.nodes n "
2960 					  "   WHERE n.upstream_node_id = %i "
2961 					  "ORDER BY n.node_id ",
2962 					  node_id);
2963 
2964 	log_verbose(LOG_DEBUG, "get_downstream_node_records():\n%s", query.data);
2965 
2966 	res = PQexec(conn, query.data);
2967 
2968 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
2969 	{
2970 		log_db_error(conn, query.data, _("get_downstream_node_records(): unable to execute query"));
2971 	}
2972 
2973 	termPQExpBuffer(&query);
2974 
2975 	/* this will return an empty list if there was an error executing the query */
2976 	_populate_node_records(res, node_list);
2977 
2978 	PQclear(res);
2979 
2980 	return;
2981 }
2982 
2983 
2984 void
get_active_sibling_node_records(PGconn * conn,int node_id,int upstream_node_id,NodeInfoList * node_list)2985 get_active_sibling_node_records(PGconn *conn, int node_id, int upstream_node_id, NodeInfoList *node_list)
2986 {
2987 	PQExpBufferData query;
2988 	PGresult   *res = NULL;
2989 
2990 	initPQExpBuffer(&query);
2991 
2992 	appendPQExpBuffer(&query,
2993 					  "  SELECT " REPMGR_NODES_COLUMNS
2994 					  "    FROM repmgr.nodes n "
2995 					  "   WHERE n.upstream_node_id = %i "
2996 					  "     AND n.node_id != %i "
2997 					  "     AND n.active IS TRUE "
2998 					  "ORDER BY n.node_id ",
2999 					  upstream_node_id,
3000 					  node_id);
3001 
3002 	log_verbose(LOG_DEBUG, "get_active_sibling_node_records():\n%s", query.data);
3003 
3004 	res = PQexec(conn, query.data);
3005 
3006 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
3007 	{
3008 		log_db_error(conn, query.data, _("get_active_sibling_records(): unable to execute query"));
3009 	}
3010 
3011 	termPQExpBuffer(&query);
3012 
3013 	/* this will return an empty list if there was an error executing the query */
3014 	_populate_node_records(res, node_list);
3015 
3016 	PQclear(res);
3017 
3018 	return;
3019 }
3020 
3021 bool
get_child_nodes(PGconn * conn,int node_id,NodeInfoList * node_list)3022 get_child_nodes(PGconn *conn, int node_id, NodeInfoList *node_list)
3023 {
3024 	PQExpBufferData query;
3025 	PGresult   *res = NULL;
3026 	bool		success = true;
3027 
3028 	initPQExpBuffer(&query);
3029 
3030 	appendPQExpBuffer(&query,
3031 					  "    SELECT n.node_id, n.type, n.upstream_node_id, n.node_name, n.conninfo, n.repluser, "
3032 					  "           n.slot_name, n.location, n.priority, n.active, n.config_file, "
3033 					  "           '' AS upstream_node_name, "
3034 					  "           CASE WHEN sr.application_name IS NULL THEN FALSE ELSE TRUE END AS attached "
3035 					  "      FROM repmgr.nodes n "
3036 					  " LEFT JOIN pg_catalog.pg_stat_replication sr "
3037 					  "        ON sr.application_name = n.node_name "
3038 					  "     WHERE n.upstream_node_id = %i ",
3039 					  node_id);
3040 
3041 	log_verbose(LOG_DEBUG, "get_child_nodes():\n%s", query.data);
3042 
3043 	res = PQexec(conn, query.data);
3044 
3045 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
3046 	{
3047 		log_db_error(conn, query.data, _("get_active_sibling_records(): unable to execute query"));
3048 		success = false;
3049 	}
3050 
3051 	termPQExpBuffer(&query);
3052 
3053 	/* this will return an empty list if there was an error executing the query */
3054 	_populate_node_records(res, node_list);
3055 
3056 	PQclear(res);
3057 
3058 	return success;
3059 }
3060 
3061 
3062 void
get_node_records_by_priority(PGconn * conn,NodeInfoList * node_list)3063 get_node_records_by_priority(PGconn *conn, NodeInfoList *node_list)
3064 {
3065 	PQExpBufferData query;
3066 	PGresult   *res = NULL;
3067 
3068 	initPQExpBuffer(&query);
3069 
3070 	appendPQExpBufferStr(&query,
3071 						 "  SELECT " REPMGR_NODES_COLUMNS
3072 						 "    FROM repmgr.nodes n "
3073 						 "ORDER BY n.priority DESC, n.node_name ");
3074 
3075 	log_verbose(LOG_DEBUG, "get_node_records_by_priority():\n%s", query.data);
3076 
3077 	res = PQexec(conn, query.data);
3078 
3079 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
3080 	{
3081 		log_db_error(conn, query.data, _("get_node_records_by_priority(): unable to execute query"));
3082 	}
3083 
3084 	termPQExpBuffer(&query);
3085 
3086 	/* this will return an empty list if there was an error executing the query */
3087 	_populate_node_records(res, node_list);
3088 
3089 	PQclear(res);
3090 
3091 	return;
3092 }
3093 
3094 /*
3095  * return all node records together with their upstream's node name,
3096  * if available.
3097  */
3098 bool
get_all_node_records_with_upstream(PGconn * conn,NodeInfoList * node_list)3099 get_all_node_records_with_upstream(PGconn *conn, NodeInfoList *node_list)
3100 {
3101 	PQExpBufferData query;
3102 	PGresult   *res = NULL;
3103 	bool		success = true;
3104 
3105 	initPQExpBuffer(&query);
3106 
3107 	appendPQExpBufferStr(&query,
3108 						 "    SELECT " REPMGR_NODES_COLUMNS_WITH_UPSTREAM
3109 						 "      FROM repmgr.nodes n "
3110 						 " LEFT JOIN repmgr.nodes un "
3111 						 "        ON un.node_id = n.upstream_node_id"
3112 						 "  ORDER BY n.node_id ");
3113 
3114 	log_verbose(LOG_DEBUG, "get_all_node_records_with_upstream():\n%s", query.data);
3115 
3116 	res = PQexec(conn, query.data);
3117 
3118 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
3119 	{
3120 		log_db_error(conn, query.data, _("get_all_node_records_with_upstream(): unable to retrieve node records"));
3121 		success = false;
3122 	}
3123 
3124 
3125 	/* this will return an empty list if there was an error executing the query */
3126 	_populate_node_records(res, node_list);
3127 
3128 	termPQExpBuffer(&query);
3129 	PQclear(res);
3130 
3131 	return success;
3132 }
3133 
3134 
3135 
3136 bool
get_downstream_nodes_with_missing_slot(PGconn * conn,int this_node_id,NodeInfoList * node_list)3137 get_downstream_nodes_with_missing_slot(PGconn *conn, int this_node_id, NodeInfoList *node_list)
3138 {
3139 	PQExpBufferData query;
3140 	PGresult   *res = NULL;
3141 	bool		success = true;
3142 
3143 	initPQExpBuffer(&query);
3144 
3145 	appendPQExpBuffer(&query,
3146 					  "   SELECT " REPMGR_NODES_COLUMNS
3147 					  "     FROM repmgr.nodes n "
3148 					  "LEFT JOIN pg_catalog.pg_replication_slots rs "
3149 					  "       ON rs.slot_name = n.slot_name "
3150 					  "    WHERE n.slot_name IS NOT NULL"
3151 					  "      AND rs.slot_name IS NULL "
3152 					  "      AND n.upstream_node_id = %i "
3153 					  "      AND n.type = 'standby'",
3154 					  this_node_id);
3155 
3156 	log_verbose(LOG_DEBUG, "get_all_node_records_with_missing_slot():\n%s", query.data);
3157 
3158 	res = PQexec(conn, query.data);
3159 
3160 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
3161 	{
3162 		log_db_error(conn, query.data, _("get_downstream_nodes_with_missing_slot(): unable to retrieve node records"));
3163 		success = false;
3164 	}
3165 
3166 	/* this will return an empty list if there was an error executing the query */
3167 	_populate_node_records(res, node_list);
3168 
3169 	termPQExpBuffer(&query);
3170 	PQclear(res);
3171 
3172 	return success;
3173 }
3174 
3175 bool
create_node_record(PGconn * conn,char * repmgr_action,t_node_info * node_info)3176 create_node_record(PGconn *conn, char *repmgr_action, t_node_info *node_info)
3177 {
3178 	if (repmgr_action != NULL)
3179 		log_verbose(LOG_DEBUG, "create_node_record(): action is \"%s\"", repmgr_action);
3180 
3181 	return _create_update_node_record(conn, "create", node_info);
3182 }
3183 
3184 
3185 bool
update_node_record(PGconn * conn,char * repmgr_action,t_node_info * node_info)3186 update_node_record(PGconn *conn, char *repmgr_action, t_node_info *node_info)
3187 {
3188 	if (repmgr_action != NULL)
3189 		log_verbose(LOG_DEBUG, "update_node_record(): action is \"%s\"", repmgr_action);
3190 
3191 	return _create_update_node_record(conn, "update", node_info);
3192 }
3193 
3194 
3195 static bool
_create_update_node_record(PGconn * conn,char * action,t_node_info * node_info)3196 _create_update_node_record(PGconn *conn, char *action, t_node_info *node_info)
3197 {
3198 	PQExpBufferData query;
3199 	char		node_id[MAXLEN] = "";
3200 	char		priority[MAXLEN] = "";
3201 
3202 	char		upstream_node_id[MAXLEN] = "";
3203 	char	   *upstream_node_id_ptr = NULL;
3204 
3205 	char	   *slot_name_ptr = NULL;
3206 
3207 	int			param_count = NODE_RECORD_PARAM_COUNT;
3208 	const char *param_values[NODE_RECORD_PARAM_COUNT];
3209 
3210 	PGresult   *res;
3211 	bool		success = true;
3212 
3213 	maxlen_snprintf(node_id, "%i", node_info->node_id);
3214 	maxlen_snprintf(priority, "%i", node_info->priority);
3215 
3216 	if (node_info->upstream_node_id == NO_UPSTREAM_NODE && node_info->type == STANDBY)
3217 	{
3218 		/*
3219 		 * No explicit upstream node id provided for standby - attempt to get
3220 		 * primary node id
3221 		 */
3222 		int			primary_node_id = get_primary_node_id(conn);
3223 
3224 		maxlen_snprintf(upstream_node_id, "%i", primary_node_id);
3225 		upstream_node_id_ptr = upstream_node_id;
3226 	}
3227 	else if (node_info->upstream_node_id != NO_UPSTREAM_NODE)
3228 	{
3229 		maxlen_snprintf(upstream_node_id, "%i", node_info->upstream_node_id);
3230 		upstream_node_id_ptr = upstream_node_id;
3231 	}
3232 
3233 	if (node_info->slot_name[0] != '\0')
3234 	{
3235 		slot_name_ptr = node_info->slot_name;
3236 	}
3237 
3238 
3239 	param_values[0] = get_node_type_string(node_info->type);
3240 	param_values[1] = upstream_node_id_ptr;
3241 	param_values[2] = node_info->node_name;
3242 	param_values[3] = node_info->conninfo;
3243 	param_values[4] = node_info->repluser;
3244 	param_values[5] = slot_name_ptr;
3245 	param_values[6] = node_info->location;
3246 	param_values[7] = priority;
3247 	param_values[8] = node_info->active == true ? "TRUE" : "FALSE";
3248 	param_values[9] = node_info->config_file;
3249 	param_values[10] = node_id;
3250 
3251 	initPQExpBuffer(&query);
3252 
3253 	if (strcmp(action, "create") == 0)
3254 	{
3255 		appendPQExpBufferStr(&query,
3256 							 "INSERT INTO repmgr.nodes "
3257 							 "       (node_id, type, upstream_node_id, "
3258 							 "        node_name, conninfo, repluser, slot_name, "
3259 							 "        location, priority, active, config_file) "
3260 							 "VALUES ($11, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ");
3261 	}
3262 	else
3263 	{
3264 		appendPQExpBufferStr(&query,
3265 							 "UPDATE repmgr.nodes SET "
3266 							 "       type = $1, "
3267 							 "       upstream_node_id = $2, "
3268 							 "       node_name = $3, "
3269 							 "       conninfo = $4, "
3270 							 "       repluser = $5, "
3271 							 "       slot_name = $6, "
3272 							 "       location = $7, "
3273 							 "       priority = $8, "
3274 							 "       active = $9, "
3275 							 "       config_file = $10 "
3276 							 " WHERE node_id = $11 ");
3277 	}
3278 
3279 	res = PQexecParams(conn,
3280 					   query.data,
3281 					   param_count,
3282 					   NULL,
3283 					   param_values,
3284 					   NULL,
3285 					   NULL,
3286 					   0);
3287 
3288 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
3289 	{
3290 		log_db_error(conn, query.data,
3291 					 _("_create_update_node_record(): unable to %s node record for node \"%s\" (ID: %i)"),
3292 					 action,
3293 					 node_info->node_name,
3294 					 node_info->node_id);
3295 
3296 		success = false;
3297 	}
3298 
3299 	termPQExpBuffer(&query);
3300 	PQclear(res);
3301 
3302 	return success;
3303 }
3304 
3305 
3306 bool
update_node_record_set_active(PGconn * conn,int this_node_id,bool active)3307 update_node_record_set_active(PGconn *conn, int this_node_id, bool active)
3308 {
3309 	PQExpBufferData query;
3310 	PGresult   *res = NULL;
3311 	bool		success = true;
3312 
3313 	initPQExpBuffer(&query);
3314 
3315 	appendPQExpBuffer(&query,
3316 					  "UPDATE repmgr.nodes SET active = %s "
3317 					  " WHERE node_id = %i",
3318 					  active == true ? "TRUE" : "FALSE",
3319 					  this_node_id);
3320 
3321 	log_verbose(LOG_DEBUG, "update_node_record_set_active():\n  %s", query.data);
3322 
3323 	res = PQexec(conn, query.data);
3324 
3325 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
3326 	{
3327 		log_db_error(conn, query.data,
3328 					 _("update_node_record_set_active(): unable to update node record"));
3329 		success = false;
3330 	}
3331 
3332 	termPQExpBuffer(&query);
3333 	PQclear(res);
3334 
3335 	return success;
3336 }
3337 
3338 
3339 bool
update_node_record_set_active_standby(PGconn * conn,int this_node_id)3340 update_node_record_set_active_standby(PGconn *conn, int this_node_id)
3341 {
3342 	PQExpBufferData query;
3343 	PGresult   *res = NULL;
3344 	bool		success = true;
3345 
3346 	initPQExpBuffer(&query);
3347 
3348 	appendPQExpBuffer(&query,
3349 					  "UPDATE repmgr.nodes "
3350 					  "   SET type = 'standby', "
3351 					  "       active = TRUE "
3352 					  " WHERE node_id = %i",
3353 					  this_node_id);
3354 
3355 	log_verbose(LOG_DEBUG, "update_node_record_set_active_standby():\n  %s", query.data);
3356 
3357 	res = PQexec(conn, query.data);
3358 
3359 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
3360 	{
3361 		log_db_error(conn, query.data, _("update_node_record_set_active_standby(): unable to update node record"));
3362 		success = false;
3363 	}
3364 
3365 	termPQExpBuffer(&query);
3366 	PQclear(res);
3367 
3368 	return success;
3369 }
3370 
3371 
3372 bool
update_node_record_set_primary(PGconn * conn,int this_node_id)3373 update_node_record_set_primary(PGconn *conn, int this_node_id)
3374 {
3375 	PQExpBufferData query;
3376 	PGresult   *res = NULL;
3377 
3378 	log_debug(_("setting node %i as primary and marking existing primary as failed"),
3379 			  this_node_id);
3380 
3381 	begin_transaction(conn);
3382 
3383 	initPQExpBuffer(&query);
3384 
3385 	appendPQExpBuffer(&query,
3386 					  "  UPDATE repmgr.nodes "
3387 					  "     SET active = FALSE "
3388 					  "   WHERE type = 'primary' "
3389 					  "     AND active IS TRUE "
3390 					  "     AND node_id != %i ",
3391 					  this_node_id);
3392 
3393 	res = PQexec(conn, query.data);
3394 
3395 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
3396 	{
3397 		log_db_error(conn, query.data,
3398 					 _("update_node_record_set_primary(): unable to set old primary node as inactive"));
3399 
3400 		termPQExpBuffer(&query);
3401 		PQclear(res);
3402 
3403 		rollback_transaction(conn);
3404 
3405 		return false;
3406 	}
3407 
3408 	termPQExpBuffer(&query);
3409 	PQclear(res);
3410 
3411 	initPQExpBuffer(&query);
3412 
3413 	appendPQExpBuffer(&query,
3414 					  "  UPDATE repmgr.nodes"
3415 					  "     SET type = 'primary', "
3416 					  "         upstream_node_id = NULL, "
3417 					  "         active = TRUE "
3418 					  "   WHERE node_id = %i ",
3419 					  this_node_id);
3420 
3421 	res = PQexec(conn, query.data);
3422 
3423 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
3424 	{
3425 		log_db_error(conn, query.data,
3426 					 _("unable to set current node %i as active primary"),
3427 					 this_node_id);
3428 
3429 		termPQExpBuffer(&query);
3430 		PQclear(res);
3431 
3432 		rollback_transaction(conn);
3433 
3434 		return false;
3435 	}
3436 
3437 	termPQExpBuffer(&query);
3438 	PQclear(res);
3439 
3440 	return commit_transaction(conn);
3441 }
3442 
3443 
3444 bool
update_node_record_set_upstream(PGconn * conn,int this_node_id,int new_upstream_node_id)3445 update_node_record_set_upstream(PGconn *conn, int this_node_id, int new_upstream_node_id)
3446 {
3447 	PQExpBufferData query;
3448 	PGresult   *res = NULL;
3449 	bool		success = true;
3450 
3451 	log_debug(_("update_node_record_set_upstream(): Updating node %i's upstream node to %i"),
3452 			  this_node_id, new_upstream_node_id);
3453 
3454 	initPQExpBuffer(&query);
3455 
3456 	appendPQExpBuffer(&query,
3457 					  "  UPDATE repmgr.nodes "
3458 					  "     SET upstream_node_id = %i "
3459 					  "   WHERE node_id = %i ",
3460 					  new_upstream_node_id,
3461 					  this_node_id);
3462 
3463 	log_verbose(LOG_DEBUG, "update_node_record_set_upstream():\n%s", query.data);
3464 
3465 	res = PQexec(conn, query.data);
3466 
3467 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
3468 	{
3469 		log_db_error(conn, query.data, _("update_node_record_set_upstream(): unable to set new upstream node id"));
3470 
3471 		success = false;
3472 	}
3473 
3474 	termPQExpBuffer(&query);
3475 	PQclear(res);
3476 
3477 	return success;
3478 }
3479 
3480 
3481 /*
3482  * Update node record following change of status
3483  * (e.g. inactive primary converted to standby)
3484  */
3485 bool
update_node_record_status(PGconn * conn,int this_node_id,char * type,int upstream_node_id,bool active)3486 update_node_record_status(PGconn *conn, int this_node_id, char *type, int upstream_node_id, bool active)
3487 {
3488 	PQExpBufferData query;
3489 	PGresult   *res = NULL;
3490 	bool		success = true;
3491 
3492 	initPQExpBuffer(&query);
3493 
3494 	appendPQExpBuffer(&query,
3495 					  "  UPDATE repmgr.nodes "
3496 					  "     SET type = '%s', "
3497 					  "         upstream_node_id = %i, "
3498 					  "         active = %s "
3499 					  "   WHERE node_id = %i ",
3500 					  type,
3501 					  upstream_node_id,
3502 					  active ? "TRUE" : "FALSE",
3503 					  this_node_id);
3504 
3505 	log_verbose(LOG_DEBUG, "update_node_record_status():\n  %s", query.data);
3506 
3507 	res = PQexec(conn, query.data);
3508 
3509 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
3510 	{
3511 		log_db_error(conn, query.data,
3512 					 _("update_node_record_status(): unable to update node record status for node %i"),
3513 					 this_node_id);
3514 
3515 		success = false;
3516 	}
3517 
3518 	termPQExpBuffer(&query);
3519 	PQclear(res);
3520 
3521 	return success;
3522 }
3523 
3524 
3525 /*
3526  * Update node record's "conninfo" and "priority" fields. Called by repmgrd
3527  * following a configuration file reload.
3528  */
3529 bool
update_node_record_conn_priority(PGconn * conn,t_configuration_options * options)3530 update_node_record_conn_priority(PGconn *conn, t_configuration_options *options)
3531 {
3532 	PQExpBufferData query;
3533 	PGresult   *res = NULL;
3534 	bool		success = true;
3535 
3536 	initPQExpBuffer(&query);
3537 
3538 	appendPQExpBuffer(&query,
3539 					  "UPDATE repmgr.nodes "
3540 					  "   SET conninfo = '%s', "
3541 					  "       priority = %d "
3542 					  " WHERE node_id = %d ",
3543 					  options->conninfo,
3544 					  options->priority,
3545 					  options->node_id);
3546 
3547 	res = PQexec(conn, query.data);
3548 
3549 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
3550 	{
3551 		log_db_error(conn, query.data, _("update_node_record_conn_priority(): unable to execute query"));
3552 		success = false;
3553 	}
3554 
3555 	termPQExpBuffer(&query);
3556 
3557 	PQclear(res);
3558 
3559 	return success;
3560 }
3561 
3562 
3563 /*
3564  * Copy node records from primary to witness servers.
3565  *
3566  * This is used when initially registering a witness server, and
3567  * by repmgrd to update the node records when required.
3568  */
3569 
3570 bool
witness_copy_node_records(PGconn * primary_conn,PGconn * witness_conn)3571 witness_copy_node_records(PGconn *primary_conn, PGconn *witness_conn)
3572 {
3573 	PGresult   *res = NULL;
3574 	NodeInfoList nodes = T_NODE_INFO_LIST_INITIALIZER;
3575 	NodeInfoListCell *cell = NULL;
3576 
3577 	begin_transaction(witness_conn);
3578 
3579 	/* Defer constraints */
3580 
3581 	res = PQexec(witness_conn, "SET CONSTRAINTS ALL DEFERRED");
3582 
3583 	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3584 	{
3585 		log_db_error(witness_conn, NULL, ("witness_copy_node_records(): unable to defer constraints"));
3586 
3587 		rollback_transaction(witness_conn);
3588 		PQclear(res);
3589 
3590 		return false;
3591 	}
3592 
3593 	PQclear(res);
3594 
3595 	/* truncate existing records */
3596 
3597 	if (truncate_node_records(witness_conn) == false)
3598 	{
3599 		rollback_transaction(witness_conn);
3600 
3601 		return false;
3602 	}
3603 
3604 	if (get_all_node_records(primary_conn, &nodes) == false)
3605 	{
3606 		rollback_transaction(witness_conn);
3607 
3608 		return false;
3609 	}
3610 
3611 	for (cell = nodes.head; cell; cell = cell->next)
3612 	{
3613 		if (create_node_record(witness_conn, NULL, cell->node_info) == false)
3614 		{
3615 			rollback_transaction(witness_conn);
3616 
3617 			return false;
3618 		}
3619 	}
3620 
3621 	/* and done */
3622 	commit_transaction(witness_conn);
3623 
3624 	clear_node_info_list(&nodes);
3625 
3626 	return true;
3627 }
3628 
3629 
3630 bool
delete_node_record(PGconn * conn,int node)3631 delete_node_record(PGconn *conn, int node)
3632 {
3633 	PQExpBufferData query;
3634 	PGresult   *res = NULL;
3635 	bool		success = true;
3636 
3637 	initPQExpBuffer(&query);
3638 
3639 	appendPQExpBuffer(&query,
3640 					  "DELETE FROM repmgr.nodes "
3641 					  " WHERE node_id = %i",
3642 					  node);
3643 
3644 	log_verbose(LOG_DEBUG, "delete_node_record():\n  %s", query.data);
3645 
3646 	res = PQexec(conn, query.data);
3647 
3648 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
3649 	{
3650 		log_db_error(conn, query.data, _("delete_node_record(): unable to delete node record"));
3651 
3652 		success = false;
3653 	}
3654 
3655 	termPQExpBuffer(&query);
3656 	PQclear(res);
3657 
3658 	return success;
3659 }
3660 
3661 bool
truncate_node_records(PGconn * conn)3662 truncate_node_records(PGconn *conn)
3663 {
3664 	PGresult   *res = NULL;
3665 	bool		success = true;
3666 
3667 	res = PQexec(conn, "TRUNCATE TABLE repmgr.nodes");
3668 
3669 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
3670 	{
3671 		log_db_error(conn, NULL, _("truncate_node_records(): unable to truncate table \"repmgr.nodes\""));
3672 
3673 		success = false;
3674 	}
3675 
3676 	PQclear(res);
3677 
3678 	return success;
3679 }
3680 
3681 
3682 bool
update_node_record_slot_name(PGconn * primary_conn,int node_id,char * slot_name)3683 update_node_record_slot_name(PGconn *primary_conn, int node_id, char *slot_name)
3684 {
3685 	PQExpBufferData query;
3686 	PGresult   *res = NULL;
3687 	bool		success = true;
3688 
3689 	initPQExpBuffer(&query);
3690 
3691 	appendPQExpBuffer(&query,
3692 					  " UPDATE repmgr.nodes "
3693 					  "    SET slot_name = '%s' "
3694 					  "  WHERE node_id = %i ",
3695 					  slot_name,
3696 					  node_id);
3697 
3698 	res = PQexec(primary_conn, query.data);
3699 
3700 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
3701 	{
3702 		log_db_error(primary_conn, query.data, _("unable to set node record slot name"));
3703 
3704 		success = false;
3705 	}
3706 
3707 	termPQExpBuffer(&query);
3708 	PQclear(res);
3709 
3710 	return success;
3711 }
3712 
3713 
3714 
3715 
3716 void
clear_node_info_list(NodeInfoList * nodes)3717 clear_node_info_list(NodeInfoList *nodes)
3718 {
3719 	NodeInfoListCell *cell = NULL;
3720 	NodeInfoListCell *next_cell = NULL;
3721 
3722 	log_verbose(LOG_DEBUG, "clear_node_info_list() - closing open connections");
3723 
3724 	/* close any open connections */
3725 	for (cell = nodes->head; cell; cell = cell->next)
3726 	{
3727 
3728 		if (PQstatus(cell->node_info->conn) == CONNECTION_OK)
3729 		{
3730 			PQfinish(cell->node_info->conn);
3731 			cell->node_info->conn = NULL;
3732 		}
3733 	}
3734 
3735 	log_verbose(LOG_DEBUG, "clear_node_info_list() - unlinking");
3736 
3737 	cell = nodes->head;
3738 
3739 	while (cell != NULL)
3740 	{
3741 		next_cell = cell->next;
3742 
3743 		if (cell->node_info->replication_info != NULL)
3744 			pfree(cell->node_info->replication_info);
3745 
3746 		pfree(cell->node_info);
3747 		pfree(cell);
3748 		cell = next_cell;
3749 	}
3750 
3751 	nodes->head = NULL;
3752 	nodes->tail = NULL;
3753 	nodes->node_count = 0;
3754 }
3755 
3756 
3757 /* ================================================ */
3758 /* PostgreSQL configuration file location functions */
3759 /* ================================================ */
3760 
3761 bool
get_datadir_configuration_files(PGconn * conn,KeyValueList * list)3762 get_datadir_configuration_files(PGconn *conn, KeyValueList *list)
3763 {
3764 	PQExpBufferData query;
3765 	PGresult   *res = NULL;
3766 	int			i;
3767 	bool		success = true;
3768 
3769 	initPQExpBuffer(&query);
3770 
3771 	appendPQExpBufferStr(&query,
3772 						 "WITH files AS ( "
3773 						 "  WITH dd AS ( "
3774 						 "   SELECT setting "
3775 						 "     FROM pg_catalog.pg_settings "
3776 						 "    WHERE name = 'data_directory') "
3777 						 "   SELECT distinct(sourcefile) AS config_file"
3778 						 "     FROM dd, pg_catalog.pg_settings ps "
3779 						 "    WHERE ps.sourcefile IS NOT NULL "
3780 						 "      AND ps.sourcefile ~ ('^' || dd.setting) "
3781 						 "       UNION "
3782 						 "   SELECT ps.setting  AS config_file"
3783 						 "     FROM dd, pg_catalog.pg_settings ps "
3784 						 "    WHERE ps.name IN ('config_file', 'hba_file', 'ident_file') "
3785 						 "      AND ps.setting ~ ('^' || dd.setting) "
3786 						 ") "
3787 						 "  SELECT config_file, "
3788 						 "         pg_catalog.regexp_replace(config_file, '^.*\\/','') AS filename "
3789 						 "    FROM files "
3790 						 "ORDER BY config_file");
3791 
3792 	res = PQexec(conn, query.data);
3793 
3794 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
3795 	{
3796 		log_db_error(conn, query.data,
3797 					 _("get_datadir_configuration_files(): unable to retrieve configuration file information"));
3798 
3799 		success = false;
3800 	}
3801 	else
3802 	{
3803 		for (i = 0; i < PQntuples(res); i++)
3804 		{
3805 			key_value_list_set(list,
3806 							   PQgetvalue(res, i, 1),
3807 							   PQgetvalue(res, i, 0));
3808 		}
3809 	}
3810 
3811 	termPQExpBuffer(&query);
3812 	PQclear(res);
3813 
3814 	return success;
3815 }
3816 
3817 
3818 bool
get_configuration_file_locations(PGconn * conn,t_configfile_list * list)3819 get_configuration_file_locations(PGconn *conn, t_configfile_list *list)
3820 {
3821 	PQExpBufferData query;
3822 	PGresult   *res = NULL;
3823 	int			i;
3824 
3825 	initPQExpBuffer(&query);
3826 
3827 	appendPQExpBufferStr(&query,
3828 						 "  WITH dd AS ( "
3829 						 "    SELECT setting AS data_directory"
3830 						 "      FROM pg_catalog.pg_settings "
3831 						 "     WHERE name = 'data_directory' "
3832 						 "  ) "
3833 						 "    SELECT DISTINCT(sourcefile), "
3834 						 "           pg_catalog.regexp_replace(sourcefile, '^.*\\/', '') AS filename, "
3835 						 "           sourcefile ~ ('^' || dd.data_directory) AS in_data_dir "
3836 						 "      FROM dd, pg_catalog.pg_settings ps "
3837 						 "     WHERE sourcefile IS NOT NULL "
3838 						 "  ORDER BY 1 ");
3839 
3840 	log_verbose(LOG_DEBUG, "get_configuration_file_locations():\n  %s",
3841 				query.data);
3842 
3843 	res = PQexec(conn, query.data);
3844 
3845 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
3846 	{
3847 		log_db_error(conn, query.data,
3848 					 _("get_configuration_file_locations(): unable to retrieve configuration file locations"));
3849 
3850 		termPQExpBuffer(&query);
3851 		PQclear(res);
3852 
3853 		return false;
3854 	}
3855 
3856 	/*
3857 	 * allocate memory for config file array - number of rows returned from
3858 	 * above query + 2 for pg_hba.conf, pg_ident.conf
3859 	 */
3860 
3861 	config_file_list_init(list, PQntuples(res) + 2);
3862 
3863 	for (i = 0; i < PQntuples(res); i++)
3864 	{
3865 		config_file_list_add(list,
3866 							 PQgetvalue(res, i, 0),
3867 							 PQgetvalue(res, i, 1),
3868 							 atobool(PQgetvalue(res, i, 2)));
3869 	}
3870 
3871 	termPQExpBuffer(&query);
3872 	PQclear(res);
3873 
3874 	/* Fetch locations of pg_hba.conf and pg_ident.conf */
3875 	initPQExpBuffer(&query);
3876 
3877 	appendPQExpBufferStr(&query,
3878 						 "  WITH dd AS ( "
3879 						 "    SELECT setting AS data_directory"
3880 						 "      FROM pg_catalog.pg_settings "
3881 						 "     WHERE name = 'data_directory' "
3882 						 "  ) "
3883 						 "    SELECT ps.setting, "
3884 						 "           pg_catalog.regexp_replace(setting, '^.*\\/', '') AS filename, "
3885 						 "           ps.setting ~ ('^' || dd.data_directory) AS in_data_dir "
3886 						 "      FROM dd, pg_catalog.pg_settings ps "
3887 						 "     WHERE ps.name IN ('hba_file', 'ident_file') "
3888 						 "  ORDER BY 1 ");
3889 
3890 	log_verbose(LOG_DEBUG, "get_configuration_file_locations():\n  %s",
3891 				query.data);
3892 
3893 	res = PQexec(conn, query.data);
3894 
3895 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
3896 	{
3897 		log_db_error(conn, query.data,
3898 					 _("get_configuration_file_locations(): unable to retrieve configuration file locations"));
3899 
3900 		termPQExpBuffer(&query);
3901 		PQclear(res);
3902 
3903 		return false;
3904 	}
3905 
3906 	for (i = 0; i < PQntuples(res); i++)
3907 	{
3908 		config_file_list_add(list,
3909 							 PQgetvalue(res, i, 0),
3910 							 PQgetvalue(res, i, 1),
3911 							 atobool(PQgetvalue(res, i, 2)));
3912 	}
3913 
3914 	termPQExpBuffer(&query);
3915 	PQclear(res);
3916 
3917 	return true;
3918 }
3919 
3920 
3921 void
config_file_list_init(t_configfile_list * list,int max_size)3922 config_file_list_init(t_configfile_list *list, int max_size)
3923 {
3924 	list->size = max_size;
3925 	list->entries = 0;
3926 	list->files = pg_malloc0(sizeof(t_configfile_info *) * max_size);
3927 
3928 	if (list->files == NULL)
3929 	{
3930 		log_error(_("config_file_list_init(): unable to allocate memory; terminating"));
3931 		exit(ERR_OUT_OF_MEMORY);
3932 	}
3933 }
3934 
3935 
3936 void
config_file_list_add(t_configfile_list * list,const char * file,const char * filename,bool in_data_dir)3937 config_file_list_add(t_configfile_list *list, const char *file, const char *filename, bool in_data_dir)
3938 {
3939 	/* Failsafe to prevent entries being added beyond the end */
3940 	if (list->entries == list->size)
3941 		return;
3942 
3943 	list->files[list->entries] = pg_malloc0(sizeof(t_configfile_info));
3944 
3945 	if (list->files[list->entries] == NULL)
3946 	{
3947 		log_error(_("config_file_list_add(): unable to allocate memory; terminating"));
3948 		exit(ERR_OUT_OF_MEMORY);
3949 	}
3950 
3951 
3952 	snprintf(list->files[list->entries]->filepath,
3953 			 sizeof(list->files[list->entries]->filepath),
3954 			 "%s", file);
3955 	canonicalize_path(list->files[list->entries]->filepath);
3956 
3957 	snprintf(list->files[list->entries]->filename,
3958 			 sizeof(list->files[list->entries]->filename),
3959 			 "%s", filename);
3960 
3961 	list->files[list->entries]->in_data_directory = in_data_dir;
3962 
3963 	list->entries++;
3964 }
3965 
3966 
3967 /* ====================== */
3968 /* event record functions */
3969 /* ====================== */
3970 
3971 
3972 /*
3973  * create_event_record()
3974  *
3975  * Create a record in the "events" table, but don't execute the
3976  * "event_notification_command".
3977  */
3978 
3979 bool
create_event_record(PGconn * conn,t_configuration_options * options,int node_id,char * event,bool successful,char * details)3980 create_event_record(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details)
3981 {
3982 	/* create dummy t_event_info */
3983 	t_event_info event_info = T_EVENT_INFO_INITIALIZER;
3984 
3985 	return _create_event(conn, options, node_id, event, successful, details, &event_info, false);
3986 }
3987 
3988 
3989 /*
3990  * create_event_notification()
3991  *
3992  * If `conn` is not NULL, insert a record into the events table.
3993  *
3994  * If configuration parameter "event_notification_command" is set, also
3995  * attempt to execute that command.
3996  *
3997  * Returns true if all operations succeeded, false if one or more failed.
3998  *
3999  * Note this function may be called with "conn" set to NULL in cases where
4000  * the primary node is not available and it's therefore not possible to write
4001  * an event record. In this case, if `event_notification_command` is set, a
4002  * user-defined notification to be generated; if not, this function will have
4003  * no effect.
4004  */
4005 bool
create_event_notification(PGconn * conn,t_configuration_options * options,int node_id,char * event,bool successful,char * details)4006 create_event_notification(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details)
4007 {
4008 	/* create dummy t_event_info */
4009 	t_event_info event_info = T_EVENT_INFO_INITIALIZER;
4010 
4011 	return _create_event(conn, options, node_id, event, successful, details, &event_info, true);
4012 }
4013 
4014 
4015 /*
4016  * create_event_notification_extended()
4017  *
4018  * The caller may need to pass additional parameters to the event notification
4019  * command (currently only the conninfo string of another node)
4020 
4021  */
4022 bool
create_event_notification_extended(PGconn * conn,t_configuration_options * options,int node_id,char * event,bool successful,char * details,t_event_info * event_info)4023 create_event_notification_extended(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details, t_event_info *event_info)
4024 {
4025 	return _create_event(conn, options, node_id, event, successful, details, event_info, true);
4026 }
4027 
4028 
4029 static bool
_create_event(PGconn * conn,t_configuration_options * options,int node_id,char * event,bool successful,char * details,t_event_info * event_info,bool send_notification)4030 _create_event(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details, t_event_info *event_info, bool send_notification)
4031 {
4032 	PQExpBufferData query;
4033 	PGresult   *res = NULL;
4034 	char		event_timestamp[MAXLEN] = "";
4035 	bool		success = true;
4036 
4037 	log_verbose(LOG_DEBUG, "_create_event(): event is \"%s\" for node %i", event, node_id);
4038 
4039 	/*
4040 	 * Only attempt to write a record if a connection handle was provided,
4041 	 * and the connection handle points to a node which is not in recovery.
4042 	 */
4043 	if (conn != NULL && PQstatus(conn) == CONNECTION_OK && get_recovery_type(conn) == RECTYPE_PRIMARY)
4044 	{
4045 		int			n_node_id = htonl(node_id);
4046 		char	   *t_successful = successful ? "TRUE" : "FALSE";
4047 
4048 		const char *values[4] = {(char *) &n_node_id,
4049 			event,
4050 			t_successful,
4051 			details
4052 		};
4053 
4054 		int			lengths[4] = {sizeof(n_node_id),
4055 			0,
4056 			0,
4057 			0
4058 		};
4059 
4060 		int			binary[4] = {1, 0, 0, 0};
4061 
4062 		initPQExpBuffer(&query);
4063 		appendPQExpBufferStr(&query,
4064 							 " INSERT INTO repmgr.events ( "
4065 							 "             node_id, "
4066 							 "             event, "
4067 							 "             successful, "
4068 							 "             details "
4069 							 "            ) "
4070 							 "      VALUES ($1, $2, $3, $4) "
4071 							 "   RETURNING event_timestamp ");
4072 
4073 		log_verbose(LOG_DEBUG, "_create_event():\n  %s", query.data);
4074 
4075 		res = PQexecParams(conn,
4076 						   query.data,
4077 						   4,
4078 						   NULL,
4079 						   values,
4080 						   lengths,
4081 						   binary,
4082 						   0);
4083 
4084 
4085 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
4086 		{
4087 			/* we don't treat this as a fatal error */
4088 			log_warning(_("unable to create event record"));
4089 			log_detail("%s", PQerrorMessage(conn));
4090 			log_detail("%s", query.data);
4091 
4092 			success = false;
4093 		}
4094 		else
4095 		{
4096 			/* Store timestamp to send to the notification command */
4097 			snprintf(event_timestamp, MAXLEN, "%s", PQgetvalue(res, 0, 0));
4098 		}
4099 
4100 		termPQExpBuffer(&query);
4101 		PQclear(res);
4102 	}
4103 
4104 	/*
4105 	 * If no database connection provided, or the query failed, generate a
4106 	 * current timestamp ourselves. This isn't quite the same format as
4107 	 * PostgreSQL, but is close enough for diagnostic use.
4108 	 */
4109 	if (!strlen(event_timestamp))
4110 	{
4111 		time_t		now;
4112 		struct tm	ts;
4113 
4114 		time(&now);
4115 		ts = *localtime(&now);
4116 		strftime(event_timestamp, MAXLEN, "%Y-%m-%d %H:%M:%S%z", &ts);
4117 	}
4118 
4119 	log_verbose(LOG_DEBUG, "_create_event(): Event timestamp is \"%s\"", event_timestamp);
4120 
4121 	/* an event notification command was provided - parse and execute it */
4122 	if (send_notification == true && strlen(options->event_notification_command))
4123 	{
4124 		char		parsed_command[MAXPGPATH] = "";
4125 		const char *src_ptr = NULL;
4126 		char	   *dst_ptr = NULL;
4127 		char	   *end_ptr = NULL;
4128 		int			r = 0;
4129 
4130 		log_verbose(LOG_DEBUG, "_create_event(): command is '%s'", options->event_notification_command);
4131 		/*
4132 		 * If configuration option 'event_notifications' was provided, check
4133 		 * if this event is one of the ones listed; if not listed, don't
4134 		 * execute the notification script.
4135 		 *
4136 		 * (If 'event_notifications' was not provided, we assume the script
4137 		 * should be executed for all events).
4138 		 */
4139 		if (options->event_notifications.head != NULL)
4140 		{
4141 			EventNotificationListCell *cell = NULL;
4142 			bool		notify_ok = false;
4143 
4144 			for (cell = options->event_notifications.head; cell; cell = cell->next)
4145 			{
4146 				if (strcmp(event, cell->event_type) == 0)
4147 				{
4148 					notify_ok = true;
4149 					break;
4150 				}
4151 			}
4152 
4153 			/*
4154 			 * Event type not found in the 'event_notifications' list - return
4155 			 * early
4156 			 */
4157 			if (notify_ok == false)
4158 			{
4159 				log_debug(_("not executing notification script for event type \"%s\""), event);
4160 				return success;
4161 			}
4162 		}
4163 
4164 		dst_ptr = parsed_command;
4165 		end_ptr = parsed_command + MAXPGPATH - 1;
4166 		*end_ptr = '\0';
4167 
4168 		for (src_ptr = options->event_notification_command; *src_ptr; src_ptr++)
4169 		{
4170 			if (*src_ptr == '%')
4171 			{
4172 				switch (src_ptr[1])
4173 				{
4174 					case '%':
4175 						/* %%: replace with % */
4176 						if (dst_ptr < end_ptr)
4177 						{
4178 							src_ptr++;
4179 							*dst_ptr++ = *src_ptr;
4180 						}
4181 						break;
4182 					case 'n':
4183 						/* %n: node id */
4184 						src_ptr++;
4185 						snprintf(dst_ptr, end_ptr - dst_ptr, "%i", node_id);
4186 						dst_ptr += strlen(dst_ptr);
4187 						break;
4188 					case 'a':
4189 						/* %a: node name */
4190 						src_ptr++;
4191 						if (event_info->node_name != NULL)
4192 						{
4193 							log_verbose(LOG_DEBUG, "node_name: %s", event_info->node_name);
4194 							strlcpy(dst_ptr, event_info->node_name, end_ptr - dst_ptr);
4195 							dst_ptr += strlen(dst_ptr);
4196 						}
4197 						break;
4198 					case 'e':
4199 						/* %e: event type */
4200 						src_ptr++;
4201 						strlcpy(dst_ptr, event, end_ptr - dst_ptr);
4202 						dst_ptr += strlen(dst_ptr);
4203 						break;
4204 					case 'd':
4205 						/* %d: details */
4206 						src_ptr++;
4207 						if (details != NULL)
4208 						{
4209 							PQExpBufferData details_escaped;
4210 							initPQExpBuffer(&details_escaped);
4211 
4212 							escape_double_quotes(details, &details_escaped);
4213 
4214 							strlcpy(dst_ptr, details_escaped.data, end_ptr - dst_ptr);
4215 							dst_ptr += strlen(dst_ptr);
4216 							termPQExpBuffer(&details_escaped);
4217 						}
4218 						break;
4219 					case 's':
4220 						/* %s: successful */
4221 						src_ptr++;
4222 						strlcpy(dst_ptr, successful ? "1" : "0", end_ptr - dst_ptr);
4223 						dst_ptr += strlen(dst_ptr);
4224 						break;
4225 					case 't':
4226 						/* %t: timestamp */
4227 						src_ptr++;
4228 						strlcpy(dst_ptr, event_timestamp, end_ptr - dst_ptr);
4229 						dst_ptr += strlen(dst_ptr);
4230 						break;
4231 					case 'c':
4232 						/* %c: conninfo for next available node */
4233 						src_ptr++;
4234 						if (event_info->conninfo_str != NULL)
4235 						{
4236 							log_debug("conninfo: %s", event_info->conninfo_str);
4237 
4238 							strlcpy(dst_ptr, event_info->conninfo_str, end_ptr - dst_ptr);
4239 							dst_ptr += strlen(dst_ptr);
4240 						}
4241 						break;
4242 					case 'p':
4243 						/* %p: primary id ("standby_switchover": former primary id) */
4244 						src_ptr++;
4245 						if (event_info->node_id != UNKNOWN_NODE_ID)
4246 						{
4247 							PQExpBufferData node_id;
4248 							initPQExpBuffer(&node_id);
4249 							appendPQExpBuffer(&node_id,
4250 											  "%i", event_info->node_id);
4251 							strlcpy(dst_ptr, node_id.data, end_ptr - dst_ptr);
4252 							dst_ptr += strlen(dst_ptr);
4253 							termPQExpBuffer(&node_id);
4254 						}
4255 						break;
4256 					default:
4257 						/* otherwise treat the % as not special */
4258 						if (dst_ptr < end_ptr)
4259 							*dst_ptr++ = *src_ptr;
4260 						break;
4261 				}
4262 			}
4263 			else
4264 			{
4265 				if (dst_ptr < end_ptr)
4266 					*dst_ptr++ = *src_ptr;
4267 			}
4268 		}
4269 
4270 		*dst_ptr = '\0';
4271 
4272 		log_info(_("executing notification command for event \"%s\""),
4273 				 event);
4274 
4275 		log_detail(_("command is:\n  %s"), parsed_command);
4276 		r = system(parsed_command);
4277 		if (r != 0)
4278 		{
4279 			log_warning(_("unable to execute event notification command"));
4280 			log_detail(_("parsed event notification command was:\n  %s"), parsed_command);
4281 			success = false;
4282 		}
4283 	}
4284 
4285 	return success;
4286 }
4287 
4288 
4289 PGresult *
get_event_records(PGconn * conn,int node_id,const char * node_name,const char * event,bool all,int limit)4290 get_event_records(PGconn *conn, int node_id, const char *node_name, const char *event, bool all, int limit)
4291 {
4292 	PGresult   *res;
4293 
4294 	PQExpBufferData query;
4295 	PQExpBufferData where_clause;
4296 
4297 
4298 	initPQExpBuffer(&query);
4299 	initPQExpBuffer(&where_clause);
4300 
4301 	/* LEFT JOIN used here as a node record may have been removed */
4302 	appendPQExpBufferStr(&query,
4303 						 "   SELECT e.node_id, n.node_name, e.event, e.successful, "
4304 						 "          pg_catalog.to_char(e.event_timestamp, 'YYYY-MM-DD HH24:MI:SS') AS timestamp, "
4305 						 "          e.details "
4306 						 "     FROM repmgr.events e "
4307 						 "LEFT JOIN repmgr.nodes n ON e.node_id = n.node_id ");
4308 
4309 	if (node_id != UNKNOWN_NODE_ID)
4310 	{
4311 		append_where_clause(&where_clause,
4312 							"n.node_id=%i", node_id);
4313 	}
4314 	else if (node_name[0] != '\0')
4315 	{
4316 		char	   *escaped = escape_string(conn, node_name);
4317 
4318 		if (escaped == NULL)
4319 		{
4320 			log_error(_("unable to escape value provided for node name"));
4321 			log_detail(_("node name is: \"%s\""), node_name);
4322 		}
4323 		else
4324 		{
4325 			append_where_clause(&where_clause,
4326 								"n.node_name='%s'",
4327 								escaped);
4328 			pfree(escaped);
4329 		}
4330 	}
4331 
4332 	if (event[0] != '\0')
4333 	{
4334 		char	   *escaped = escape_string(conn, event);
4335 
4336 		if (escaped == NULL)
4337 		{
4338 			log_error(_("unable to escape value provided for event"));
4339 			log_detail(_("event is: \"%s\""), event);
4340 		}
4341 		else
4342 		{
4343 			append_where_clause(&where_clause,
4344 								"e.event='%s'",
4345 								escaped);
4346 			pfree(escaped);
4347 		}
4348 	}
4349 
4350 	appendPQExpBuffer(&query, "\n%s\n",
4351 					  where_clause.data);
4352 
4353 	appendPQExpBufferStr(&query,
4354 						 " ORDER BY e.event_timestamp DESC");
4355 
4356 	if (all == false && limit > 0)
4357 	{
4358 		appendPQExpBuffer(&query, " LIMIT %i",
4359 						  limit);
4360 	}
4361 
4362 	log_debug("do_cluster_event():\n%s", query.data);
4363 	res = PQexec(conn, query.data);
4364 
4365 	termPQExpBuffer(&query);
4366 	termPQExpBuffer(&where_clause);
4367 
4368 	return res;
4369 }
4370 
4371 
4372 /* ========================== */
4373 /* replication slot functions */
4374 /* ========================== */
4375 
4376 
4377 void
create_slot_name(char * slot_name,int node_id)4378 create_slot_name(char *slot_name, int node_id)
4379 {
4380 	maxlen_snprintf(slot_name, "repmgr_slot_%i", node_id);
4381 }
4382 
4383 
4384 static ReplSlotStatus
_verify_replication_slot(PGconn * conn,char * slot_name,PQExpBufferData * error_msg)4385 _verify_replication_slot(PGconn *conn, char *slot_name, PQExpBufferData *error_msg)
4386 {
4387 	RecordStatus record_status = RECORD_NOT_FOUND;
4388 	t_replication_slot slot_info = T_REPLICATION_SLOT_INITIALIZER;
4389 
4390 	/*
4391 	 * Check whether slot exists already; if it exists and is active, that
4392 	 * means another active standby is using it, which creates an error
4393 	 * situation; if not we can reuse it as-is.
4394 	 */
4395 	record_status = get_slot_record(conn, slot_name, &slot_info);
4396 
4397 	if (record_status == RECORD_FOUND)
4398 	{
4399 		if (strcmp(slot_info.slot_type, "physical") != 0)
4400 		{
4401 			if (error_msg)
4402 				appendPQExpBuffer(error_msg,
4403 								  _("slot \"%s\" exists and is not a physical slot\n"),
4404 								  slot_name);
4405 			return SLOT_NOT_PHYSICAL;
4406 		}
4407 
4408 		if (slot_info.active == false)
4409 		{
4410 			log_debug("replication slot \"%s\" exists but is inactive; reusing",
4411 					  slot_name);
4412 
4413 			return SLOT_INACTIVE;
4414 		}
4415 
4416 		if (error_msg)
4417 			appendPQExpBuffer(error_msg,
4418 							  _("slot \"%s\" already exists as an active slot\n"),
4419 							  slot_name);
4420 		return SLOT_ACTIVE;
4421 	}
4422 
4423 	return SLOT_NOT_FOUND;
4424 }
4425 
4426 
4427 bool
create_replication_slot_replprot(PGconn * conn,PGconn * repl_conn,char * slot_name,PQExpBufferData * error_msg)4428 create_replication_slot_replprot(PGconn *conn, PGconn *repl_conn, char *slot_name, PQExpBufferData *error_msg)
4429 {
4430 	PQExpBufferData query;
4431 	PGresult   *res = NULL;
4432 	bool		success = true;
4433 
4434 	ReplSlotStatus slot_status = _verify_replication_slot(conn, slot_name, error_msg);
4435 
4436 	/* Replication slot is unusable */
4437 	if (slot_status == SLOT_NOT_PHYSICAL || slot_status == SLOT_ACTIVE)
4438 		return false;
4439 
4440 	/* Replication slot can be reused */
4441 	if (slot_status == SLOT_INACTIVE)
4442 		return true;
4443 
4444 	initPQExpBuffer(&query);
4445 
4446 	appendPQExpBuffer(&query,
4447 					  "CREATE_REPLICATION_SLOT %s PHYSICAL",
4448 					  slot_name);
4449 
4450 	/* In 9.6 and later, reserve the LSN straight away */
4451 	if (PQserverVersion(conn) >= 90600)
4452 	{
4453 		appendPQExpBufferStr(&query,
4454 							 " RESERVE_WAL");
4455 	}
4456 
4457 	appendPQExpBufferChar(&query, ';');
4458 
4459 	res = PQexec(repl_conn, query.data);
4460 
4461 
4462 	if ((PQresultStatus(res) != PGRES_TUPLES_OK || !PQntuples(res)) && error_msg != NULL)
4463 	{
4464 		appendPQExpBuffer(error_msg,
4465 						  _("unable to create replication slot \"%s\" on the upstream node: %s\n"),
4466 						  slot_name,
4467 						  PQerrorMessage(conn));
4468 		success = false;
4469 	}
4470 
4471 	PQclear(res);
4472 	return success;
4473 }
4474 
4475 
4476 bool
create_replication_slot_sql(PGconn * conn,char * slot_name,PQExpBufferData * error_msg)4477 create_replication_slot_sql(PGconn *conn, char *slot_name, PQExpBufferData *error_msg)
4478 {
4479 	PQExpBufferData query;
4480 	PGresult   *res = NULL;
4481 	bool		success = true;
4482 
4483 	ReplSlotStatus slot_status = _verify_replication_slot(conn, slot_name, error_msg);
4484 
4485 	/* Replication slot is unusable */
4486 	if (slot_status == SLOT_NOT_PHYSICAL || slot_status == SLOT_ACTIVE)
4487 		return false;
4488 
4489 	/* Replication slot can be reused */
4490 	if (slot_status == SLOT_INACTIVE)
4491 		return true;
4492 
4493 	initPQExpBuffer(&query);
4494 
4495 	/* In 9.6 and later, reserve the LSN straight away */
4496 	if (PQserverVersion(conn) >= 90600)
4497 	{
4498 		appendPQExpBuffer(&query,
4499 						  "SELECT * FROM pg_catalog.pg_create_physical_replication_slot('%s', TRUE)",
4500 						  slot_name);
4501 	}
4502 	else
4503 	{
4504 		appendPQExpBuffer(&query,
4505 						  "SELECT * FROM pg_catalog.pg_create_physical_replication_slot('%s')",
4506 						  slot_name);
4507 	}
4508 
4509 	log_debug(_("create_replication_slot_sql(): creating slot \"%s\" on upstream"), slot_name);
4510 	log_verbose(LOG_DEBUG, "create_replication_slot_sql():\n%s", query.data);
4511 
4512 	res = PQexec(conn, query.data);
4513 	termPQExpBuffer(&query);
4514 
4515 	if (PQresultStatus(res) != PGRES_TUPLES_OK && error_msg != NULL)
4516 	{
4517 		appendPQExpBuffer(error_msg,
4518 						  _("unable to create replication slot \"%s\" on the upstream node: %s\n"),
4519 						  slot_name,
4520 						  PQerrorMessage(conn));
4521 		success = false;
4522 	}
4523 
4524 	PQclear(res);
4525 	return success;
4526 }
4527 
4528 
4529 bool
drop_replication_slot_sql(PGconn * conn,char * slot_name)4530 drop_replication_slot_sql(PGconn *conn, char *slot_name)
4531 {
4532 	PQExpBufferData query;
4533 	PGresult   *res = NULL;
4534 	bool		success = true;
4535 
4536 	initPQExpBuffer(&query);
4537 
4538 	appendPQExpBuffer(&query,
4539 					  "SELECT pg_catalog.pg_drop_replication_slot('%s')",
4540 					  slot_name);
4541 
4542 	log_verbose(LOG_DEBUG, "drop_replication_slot_sql():\n  %s", query.data);
4543 
4544 	res = PQexec(conn, query.data);
4545 
4546 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
4547 	{
4548 		log_db_error(conn, query.data,
4549 					 _("drop_replication_slot_sql(): unable to drop replication slot \"%s\""),
4550 					 slot_name);
4551 
4552 		success = false;
4553 	}
4554 	else
4555 	{
4556 		log_verbose(LOG_DEBUG, "replication slot \"%s\" successfully dropped",
4557 					slot_name);
4558 	}
4559 
4560 	termPQExpBuffer(&query);
4561 	PQclear(res);
4562 
4563 	return success;
4564 }
4565 
4566 
4567 bool
drop_replication_slot_replprot(PGconn * repl_conn,char * slot_name)4568 drop_replication_slot_replprot(PGconn *repl_conn, char *slot_name)
4569 {
4570 	PQExpBufferData query;
4571 	PGresult   *res = NULL;
4572 	bool		success = true;
4573 
4574 	initPQExpBuffer(&query);
4575 
4576 	appendPQExpBuffer(&query,
4577 					  "DROP_REPLICATION_SLOT %s",
4578 					  slot_name);
4579 
4580 	log_verbose(LOG_DEBUG, "drop_replication_slot_replprot():\n  %s", query.data);
4581 
4582 	res = PQexec(repl_conn, query.data);
4583 
4584 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
4585 	{
4586 		log_db_error(repl_conn, query.data,
4587 					 _("drop_replication_slot_sql(): unable to drop replication slot \"%s\""),
4588 					 slot_name);
4589 
4590 		success = false;
4591 	}
4592 	else
4593 	{
4594 		log_verbose(LOG_DEBUG, "replication slot \"%s\" successfully dropped",
4595 					slot_name);
4596 	}
4597 
4598 	termPQExpBuffer(&query);
4599 	PQclear(res);
4600 
4601 	return success;
4602 }
4603 
4604 
4605 RecordStatus
get_slot_record(PGconn * conn,char * slot_name,t_replication_slot * record)4606 get_slot_record(PGconn *conn, char *slot_name, t_replication_slot *record)
4607 {
4608 	PQExpBufferData query;
4609 	PGresult   *res = NULL;
4610 	RecordStatus record_status = RECORD_FOUND;
4611 
4612 	initPQExpBuffer(&query);
4613 
4614 	appendPQExpBuffer(&query,
4615 					  "SELECT slot_name, slot_type, active "
4616 					  "  FROM pg_catalog.pg_replication_slots "
4617 					  " WHERE slot_name = '%s' ",
4618 					  slot_name);
4619 
4620 	log_verbose(LOG_DEBUG, "get_slot_record():\n%s", query.data);
4621 
4622 	res = PQexec(conn, query.data);
4623 
4624 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
4625 	{
4626 		log_db_error(conn, query.data,
4627 					 _("get_slot_record(): unable to query pg_replication_slots"));
4628 
4629 		record_status = RECORD_ERROR;
4630 	}
4631 	else if (!PQntuples(res))
4632 	{
4633 		record_status = RECORD_NOT_FOUND;
4634 	}
4635 	else
4636 	{
4637 		snprintf(record->slot_name,
4638 				 sizeof(record->slot_name),
4639 				 "%s", PQgetvalue(res, 0, 0));
4640 		snprintf(record->slot_type,
4641 				 sizeof(record->slot_type),
4642 				 "%s", PQgetvalue(res, 0, 1));
4643 		record->active = atobool(PQgetvalue(res, 0, 2));
4644 	}
4645 
4646 	termPQExpBuffer(&query);
4647 	PQclear(res);
4648 
4649 	return record_status;
4650 }
4651 
4652 
4653 int
get_free_replication_slot_count(PGconn * conn,int * max_replication_slots)4654 get_free_replication_slot_count(PGconn *conn, int *max_replication_slots)
4655 {
4656 	PQExpBufferData query;
4657 	PGresult   *res = NULL;
4658 	int			free_slots = 0;
4659 
4660 	initPQExpBuffer(&query);
4661 
4662 	appendPQExpBufferStr(&query,
4663 						 " SELECT pg_catalog.current_setting('max_replication_slots')::INT - "
4664 						 "          pg_catalog.count(*) "
4665 						 "          AS free_slots, "
4666 						 "        pg_catalog.current_setting('max_replication_slots')::INT "
4667 						 "          AS max_replication_slots "
4668 						 "   FROM pg_catalog.pg_replication_slots s"
4669 						 "  WHERE s.slot_type = 'physical'");
4670 
4671 	res = PQexec(conn, query.data);
4672 
4673 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
4674 	{
4675 		log_db_error(conn, query.data,
4676 					 _("get_free_replication_slot_count(): unable to execute replication slot query"));
4677 
4678 		free_slots = UNKNOWN_VALUE;
4679 	}
4680 	else if (PQntuples(res) == 0)
4681 	{
4682 		free_slots = UNKNOWN_VALUE;
4683 	}
4684 	else
4685 	{
4686 		free_slots = atoi(PQgetvalue(res, 0, 0));
4687 		if (max_replication_slots != NULL)
4688 			*max_replication_slots = atoi(PQgetvalue(res, 0, 1));
4689 	}
4690 
4691 	termPQExpBuffer(&query);
4692 	PQclear(res);
4693 
4694 	return free_slots;
4695 }
4696 
4697 
4698 int
get_inactive_replication_slots(PGconn * conn,KeyValueList * list)4699 get_inactive_replication_slots(PGconn *conn, KeyValueList *list)
4700 {
4701 	PQExpBufferData query;
4702 	PGresult   *res = NULL;
4703 	int			i, inactive_slots = 0;
4704 
4705 	initPQExpBuffer(&query);
4706 
4707 	appendPQExpBufferStr(&query,
4708 						 "   SELECT slot_name, slot_type "
4709 						 "     FROM pg_catalog.pg_replication_slots "
4710 						 "    WHERE active IS FALSE "
4711 						 "      AND slot_type = 'physical' "
4712 						 " ORDER BY slot_name ");
4713 
4714 	res = PQexec(conn, query.data);
4715 
4716 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
4717 	{
4718 		log_db_error(conn, query.data,
4719 					 _("get_inactive_replication_slots(): unable to execute replication slot query"));
4720 
4721 		inactive_slots = -1;
4722 	}
4723 	else
4724 	{
4725 		inactive_slots = PQntuples(res);
4726 
4727 		for (i = 0; i < inactive_slots; i++)
4728 		{
4729 			key_value_list_set(list,
4730 							   PQgetvalue(res, i, 0),
4731 							   PQgetvalue(res, i, 1));
4732 		}
4733 	}
4734 
4735 	termPQExpBuffer(&query);
4736 	PQclear(res);
4737 
4738 	return inactive_slots;
4739 }
4740 
4741 
4742 
4743 /* ==================== */
4744 /* tablespace functions */
4745 /* ==================== */
4746 
4747 bool
get_tablespace_name_by_location(PGconn * conn,const char * location,char * name)4748 get_tablespace_name_by_location(PGconn *conn, const char *location, char *name)
4749 {
4750 	PQExpBufferData query;
4751 	PGresult   *res = NULL;
4752 	bool	    success = true;
4753 
4754 	initPQExpBuffer(&query);
4755 
4756 	appendPQExpBuffer(&query,
4757 					  "SELECT spcname "
4758 					  "  FROM pg_catalog.pg_tablespace "
4759 					  " WHERE pg_catalog.pg_tablespace_location(oid) = '%s'",
4760 					  location);
4761 
4762 	log_verbose(LOG_DEBUG, "get_tablespace_name_by_location():\n%s", query.data);
4763 
4764 	res = PQexec(conn, query.data);
4765 
4766 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
4767 	{
4768 		log_db_error(conn, query.data,
4769 					 _("get_tablespace_name_by_location(): unable to execute tablespace query"));
4770 		success = false;
4771 	}
4772 	else if (PQntuples(res) == 0)
4773 	{
4774 		success = false;
4775 	}
4776 	else
4777 	{
4778 		snprintf(name, MAXLEN,
4779 				 "%s", PQgetvalue(res, 0, 0));
4780 	}
4781 
4782 	termPQExpBuffer(&query);
4783 	PQclear(res);
4784 
4785 	return success;
4786 }
4787 
4788 /* ============================ */
4789 /* asynchronous query functions */
4790 /* ============================ */
4791 
4792 bool
cancel_query(PGconn * conn,int timeout)4793 cancel_query(PGconn *conn, int timeout)
4794 {
4795 	char		errbuf[ERRBUFF_SIZE] = "";
4796 	PGcancel   *pgcancel = NULL;
4797 
4798 	if (wait_connection_availability(conn, timeout) != 1)
4799 		return false;
4800 
4801 	pgcancel = PQgetCancel(conn);
4802 
4803 	if (pgcancel == NULL)
4804 		return false;
4805 
4806 	/*
4807 	 * PQcancel can only return 0 if socket()/connect()/send() fails, in any
4808 	 * of those cases we can assume something bad happened to the connection
4809 	 */
4810 	if (PQcancel(pgcancel, errbuf, ERRBUFF_SIZE) == 0)
4811 	{
4812 		log_warning(_("unable to cancel current query"));
4813 		log_detail("\n%s", errbuf);
4814 		PQfreeCancel(pgcancel);
4815 		return false;
4816 	}
4817 
4818 	PQfreeCancel(pgcancel);
4819 
4820 	return true;
4821 }
4822 
4823 
4824 /*
4825  * Wait until current query finishes, ignoring any results.
4826  * Usually this will be an async query or query cancellation.
4827  *
4828  * Returns 1 for success; 0 if any error ocurred; -1 if timeout reached.
4829  */
4830 int
wait_connection_availability(PGconn * conn,int timeout)4831 wait_connection_availability(PGconn *conn, int timeout)
4832 {
4833 	PGresult   *res = NULL;
4834 	fd_set		read_set;
4835 	int			sock = PQsocket(conn);
4836 	struct timeval tmout,
4837 				before,
4838 				after;
4839 	struct timezone tz;
4840 	long long	timeout_ms;
4841 
4842 	/* calculate timeout in microseconds */
4843 	timeout_ms = (long long) timeout * 1000000;
4844 
4845 	while (timeout_ms > 0)
4846 	{
4847 		if (PQconsumeInput(conn) == 0)
4848 		{
4849 			log_warning(_("wait_connection_availability(): unable to receive data from connection"));
4850 			log_detail("%s", PQerrorMessage(conn));
4851 			return 0;
4852 		}
4853 
4854 		if (PQisBusy(conn) == 0)
4855 		{
4856 			do
4857 			{
4858 				res = PQgetResult(conn);
4859 				PQclear(res);
4860 			} while (res != NULL);
4861 
4862 			break;
4863 		}
4864 
4865 		tmout.tv_sec = 0;
4866 		tmout.tv_usec = 250000;
4867 
4868 		FD_ZERO(&read_set);
4869 		FD_SET(sock, &read_set);
4870 
4871 		gettimeofday(&before, &tz);
4872 		if (select(sock, &read_set, NULL, NULL, &tmout) == -1)
4873 		{
4874 			log_warning(_("wait_connection_availability(): select() returned with error"));
4875 			log_detail("%s", strerror(errno));
4876 			return -1;
4877 		}
4878 
4879 		gettimeofday(&after, &tz);
4880 
4881 		timeout_ms -= (after.tv_sec * 1000000 + after.tv_usec) -
4882 			(before.tv_sec * 1000000 + before.tv_usec);
4883 	}
4884 
4885 
4886 	if (timeout_ms >= 0)
4887 	{
4888 		return 1;
4889 	}
4890 
4891 	log_warning(_("wait_connection_availability(): timeout (%i secs) reached"), timeout);
4892 	return -1;
4893 }
4894 
4895 
4896 /* =========================== */
4897 /* node availability functions */
4898 /* =========================== */
4899 
4900 bool
is_server_available(const char * conninfo)4901 is_server_available(const char *conninfo)
4902 {
4903 	return _is_server_available(conninfo, false);
4904 }
4905 
4906 
4907 bool
is_server_available_quiet(const char * conninfo)4908 is_server_available_quiet(const char *conninfo)
4909 {
4910 	return _is_server_available(conninfo, true);
4911 }
4912 
4913 
4914 static bool
_is_server_available(const char * conninfo,bool quiet)4915 _is_server_available(const char *conninfo, bool quiet)
4916 {
4917 	PGPing		status = PQping(conninfo);
4918 
4919 	log_verbose(LOG_DEBUG, "is_server_available(): ping status for \"%s\" is %s", conninfo, print_pqping_status(status));
4920 	if (status == PQPING_OK)
4921 		return true;
4922 
4923 	if (quiet == false)
4924 	{
4925 		log_warning(_("unable to ping \"%s\""), conninfo);
4926 		log_detail(_("PQping() returned \"%s\""), print_pqping_status(status));
4927 	}
4928 
4929 	return false;
4930 }
4931 
4932 
4933 bool
is_server_available_params(t_conninfo_param_list * param_list)4934 is_server_available_params(t_conninfo_param_list *param_list)
4935 {
4936 	PGPing		status = PQpingParams((const char **) param_list->keywords,
4937 									  (const char **) param_list->values,
4938 									  false);
4939 
4940 	/* deparsing the param_list adds overhead, so only do it if needed  */
4941 	if (log_level == LOG_DEBUG || status != PQPING_OK)
4942 	{
4943 		char *conninfo_str = param_list_to_string(param_list);
4944 		log_verbose(LOG_DEBUG, "is_server_available_params(): ping status for \"%s\" is %s", conninfo_str, print_pqping_status(status));
4945 
4946 		if (status != PQPING_OK)
4947 		{
4948 			log_warning(_("unable to ping \"%s\""), conninfo_str);
4949 			log_detail(_("PQping() returned \"%s\""), print_pqping_status(status));
4950 		}
4951 
4952 		pfree(conninfo_str);
4953 	}
4954 
4955 	if (status == PQPING_OK)
4956 		return true;
4957 
4958 	return false;
4959 }
4960 
4961 
4962 
4963 /*
4964  * Simple throw-away query to stop a connection handle going stale.
4965  */
4966 ExecStatusType
connection_ping(PGconn * conn)4967 connection_ping(PGconn *conn)
4968 {
4969 	PGresult   *res = PQexec(conn, "SELECT TRUE");
4970 	ExecStatusType ping_result;
4971 
4972 	log_verbose(LOG_DEBUG, "connection_ping(): result is %s", PQresStatus(PQresultStatus(res)));
4973 
4974 	ping_result = PQresultStatus(res);
4975 	PQclear(res);
4976 
4977 	return ping_result;
4978 }
4979 
4980 
4981 ExecStatusType
connection_ping_reconnect(PGconn * conn)4982 connection_ping_reconnect(PGconn *conn)
4983 {
4984 	ExecStatusType ping_result = connection_ping(conn);
4985 
4986 	if (PQstatus(conn) != CONNECTION_OK)
4987 	{
4988 		log_warning(_("connection error, attempting to reset"));
4989 		log_detail("\n%s", PQerrorMessage(conn));
4990 		PQreset(conn);
4991 		ping_result = connection_ping(conn);
4992 	}
4993 
4994 	log_verbose(LOG_DEBUG, "connection_ping_reconnect(): result is %s", PQresStatus(ping_result));
4995 
4996 	return ping_result;
4997 }
4998 
4999 
5000 
5001 /* ==================== */
5002 /* monitoring functions */
5003 /* ==================== */
5004 
5005 void
add_monitoring_record(PGconn * primary_conn,PGconn * local_conn,int primary_node_id,int local_node_id,char * monitor_standby_timestamp,XLogRecPtr primary_last_wal_location,XLogRecPtr last_wal_receive_lsn,char * last_xact_replay_timestamp,long long unsigned int replication_lag_bytes,long long unsigned int apply_lag_bytes)5006 add_monitoring_record(PGconn *primary_conn,
5007 					  PGconn *local_conn,
5008 					  int primary_node_id,
5009 					  int local_node_id,
5010 					  char *monitor_standby_timestamp,
5011 					  XLogRecPtr primary_last_wal_location,
5012 					  XLogRecPtr last_wal_receive_lsn,
5013 					  char *last_xact_replay_timestamp,
5014 					  long long unsigned int replication_lag_bytes,
5015 					  long long unsigned int apply_lag_bytes
5016 )
5017 {
5018 	PQExpBufferData query;
5019 
5020 	initPQExpBuffer(&query);
5021 
5022 	appendPQExpBuffer(&query,
5023 					  "INSERT INTO repmgr.monitoring_history "
5024 					  "           (primary_node_id, "
5025 					  "            standby_node_id, "
5026 					  "            last_monitor_time, "
5027 					  "            last_apply_time, "
5028 					  "            last_wal_primary_location, "
5029 					  "            last_wal_standby_location, "
5030 					  "            replication_lag, "
5031 					  "            apply_lag ) "
5032 					  "     VALUES(%i, "
5033 					  "            %i, "
5034 					  "            '%s'::TIMESTAMP WITH TIME ZONE, "
5035 					  "            '%s'::TIMESTAMP WITH TIME ZONE, "
5036 					  "            '%X/%X', "
5037 					  "            '%X/%X', "
5038 					  "            %llu, "
5039 					  "            %llu) ",
5040 					  primary_node_id,
5041 					  local_node_id,
5042 					  monitor_standby_timestamp,
5043 					  last_xact_replay_timestamp,
5044 					  format_lsn(primary_last_wal_location),
5045 					  format_lsn(last_wal_receive_lsn),
5046 					  replication_lag_bytes,
5047 					  apply_lag_bytes);
5048 
5049 	log_verbose(LOG_DEBUG, "standby_monitor:()\n%s", query.data);
5050 
5051 	if (PQsendQuery(primary_conn, query.data) == 0)
5052 	{
5053 		log_warning(_("query could not be sent to primary:\n  %s"),
5054 					PQerrorMessage(primary_conn));
5055 	}
5056 	else
5057 	{
5058 		PGresult   *res = PQexec(local_conn, "SELECT repmgr.standby_set_last_updated()");
5059 
5060 		/* not critical if the above query fails */
5061 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
5062 			log_warning(_("add_monitoring_record(): unable to set last_updated:\n  %s"),
5063 						PQerrorMessage(local_conn));
5064 
5065 		PQclear(res);
5066 	}
5067 
5068 	termPQExpBuffer(&query);
5069 
5070 	return;
5071 }
5072 
5073 
5074 int
get_number_of_monitoring_records_to_delete(PGconn * primary_conn,int keep_history,int node_id)5075 get_number_of_monitoring_records_to_delete(PGconn *primary_conn, int keep_history, int node_id)
5076 {
5077 	PQExpBufferData query;
5078 	int				record_count = -1;
5079 	PGresult	   *res = NULL;
5080 
5081 	initPQExpBuffer(&query);
5082 
5083 	appendPQExpBuffer(&query,
5084 					  "SELECT pg_catalog.count(*) "
5085 					  "  FROM repmgr.monitoring_history "
5086 					  " WHERE pg_catalog.age(pg_catalog.now(), last_monitor_time) >= '%d days'::interval",
5087 					  keep_history);
5088 
5089 	if (node_id != UNKNOWN_NODE_ID)
5090 	{
5091 		appendPQExpBuffer(&query,
5092 						  "  AND standby_node_id = %i", node_id);
5093 	}
5094 
5095 	log_verbose(LOG_DEBUG, "get_number_of_monitoring_records_to_delete():\n  %s", query.data);
5096 
5097 	res = PQexec(primary_conn, query.data);
5098 
5099 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
5100 	{
5101 		log_db_error(primary_conn, query.data,
5102 					 _("get_number_of_monitoring_records_to_delete(): unable to query number of monitoring records to clean up"));
5103 	}
5104 	else
5105 	{
5106 		record_count = atoi(PQgetvalue(res, 0, 0));
5107 	}
5108 
5109 	termPQExpBuffer(&query);
5110 	PQclear(res);
5111 
5112 	return record_count;
5113 }
5114 
5115 
5116 bool
delete_monitoring_records(PGconn * primary_conn,int keep_history,int node_id)5117 delete_monitoring_records(PGconn *primary_conn, int keep_history, int node_id)
5118 {
5119 	PQExpBufferData query;
5120 	bool			success = true;
5121 	PGresult	   *res = NULL;
5122 
5123 	initPQExpBuffer(&query);
5124 
5125 	if (keep_history > 0 || node_id != UNKNOWN_NODE_ID)
5126 	{
5127 		appendPQExpBuffer(&query,
5128 						  "DELETE FROM repmgr.monitoring_history "
5129 						  " WHERE pg_catalog.age(pg_catalog.now(), last_monitor_time) >= '%d days'::INTERVAL ",
5130 						  keep_history);
5131 
5132 		if (node_id != UNKNOWN_NODE_ID)
5133 		{
5134 			appendPQExpBuffer(&query,
5135 							  "  AND standby_node_id = %i", node_id);
5136 		}
5137 	}
5138 	else
5139 	{
5140 		appendPQExpBufferStr(&query,
5141 							 "TRUNCATE TABLE repmgr.monitoring_history");
5142 	}
5143 
5144 	res = PQexec(primary_conn, query.data);
5145 
5146 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
5147 	{
5148 		log_db_error(primary_conn, query.data,
5149 					 _("delete_monitoring_records(): unable to delete monitoring records"));
5150 		success = false;
5151 	}
5152 
5153 	termPQExpBuffer(&query);
5154 	PQclear(res);
5155 
5156 	return success;
5157 }
5158 
5159 /*
5160  * node voting functions
5161  *
5162  * These are intended to run under repmgrd and mainly rely on shared memory
5163  */
5164 
5165 int
get_current_term(PGconn * conn)5166 get_current_term(PGconn *conn)
5167 {
5168 	PGresult   *res = NULL;
5169 	int term = VOTING_TERM_NOT_SET;
5170 
5171 	res = PQexec(conn, "SELECT term FROM repmgr.voting_term");
5172 
5173 	/* it doesn't matter if for whatever reason the table has no rows */
5174 
5175 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
5176 	{
5177 		log_db_error(conn, NULL,
5178 					 _("get_current_term(): unable to query \"repmgr.voting_term\""));
5179 	}
5180 	else if (PQntuples(res) > 0)
5181 	{
5182 		term = atoi(PQgetvalue(res, 0, 0));
5183 	}
5184 
5185 	PQclear(res);
5186 	return term;
5187 }
5188 
5189 
5190 void
initialize_voting_term(PGconn * conn)5191 initialize_voting_term(PGconn *conn)
5192 {
5193 	PGresult   *res = NULL;
5194 
5195 	int current_term = get_current_term(conn);
5196 
5197 	if (current_term == VOTING_TERM_NOT_SET)
5198 	{
5199 		res = PQexec(conn, "INSERT INTO repmgr.voting_term (term) VALUES (1)");
5200 	}
5201 	else
5202 	{
5203 		res = PQexec(conn, "UPDATE repmgr.voting_term SET term = 1");
5204 	}
5205 
5206 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
5207 	{
5208 		log_db_error(conn, NULL, _("unable to initialize repmgr.voting_term"));
5209 	}
5210 
5211 	PQclear(res);
5212 	return;
5213 }
5214 
5215 
5216 void
increment_current_term(PGconn * conn)5217 increment_current_term(PGconn *conn)
5218 {
5219 	PGresult   *res = NULL;
5220 
5221 	res = PQexec(conn, "UPDATE repmgr.voting_term SET term = term + 1");
5222 
5223 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
5224 	{
5225 		log_db_error(conn, NULL, _("unable to increment repmgr.voting_term"));
5226 	}
5227 
5228 	PQclear(res);
5229 	return;
5230 }
5231 
5232 
5233 bool
announce_candidature(PGconn * conn,t_node_info * this_node,t_node_info * other_node,int electoral_term)5234 announce_candidature(PGconn *conn, t_node_info *this_node, t_node_info *other_node, int electoral_term)
5235 {
5236 	PQExpBufferData query;
5237 	PGresult   *res = NULL;
5238 
5239 	bool		retval = false;
5240 
5241 	initPQExpBuffer(&query);
5242 
5243 	appendPQExpBuffer(&query,
5244 					  "SELECT repmgr.other_node_is_candidate(%i, %i)",
5245 					  this_node->node_id,
5246 					  electoral_term);
5247 
5248 	res = PQexec(conn, query.data);
5249 
5250 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
5251 	{
5252 		log_db_error(conn, query.data, _("announce_candidature(): unable to execute repmgr.other_node_is_candidate()"));
5253 	}
5254 	else
5255 	{
5256 		retval = atobool(PQgetvalue(res, 0, 0));
5257 	}
5258 
5259 	termPQExpBuffer(&query);
5260 	PQclear(res);
5261 
5262 	return retval;
5263 }
5264 
5265 
5266 void
notify_follow_primary(PGconn * conn,int primary_node_id)5267 notify_follow_primary(PGconn *conn, int primary_node_id)
5268 {
5269 	PQExpBufferData query;
5270 	PGresult   *res = NULL;
5271 
5272 	initPQExpBuffer(&query);
5273 
5274 	appendPQExpBuffer(&query,
5275 					  "SELECT repmgr.notify_follow_primary(%i)",
5276 					  primary_node_id);
5277 
5278 	log_verbose(LOG_DEBUG, "notify_follow_primary():\n  %s", query.data);
5279 
5280 	res = PQexec(conn, query.data);
5281 
5282 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
5283 	{
5284 		log_db_error(conn, query.data, _("unable to execute repmgr.notify_follow_primary()"));
5285 	}
5286 
5287 	termPQExpBuffer(&query);
5288 	PQclear(res);
5289 
5290 	return;
5291 }
5292 
5293 
5294 bool
get_new_primary(PGconn * conn,int * primary_node_id)5295 get_new_primary(PGconn *conn, int *primary_node_id)
5296 {
5297 	PGresult   *res = NULL;
5298 	int			new_primary_node_id = UNKNOWN_NODE_ID;
5299 	bool		success = true;
5300 
5301 	const char *sqlquery = "SELECT repmgr.get_new_primary()";
5302 
5303 	res = PQexec(conn, sqlquery);
5304 
5305 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
5306 	{
5307 		log_db_error(conn, sqlquery, _("unable to execute repmgr.get_new_primary()"));
5308 		success = false;
5309 	}
5310 	else if (PQgetisnull(res, 0, 0))
5311 	{
5312 		success = false;
5313 	}
5314 	else
5315 	{
5316 		new_primary_node_id = atoi(PQgetvalue(res, 0, 0));
5317 	}
5318 
5319 	PQclear(res);
5320 
5321 	/*
5322 	 * repmgr.get_new_primary() will return UNKNOWN_NODE_ID if
5323 	 * "follow_new_primary" is false
5324 	 */
5325 	if (new_primary_node_id == UNKNOWN_NODE_ID)
5326 		success = false;
5327 
5328 	*primary_node_id = new_primary_node_id;
5329 
5330 	return success;
5331 }
5332 
5333 
5334 void
reset_voting_status(PGconn * conn)5335 reset_voting_status(PGconn *conn)
5336 {
5337 	PGresult   *res = NULL;
5338 
5339 	const char *sqlquery = "SELECT repmgr.reset_voting_status()";
5340 
5341 	res = PQexec(conn, sqlquery);
5342 
5343 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
5344 	{
5345 		log_db_error(conn, sqlquery, _("unable to execute repmgr.reset_voting_status()"));
5346 	}
5347 
5348 	PQclear(res);
5349 	return;
5350 }
5351 
5352 
5353 /* ============================ */
5354 /* replication status functions */
5355 /* ============================ */
5356 
5357 /*
5358  * Returns the current LSN on the primary.
5359  *
5360  * This just executes "pg_current_wal_lsn()".
5361  *
5362  * Function "get_node_current_lsn()" below will return the latest
5363  * LSN regardless of recovery state.
5364  */
5365 XLogRecPtr
get_primary_current_lsn(PGconn * conn)5366 get_primary_current_lsn(PGconn *conn)
5367 {
5368 	PGresult   *res = NULL;
5369 	XLogRecPtr	ptr = InvalidXLogRecPtr;
5370 
5371 	if (PQserverVersion(conn) >= 100000)
5372 	{
5373 		res = PQexec(conn, "SELECT pg_catalog.pg_current_wal_lsn()");
5374 	}
5375 	else
5376 	{
5377 		res = PQexec(conn, "SELECT pg_catalog.pg_current_xlog_location()");
5378 	}
5379 
5380 	if (PQresultStatus(res) == PGRES_TUPLES_OK)
5381 	{
5382 		ptr = parse_lsn(PQgetvalue(res, 0, 0));
5383 	}
5384 	else
5385 	{
5386 		log_db_error(conn, NULL, _("unable to execute get_primary_current_lsn()"));
5387 	}
5388 
5389 
5390 	PQclear(res);
5391 
5392 	return ptr;
5393 }
5394 
5395 
5396 XLogRecPtr
get_last_wal_receive_location(PGconn * conn)5397 get_last_wal_receive_location(PGconn *conn)
5398 {
5399 	PGresult   *res = NULL;
5400 	XLogRecPtr	ptr = InvalidXLogRecPtr;
5401 
5402 	if (PQserverVersion(conn) >= 100000)
5403 	{
5404 		res = PQexec(conn, "SELECT pg_catalog.pg_last_wal_receive_lsn()");
5405 	}
5406 	else
5407 	{
5408 		res = PQexec(conn, "SELECT pg_catalog.pg_last_xlog_receive_location()");
5409 	}
5410 
5411 	if (PQresultStatus(res) == PGRES_TUPLES_OK)
5412 	{
5413 		ptr = parse_lsn(PQgetvalue(res, 0, 0));
5414 	}
5415 	else
5416 	{
5417 		log_db_error(conn, NULL, _("unable to execute get_last_wal_receive_location()"));
5418 	}
5419 
5420 	PQclear(res);
5421 
5422 	return ptr;
5423 }
5424 
5425 /*
5426  * Returns the latest LSN for the node regardless of recovery state.
5427  */
5428 XLogRecPtr
get_node_current_lsn(PGconn * conn)5429 get_node_current_lsn(PGconn *conn)
5430 {
5431 	PQExpBufferData query;
5432 	PGresult   *res = NULL;
5433 	XLogRecPtr	ptr = InvalidXLogRecPtr;
5434 
5435 	initPQExpBuffer(&query);
5436 
5437 	if (PQserverVersion(conn) >= 100000)
5438 	{
5439 		appendPQExpBufferStr(&query,
5440 							 " WITH lsn_states AS ( "
5441 							 "  SELECT "
5442 							 "    CASE WHEN pg_catalog.pg_is_in_recovery() IS FALSE "
5443 							 "      THEN pg_catalog.pg_current_wal_lsn() "
5444 							 "      ELSE NULL "
5445 							 "    END "
5446 							 "      AS current_wal_lsn, "
5447 							 "    CASE WHEN pg_catalog.pg_is_in_recovery() IS TRUE "
5448 							 "      THEN pg_catalog.pg_last_wal_receive_lsn() "
5449 							 "      ELSE NULL "
5450 							 "    END "
5451 							 "      AS last_wal_receive_lsn, "
5452 							 "    CASE WHEN pg_catalog.pg_is_in_recovery() IS TRUE "
5453 							 "      THEN pg_catalog.pg_last_wal_replay_lsn() "
5454 							 "      ELSE NULL "
5455 							 "     END "
5456 							 "       AS last_wal_replay_lsn "
5457 							 " ) ");
5458 	}
5459 	else
5460 	{
5461 		appendPQExpBufferStr(&query,
5462 							 " WITH lsn_states AS ( "
5463 							 "  SELECT "
5464 							 "    CASE WHEN pg_catalog.pg_is_in_recovery() IS FALSE "
5465 							 "      THEN pg_catalog.pg_current_xlog_location() "
5466 							 "      ELSE NULL "
5467 							 "    END "
5468 							 "      AS current_wal_lsn, "
5469 							 "    CASE WHEN pg_catalog.pg_is_in_recovery() IS TRUE "
5470 							 "      THEN pg_catalog.pg_last_xlog_receive_location() "
5471 							 "      ELSE NULL "
5472 							 "    END "
5473 							 "      AS last_wal_receive_lsn, "
5474 							 "    CASE WHEN pg_catalog.pg_is_in_recovery() IS TRUE "
5475 							 "      THEN pg_catalog.pg_last_xlog_replay_location() "
5476 							 "      ELSE NULL "
5477 							 "     END "
5478 							 "       AS last_wal_replay_lsn "
5479 							 " ) ");
5480 	}
5481 
5482 	appendPQExpBufferStr(&query,
5483 						 " SELECT "
5484 						 "   CASE WHEN pg_catalog.pg_is_in_recovery() IS FALSE "
5485 						 "     THEN current_wal_lsn "
5486 						 "     ELSE "
5487 						 "       CASE WHEN last_wal_receive_lsn IS NULL "
5488 						 "       THEN last_wal_replay_lsn "
5489 						 "         ELSE "
5490 						 "           CASE WHEN last_wal_replay_lsn > last_wal_receive_lsn "
5491 						 "             THEN last_wal_replay_lsn "
5492 						 "             ELSE last_wal_receive_lsn "
5493 						 "           END "
5494 						 "       END "
5495 						 "   END "
5496 						 "     AS current_lsn "
5497 						 "   FROM lsn_states ");
5498 
5499 	res = PQexec(conn, query.data);
5500 
5501 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
5502 	{
5503 		log_db_error(conn, query.data, _("unable to execute get_node_current_lsn()"));
5504 	}
5505 	else if (!PQgetisnull(res, 0, 0))
5506 	{
5507 		ptr = parse_lsn(PQgetvalue(res, 0, 0));
5508 	}
5509 
5510 	termPQExpBuffer(&query);
5511 	PQclear(res);
5512 
5513 	return ptr;
5514 }
5515 
5516 
5517 void
init_replication_info(ReplInfo * replication_info)5518 init_replication_info(ReplInfo *replication_info)
5519 {
5520 	memset(replication_info->current_timestamp, 0, sizeof(replication_info->current_timestamp));
5521 	replication_info->in_recovery = false;
5522 	replication_info->timeline_id = UNKNOWN_TIMELINE_ID;
5523 	replication_info->last_wal_receive_lsn = InvalidXLogRecPtr;
5524 	replication_info->last_wal_replay_lsn = InvalidXLogRecPtr;
5525 	memset(replication_info->last_xact_replay_timestamp, 0, sizeof(replication_info->last_xact_replay_timestamp));
5526 	replication_info->replication_lag_time = 0;
5527 	replication_info->receiving_streamed_wal = true;
5528 	replication_info->wal_replay_paused = false;
5529 	replication_info->upstream_last_seen = -1;
5530 	replication_info->upstream_node_id = UNKNOWN_NODE_ID;
5531 }
5532 
5533 
5534 bool
get_replication_info(PGconn * conn,t_server_type node_type,ReplInfo * replication_info)5535 get_replication_info(PGconn *conn, t_server_type node_type, ReplInfo *replication_info)
5536 {
5537 	PQExpBufferData query;
5538 	PGresult   *res = NULL;
5539 	bool		success = true;
5540 
5541 	initPQExpBuffer(&query);
5542 	appendPQExpBufferStr(&query,
5543 						 " SELECT ts, "
5544 						 "        in_recovery, "
5545 						 "        last_wal_receive_lsn, "
5546 						 "        last_wal_replay_lsn, "
5547 						 "        last_xact_replay_timestamp, "
5548 						 "        CASE WHEN (last_wal_receive_lsn = last_wal_replay_lsn) "
5549 						 "          THEN 0::INT "
5550 						 "        ELSE "
5551 						 "          CASE WHEN last_xact_replay_timestamp IS NULL "
5552 						 "            THEN 0::INT "
5553 						 "          ELSE "
5554 						 "            EXTRACT(epoch FROM (pg_catalog.clock_timestamp() - last_xact_replay_timestamp))::INT "
5555 						 "          END "
5556 						 "        END AS replication_lag_time, "
5557 						 "        last_wal_receive_lsn >= last_wal_replay_lsn AS receiving_streamed_wal, "
5558 						 "        wal_replay_paused, "
5559 						 "        upstream_last_seen, "
5560 						 "        upstream_node_id "
5561 						 "   FROM ( "
5562 						 " SELECT CURRENT_TIMESTAMP AS ts, "
5563 						 "        pg_catalog.pg_is_in_recovery() AS in_recovery, "
5564 						 "        pg_catalog.pg_last_xact_replay_timestamp() AS last_xact_replay_timestamp, ");
5565 
5566 
5567 	if (PQserverVersion(conn) >= 100000)
5568 	{
5569 		appendPQExpBufferStr(&query,
5570 							 "        COALESCE(pg_catalog.pg_last_wal_receive_lsn(), '0/0'::PG_LSN) AS last_wal_receive_lsn, "
5571 							 "        COALESCE(pg_catalog.pg_last_wal_replay_lsn(),  '0/0'::PG_LSN) AS last_wal_replay_lsn, "
5572 							 "        CASE WHEN pg_catalog.pg_is_in_recovery() IS FALSE "
5573 							 "          THEN FALSE "
5574 							 "          ELSE pg_catalog.pg_is_wal_replay_paused() "
5575 							 "        END AS wal_replay_paused, ");
5576 	}
5577 	else
5578 	{
5579 		appendPQExpBufferStr(&query,
5580 							 "        COALESCE(pg_catalog.pg_last_xlog_receive_location(), '0/0'::PG_LSN) AS last_wal_receive_lsn, "
5581 							 "        COALESCE(pg_catalog.pg_last_xlog_replay_location(),  '0/0'::PG_LSN) AS last_wal_replay_lsn, "
5582 							 "        CASE WHEN pg_catalog.pg_is_in_recovery() IS FALSE "
5583 							 "          THEN FALSE "
5584 							 "          ELSE pg_catalog.pg_is_xlog_replay_paused() "
5585 							 "        END AS wal_replay_paused, ");
5586 	}
5587 
5588 	/* Add information about upstream node from shared memory */
5589 	if (node_type == WITNESS)
5590 	{
5591 		appendPQExpBufferStr(&query,
5592 							 "        repmgr.get_upstream_last_seen() AS upstream_last_seen, "
5593 							 "        repmgr.get_upstream_node_id() AS upstream_node_id ");
5594 	}
5595 	else
5596 	{
5597 		appendPQExpBufferStr(&query,
5598 							 "        CASE WHEN pg_catalog.pg_is_in_recovery() IS FALSE "
5599 							 "          THEN -1 "
5600 							 "          ELSE repmgr.get_upstream_last_seen() "
5601 							 "        END AS upstream_last_seen, ");
5602 		appendPQExpBufferStr(&query,
5603 							 "        CASE WHEN pg_catalog.pg_is_in_recovery() IS FALSE "
5604 							 "          THEN -1 "
5605 							 "          ELSE repmgr.get_upstream_node_id() "
5606 							 "        END AS upstream_node_id ");
5607 	}
5608 
5609 	appendPQExpBufferStr(&query,
5610 						 "          ) q ");
5611 
5612 	log_verbose(LOG_DEBUG, "get_replication_info():\n%s", query.data);
5613 
5614 	res = PQexec(conn, query.data);
5615 
5616 	if (PQresultStatus(res) != PGRES_TUPLES_OK || !PQntuples(res))
5617 	{
5618 		log_db_error(conn, query.data, _("get_replication_info(): unable to execute query"));
5619 
5620 		success = false;
5621 	}
5622 	else
5623 	{
5624 		snprintf(replication_info->current_timestamp,
5625 				 sizeof(replication_info->current_timestamp),
5626 				 "%s", PQgetvalue(res, 0, 0));
5627 		replication_info->in_recovery = atobool(PQgetvalue(res, 0, 1));
5628 		replication_info->last_wal_receive_lsn = parse_lsn(PQgetvalue(res, 0, 2));
5629 		replication_info->last_wal_replay_lsn = parse_lsn(PQgetvalue(res, 0, 3));
5630 		snprintf(replication_info->last_xact_replay_timestamp,
5631 				 sizeof(replication_info->last_xact_replay_timestamp),
5632 				 "%s", PQgetvalue(res, 0, 4));
5633 		replication_info->replication_lag_time = atoi(PQgetvalue(res, 0, 5));
5634 		replication_info->receiving_streamed_wal = atobool(PQgetvalue(res, 0, 6));
5635 		replication_info->wal_replay_paused = atobool(PQgetvalue(res, 0, 7));
5636 		replication_info->upstream_last_seen = atoi(PQgetvalue(res, 0, 8));
5637 		replication_info->upstream_node_id = atoi(PQgetvalue(res, 0, 9));
5638 	}
5639 
5640 	termPQExpBuffer(&query);
5641 	PQclear(res);
5642 
5643 	return success;
5644 }
5645 
5646 
5647 int
get_replication_lag_seconds(PGconn * conn)5648 get_replication_lag_seconds(PGconn *conn)
5649 {
5650 	PQExpBufferData query;
5651 	PGresult   *res = NULL;
5652 	int			lag_seconds = 0;
5653 
5654 	initPQExpBuffer(&query);
5655 
5656 	if (PQserverVersion(conn) >= 100000)
5657 	{
5658 		appendPQExpBufferStr(&query,
5659 							 " SELECT CASE WHEN (pg_catalog.pg_last_wal_receive_lsn() = pg_catalog.pg_last_wal_replay_lsn()) ");
5660 
5661 	}
5662 	else
5663 	{
5664 		appendPQExpBufferStr(&query,
5665 							 " SELECT CASE WHEN (pg_catalog.pg_last_xlog_receive_location() = pg_catalog.pg_last_xlog_replay_location()) ");
5666 	}
5667 
5668 	appendPQExpBufferStr(&query,
5669 						 "          THEN 0 "
5670 						 "        ELSE EXTRACT(epoch FROM (pg_catalog.clock_timestamp() - pg_catalog.pg_last_xact_replay_timestamp()))::INT "
5671 						 "          END "
5672 						 "        AS lag_seconds");
5673 
5674 	res = PQexec(conn, query.data);
5675 	log_verbose(LOG_DEBUG, "get_replication_lag_seconds():\n%s", query.data);
5676 	termPQExpBuffer(&query);
5677 
5678 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
5679 	{
5680 		log_warning("%s", PQerrorMessage(conn));
5681 		PQclear(res);
5682 
5683 		return UNKNOWN_REPLICATION_LAG;
5684 	}
5685 
5686 	if (!PQntuples(res))
5687 	{
5688 		return UNKNOWN_REPLICATION_LAG;
5689 	}
5690 
5691 	lag_seconds = atoi(PQgetvalue(res, 0, 0));
5692 
5693 	PQclear(res);
5694 	return lag_seconds;
5695 }
5696 
5697 
5698 
5699 TimeLineID
get_node_timeline(PGconn * conn,char * timeline_id_str)5700 get_node_timeline(PGconn *conn, char *timeline_id_str)
5701 {
5702 	TimeLineID timeline_id  = UNKNOWN_TIMELINE_ID;
5703 
5704 	/*
5705 	 * PG_control_checkpoint() was introduced in PostgreSQL 9.6
5706 	 */
5707 	if (PQserverVersion(conn) >= 90600)
5708 	{
5709 		PGresult   *res = NULL;
5710 
5711 		res = PQexec(conn, "SELECT timeline_id FROM pg_catalog.pg_control_checkpoint()");
5712 
5713 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
5714 		{
5715 			log_db_error(conn, NULL, _("get_node_timeline(): unable to query pg_control_system()"));
5716 		}
5717 		else
5718 		{
5719 			timeline_id = atoi(PQgetvalue(res, 0, 0));
5720 		}
5721 
5722 		PQclear(res);
5723 	}
5724 
5725 	/* If requested, format the timeline ID as a string */
5726 	if (timeline_id_str != NULL)
5727 	{
5728 		if (timeline_id == UNKNOWN_TIMELINE_ID)
5729 		{
5730 			strncpy(timeline_id_str, "?", MAXLEN);
5731 		}
5732 		else
5733 		{
5734 			snprintf(timeline_id_str, MAXLEN, "%i", timeline_id);
5735 		}
5736 	}
5737 
5738 	return timeline_id;
5739 }
5740 
5741 
5742 void
get_node_replication_stats(PGconn * conn,t_node_info * node_info)5743 get_node_replication_stats(PGconn *conn, t_node_info *node_info)
5744 {
5745 	PQExpBufferData query;
5746 	PGresult   *res = NULL;
5747 
5748 	initPQExpBuffer(&query);
5749 
5750 	appendPQExpBufferStr(&query,
5751 						 " SELECT pg_catalog.current_setting('max_wal_senders')::INT AS max_wal_senders, "
5752 						 "        (SELECT pg_catalog.count(*) FROM pg_catalog.pg_stat_replication) AS attached_wal_receivers, "
5753 						 "        current_setting('max_replication_slots')::INT AS max_replication_slots, "
5754 						 "        (SELECT pg_catalog.count(*) FROM pg_catalog.pg_replication_slots WHERE slot_type='physical') AS total_replication_slots, "
5755 						 "        (SELECT pg_catalog.count(*) FROM pg_catalog.pg_replication_slots WHERE active IS TRUE AND slot_type='physical')  AS active_replication_slots, "
5756 						 "        (SELECT pg_catalog.count(*) FROM pg_catalog.pg_replication_slots WHERE active IS FALSE AND slot_type='physical') AS inactive_replication_slots, "
5757 						 "        pg_catalog.pg_is_in_recovery() AS in_recovery");
5758 
5759 	log_verbose(LOG_DEBUG, "get_node_replication_stats():\n%s", query.data);
5760 
5761 	res = PQexec(conn, query.data);
5762 
5763 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
5764 	{
5765 		log_warning(_("unable to retrieve node replication statistics"));
5766 		log_detail("%s", PQerrorMessage(conn));
5767 		log_detail("%s", query.data);
5768 
5769 		termPQExpBuffer(&query);
5770 		PQclear(res);
5771 
5772 		return;
5773 	}
5774 
5775 	node_info->max_wal_senders = atoi(PQgetvalue(res, 0, 0));
5776 	node_info->attached_wal_receivers = atoi(PQgetvalue(res, 0, 1));
5777 	node_info->max_replication_slots = atoi(PQgetvalue(res, 0, 2));
5778 	node_info->total_replication_slots = atoi(PQgetvalue(res, 0, 3));
5779 	node_info->active_replication_slots = atoi(PQgetvalue(res, 0, 4));
5780 	node_info->inactive_replication_slots = atoi(PQgetvalue(res, 0, 5));
5781 	node_info->recovery_type = strcmp(PQgetvalue(res, 0, 6), "f") == 0 ? RECTYPE_PRIMARY : RECTYPE_STANDBY;
5782 
5783 	termPQExpBuffer(&query);
5784 	PQclear(res);
5785 
5786 	return;
5787 }
5788 
5789 
5790 NodeAttached
is_downstream_node_attached(PGconn * conn,char * node_name,char ** node_state)5791 is_downstream_node_attached(PGconn *conn, char *node_name, char **node_state)
5792 {
5793 	PQExpBufferData query;
5794 	PGresult   *res = NULL;
5795 
5796 	initPQExpBuffer(&query);
5797 
5798 	appendPQExpBuffer(&query,
5799 					  " SELECT pid, state "
5800 					  "   FROM pg_catalog.pg_stat_replication "
5801 					  "  WHERE application_name = '%s'",
5802 					  node_name);
5803 
5804 	res = PQexec(conn, query.data);
5805 
5806 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
5807 	{
5808 		log_verbose(LOG_WARNING, _("unable to query pg_stat_replication"));
5809 		log_detail("%s", PQerrorMessage(conn));
5810 		log_detail("%s", query.data);
5811 
5812 		termPQExpBuffer(&query);
5813 		PQclear(res);
5814 
5815 		return NODE_ATTACHED_UNKNOWN;
5816 	}
5817 
5818 	termPQExpBuffer(&query);
5819 
5820 	/*
5821 	 * If there's more than one entry in pg_stat_application, there's no
5822 	 * way we can reliably determine which one belongs to the node we're
5823 	 * checking, so there's nothing more we can do.
5824 	 */
5825 	if (PQntuples(res) > 1)
5826 	{
5827 		log_error(_("multiple entries with \"application_name\" set to  \"%s\" found in \"pg_stat_replication\""),
5828 				  node_name);
5829 		log_hint(_("verify that a unique node name is configured for each node"));
5830 
5831 		PQclear(res);
5832 
5833 		return NODE_ATTACHED_UNKNOWN;
5834 	}
5835 
5836 	if (PQntuples(res) == 0)
5837 	{
5838 		log_warning(_("node \"%s\" not found in \"pg_stat_replication\""), node_name);
5839 
5840 		PQclear(res);
5841 
5842 		return NODE_DETACHED;
5843 	}
5844 
5845 	/*
5846 	 * If the connection is not a superuser or member of pg_read_all_stats, we
5847 	 * won't be able to retrieve the "state" column, so we'll assume
5848 	 * the node is attached.
5849 	 */
5850 
5851 	if (connection_has_pg_monitor_role(conn, "pg_read_all_stats"))
5852 	{
5853 		const char *state = PQgetvalue(res, 0, 1);
5854 
5855 		if (node_state != NULL)
5856 		{
5857 			int		state_len = strlen(state);
5858 			*node_state = palloc0(state_len + 1);
5859 			strncpy(*node_state, state, state_len);
5860 		}
5861 
5862 		if (strcmp(state, "streaming") != 0)
5863 		{
5864 			log_warning(_("node \"%s\" attached in state \"%s\""),
5865 						node_name,
5866 						state);
5867 
5868 			PQclear(res);
5869 
5870 			return NODE_NOT_ATTACHED;
5871 		}
5872 	}
5873 	else if (node_state != NULL)
5874 	{
5875 		*node_state = palloc0(1);
5876 		*node_state[0] = '\0';
5877 	}
5878 
5879 	PQclear(res);
5880 
5881 	return NODE_ATTACHED;
5882 }
5883 
5884 
5885 void
set_upstream_last_seen(PGconn * conn,int upstream_node_id)5886 set_upstream_last_seen(PGconn *conn, int upstream_node_id)
5887 {
5888 	PQExpBufferData query;
5889 	PGresult   *res = NULL;
5890 
5891 	initPQExpBuffer(&query);
5892 
5893 	appendPQExpBuffer(&query,
5894 					  "SELECT repmgr.set_upstream_last_seen(%i)",
5895 					  upstream_node_id);
5896 
5897 	res = PQexec(conn, query.data);
5898 
5899 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
5900 	{
5901 		log_db_error(conn, query.data, _("unable to execute repmgr.set_upstream_last_seen()"));
5902 	}
5903 
5904 	termPQExpBuffer(&query);
5905 	PQclear(res);
5906 }
5907 
5908 
5909 int
get_upstream_last_seen(PGconn * conn,t_server_type node_type)5910 get_upstream_last_seen(PGconn *conn, t_server_type node_type)
5911 {
5912 	PQExpBufferData query;
5913 	PGresult   *res = NULL;
5914 	int upstream_last_seen = -1;
5915 
5916 	initPQExpBuffer(&query);
5917 
5918 	if (node_type == WITNESS)
5919 	{
5920 		appendPQExpBufferStr(&query,
5921 							 "SELECT repmgr.get_upstream_last_seen()");
5922 	}
5923 	else
5924 	{
5925 		appendPQExpBufferStr(&query,
5926 							 "SELECT CASE WHEN pg_catalog.pg_is_in_recovery() IS FALSE "
5927 							 "   THEN -1 "
5928 							 "   ELSE repmgr.get_upstream_last_seen() "
5929 							 " END AS upstream_last_seen ");
5930 	}
5931 
5932 	res = PQexec(conn, query.data);
5933 
5934 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
5935 	{
5936 		log_db_error(conn, query.data, _("unable to execute repmgr.get_upstream_last_seen()"));
5937 	}
5938 	else
5939 	{
5940 		upstream_last_seen = atoi(PQgetvalue(res, 0, 0));
5941 	}
5942 
5943 	termPQExpBuffer(&query);
5944 	PQclear(res);
5945 
5946 	return upstream_last_seen;
5947 }
5948 
5949 
5950 bool
is_wal_replay_paused(PGconn * conn,bool check_pending_wal)5951 is_wal_replay_paused(PGconn *conn, bool check_pending_wal)
5952 {
5953 	PQExpBufferData query;
5954 	PGresult   *res = NULL;
5955 	bool		is_paused = false;
5956 
5957 	initPQExpBuffer(&query);
5958 
5959 	appendPQExpBufferStr(&query,
5960 						 "SELECT paused.wal_replay_paused ");
5961 
5962 	if (PQserverVersion(conn) >= 100000)
5963 	{
5964 		if (check_pending_wal == true)
5965 		{
5966 			appendPQExpBufferStr(&query,
5967 								 " AND pg_catalog.pg_last_wal_replay_lsn() < pg_catalog.pg_last_wal_receive_lsn() ");
5968 		}
5969 
5970 		appendPQExpBufferStr(&query,
5971 							 " FROM (SELECT CASE WHEN pg_catalog.pg_is_in_recovery() IS FALSE "
5972 							 "                THEN FALSE "
5973 							 "                ELSE pg_catalog.pg_is_wal_replay_paused() "
5974 							 "              END AS wal_replay_paused) paused ");
5975 	}
5976 	else
5977 	{
5978 		if (check_pending_wal == true)
5979 		{
5980 			appendPQExpBufferStr(&query,
5981 								 " AND pg_catalog.pg_last_xlog_replay_location() < pg_catalog.pg_last_xlog_receive_location() ");
5982 		}
5983 
5984 		appendPQExpBufferStr(&query,
5985 							 " FROM (SELECT CASE WHEN pg_catalog.pg_is_in_recovery() IS FALSE "
5986 							 "                THEN FALSE "
5987 							 "                ELSE pg_catalog.pg_is_xlog_replay_paused() "
5988 							 "              END AS wal_replay_paused) paused ");
5989 
5990 	}
5991 
5992 	res = PQexec(conn, query.data);
5993 
5994 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
5995 	{
5996 		log_db_error(conn, query.data, _("unable to execute WAL replay pause query"));
5997 	}
5998 	else
5999 	{
6000 		is_paused = atobool(PQgetvalue(res, 0, 0));
6001 	}
6002 
6003 	termPQExpBuffer(&query);
6004 	PQclear(res);
6005 
6006 	return is_paused;
6007 }
6008 
6009 
6010 /* miscellaneous debugging functions */
6011 
6012 const char *
print_node_status(NodeStatus node_status)6013 print_node_status(NodeStatus node_status)
6014 {
6015 	switch (node_status)
6016 	{
6017 		case NODE_STATUS_UNKNOWN:
6018 			return "UNKNOWN";
6019 		case NODE_STATUS_UP:
6020 			return "UP";
6021 		case NODE_STATUS_SHUTTING_DOWN:
6022 			return "SHUTTING_DOWN";
6023 		case NODE_STATUS_DOWN:
6024 			return "SHUTDOWN";
6025 		case NODE_STATUS_UNCLEAN_SHUTDOWN:
6026 			return "UNCLEAN_SHUTDOWN";
6027 		case NODE_STATUS_REJECTED:
6028 			return "REJECTED";
6029 	}
6030 
6031 	return "UNIDENTIFIED_STATUS";
6032 }
6033 
6034 
6035 const char *
print_pqping_status(PGPing ping_status)6036 print_pqping_status(PGPing ping_status)
6037 {
6038 	switch (ping_status)
6039 	{
6040 		case PQPING_OK:
6041 			return "PQPING_OK";
6042 		case PQPING_REJECT:
6043 			return "PQPING_REJECT";
6044 		case PQPING_NO_RESPONSE:
6045 			return "PQPING_NO_RESPONSE";
6046 		case PQPING_NO_ATTEMPT:
6047 			return "PQPING_NO_ATTEMPT";
6048 	}
6049 
6050 	return "PQPING_UNKNOWN_STATUS";
6051 }
6052