1 /*
2  * $Header$
3  *
4  * Handles watchdog connection, and protocol communication with pgpool-II
5  *
6  * pgpool: a language independent connection pool server for PostgreSQL
7  * written by Tatsuo Ishii
8  *
9  * Copyright (c) 2003-2020	PgPool Global Development Group
10  *
11  * Permission to use, copy, modify, and distribute this software and
12  * its documentation for any purpose and without fee is hereby
13  * granted, provided that the above copyright notice appear in all
14  * copies and that both that copyright notice and this permission
15  * notice appear in supporting documentation, and that the name of the
16  * author not be used in advertising or publicity pertaining to
17  * distribution of the software without specific, written prior
18  * permission. The author makes no representations about the
19  * suitability of this software for any purpose.  It is provided "as
20  * is" without express or implied warranty.
21  *
22  */
23 #include <stdio.h>
24 #include <errno.h>
25 #include <ctype.h>
26 #include <time.h>
27 #include <string.h>
28 #include <stdlib.h>
29 #include <signal.h>
30 #include <sys/stat.h>
31 #include <sys/un.h>
32 #include <sys/types.h>
33 #include <sys/socket.h>
34 #include <netinet/in.h>
35 #include <netinet/tcp.h>
36 #include <netdb.h>
37 #include <arpa/inet.h>
38 #include <unistd.h>
39 #include <fcntl.h>
40 #include <sys/wait.h>
41 
42 #include "pool.h"
43 #include "pool_config.h"
44 #include "watchdog/wd_json_data.h"
45 #include "watchdog/wd_internal_commands.h"
46 #include "utils/elog.h"
47 #include "utils/json_writer.h"
48 #include "utils/pool_signal.h"
49 #include "utils/json.h"
50 #include "auth/pool_auth.h"
51 
52 #define WD_DEFAULT_IPC_COMMAND_TIMEOUT	8	/* default number of seconds to
53 											 * wait for IPC command results */
54 #define WD_INTERLOCK_WAIT_MSEC		500
55 #define WD_INTERLOCK_TIMEOUT_SEC	10
56 #define WD_INTERLOCK_WAIT_COUNT ((int) ((WD_INTERLOCK_TIMEOUT_SEC * 1000)/WD_INTERLOCK_WAIT_MSEC))
57 
58 /* shared memory variables */
59 bool	   		*watchdog_require_cleanup = NULL;	/* shared memory variable set
60 													 * to true when watchdog
61 													 * process terminates
62 													 * abnormally */
63 bool	   		*watchdog_node_escalated = NULL; /* shared memory variable set to
64 												  * true when watchdog process has
65 												  * performed escalation */
66 unsigned int 	*ipc_shared_key = NULL;		/* key lives in shared memory used to
67 											 * identify the ipc internal clients */
68 
69 static char *get_wd_failover_state_json(bool start);
70 static WDFailoverCMDResults wd_get_failover_result_from_data(WDIPCCmdResult * result,
71 															 unsigned int *wd_failover_id);
72 static WDFailoverCMDResults wd_issue_failover_command(char *func_name, int *node_id_set,
73 													  		int count, unsigned char flags);
74 
75 
76 void
wd_ipc_initialize_data(void)77 wd_ipc_initialize_data(void)
78 {
79 	wd_ipc_conn_initialize();
80 
81 	if (ipc_shared_key == NULL)
82 	{
83 		ipc_shared_key = pool_shared_memory_segment_get_chunk(sizeof(unsigned int));
84 		*ipc_shared_key = 0;
85 		while (*ipc_shared_key == 0)
86 		{
87 			pool_random_salt((char *) ipc_shared_key);
88 		}
89 	}
90 
91 	if (watchdog_require_cleanup == NULL)
92 	{
93 		watchdog_require_cleanup = pool_shared_memory_segment_get_chunk(sizeof(bool));
94 		*watchdog_require_cleanup = false;
95 	}
96 
97 	if (watchdog_node_escalated == NULL)
98 	{
99 		watchdog_node_escalated = pool_shared_memory_segment_get_chunk(sizeof(bool));
100 		*watchdog_node_escalated = false;
101 	}
102 }
103 
wd_ipc_get_shared_mem_size(void)104 size_t wd_ipc_get_shared_mem_size(void)
105 {
106 	size_t size = 0;
107 	size += MAXALIGN(sizeof(unsigned int)); /* ipc_shared_key */
108 	size += MAXALIGN(sizeof(bool)); /* watchdog_require_cleanup */
109 	size += MAXALIGN(sizeof(bool)); /* watchdog_node_escalated */
110 	size += estimate_ipc_socket_addr_len();
111 	return size;
112 }
113 
114 /*
115  * function gets the PG backend status of all attached nodes from
116  * the leader watchdog node.
117  */
118 WDPGBackendStatus *
get_pg_backend_status_from_leader_wd_node(void)119 get_pg_backend_status_from_leader_wd_node(void)
120 {
121 	unsigned int *shared_key = get_ipc_shared_key();
122 	char	   *data = get_data_request_json(WD_DATE_REQ_PG_BACKEND_DATA,
123 											 shared_key ? *shared_key : 0, pool_config->wd_authkey);
124 
125 	WDIPCCmdResult *result = issue_command_to_watchdog(WD_GET_LEADER_DATA_REQUEST,
126 													   WD_DEFAULT_IPC_COMMAND_TIMEOUT,
127 													   data, strlen(data), true);
128 
129 	pfree(data);
130 
131 	if (result == NULL)
132 	{
133 		ereport(WARNING,
134 				(errmsg("get backend node status from leader watchdog failed"),
135 				 errdetail("issue command to watchdog returned NULL")));
136 		return NULL;
137 	}
138 	if (result->type == WD_IPC_CMD_CLUSTER_IN_TRAN)
139 	{
140 		ereport(WARNING,
141 				(errmsg("get backend node status from leader watchdog failed"),
142 				 errdetail("watchdog cluster is not in stable state"),
143 				 errhint("try again when the cluster is fully initialized")));
144 		FreeCmdResult(result);
145 		return NULL;
146 	}
147 	else if (result->type == WD_IPC_CMD_TIMEOUT)
148 	{
149 		ereport(WARNING,
150 				(errmsg("get backend node status from leader watchdog failed"),
151 				 errdetail("ipc command timeout")));
152 		FreeCmdResult(result);
153 		return NULL;
154 	}
155 	else if (result->type == WD_IPC_CMD_RESULT_OK)
156 	{
157 		WDPGBackendStatus *backendStatus = get_pg_backend_node_status_from_json(result->data, result->length);
158 
159 		/*
160 		 * Watchdog returns the zero length data when the node itself is a
161 		 * leader watchdog node
162 		 */
163 		if (result->length <= 0)
164 		{
165 			backendStatus = palloc0(sizeof(WDPGBackendStatus));
166 			backendStatus->node_count = -1;
167 		}
168 		else
169 		{
170 			backendStatus = get_pg_backend_node_status_from_json(result->data, result->length);
171 		}
172 		FreeCmdResult(result);
173 		return backendStatus;
174 	}
175 
176 	ereport(WARNING,
177 			(errmsg("get backend node status from leader watchdog failed")));
178 	FreeCmdResult(result);
179 	return NULL;
180 }
181 
182 WdCommandResult
wd_start_recovery(void)183 wd_start_recovery(void)
184 {
185 	char		type;
186 	unsigned int *shared_key = get_ipc_shared_key();
187 
188 	char	   *func = get_wd_node_function_json(WD_FUNCTION_START_RECOVERY, NULL, 0, 0,
189 												 shared_key ? *shared_key : 0, pool_config->wd_authkey);
190 
191 	WDIPCCmdResult *result = issue_command_to_watchdog(WD_IPC_ONLINE_RECOVERY_COMMAND,
192 													   pool_config->recovery_timeout + WD_DEFAULT_IPC_COMMAND_TIMEOUT,
193 													   func, strlen(func), true);
194 
195 	pfree(func);
196 
197 	if (result == NULL)
198 	{
199 		ereport(WARNING,
200 				(errmsg("start recovery command lock failed"),
201 				 errdetail("issue command to watchdog returned NULL")));
202 		return COMMAND_FAILED;
203 	}
204 
205 	type = result->type;
206 	FreeCmdResult(result);
207 	if (type == WD_IPC_CMD_CLUSTER_IN_TRAN)
208 	{
209 		ereport(WARNING,
210 				(errmsg("start recovery command lock failed"),
211 				 errdetail("watchdog cluster is not in stable state"),
212 				 errhint("try again when the cluster is fully initialized")));
213 		return CLUSTER_IN_TRANSATIONING;
214 	}
215 	else if (type == WD_IPC_CMD_TIMEOUT)
216 	{
217 		ereport(WARNING,
218 				(errmsg("start recovery command lock failed"),
219 				 errdetail("ipc command timeout")));
220 		return COMMAND_TIMEOUT;
221 	}
222 	else if (type == WD_IPC_CMD_RESULT_OK)
223 	{
224 		return COMMAND_OK;
225 	}
226 	return COMMAND_FAILED;
227 }
228 
229 WdCommandResult
wd_end_recovery(void)230 wd_end_recovery(void)
231 {
232 	char		type;
233 	unsigned int *shared_key = get_ipc_shared_key();
234 
235 	char	   *func = get_wd_node_function_json(WD_FUNCTION_END_RECOVERY, NULL, 0, 0,
236 												 shared_key ? *shared_key : 0, pool_config->wd_authkey);
237 
238 
239 	WDIPCCmdResult *result = issue_command_to_watchdog(WD_IPC_ONLINE_RECOVERY_COMMAND,
240 													   WD_DEFAULT_IPC_COMMAND_TIMEOUT,
241 													   func, strlen(func), true);
242 
243 	pfree(func);
244 
245 	if (result == NULL)
246 	{
247 		ereport(WARNING,
248 				(errmsg("end recovery command lock failed"),
249 				 errdetail("issue command to watchdog returned NULL")));
250 		return COMMAND_FAILED;
251 	}
252 
253 	type = result->type;
254 	FreeCmdResult(result);
255 
256 	if (type == WD_IPC_CMD_CLUSTER_IN_TRAN)
257 	{
258 		ereport(WARNING,
259 				(errmsg("end recovery command lock failed"),
260 				 errdetail("watchdog cluster is not in stable state"),
261 				 errhint("try again when the cluster is fully initialized")));
262 		return CLUSTER_IN_TRANSATIONING;
263 	}
264 	else if (type == WD_IPC_CMD_TIMEOUT)
265 	{
266 		ereport(WARNING,
267 				(errmsg("end recovery command lock failed"),
268 				 errdetail("ipc command timeout")));
269 		return COMMAND_TIMEOUT;
270 	}
271 	else if (type == WD_IPC_CMD_RESULT_OK)
272 	{
273 		return COMMAND_OK;
274 	}
275 	return COMMAND_FAILED;
276 }
277 
278 WdCommandResult
wd_execute_cluster_command(char * clusterCommand,int nArgs,WDExecCommandArg * wdExecCommandArg)279 wd_execute_cluster_command(char* clusterCommand,
280 						   int nArgs, WDExecCommandArg *wdExecCommandArg)
281 {
282 	char		type;
283 	unsigned int *shared_key = get_ipc_shared_key();
284 
285 	char	   *func = get_wd_exec_cluster_command_json(clusterCommand, nArgs, wdExecCommandArg,
286 												 shared_key ? *shared_key : 0, pool_config->wd_authkey);
287 
288 	WDIPCCmdResult *result = issue_command_to_watchdog(WD_EXECUTE_CLUSTER_COMMAND,
289 													   WD_DEFAULT_IPC_COMMAND_TIMEOUT,
290 													   func, strlen(func), true);
291 
292 	pfree(func);
293 
294 	if (result == NULL)
295 	{
296 		ereport(WARNING,
297 				(errmsg("execute cluster command failed"),
298 				 errdetail("issue command to watchdog returned NULL")));
299 		return COMMAND_FAILED;
300 	}
301 
302 	type = result->type;
303 	FreeCmdResult(result);
304 
305 	if (type == WD_IPC_CMD_CLUSTER_IN_TRAN)
306 	{
307 		ereport(WARNING,
308 				(errmsg("execute cluster command failed"),
309 				 errdetail("watchdog cluster is not in stable state"),
310 				 errhint("try again when the cluster is fully initialized")));
311 		return CLUSTER_IN_TRANSATIONING;
312 	}
313 	else if (type == WD_IPC_CMD_TIMEOUT)
314 	{
315 		ereport(WARNING,
316 				(errmsg("execute cluster command failed"),
317 				 errdetail("ipc command timeout")));
318 		return COMMAND_TIMEOUT;
319 	}
320 	else if (type == WD_IPC_CMD_RESULT_OK)
321 	{
322 		return COMMAND_OK;
323 	}
324 	return COMMAND_FAILED;
325 }
326 
327 
328 static char *
get_wd_failover_state_json(bool start)329 get_wd_failover_state_json(bool start)
330 {
331 	char	   *json_str;
332 	JsonNode   *jNode = jw_create_with_object(true);
333 	unsigned int *shared_key = get_ipc_shared_key();
334 
335 	jw_put_int(jNode, WD_IPC_SHARED_KEY, shared_key ? *shared_key : 0); /* put the shared key */
336 	if (pool_config->wd_authkey != NULL && strlen(pool_config->wd_authkey) > 0)
337 		jw_put_string(jNode, WD_IPC_AUTH_KEY, pool_config->wd_authkey); /* put the auth key */
338 
339 	jw_put_int(jNode, "FailoverFuncState", start ? 0 : 1);
340 	jw_finish_document(jNode);
341 	json_str = pstrdup(jw_get_json_string(jNode));
342 	jw_destroy(jNode);
343 	return json_str;
344 }
345 
346 static WDFailoverCMDResults
wd_send_failover_func_status_command(bool start)347 wd_send_failover_func_status_command(bool start)
348 {
349 	WDFailoverCMDResults res;
350 	unsigned int failover_id;
351 
352 	char	   *json_data = get_wd_failover_state_json(start);
353 
354 	WDIPCCmdResult *result = issue_command_to_watchdog(WD_FAILOVER_INDICATION
355 													   ,WD_DEFAULT_IPC_COMMAND_TIMEOUT,
356 													   json_data, strlen(json_data), true);
357 
358 	pfree(json_data);
359 
360 	res = wd_get_failover_result_from_data(result, &failover_id);
361 
362 	FreeCmdResult(result);
363 	return res;
364 }
365 
wd_get_failover_result_from_data(WDIPCCmdResult * result,unsigned int * wd_failover_id)366 static WDFailoverCMDResults wd_get_failover_result_from_data(WDIPCCmdResult * result, unsigned int *wd_failover_id)
367 {
368 	if (result == NULL)
369 	{
370 		ereport(WARNING,
371 				(errmsg("failover command on watchdog failed"),
372 				 errdetail("issue command to watchdog returned NULL")));
373 		return FAILOVER_RES_ERROR;
374 	}
375 	if (result->type == WD_IPC_CMD_CLUSTER_IN_TRAN)
376 	{
377 		ereport(WARNING,
378 				(errmsg("failover command on watchdog failed"),
379 				 errdetail("watchdog cluster is not in stable state"),
380 				 errhint("try again when the cluster is fully initialized")));
381 		return FAILOVER_RES_TRANSITION;
382 	}
383 	else if (result->type == WD_IPC_CMD_TIMEOUT)
384 	{
385 		ereport(WARNING,
386 				(errmsg("failover command on watchdog failed"),
387 				 errdetail("ipc command timeout")));
388 		return FAILOVER_RES_TIMEOUT;
389 	}
390 	else if (result->type == WD_IPC_CMD_RESULT_OK)
391 	{
392 		WDFailoverCMDResults res = FAILOVER_RES_ERROR;
393 		json_value *root;
394 
395 		root = json_parse(result->data, result->length);
396 		/* The root node must be object */
397 		if (root == NULL || root->type != json_object)
398 		{
399 			ereport(NOTICE,
400 					(errmsg("unable to parse json data from failover command result")));
401 			return res;
402 		}
403 		if (root && json_get_int_value_for_key(root, WD_FAILOVER_RESULT_KEY, (int *) &res))
404 		{
405 			json_value_free(root);
406 			return FAILOVER_RES_ERROR;
407 		}
408 		if (root && json_get_int_value_for_key(root, WD_FAILOVER_ID_KEY, (int *) wd_failover_id))
409 		{
410 			json_value_free(root);
411 			return FAILOVER_RES_ERROR;
412 		}
413 		return res;
414 	}
415 	return FAILOVER_RES_ERROR;
416 }
417 
418 static WDFailoverCMDResults
wd_issue_failover_command(char * func_name,int * node_id_set,int count,unsigned char flags)419 wd_issue_failover_command(char *func_name, int *node_id_set, int count, unsigned char flags)
420 {
421 	WDFailoverCMDResults res;
422 	char	   *func;
423 	unsigned int *shared_key = get_ipc_shared_key();
424 	unsigned int wd_failover_id;
425 
426 	func = get_wd_node_function_json(func_name, node_id_set, count, flags,
427 									 shared_key ? *shared_key : 0, pool_config->wd_authkey);
428 
429 	WDIPCCmdResult *result = issue_command_to_watchdog(WD_IPC_FAILOVER_COMMAND,
430 													   WD_DEFAULT_IPC_COMMAND_TIMEOUT,
431 													   func, strlen(func), true);
432 
433 	pfree(func);
434 	res = wd_get_failover_result_from_data(result, &wd_failover_id);
435 	FreeCmdResult(result);
436 	return res;
437 }
438 
439 /*
440  * send the degenerate backend request to watchdog.
441  * now watchdog can respond to the request in following ways.
442  *
443  * 1 - It can tell the caller to procees with failover. This
444  * happens when the current node is the leader watchdog node.
445  *
446  * 2 - It can tell the caller to failover not allowed
447  * this happens when either cluster does not have the quorum
448  *
449  */
450 WDFailoverCMDResults
wd_degenerate_backend_set(int * node_id_set,int count,unsigned char flags)451 wd_degenerate_backend_set(int *node_id_set, int count, unsigned char flags)
452 {
453 	if (pool_config->use_watchdog)
454 		return wd_issue_failover_command(WD_FUNCTION_DEGENERATE_REQUEST, node_id_set, count, flags);
455 	return FAILOVER_RES_PROCEED;
456 }
457 
458 WDFailoverCMDResults
wd_promote_backend(int node_id,unsigned char flags)459 wd_promote_backend(int node_id, unsigned char flags)
460 {
461 	if (pool_config->use_watchdog)
462 		return wd_issue_failover_command(WD_FUNCTION_PROMOTE_REQUEST, &node_id, 1, flags);
463 	return FAILOVER_RES_PROCEED;
464 }
465 
466 WDFailoverCMDResults
wd_send_failback_request(int node_id,unsigned char flags)467 wd_send_failback_request(int node_id, unsigned char flags)
468 {
469 	if (pool_config->use_watchdog)
470 		return wd_issue_failover_command(WD_FUNCTION_FAILBACK_REQUEST, &node_id, 1, flags);
471 	return FAILOVER_RES_PROCEED;
472 }
473 
474 /*
475  * Function returns the JSON of watchdog nodes
476  * pass nodeID = -1 to get list of all nodes
477  */
478 char *
wd_internal_get_watchdog_nodes_json(int nodeID)479 wd_internal_get_watchdog_nodes_json(int nodeID)
480 {
481 	return wd_get_watchdog_nodes_json(pool_config->wd_authkey, nodeID);
482 }
483 
484 WDFailoverCMDResults
wd_failover_start(void)485 wd_failover_start(void)
486 {
487 	if (pool_config->use_watchdog)
488 		return wd_send_failover_func_status_command(true);
489 	return FAILOVER_RES_PROCEED;
490 }
491 
492 WDFailoverCMDResults
wd_failover_end(void)493 wd_failover_end(void)
494 {
495 	if (pool_config->use_watchdog)
496 		return wd_send_failover_func_status_command(false);
497 	return FAILOVER_RES_PROCEED;
498 }
499 
500 /* These functions are not available for frontend utilities */
501 unsigned int *
get_ipc_shared_key(void)502 get_ipc_shared_key(void)
503 {
504 	return ipc_shared_key;
505 }
506 
507 void
set_watchdog_process_needs_cleanup(void)508 set_watchdog_process_needs_cleanup(void)
509 {
510 	*watchdog_require_cleanup = true;
511 }
512 
513 void
reset_watchdog_process_needs_cleanup(void)514 reset_watchdog_process_needs_cleanup(void)
515 {
516 	*watchdog_require_cleanup = false;
517 }
518 
519 bool
get_watchdog_process_needs_cleanup(void)520 get_watchdog_process_needs_cleanup(void)
521 {
522 	return *watchdog_require_cleanup;
523 }
524 
525 
526 void
set_watchdog_node_escalated(void)527 set_watchdog_node_escalated(void)
528 {
529 	*watchdog_node_escalated = true;
530 }
531 
532 void
reset_watchdog_node_escalated(void)533 reset_watchdog_node_escalated(void)
534 {
535 	*watchdog_node_escalated = false;
536 }
537 
538 bool
get_watchdog_node_escalation_state(void)539 get_watchdog_node_escalation_state(void)
540 {
541 	return *watchdog_node_escalated;
542 }
543 
544 int
wd_internal_get_watchdog_quorum_state(void)545 wd_internal_get_watchdog_quorum_state(void)
546 {
547 	return get_watchdog_quorum_state(pool_config->wd_authkey);
548 }
549 
550 WD_STATES
wd_internal_get_watchdog_local_node_state(void)551 wd_internal_get_watchdog_local_node_state(void)
552 {
553 	return get_watchdog_local_node_state(pool_config->wd_authkey);
554 }
555