1 /*
2  * $Header$
3  *
4  * Handles watchdog connection, and protocol communication with pgpool-II
5  *
6  * pgpool: a language independent connection pool server for PostgreSQL
7  * written by Tatsuo Ishii
8  *
9  * Copyright (c) 2003-2016	PgPool Global Development Group
10  *
11  * Permission to use, copy, modify, and distribute this software and
12  * its documentation for any purpose and without fee is hereby
13  * granted, provided that the above copyright notice appear in all
14  * copies and that both that copyright notice and this permission
15  * notice appear in supporting documentation, and that the name of the
16  * author not be used in advertising or publicity pertaining to
17  * distribution of the software without specific, written prior
18  * permission. The author makes no representations about the
19  * suitability of this software for any purpose.  It is provided "as
20  * is" without express or implied warranty.
21  *
22  */
23 #include <stdio.h>
24 #include <errno.h>
25 #include <ctype.h>
26 #include <time.h>
27 #include <string.h>
28 #include <stdlib.h>
29 #include <signal.h>
30 #include <sys/stat.h>
31 #include <sys/un.h>
32 #include <sys/types.h>
33 #include <sys/socket.h>
34 #include <netinet/in.h>
35 #include <netinet/tcp.h>
36 #include <netdb.h>
37 #include <arpa/inet.h>
38 #include <unistd.h>
39 #include <fcntl.h>
40 #include <sys/wait.h>
41 
42 #include "pool.h"
43 #include "utils/elog.h"
44 #include "utils/json_writer.h"
45 #include "utils/json.h"
46 #include "utils/pool_stream.h"
47 #include "pool_config.h"
48 #include "watchdog/wd_json_data.h"
49 #include "watchdog/wd_ipc_commands.h"
50 #include "watchdog/wd_ipc_defines.h"
51 
52 #define WD_DEFAULT_IPC_COMMAND_TIMEOUT	8 /* default number of seconds to wait for IPC command results*/
53 #define WD_INTERLOCK_WAIT_MSEC		500
54 #define WD_INTERLOCK_TIMEOUT_SEC	10
55 #define WD_INTERLOCK_WAIT_COUNT ((int) ((WD_INTERLOCK_TIMEOUT_SEC * 1000)/WD_INTERLOCK_WAIT_MSEC))
56 
57 static void sleep_in_waiting(void);
58 static void FreeCmdResult(WDIPCCmdResult* res);
59 
60 static WDFailoverCMDResults wd_issue_failover_lock_command(char* syncReqType, enum WDFailoverLocks lockID, unsigned int wd_failover_id);
61 static char* get_wd_failover_cmd_type_json(char* reqType, enum WDFailoverLocks lockID, unsigned int wd_failover_id);
62 static WDFailoverCMDResults wd_send_failover_sync_command(char* syncReqType, enum WDFailoverLocks lockID, unsigned int wd_failover_id);
63 
64 static int open_wd_command_sock(bool throw_error);
65 static WDFailoverCMDResults wd_get_failover_result_from_data(WDIPCCmdResult *result, unsigned int *wd_failover_id);
66 
67 /* shared memory variables */
68 char *watchdog_ipc_address = NULL;
69 bool *watchdog_require_cleanup = NULL;	/* shared memory variable set to true
70 										 * when watchdog process terminates abnormally
71 										 */
72 bool *watchdog_node_escalated = NULL;	/* shared memory variable set to true
73 										 * when watchdog process has performed escalation
74 										 */
75 unsigned int *ipc_shared_key = NULL;   /* key lives in shared memory
76 										* used to identify the ipc internal
77 										* clients
78 										*/
wd_ipc_initialize_data(void)79 void wd_ipc_initialize_data(void)
80 {
81 	if (watchdog_ipc_address == NULL)
82 	{
83 		char wd_ipc_sock_addr[255];
84 		snprintf(wd_ipc_sock_addr, sizeof(wd_ipc_sock_addr), "%s/.s.PGPOOLWD_CMD.%d",
85 				 pool_config->wd_ipc_socket_dir,
86 				 pool_config->wd_port);
87 
88 		watchdog_ipc_address = pool_shared_memory_create(strlen(wd_ipc_sock_addr) +1);
89 		strcpy(watchdog_ipc_address, wd_ipc_sock_addr);
90 	}
91 
92 	if (ipc_shared_key == NULL)
93 	{
94 		ipc_shared_key = pool_shared_memory_create(sizeof(unsigned int));
95 		*ipc_shared_key = 0;
96 		while (*ipc_shared_key == 0) {
97 			pool_random_salt((char*)ipc_shared_key);
98 		}
99 	}
100 
101 	if (watchdog_require_cleanup == NULL)
102 	{
103 		watchdog_require_cleanup = pool_shared_memory_create(sizeof(bool));
104 		*watchdog_require_cleanup = false;
105 	}
106 
107 	if (watchdog_node_escalated == NULL)
108 	{
109 		watchdog_node_escalated = pool_shared_memory_create(sizeof(bool));
110 		*watchdog_node_escalated = false;
111 	}
112 }
113 
get_watchdog_local_node_state(void)114 WD_STATES get_watchdog_local_node_state(void)
115 {
116 	WD_STATES ret = WD_DEAD;
117 	WDGenericData *state = get_wd_runtime_variable_value(WD_RUNTIME_VAR_WD_STATE);
118 	if (state == NULL)
119 	{
120 		ereport(LOG,
121 				(errmsg("failed to get current state of local watchdog node"),
122 				 errdetail("get runtime variable value from watchdog returned no data")));
123 		return WD_DEAD;
124 	}
125 	if (state->valueType != VALUE_DATA_TYPE_INT)
126 	{
127 		ereport(LOG,
128 				(errmsg("failed to get current state of local watchdog node"),
129 				 errdetail("get runtime variable value from watchdog returned invalid value type")));
130 		pfree(state);
131 		return WD_DEAD;
132 	}
133 	ret = (WD_STATES)state->data.intVal;
134 	pfree(state);
135 	return ret;
136 }
137 
get_watchdog_ipc_address(void)138 char* get_watchdog_ipc_address(void)
139 {
140 	return watchdog_ipc_address;
141 }
142 
get_ipc_shared_key(void)143 unsigned int* get_ipc_shared_key(void)
144 {
145 	return ipc_shared_key;
146 }
147 
set_watchdog_process_needs_cleanup(void)148 void set_watchdog_process_needs_cleanup(void)
149 {
150 	*watchdog_require_cleanup = true;
151 }
152 
reset_watchdog_process_needs_cleanup(void)153 void reset_watchdog_process_needs_cleanup(void)
154 {
155 	*watchdog_require_cleanup = false;
156 }
157 
get_watchdog_process_needs_cleanup(void)158 bool get_watchdog_process_needs_cleanup(void)
159 {
160 	return *watchdog_require_cleanup;
161 }
162 
163 
set_watchdog_node_escalated(void)164 void set_watchdog_node_escalated(void)
165 {
166 	*watchdog_node_escalated = true;
167 }
168 
reset_watchdog_node_escalated(void)169 void reset_watchdog_node_escalated(void)
170 {
171 	*watchdog_node_escalated = false;
172 }
173 
get_watchdog_node_escalation_state(void)174 bool get_watchdog_node_escalation_state(void)
175 {
176 	return *watchdog_node_escalated;
177 }
178 
179 /*
180  * function issues the command to watchdog process over the watchdog
181  * IPC command socket.
182  * type:            command type to send. valid command
183  *                  types are defined in wd_ipc_defines.h
184  * timeout_sec:     number of seconds to wait for the command response
185  *                  from watchdog
186  * data:            command data
187  * data_len:        length of data
188  * blocking:        send true if caller wants to wait for the results
189  *                  when blocking is false the timeout_sec is ignored
190  */
191 WDIPCCmdResult*
issue_command_to_watchdog(char type,int timeout_sec,char * data,int data_len,bool blocking)192 issue_command_to_watchdog(char type, int timeout_sec, char* data, int data_len, bool blocking)
193 {
194 	struct timeval start_time,tv;
195 	int sock;
196 	WDIPCCmdResult* result = NULL;
197 	char res_type = 'P';
198 	int res_length, len;
199 	gettimeofday(&start_time, NULL);
200 
201 	/* open the watchdog command socket for IPC */
202 	sock = open_wd_command_sock(false);
203 	if (sock < 0)
204 		return NULL;
205 
206 	len = htonl(data_len);
207 
208 	if (socket_write(sock, &type, sizeof(char)) <= 0)
209 	{
210 		close(sock);
211 		return NULL;
212 	}
213 
214 	if (socket_write(sock, &len, sizeof(int)) <= 0)
215 	{
216 		close(sock);
217 		return NULL;
218 	}
219 	if (data && data_len > 0)
220 	{
221 		if (socket_write(sock, data, data_len) <= 0)
222 		{
223 			close(sock);
224 			return NULL;
225 		}
226 	}
227 
228 	if (blocking)
229 	{
230 		/* if we are asked to wait for results */
231 		fd_set fds;
232 		struct timeval *timeout_st = NULL;
233 		if (timeout_sec > 0)
234 		{
235 			tv.tv_sec = timeout_sec;
236 			tv.tv_usec = 0;
237 			timeout_st = &tv;
238 		}
239 		FD_ZERO(&fds);
240 		FD_SET(sock,&fds);
241 		for (;;)
242 		{
243 			int select_res;
244 			select_res = select(sock+1,&fds,NULL,NULL,timeout_st);
245 			if (select_res == 0)
246 			{
247 				close(sock);
248 				result = palloc(sizeof(WDIPCCmdResult));
249 				result->type = WD_IPC_CMD_TIMEOUT;
250 				result->length = 0;
251 				result->data = NULL;
252 				return result;
253 			}
254 			if (select_res < 0)
255 			{
256 				if (errno == EAGAIN || errno == EINTR)
257 					continue;
258 				ereport(WARNING,
259 					(errmsg("error reading from IPC command socket for ipc command %c",type),
260 						 errdetail("select system call failed with error \"%s\"",strerror(errno))));
261 				close(sock);
262 				return NULL;
263 			}
264 			if (select_res > 0)
265 			{
266 				/* read the result type char */
267 				if (socket_read(sock, &res_type, 1 ,0) <=0)
268 				{
269 					ereport(WARNING,
270 						(errmsg("error reading from IPC command socket for ipc command %c",type),
271 							 errdetail("read from socket failed with error \"%s\"",strerror(errno))));
272 					close(sock);
273 					return result;
274 				}
275 				/* read the result data length */
276 				if (socket_read(sock, &res_length, sizeof(int), 0) <= 0)
277 				{
278 					ereport(WARNING,
279 						(errmsg("error reading from IPC command socket for ipc command %c",type),
280 							 errdetail("read from socket failed with error \"%s\"",strerror(errno))));
281 					close(sock);
282 					return result;
283 				}
284 
285 				result = palloc(sizeof(WDIPCCmdResult));
286 				result->type = res_type;
287 				result->length = ntohl(res_length);
288 				result->data = NULL;
289 
290 				if (result->length > 0)
291 				{
292 					result->data = palloc(result->length);
293 					if (socket_read(sock, result->data, result->length, 0) <= 0)
294 					{
295 						pfree(result->data);
296 						pfree(result);
297 						ereport(DEBUG1,
298 							(errmsg("error reading from IPC command socket for ipc command %c",type),
299 								 errdetail("read from socket failed with error \"%s\"",strerror(errno))));
300 						close(sock);
301 						return NULL;
302 					}
303 				}
304 				break;
305 			}
306 		}
307 	}
308 	else
309 	{
310 		/* For non blocking mode if we are sucessful in sending the command
311 		 * that means the command is success
312 		 */
313 		result = palloc0(sizeof(WDIPCCmdResult));
314 		result->type = WD_IPC_CMD_RESULT_OK;
315 	}
316 	close(sock);
317 	return result;
318 }
319 
320 /*
321  * Function gets the runtime value of watchdog varibale using the
322  * watchdog IPC
323  */
get_wd_runtime_variable_value(char * varName)324 WDGenericData *get_wd_runtime_variable_value(char *varName)
325 {
326 	unsigned int *shared_key = get_ipc_shared_key();
327 	char *data = get_simple_request_json(WD_JSON_KEY_VARIABLE_NAME,varName,
328 									   shared_key?*shared_key:0,pool_config->wd_authkey);
329 
330 	WDIPCCmdResult *result = issue_command_to_watchdog(WD_GET_RUNTIME_VARIABLE_VALUE,
331 													   WD_DEFAULT_IPC_COMMAND_TIMEOUT,
332 													   data, strlen(data), true);
333 	pfree(data);
334 
335 	if (result == NULL)
336 	{
337 		ereport(WARNING,
338 			(errmsg("get runtime variable value from watchdog failed"),
339 				 errdetail("issue command to watchdog returned NULL")));
340 		return NULL;
341 	}
342 	if (result->type == WD_IPC_CMD_CLUSTER_IN_TRAN)
343 	{
344 		ereport(WARNING,
345 				(errmsg("get runtime variable value from watchdog failed"),
346 				 errdetail("watchdog cluster is not in stable state"),
347 					errhint("try again when the cluster is fully initialized")));
348 		FreeCmdResult(result);
349 		return NULL;
350 	}
351 	else if (result->type == WD_IPC_CMD_TIMEOUT)
352 	{
353 		ereport(WARNING,
354 				(errmsg("get runtime variable value from watchdog failed"),
355 				 errdetail("ipc command timeout")));
356 		FreeCmdResult(result);
357 		return NULL;
358 	}
359 	else if (result->type == WD_IPC_CMD_RESULT_OK)
360 	{
361 		json_value *root = NULL;
362 		WDGenericData *genData = NULL;
363 		WDValueDataType dayaType;
364 
365 		root = json_parse(result->data, result->length);
366 		/* The root node must be object */
367 		if (root == NULL || root->type != json_object)
368 		{
369 			FreeCmdResult(result);
370 			return NULL;
371 		}
372 
373 		if (json_get_int_value_for_key(root, WD_JSON_KEY_VALUE_DATA_TYPE, (int*)&dayaType))
374 		{
375 			FreeCmdResult(result);
376 			json_value_free(root);
377 			return NULL;
378 		}
379 
380 		switch (dayaType) {
381 			case VALUE_DATA_TYPE_INT:
382 			{
383 				int intVal;
384 				if (json_get_int_value_for_key(root, WD_JSON_KEY_VALUE_DATA, &intVal))
385 				{
386 					ereport(WARNING,
387 						(errmsg("get runtime variable value from watchdog failed"),
388 							 errdetail("unable to get INT value from JSON data returned by watchdog")));
389 				}
390 				else
391 				{
392 					genData = palloc(sizeof(WDGenericData));
393 					genData->valueType = dayaType;
394 					genData->data.intVal = intVal;
395 				}
396 			}
397 				break;
398 
399 			case VALUE_DATA_TYPE_LONG:
400 			{
401 				long longVal;
402 				if (json_get_long_value_for_key(root, WD_JSON_KEY_VALUE_DATA, &longVal))
403 				{
404 					ereport(WARNING,
405 						(errmsg("get runtime variable value from watchdog failed"),
406 							 errdetail("unable to get LONG value from JSON data returned by watchdog")));
407 				}
408 				else
409 				{
410 					genData = palloc(sizeof(WDGenericData));
411 					genData->valueType = dayaType;
412 					genData->data.longVal = longVal;
413 				}
414 			}
415 				break;
416 
417 			case VALUE_DATA_TYPE_BOOL:
418 			{
419 				bool boolVal;
420 				if (json_get_bool_value_for_key(root, WD_JSON_KEY_VALUE_DATA, &boolVal))
421 				{
422 					ereport(WARNING,
423 						(errmsg("get runtime variable value from watchdog failed"),
424 							 errdetail("unable to get BOOL value from JSON data returned by watchdog")));
425 				}
426 				else
427 				{
428 					genData = palloc(sizeof(WDGenericData));
429 					genData->valueType = dayaType;
430 					genData->data.boolVal = boolVal;
431 				}
432 			}
433 				break;
434 
435 			case VALUE_DATA_TYPE_STRING:
436 			{
437 				char *ptr = json_get_string_value_for_key(root, WD_JSON_KEY_VALUE_DATA);
438 				if (ptr == NULL)
439 				{
440 					ereport(WARNING,
441 						(errmsg("get runtime variable value from watchdog failed"),
442 							 errdetail("unable to get STRING value from JSON data returned by watchdog")));
443 				}
444 				else
445 				{
446 					genData = palloc(sizeof(WDGenericData));
447 					genData->valueType = dayaType;
448 					genData->data.stringVal = pstrdup(ptr);
449 				}
450 			}
451 				break;
452 
453 			default:
454 				ereport(WARNING,
455 						(errmsg("get runtime variable value from watchdog failed, unknown value data type")));
456 				break;
457 		}
458 
459 		json_value_free(root);
460 		FreeCmdResult(result);
461 		return genData;
462 	}
463 
464 	ereport(WARNING,
465 			(errmsg("get runtime variable value from watchdog failed")));
466 	FreeCmdResult(result);
467 	return NULL;
468 
469 }
470 
471 /*
472  * function gets the PG backend status of all attached nodes from
473  * the master watchdog node.
474  */
get_pg_backend_status_from_master_wd_node(void)475 WDPGBackendStatus* get_pg_backend_status_from_master_wd_node(void)
476 {
477 	unsigned int *shared_key = get_ipc_shared_key();
478 	char *data = get_data_request_json(WD_DATE_REQ_PG_BACKEND_DATA,
479 									   shared_key?*shared_key:0,pool_config->wd_authkey);
480 
481 	WDIPCCmdResult *result = issue_command_to_watchdog(WD_GET_MASTER_DATA_REQUEST,
482 													   WD_DEFAULT_IPC_COMMAND_TIMEOUT,
483 													   data, strlen(data), true);
484 	pfree(data);
485 
486 	if (result == NULL)
487 	{
488 		ereport(WARNING,
489 			(errmsg("get backend node status from master watchdog failed"),
490 				 errdetail("issue command to watchdog returned NULL")));
491 		return NULL;
492 	}
493 	if (result->type == WD_IPC_CMD_CLUSTER_IN_TRAN)
494 	{
495 		ereport(WARNING,
496 			(errmsg("get backend node status from master watchdog failed"),
497 				 errdetail("watchdog cluster is not in stable state"),
498 					errhint("try again when the cluster is fully initialized")));
499 		FreeCmdResult(result);
500 		return NULL;
501 	}
502 	else if (result->type == WD_IPC_CMD_TIMEOUT)
503 	{
504 		ereport(WARNING,
505 				(errmsg("get backend node status from master watchdog failed"),
506 				 errdetail("ipc command timeout")));
507 		FreeCmdResult(result);
508 		return NULL;
509 	}
510 	else if (result->type == WD_IPC_CMD_RESULT_OK)
511 	{
512 		WDPGBackendStatus* backendStatus =  get_pg_backend_node_status_from_json(result->data, result->length);
513 		/*
514 		 * Watchdog returns the zero length data when the node itself is a master
515 		 * watchdog node
516 		 */
517 		if (result->length <= 0)
518 		{
519 			backendStatus = palloc0(sizeof(WDPGBackendStatus));
520 			backendStatus->node_count = -1;
521 		}
522 		else
523 		{
524 			backendStatus =  get_pg_backend_node_status_from_json(result->data, result->length);
525 		}
526 		FreeCmdResult(result);
527 		return backendStatus;
528 	}
529 
530 	ereport(WARNING,
531 		(errmsg("get backend node status from master watchdog failed")));
532 	FreeCmdResult(result);
533 	return NULL;
534 }
535 
536 WdCommandResult
wd_start_recovery(void)537 wd_start_recovery(void)
538 {
539 	char type;
540 	unsigned int *shared_key = get_ipc_shared_key();
541 
542 	char* func = get_wd_node_function_json(WD_FUNCTION_START_RECOVERY, NULL,0,
543 										   shared_key?*shared_key:0,pool_config->wd_authkey);
544 
545 	WDIPCCmdResult *result = issue_command_to_watchdog(WD_IPC_ONLINE_RECOVERY_COMMAND,
546 													   pool_config->recovery_timeout + WD_DEFAULT_IPC_COMMAND_TIMEOUT,
547 													   func, strlen(func), true);
548 	pfree(func);
549 
550 	if (result == NULL)
551 	{
552 		ereport(WARNING,
553 			(errmsg("start recovery command lock failed"),
554 				 errdetail("issue command to watchdog returned NULL")));
555 		return COMMAND_FAILED;
556 	}
557 
558 	type = result->type;
559 	FreeCmdResult(result);
560 	if (type == WD_IPC_CMD_CLUSTER_IN_TRAN)
561 	{
562 		ereport(WARNING,
563 			(errmsg("start recovery command lock failed"),
564 				 errdetail("watchdog cluster is not in stable state"),
565 					errhint("try again when the cluster is fully initialized")));
566 		return CLUSTER_IN_TRANSATIONING;
567 	}
568 	else if (type == WD_IPC_CMD_TIMEOUT)
569 	{
570 		ereport(WARNING,
571 			(errmsg("start recovery command lock failed"),
572 				 errdetail("ipc command timeout")));
573 		return COMMAND_TIMEOUT;
574 	}
575 	else if (type == WD_IPC_CMD_RESULT_OK)
576 	{
577 		return COMMAND_OK;
578 	}
579 	return COMMAND_FAILED;
580 }
581 
582 WdCommandResult
wd_end_recovery(void)583 wd_end_recovery(void)
584 {
585 	char type;
586 	unsigned int *shared_key = get_ipc_shared_key();
587 
588 	char* func = get_wd_node_function_json(WD_FUNCTION_END_RECOVERY, NULL, 0,
589 										   shared_key?*shared_key:0,pool_config->wd_authkey);
590 
591 
592 	WDIPCCmdResult *result = issue_command_to_watchdog(WD_IPC_ONLINE_RECOVERY_COMMAND,
593 													   WD_DEFAULT_IPC_COMMAND_TIMEOUT,
594 													   func, strlen(func), true);
595 	pfree(func);
596 
597 	if (result == NULL)
598 	{
599 		ereport(WARNING,
600 			(errmsg("end recovery command lock failed"),
601 				 errdetail("issue command to watchdog returned NULL")));
602 		return COMMAND_FAILED;
603 	}
604 
605 	type = result->type;
606 	FreeCmdResult(result);
607 
608 	if (type == WD_IPC_CMD_CLUSTER_IN_TRAN)
609 	{
610 		ereport(WARNING,
611 				(errmsg("end recovery command lock failed"),
612 				 errdetail("watchdog cluster is not in stable state"),
613 					errhint("try again when the cluster is fully initialized")));
614 		return CLUSTER_IN_TRANSATIONING;
615 	}
616 	else if (type == WD_IPC_CMD_TIMEOUT)
617 	{
618 		ereport(WARNING,
619 			(errmsg("end recovery command lock failed"),
620 				 errdetail("ipc command timeout")));
621 		return COMMAND_TIMEOUT;
622 	}
623 	else if (type == WD_IPC_CMD_RESULT_OK)
624 	{
625 		return COMMAND_OK;
626 	}
627 	return COMMAND_FAILED;
628 }
629 
630 
631 WDFailoverCMDResults
wd_send_failback_request(int node_id,unsigned int * wd_failover_id)632 wd_send_failback_request(int node_id, unsigned int *wd_failover_id)
633 {
634 	int n = node_id;
635 	char* func;
636 	unsigned int *shared_key = get_ipc_shared_key();
637 	WDFailoverCMDResults res;
638 
639 	func = get_wd_node_function_json(WD_FUNCTION_FAILBACK_REQUEST,&n, 1,
640 									 shared_key?*shared_key:0,pool_config->wd_authkey);
641 
642 	WDIPCCmdResult *result = issue_command_to_watchdog(WD_IPC_FAILOVER_COMMAND,
643 													   WD_DEFAULT_IPC_COMMAND_TIMEOUT,
644 													   func, strlen(func), true);
645 	pfree(func);
646 
647 	res = wd_get_failover_result_from_data(result, wd_failover_id);
648 	FreeCmdResult(result);
649 	return res;
650 }
651 
get_wd_failover_cmd_type_json(char * reqType,enum WDFailoverLocks lockID,unsigned int wd_failover_id)652 static char* get_wd_failover_cmd_type_json(char* reqType, enum WDFailoverLocks lockID, unsigned int wd_failover_id)
653 {
654 	char* json_str;
655 	JsonNode* jNode = jw_create_with_object(true);
656 	unsigned int *shared_key = get_ipc_shared_key();
657 
658 	jw_put_int(jNode, WD_IPC_SHARED_KEY, shared_key?*shared_key:0); /* put the shared key*/
659 	if (pool_config->wd_authkey != NULL && strlen(pool_config->wd_authkey) > 0)
660 		jw_put_string(jNode, WD_IPC_AUTH_KEY, pool_config->wd_authkey); /*  put the auth key*/
661 
662 	jw_put_string(jNode, "SyncRequestType", reqType);
663 	jw_put_int(jNode, "FailoverLockID", lockID);
664 	jw_put_int(jNode, "WDFailoverID", wd_failover_id);
665 	jw_finish_document(jNode);
666 	json_str = pstrdup(jw_get_json_string(jNode));
667 	jw_destroy(jNode);
668 	return json_str;
669 }
670 
671 static WDFailoverCMDResults
wd_send_failover_sync_command(char * syncReqType,enum WDFailoverLocks lockID,unsigned int wd_failover_id)672 wd_send_failover_sync_command(char* syncReqType, enum WDFailoverLocks lockID, unsigned int wd_failover_id)
673 {
674 	WDFailoverCMDResults res;
675 	unsigned int failover_id;
676 
677 	char* json_data = get_wd_failover_cmd_type_json(syncReqType, lockID, wd_failover_id);
678 
679 	WDIPCCmdResult *result = issue_command_to_watchdog(WD_FAILOVER_LOCKING_REQUEST
680 													   ,WD_DEFAULT_IPC_COMMAND_TIMEOUT,
681 													   json_data, strlen(json_data), true);
682 
683 	pfree(json_data);
684 
685 	res = wd_get_failover_result_from_data(result, &failover_id);
686 
687 	FreeCmdResult(result);
688 	return res;
689 }
690 
wd_get_failover_result_from_data(WDIPCCmdResult * result,unsigned int * wd_failover_id)691 static WDFailoverCMDResults wd_get_failover_result_from_data(WDIPCCmdResult *result, unsigned int *wd_failover_id)
692 {
693 	if (result == NULL)
694 		return FAILOVER_RES_ERROR;
695 
696 	if (result == NULL)
697 	{
698 		ereport(WARNING,
699 			(errmsg("failover command on watchdog failed"),
700 				 errdetail("issue command to watchdog returned NULL")));
701 		return FAILOVER_RES_ERROR;
702 	}
703 	if (result->type == WD_IPC_CMD_CLUSTER_IN_TRAN)
704 	{
705 		ereport(WARNING,
706 				(errmsg("failover command on watchdog failed"),
707 				 errdetail("watchdog cluster is not in stable state"),
708 					errhint("try again when the cluster is fully initialized")));
709 		return FAILOVER_RES_TRANSITION;
710 	}
711 	else if (result->type == WD_IPC_CMD_TIMEOUT)
712 	{
713 		ereport(WARNING,
714 				(errmsg("failover command on watchdog failed"),
715 				 errdetail("ipc command timeout")));
716 		return FAILOVER_RES_TIMEOUT;
717 	}
718 	else if (result->type == WD_IPC_CMD_RESULT_OK)
719 	{
720 		WDFailoverCMDResults res = FAILOVER_RES_ERROR;
721 		json_value *root;
722 
723 		root = json_parse(result->data,result->length);
724 		/* The root node must be object */
725 		if (root == NULL || root->type != json_object)
726 		{
727 			ereport(NOTICE,
728 					(errmsg("unable to parse json data from failover command result")));
729 			return res;
730 		}
731 		if (root && json_get_int_value_for_key(root, WD_FAILOVER_RESULT_KEY, (int*)&res))
732 		{
733 			json_value_free(root);
734 			return FAILOVER_RES_ERROR;
735 		}
736 		if (root && json_get_int_value_for_key(root, WD_FAILOVER_ID_KEY, (int*)wd_failover_id))
737 		{
738 			json_value_free(root);
739 			return FAILOVER_RES_ERROR;
740 		}
741 		return res;
742 	}
743 	return FAILOVER_RES_ERROR;
744 }
745 
746 WDFailoverCMDResults
wd_degenerate_backend_set(int * node_id_set,int count,unsigned int * wd_failover_id)747 wd_degenerate_backend_set(int *node_id_set, int count, unsigned int *wd_failover_id)
748 {
749 	WDFailoverCMDResults res;
750 	char* func;
751 	unsigned int *shared_key = get_ipc_shared_key();
752 
753 	func = get_wd_node_function_json(WD_FUNCTION_DEGENERATE_REQUEST,node_id_set, count,
754 									 shared_key?*shared_key:0,pool_config->wd_authkey);
755 
756 	WDIPCCmdResult *result = issue_command_to_watchdog(WD_IPC_FAILOVER_COMMAND ,
757 													   WD_DEFAULT_IPC_COMMAND_TIMEOUT,
758 													   func, strlen(func), true);
759 	pfree(func);
760 	res = wd_get_failover_result_from_data(result, wd_failover_id);
761 	FreeCmdResult(result);
762 	return res;
763 }
764 
765 WDFailoverCMDResults
wd_promote_backend(int node_id,unsigned int * wd_failover_id)766 wd_promote_backend(int node_id, unsigned int *wd_failover_id)
767 {
768 	WDFailoverCMDResults res;
769 	int n = node_id;
770 	char* func;
771 	WDIPCCmdResult *result;
772 	unsigned int *shared_key = get_ipc_shared_key();
773 
774 	func = get_wd_node_function_json(WD_FUNCTION_PROMOTE_REQUEST,&n, 1,
775 									 shared_key?*shared_key:0,pool_config->wd_authkey);
776 	result = issue_command_to_watchdog(WD_IPC_FAILOVER_COMMAND,
777 									   WD_DEFAULT_IPC_COMMAND_TIMEOUT,
778 									   func, strlen(func), true);
779 	pfree(func);
780 	res = wd_get_failover_result_from_data(result, wd_failover_id);
781 	FreeCmdResult(result);
782 	return res;
783 }
784 
785 /*
786  * Function returns the JSON of watchdog nodes
787  * pass nodeID = -1 to get list of all nodes
788  */
wd_get_watchdog_nodes(int nodeID)789 char* wd_get_watchdog_nodes(int nodeID)
790 {
791 	WDIPCCmdResult *result;
792 	char* json_str;
793 	unsigned int *shared_key = get_ipc_shared_key();
794 
795 	JsonNode* jNode = jw_create_with_object(true);
796 	jw_put_int(jNode, "NodeID", nodeID);
797 
798 	jw_put_int(jNode, WD_IPC_SHARED_KEY, shared_key?*shared_key:0); /* put the shared key*/
799 
800 	if (pool_config->wd_authkey != NULL && strlen(pool_config->wd_authkey) > 0)
801 		jw_put_string(jNode, WD_IPC_AUTH_KEY, pool_config->wd_authkey); /*  put the auth key*/
802 
803 	jw_finish_document(jNode);
804 
805 	json_str = jw_get_json_string(jNode);
806 
807 	result = issue_command_to_watchdog(WD_GET_NODES_LIST_COMMAND
808 									   ,WD_DEFAULT_IPC_COMMAND_TIMEOUT,
809 									   json_str, strlen(json_str), true);
810 
811 	jw_destroy(jNode);
812 
813 	if (result == NULL)
814 	{
815 		ereport(WARNING,
816 			(errmsg("get watchdog nodes command failed"),
817 				 errdetail("issue command to watchdog returned NULL")));
818 		return NULL;
819 	}
820 	else if (result->type == WD_IPC_CMD_CLUSTER_IN_TRAN)
821 	{
822 		ereport(WARNING,
823 			(errmsg("get watchdog nodes command failed"),
824 				 errdetail("watchdog cluster is not in stable state"),
825 					errhint("try again when the cluster is fully initialized")));
826 		FreeCmdResult(result);
827 		return NULL;
828 	}
829 	else if (result->type == WD_IPC_CMD_TIMEOUT)
830 	{
831 		ereport(WARNING,
832 			(errmsg("get watchdog nodes command failed"),
833 				 errdetail("ipc command timeout")));
834 		FreeCmdResult(result);
835 		return NULL;
836 	}
837 	else if (result->type == WD_IPC_CMD_RESULT_OK)
838 	{
839 		char* data = result->data;
840 		/* do not free the result->data, Save the data copy */
841 		pfree(result);
842 		return data;
843 	}
844 	FreeCmdResult(result);
845 	return NULL;
846 }
847 
848 static int
open_wd_command_sock(bool throw_error)849 open_wd_command_sock(bool throw_error)
850 {
851 	size_t	len;
852 	struct sockaddr_un addr;
853 	int sock = -1;
854 
855 	/* We use unix domain stream sockets for the purpose */
856 	if ((sock = socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
857 	{
858 		/* socket create failed */
859 		ereport(throw_error? ERROR:LOG,
860 				(errmsg("failed to connect to watchdog command server socket"),
861 				 errdetail("connect on \"%s\" failed with reason: \"%s\"", addr.sun_path, strerror(errno))));
862 		return -1;
863 	}
864 
865 	memset(&addr, 0, sizeof(addr));
866 	addr.sun_family = AF_UNIX;
867 	snprintf(addr.sun_path, sizeof(addr.sun_path),"%s",watchdog_ipc_address);
868 	len = sizeof(struct sockaddr_un);
869 
870 	if (connect(sock, (struct sockaddr *) &addr, len) == -1)
871 	{
872 		close(sock);
873 		ereport(throw_error? ERROR:LOG,
874 				(errmsg("failed to connect to watchdog command server socket"),
875 				 errdetail("connect on \"%s\" failed with reason: \"%s\"", addr.sun_path, strerror(errno))));
876 		return -1;
877 	}
878 	return sock;
879 }
880 
wd_start_failover_interlocking(unsigned int wd_failover_id)881 WDFailoverCMDResults wd_start_failover_interlocking(unsigned int wd_failover_id)
882 {
883 	if (pool_config->use_watchdog)
884 		return wd_issue_failover_lock_command(WD_REQ_FAILOVER_START, 0, wd_failover_id);
885 	return FAILOVER_RES_I_AM_LOCK_HOLDER;
886 }
887 
wd_end_failover_interlocking(unsigned int wd_failover_id)888 WDFailoverCMDResults wd_end_failover_interlocking(unsigned int wd_failover_id)
889 {
890 	if (pool_config->use_watchdog)
891 		return wd_issue_failover_lock_command(WD_REQ_FAILOVER_END, 0, wd_failover_id);
892 	return FAILOVER_RES_SUCCESS;
893 }
894 
wd_failover_lock_release(enum WDFailoverLocks lock,unsigned int wd_failover_id)895 WDFailoverCMDResults wd_failover_lock_release(enum WDFailoverLocks lock, unsigned int wd_failover_id)
896 {
897 	if (pool_config->use_watchdog)
898 		return wd_issue_failover_lock_command(WD_REQ_FAILOVER_RELEASE_LOCK, lock, wd_failover_id);
899 	return FAILOVER_RES_SUCCESS;
900 }
901 
wd_failover_lock_status(enum WDFailoverLocks lock,unsigned int wd_failover_id)902 WDFailoverCMDResults wd_failover_lock_status(enum WDFailoverLocks lock, unsigned int wd_failover_id)
903 {
904 	if (pool_config->use_watchdog)
905 		return wd_issue_failover_lock_command(WD_REQ_FAILOVER_LOCK_STATUS, lock, wd_failover_id);
906 	return FAILOVER_RES_UNLOCKED;
907 }
908 
wd_wait_until_command_complete_or_timeout(enum WDFailoverLocks lock,unsigned int wd_failover_id)909 void wd_wait_until_command_complete_or_timeout(enum WDFailoverLocks lock, unsigned int wd_failover_id)
910 {
911 	WDFailoverCMDResults res = FAILOVER_RES_TRANSITION;
912 	int	count = WD_INTERLOCK_WAIT_COUNT;
913 
914 	while (pool_config->use_watchdog)
915 	{
916 		res = wd_failover_lock_status(lock, wd_failover_id);
917 		if (res == FAILOVER_RES_UNLOCKED ||
918 			res == FAILOVER_RES_NO_LOCKHOLDER)
919 		{
920 			/* we have the permision */
921 			return;
922 		}
923 		sleep_in_waiting();
924 		if (--count < 0)
925 		{
926 			ereport(WARNING,
927 					(errmsg("timeout wating for unlock")));
928 			break;
929 		}
930 	}
931 }
932 
933 /*
934  * This is just a wrapper over wd_send_failover_sync_command()
935  * but try to wait for WD_INTERLOCK_TIMEOUT_SEC amount of time
936  * if watchdog is in transition state
937  */
938 
wd_issue_failover_lock_command(char * syncReqType,enum WDFailoverLocks lockID,unsigned int wd_failover_id)939 static WDFailoverCMDResults wd_issue_failover_lock_command(char* syncReqType, enum WDFailoverLocks lockID, unsigned int wd_failover_id)
940 {
941 	WDFailoverCMDResults res;
942 	int x;
943 	for (x=0; x < MAX_SEC_WAIT_FOR_CLUSTER_TRANSATION/2; x++)
944 	{
945 		res = wd_send_failover_sync_command(syncReqType, lockID, wd_failover_id);
946 		if (res != FAILOVER_RES_TRANSITION)
947 			break;
948 		sleep(2);
949 	}
950 	return res;
951 }
952 
953 static void
sleep_in_waiting(void)954 sleep_in_waiting(void)
955 {
956 	struct timeval t = {0, WD_INTERLOCK_WAIT_MSEC * 1000};
957 	select(0, NULL, NULL, NULL, &t);
958 }
959 
960 
961 
FreeCmdResult(WDIPCCmdResult * res)962 static void FreeCmdResult(WDIPCCmdResult* res)
963 {
964 	if (res == NULL)
965 		return;
966 
967 	if (res->data)
968 		pfree(res->data);
969 	pfree (res);
970 }
971