1 /*
2 * $Header$
3 *
4 * Handles watchdog connection, and protocol communication with pgpool-II
5 *
6 * pgpool: a language independent connection pool server for PostgreSQL
7 * written by Tatsuo Ishii
8 *
9 * Copyright (c) 2003-2016 PgPool Global Development Group
10 *
11 * Permission to use, copy, modify, and distribute this software and
12 * its documentation for any purpose and without fee is hereby
13 * granted, provided that the above copyright notice appear in all
14 * copies and that both that copyright notice and this permission
15 * notice appear in supporting documentation, and that the name of the
16 * author not be used in advertising or publicity pertaining to
17 * distribution of the software without specific, written prior
18 * permission. The author makes no representations about the
19 * suitability of this software for any purpose. It is provided "as
20 * is" without express or implied warranty.
21 *
22 */
23 #include <stdio.h>
24 #include <errno.h>
25 #include <ctype.h>
26 #include <time.h>
27 #include <string.h>
28 #include <stdlib.h>
29 #include <signal.h>
30 #include <sys/stat.h>
31 #include <sys/un.h>
32 #include <sys/types.h>
33 #include <sys/socket.h>
34 #include <netinet/in.h>
35 #include <netinet/tcp.h>
36 #include <netdb.h>
37 #include <arpa/inet.h>
38 #include <unistd.h>
39 #include <fcntl.h>
40 #include <sys/wait.h>
41
42 #include "pool.h"
43 #include "utils/elog.h"
44 #include "utils/json_writer.h"
45 #include "utils/json.h"
46 #include "utils/pool_stream.h"
47 #include "pool_config.h"
48 #include "watchdog/wd_json_data.h"
49 #include "watchdog/wd_ipc_commands.h"
50 #include "watchdog/wd_ipc_defines.h"
51
52 #define WD_DEFAULT_IPC_COMMAND_TIMEOUT 8 /* default number of seconds to wait for IPC command results*/
53 #define WD_INTERLOCK_WAIT_MSEC 500
54 #define WD_INTERLOCK_TIMEOUT_SEC 10
55 #define WD_INTERLOCK_WAIT_COUNT ((int) ((WD_INTERLOCK_TIMEOUT_SEC * 1000)/WD_INTERLOCK_WAIT_MSEC))
56
57 static void sleep_in_waiting(void);
58 static void FreeCmdResult(WDIPCCmdResult* res);
59
60 static WDFailoverCMDResults wd_issue_failover_lock_command(char* syncReqType, enum WDFailoverLocks lockID, unsigned int wd_failover_id);
61 static char* get_wd_failover_cmd_type_json(char* reqType, enum WDFailoverLocks lockID, unsigned int wd_failover_id);
62 static WDFailoverCMDResults wd_send_failover_sync_command(char* syncReqType, enum WDFailoverLocks lockID, unsigned int wd_failover_id);
63
64 static int open_wd_command_sock(bool throw_error);
65 static WDFailoverCMDResults wd_get_failover_result_from_data(WDIPCCmdResult *result, unsigned int *wd_failover_id);
66
67 /* shared memory variables */
68 char *watchdog_ipc_address = NULL;
69 bool *watchdog_require_cleanup = NULL; /* shared memory variable set to true
70 * when watchdog process terminates abnormally
71 */
72 bool *watchdog_node_escalated = NULL; /* shared memory variable set to true
73 * when watchdog process has performed escalation
74 */
75 unsigned int *ipc_shared_key = NULL; /* key lives in shared memory
76 * used to identify the ipc internal
77 * clients
78 */
wd_ipc_initialize_data(void)79 void wd_ipc_initialize_data(void)
80 {
81 if (watchdog_ipc_address == NULL)
82 {
83 char wd_ipc_sock_addr[255];
84 snprintf(wd_ipc_sock_addr, sizeof(wd_ipc_sock_addr), "%s/.s.PGPOOLWD_CMD.%d",
85 pool_config->wd_ipc_socket_dir,
86 pool_config->wd_port);
87
88 watchdog_ipc_address = pool_shared_memory_create(strlen(wd_ipc_sock_addr) +1);
89 strcpy(watchdog_ipc_address, wd_ipc_sock_addr);
90 }
91
92 if (ipc_shared_key == NULL)
93 {
94 ipc_shared_key = pool_shared_memory_create(sizeof(unsigned int));
95 *ipc_shared_key = 0;
96 while (*ipc_shared_key == 0) {
97 pool_random_salt((char*)ipc_shared_key);
98 }
99 }
100
101 if (watchdog_require_cleanup == NULL)
102 {
103 watchdog_require_cleanup = pool_shared_memory_create(sizeof(bool));
104 *watchdog_require_cleanup = false;
105 }
106
107 if (watchdog_node_escalated == NULL)
108 {
109 watchdog_node_escalated = pool_shared_memory_create(sizeof(bool));
110 *watchdog_node_escalated = false;
111 }
112 }
113
get_watchdog_local_node_state(void)114 WD_STATES get_watchdog_local_node_state(void)
115 {
116 WD_STATES ret = WD_DEAD;
117 WDGenericData *state = get_wd_runtime_variable_value(WD_RUNTIME_VAR_WD_STATE);
118 if (state == NULL)
119 {
120 ereport(LOG,
121 (errmsg("failed to get current state of local watchdog node"),
122 errdetail("get runtime variable value from watchdog returned no data")));
123 return WD_DEAD;
124 }
125 if (state->valueType != VALUE_DATA_TYPE_INT)
126 {
127 ereport(LOG,
128 (errmsg("failed to get current state of local watchdog node"),
129 errdetail("get runtime variable value from watchdog returned invalid value type")));
130 pfree(state);
131 return WD_DEAD;
132 }
133 ret = (WD_STATES)state->data.intVal;
134 pfree(state);
135 return ret;
136 }
137
get_watchdog_ipc_address(void)138 char* get_watchdog_ipc_address(void)
139 {
140 return watchdog_ipc_address;
141 }
142
get_ipc_shared_key(void)143 unsigned int* get_ipc_shared_key(void)
144 {
145 return ipc_shared_key;
146 }
147
set_watchdog_process_needs_cleanup(void)148 void set_watchdog_process_needs_cleanup(void)
149 {
150 *watchdog_require_cleanup = true;
151 }
152
reset_watchdog_process_needs_cleanup(void)153 void reset_watchdog_process_needs_cleanup(void)
154 {
155 *watchdog_require_cleanup = false;
156 }
157
get_watchdog_process_needs_cleanup(void)158 bool get_watchdog_process_needs_cleanup(void)
159 {
160 return *watchdog_require_cleanup;
161 }
162
163
set_watchdog_node_escalated(void)164 void set_watchdog_node_escalated(void)
165 {
166 *watchdog_node_escalated = true;
167 }
168
reset_watchdog_node_escalated(void)169 void reset_watchdog_node_escalated(void)
170 {
171 *watchdog_node_escalated = false;
172 }
173
get_watchdog_node_escalation_state(void)174 bool get_watchdog_node_escalation_state(void)
175 {
176 return *watchdog_node_escalated;
177 }
178
179 /*
180 * function issues the command to watchdog process over the watchdog
181 * IPC command socket.
182 * type: command type to send. valid command
183 * types are defined in wd_ipc_defines.h
184 * timeout_sec: number of seconds to wait for the command response
185 * from watchdog
186 * data: command data
187 * data_len: length of data
188 * blocking: send true if caller wants to wait for the results
189 * when blocking is false the timeout_sec is ignored
190 */
191 WDIPCCmdResult*
issue_command_to_watchdog(char type,int timeout_sec,char * data,int data_len,bool blocking)192 issue_command_to_watchdog(char type, int timeout_sec, char* data, int data_len, bool blocking)
193 {
194 struct timeval start_time,tv;
195 int sock;
196 WDIPCCmdResult* result = NULL;
197 char res_type = 'P';
198 int res_length, len;
199 gettimeofday(&start_time, NULL);
200
201 /* open the watchdog command socket for IPC */
202 sock = open_wd_command_sock(false);
203 if (sock < 0)
204 return NULL;
205
206 len = htonl(data_len);
207
208 if (socket_write(sock, &type, sizeof(char)) <= 0)
209 {
210 close(sock);
211 return NULL;
212 }
213
214 if (socket_write(sock, &len, sizeof(int)) <= 0)
215 {
216 close(sock);
217 return NULL;
218 }
219 if (data && data_len > 0)
220 {
221 if (socket_write(sock, data, data_len) <= 0)
222 {
223 close(sock);
224 return NULL;
225 }
226 }
227
228 if (blocking)
229 {
230 /* if we are asked to wait for results */
231 fd_set fds;
232 struct timeval *timeout_st = NULL;
233 if (timeout_sec > 0)
234 {
235 tv.tv_sec = timeout_sec;
236 tv.tv_usec = 0;
237 timeout_st = &tv;
238 }
239 FD_ZERO(&fds);
240 FD_SET(sock,&fds);
241 for (;;)
242 {
243 int select_res;
244 select_res = select(sock+1,&fds,NULL,NULL,timeout_st);
245 if (select_res == 0)
246 {
247 close(sock);
248 result = palloc(sizeof(WDIPCCmdResult));
249 result->type = WD_IPC_CMD_TIMEOUT;
250 result->length = 0;
251 result->data = NULL;
252 return result;
253 }
254 if (select_res < 0)
255 {
256 if (errno == EAGAIN || errno == EINTR)
257 continue;
258 ereport(WARNING,
259 (errmsg("error reading from IPC command socket for ipc command %c",type),
260 errdetail("select system call failed with error \"%s\"",strerror(errno))));
261 close(sock);
262 return NULL;
263 }
264 if (select_res > 0)
265 {
266 /* read the result type char */
267 if (socket_read(sock, &res_type, 1 ,0) <=0)
268 {
269 ereport(WARNING,
270 (errmsg("error reading from IPC command socket for ipc command %c",type),
271 errdetail("read from socket failed with error \"%s\"",strerror(errno))));
272 close(sock);
273 return result;
274 }
275 /* read the result data length */
276 if (socket_read(sock, &res_length, sizeof(int), 0) <= 0)
277 {
278 ereport(WARNING,
279 (errmsg("error reading from IPC command socket for ipc command %c",type),
280 errdetail("read from socket failed with error \"%s\"",strerror(errno))));
281 close(sock);
282 return result;
283 }
284
285 result = palloc(sizeof(WDIPCCmdResult));
286 result->type = res_type;
287 result->length = ntohl(res_length);
288 result->data = NULL;
289
290 if (result->length > 0)
291 {
292 result->data = palloc(result->length);
293 if (socket_read(sock, result->data, result->length, 0) <= 0)
294 {
295 pfree(result->data);
296 pfree(result);
297 ereport(DEBUG1,
298 (errmsg("error reading from IPC command socket for ipc command %c",type),
299 errdetail("read from socket failed with error \"%s\"",strerror(errno))));
300 close(sock);
301 return NULL;
302 }
303 }
304 break;
305 }
306 }
307 }
308 else
309 {
310 /* For non blocking mode if we are sucessful in sending the command
311 * that means the command is success
312 */
313 result = palloc0(sizeof(WDIPCCmdResult));
314 result->type = WD_IPC_CMD_RESULT_OK;
315 }
316 close(sock);
317 return result;
318 }
319
320 /*
321 * Function gets the runtime value of watchdog varibale using the
322 * watchdog IPC
323 */
get_wd_runtime_variable_value(char * varName)324 WDGenericData *get_wd_runtime_variable_value(char *varName)
325 {
326 unsigned int *shared_key = get_ipc_shared_key();
327 char *data = get_simple_request_json(WD_JSON_KEY_VARIABLE_NAME,varName,
328 shared_key?*shared_key:0,pool_config->wd_authkey);
329
330 WDIPCCmdResult *result = issue_command_to_watchdog(WD_GET_RUNTIME_VARIABLE_VALUE,
331 WD_DEFAULT_IPC_COMMAND_TIMEOUT,
332 data, strlen(data), true);
333 pfree(data);
334
335 if (result == NULL)
336 {
337 ereport(WARNING,
338 (errmsg("get runtime variable value from watchdog failed"),
339 errdetail("issue command to watchdog returned NULL")));
340 return NULL;
341 }
342 if (result->type == WD_IPC_CMD_CLUSTER_IN_TRAN)
343 {
344 ereport(WARNING,
345 (errmsg("get runtime variable value from watchdog failed"),
346 errdetail("watchdog cluster is not in stable state"),
347 errhint("try again when the cluster is fully initialized")));
348 FreeCmdResult(result);
349 return NULL;
350 }
351 else if (result->type == WD_IPC_CMD_TIMEOUT)
352 {
353 ereport(WARNING,
354 (errmsg("get runtime variable value from watchdog failed"),
355 errdetail("ipc command timeout")));
356 FreeCmdResult(result);
357 return NULL;
358 }
359 else if (result->type == WD_IPC_CMD_RESULT_OK)
360 {
361 json_value *root = NULL;
362 WDGenericData *genData = NULL;
363 WDValueDataType dayaType;
364
365 root = json_parse(result->data, result->length);
366 /* The root node must be object */
367 if (root == NULL || root->type != json_object)
368 {
369 FreeCmdResult(result);
370 return NULL;
371 }
372
373 if (json_get_int_value_for_key(root, WD_JSON_KEY_VALUE_DATA_TYPE, (int*)&dayaType))
374 {
375 FreeCmdResult(result);
376 json_value_free(root);
377 return NULL;
378 }
379
380 switch (dayaType) {
381 case VALUE_DATA_TYPE_INT:
382 {
383 int intVal;
384 if (json_get_int_value_for_key(root, WD_JSON_KEY_VALUE_DATA, &intVal))
385 {
386 ereport(WARNING,
387 (errmsg("get runtime variable value from watchdog failed"),
388 errdetail("unable to get INT value from JSON data returned by watchdog")));
389 }
390 else
391 {
392 genData = palloc(sizeof(WDGenericData));
393 genData->valueType = dayaType;
394 genData->data.intVal = intVal;
395 }
396 }
397 break;
398
399 case VALUE_DATA_TYPE_LONG:
400 {
401 long longVal;
402 if (json_get_long_value_for_key(root, WD_JSON_KEY_VALUE_DATA, &longVal))
403 {
404 ereport(WARNING,
405 (errmsg("get runtime variable value from watchdog failed"),
406 errdetail("unable to get LONG value from JSON data returned by watchdog")));
407 }
408 else
409 {
410 genData = palloc(sizeof(WDGenericData));
411 genData->valueType = dayaType;
412 genData->data.longVal = longVal;
413 }
414 }
415 break;
416
417 case VALUE_DATA_TYPE_BOOL:
418 {
419 bool boolVal;
420 if (json_get_bool_value_for_key(root, WD_JSON_KEY_VALUE_DATA, &boolVal))
421 {
422 ereport(WARNING,
423 (errmsg("get runtime variable value from watchdog failed"),
424 errdetail("unable to get BOOL value from JSON data returned by watchdog")));
425 }
426 else
427 {
428 genData = palloc(sizeof(WDGenericData));
429 genData->valueType = dayaType;
430 genData->data.boolVal = boolVal;
431 }
432 }
433 break;
434
435 case VALUE_DATA_TYPE_STRING:
436 {
437 char *ptr = json_get_string_value_for_key(root, WD_JSON_KEY_VALUE_DATA);
438 if (ptr == NULL)
439 {
440 ereport(WARNING,
441 (errmsg("get runtime variable value from watchdog failed"),
442 errdetail("unable to get STRING value from JSON data returned by watchdog")));
443 }
444 else
445 {
446 genData = palloc(sizeof(WDGenericData));
447 genData->valueType = dayaType;
448 genData->data.stringVal = pstrdup(ptr);
449 }
450 }
451 break;
452
453 default:
454 ereport(WARNING,
455 (errmsg("get runtime variable value from watchdog failed, unknown value data type")));
456 break;
457 }
458
459 json_value_free(root);
460 FreeCmdResult(result);
461 return genData;
462 }
463
464 ereport(WARNING,
465 (errmsg("get runtime variable value from watchdog failed")));
466 FreeCmdResult(result);
467 return NULL;
468
469 }
470
471 /*
472 * function gets the PG backend status of all attached nodes from
473 * the master watchdog node.
474 */
get_pg_backend_status_from_master_wd_node(void)475 WDPGBackendStatus* get_pg_backend_status_from_master_wd_node(void)
476 {
477 unsigned int *shared_key = get_ipc_shared_key();
478 char *data = get_data_request_json(WD_DATE_REQ_PG_BACKEND_DATA,
479 shared_key?*shared_key:0,pool_config->wd_authkey);
480
481 WDIPCCmdResult *result = issue_command_to_watchdog(WD_GET_MASTER_DATA_REQUEST,
482 WD_DEFAULT_IPC_COMMAND_TIMEOUT,
483 data, strlen(data), true);
484 pfree(data);
485
486 if (result == NULL)
487 {
488 ereport(WARNING,
489 (errmsg("get backend node status from master watchdog failed"),
490 errdetail("issue command to watchdog returned NULL")));
491 return NULL;
492 }
493 if (result->type == WD_IPC_CMD_CLUSTER_IN_TRAN)
494 {
495 ereport(WARNING,
496 (errmsg("get backend node status from master watchdog failed"),
497 errdetail("watchdog cluster is not in stable state"),
498 errhint("try again when the cluster is fully initialized")));
499 FreeCmdResult(result);
500 return NULL;
501 }
502 else if (result->type == WD_IPC_CMD_TIMEOUT)
503 {
504 ereport(WARNING,
505 (errmsg("get backend node status from master watchdog failed"),
506 errdetail("ipc command timeout")));
507 FreeCmdResult(result);
508 return NULL;
509 }
510 else if (result->type == WD_IPC_CMD_RESULT_OK)
511 {
512 WDPGBackendStatus* backendStatus = get_pg_backend_node_status_from_json(result->data, result->length);
513 /*
514 * Watchdog returns the zero length data when the node itself is a master
515 * watchdog node
516 */
517 if (result->length <= 0)
518 {
519 backendStatus = palloc0(sizeof(WDPGBackendStatus));
520 backendStatus->node_count = -1;
521 }
522 else
523 {
524 backendStatus = get_pg_backend_node_status_from_json(result->data, result->length);
525 }
526 FreeCmdResult(result);
527 return backendStatus;
528 }
529
530 ereport(WARNING,
531 (errmsg("get backend node status from master watchdog failed")));
532 FreeCmdResult(result);
533 return NULL;
534 }
535
536 WdCommandResult
wd_start_recovery(void)537 wd_start_recovery(void)
538 {
539 char type;
540 unsigned int *shared_key = get_ipc_shared_key();
541
542 char* func = get_wd_node_function_json(WD_FUNCTION_START_RECOVERY, NULL,0,
543 shared_key?*shared_key:0,pool_config->wd_authkey);
544
545 WDIPCCmdResult *result = issue_command_to_watchdog(WD_IPC_ONLINE_RECOVERY_COMMAND,
546 pool_config->recovery_timeout + WD_DEFAULT_IPC_COMMAND_TIMEOUT,
547 func, strlen(func), true);
548 pfree(func);
549
550 if (result == NULL)
551 {
552 ereport(WARNING,
553 (errmsg("start recovery command lock failed"),
554 errdetail("issue command to watchdog returned NULL")));
555 return COMMAND_FAILED;
556 }
557
558 type = result->type;
559 FreeCmdResult(result);
560 if (type == WD_IPC_CMD_CLUSTER_IN_TRAN)
561 {
562 ereport(WARNING,
563 (errmsg("start recovery command lock failed"),
564 errdetail("watchdog cluster is not in stable state"),
565 errhint("try again when the cluster is fully initialized")));
566 return CLUSTER_IN_TRANSATIONING;
567 }
568 else if (type == WD_IPC_CMD_TIMEOUT)
569 {
570 ereport(WARNING,
571 (errmsg("start recovery command lock failed"),
572 errdetail("ipc command timeout")));
573 return COMMAND_TIMEOUT;
574 }
575 else if (type == WD_IPC_CMD_RESULT_OK)
576 {
577 return COMMAND_OK;
578 }
579 return COMMAND_FAILED;
580 }
581
582 WdCommandResult
wd_end_recovery(void)583 wd_end_recovery(void)
584 {
585 char type;
586 unsigned int *shared_key = get_ipc_shared_key();
587
588 char* func = get_wd_node_function_json(WD_FUNCTION_END_RECOVERY, NULL, 0,
589 shared_key?*shared_key:0,pool_config->wd_authkey);
590
591
592 WDIPCCmdResult *result = issue_command_to_watchdog(WD_IPC_ONLINE_RECOVERY_COMMAND,
593 WD_DEFAULT_IPC_COMMAND_TIMEOUT,
594 func, strlen(func), true);
595 pfree(func);
596
597 if (result == NULL)
598 {
599 ereport(WARNING,
600 (errmsg("end recovery command lock failed"),
601 errdetail("issue command to watchdog returned NULL")));
602 return COMMAND_FAILED;
603 }
604
605 type = result->type;
606 FreeCmdResult(result);
607
608 if (type == WD_IPC_CMD_CLUSTER_IN_TRAN)
609 {
610 ereport(WARNING,
611 (errmsg("end recovery command lock failed"),
612 errdetail("watchdog cluster is not in stable state"),
613 errhint("try again when the cluster is fully initialized")));
614 return CLUSTER_IN_TRANSATIONING;
615 }
616 else if (type == WD_IPC_CMD_TIMEOUT)
617 {
618 ereport(WARNING,
619 (errmsg("end recovery command lock failed"),
620 errdetail("ipc command timeout")));
621 return COMMAND_TIMEOUT;
622 }
623 else if (type == WD_IPC_CMD_RESULT_OK)
624 {
625 return COMMAND_OK;
626 }
627 return COMMAND_FAILED;
628 }
629
630
631 WDFailoverCMDResults
wd_send_failback_request(int node_id,unsigned int * wd_failover_id)632 wd_send_failback_request(int node_id, unsigned int *wd_failover_id)
633 {
634 int n = node_id;
635 char* func;
636 unsigned int *shared_key = get_ipc_shared_key();
637 WDFailoverCMDResults res;
638
639 func = get_wd_node_function_json(WD_FUNCTION_FAILBACK_REQUEST,&n, 1,
640 shared_key?*shared_key:0,pool_config->wd_authkey);
641
642 WDIPCCmdResult *result = issue_command_to_watchdog(WD_IPC_FAILOVER_COMMAND,
643 WD_DEFAULT_IPC_COMMAND_TIMEOUT,
644 func, strlen(func), true);
645 pfree(func);
646
647 res = wd_get_failover_result_from_data(result, wd_failover_id);
648 FreeCmdResult(result);
649 return res;
650 }
651
get_wd_failover_cmd_type_json(char * reqType,enum WDFailoverLocks lockID,unsigned int wd_failover_id)652 static char* get_wd_failover_cmd_type_json(char* reqType, enum WDFailoverLocks lockID, unsigned int wd_failover_id)
653 {
654 char* json_str;
655 JsonNode* jNode = jw_create_with_object(true);
656 unsigned int *shared_key = get_ipc_shared_key();
657
658 jw_put_int(jNode, WD_IPC_SHARED_KEY, shared_key?*shared_key:0); /* put the shared key*/
659 if (pool_config->wd_authkey != NULL && strlen(pool_config->wd_authkey) > 0)
660 jw_put_string(jNode, WD_IPC_AUTH_KEY, pool_config->wd_authkey); /* put the auth key*/
661
662 jw_put_string(jNode, "SyncRequestType", reqType);
663 jw_put_int(jNode, "FailoverLockID", lockID);
664 jw_put_int(jNode, "WDFailoverID", wd_failover_id);
665 jw_finish_document(jNode);
666 json_str = pstrdup(jw_get_json_string(jNode));
667 jw_destroy(jNode);
668 return json_str;
669 }
670
671 static WDFailoverCMDResults
wd_send_failover_sync_command(char * syncReqType,enum WDFailoverLocks lockID,unsigned int wd_failover_id)672 wd_send_failover_sync_command(char* syncReqType, enum WDFailoverLocks lockID, unsigned int wd_failover_id)
673 {
674 WDFailoverCMDResults res;
675 unsigned int failover_id;
676
677 char* json_data = get_wd_failover_cmd_type_json(syncReqType, lockID, wd_failover_id);
678
679 WDIPCCmdResult *result = issue_command_to_watchdog(WD_FAILOVER_LOCKING_REQUEST
680 ,WD_DEFAULT_IPC_COMMAND_TIMEOUT,
681 json_data, strlen(json_data), true);
682
683 pfree(json_data);
684
685 res = wd_get_failover_result_from_data(result, &failover_id);
686
687 FreeCmdResult(result);
688 return res;
689 }
690
wd_get_failover_result_from_data(WDIPCCmdResult * result,unsigned int * wd_failover_id)691 static WDFailoverCMDResults wd_get_failover_result_from_data(WDIPCCmdResult *result, unsigned int *wd_failover_id)
692 {
693 if (result == NULL)
694 return FAILOVER_RES_ERROR;
695
696 if (result == NULL)
697 {
698 ereport(WARNING,
699 (errmsg("failover command on watchdog failed"),
700 errdetail("issue command to watchdog returned NULL")));
701 return FAILOVER_RES_ERROR;
702 }
703 if (result->type == WD_IPC_CMD_CLUSTER_IN_TRAN)
704 {
705 ereport(WARNING,
706 (errmsg("failover command on watchdog failed"),
707 errdetail("watchdog cluster is not in stable state"),
708 errhint("try again when the cluster is fully initialized")));
709 return FAILOVER_RES_TRANSITION;
710 }
711 else if (result->type == WD_IPC_CMD_TIMEOUT)
712 {
713 ereport(WARNING,
714 (errmsg("failover command on watchdog failed"),
715 errdetail("ipc command timeout")));
716 return FAILOVER_RES_TIMEOUT;
717 }
718 else if (result->type == WD_IPC_CMD_RESULT_OK)
719 {
720 WDFailoverCMDResults res = FAILOVER_RES_ERROR;
721 json_value *root;
722
723 root = json_parse(result->data,result->length);
724 /* The root node must be object */
725 if (root == NULL || root->type != json_object)
726 {
727 ereport(NOTICE,
728 (errmsg("unable to parse json data from failover command result")));
729 return res;
730 }
731 if (root && json_get_int_value_for_key(root, WD_FAILOVER_RESULT_KEY, (int*)&res))
732 {
733 json_value_free(root);
734 return FAILOVER_RES_ERROR;
735 }
736 if (root && json_get_int_value_for_key(root, WD_FAILOVER_ID_KEY, (int*)wd_failover_id))
737 {
738 json_value_free(root);
739 return FAILOVER_RES_ERROR;
740 }
741 return res;
742 }
743 return FAILOVER_RES_ERROR;
744 }
745
746 WDFailoverCMDResults
wd_degenerate_backend_set(int * node_id_set,int count,unsigned int * wd_failover_id)747 wd_degenerate_backend_set(int *node_id_set, int count, unsigned int *wd_failover_id)
748 {
749 WDFailoverCMDResults res;
750 char* func;
751 unsigned int *shared_key = get_ipc_shared_key();
752
753 func = get_wd_node_function_json(WD_FUNCTION_DEGENERATE_REQUEST,node_id_set, count,
754 shared_key?*shared_key:0,pool_config->wd_authkey);
755
756 WDIPCCmdResult *result = issue_command_to_watchdog(WD_IPC_FAILOVER_COMMAND ,
757 WD_DEFAULT_IPC_COMMAND_TIMEOUT,
758 func, strlen(func), true);
759 pfree(func);
760 res = wd_get_failover_result_from_data(result, wd_failover_id);
761 FreeCmdResult(result);
762 return res;
763 }
764
765 WDFailoverCMDResults
wd_promote_backend(int node_id,unsigned int * wd_failover_id)766 wd_promote_backend(int node_id, unsigned int *wd_failover_id)
767 {
768 WDFailoverCMDResults res;
769 int n = node_id;
770 char* func;
771 WDIPCCmdResult *result;
772 unsigned int *shared_key = get_ipc_shared_key();
773
774 func = get_wd_node_function_json(WD_FUNCTION_PROMOTE_REQUEST,&n, 1,
775 shared_key?*shared_key:0,pool_config->wd_authkey);
776 result = issue_command_to_watchdog(WD_IPC_FAILOVER_COMMAND,
777 WD_DEFAULT_IPC_COMMAND_TIMEOUT,
778 func, strlen(func), true);
779 pfree(func);
780 res = wd_get_failover_result_from_data(result, wd_failover_id);
781 FreeCmdResult(result);
782 return res;
783 }
784
785 /*
786 * Function returns the JSON of watchdog nodes
787 * pass nodeID = -1 to get list of all nodes
788 */
wd_get_watchdog_nodes(int nodeID)789 char* wd_get_watchdog_nodes(int nodeID)
790 {
791 WDIPCCmdResult *result;
792 char* json_str;
793 unsigned int *shared_key = get_ipc_shared_key();
794
795 JsonNode* jNode = jw_create_with_object(true);
796 jw_put_int(jNode, "NodeID", nodeID);
797
798 jw_put_int(jNode, WD_IPC_SHARED_KEY, shared_key?*shared_key:0); /* put the shared key*/
799
800 if (pool_config->wd_authkey != NULL && strlen(pool_config->wd_authkey) > 0)
801 jw_put_string(jNode, WD_IPC_AUTH_KEY, pool_config->wd_authkey); /* put the auth key*/
802
803 jw_finish_document(jNode);
804
805 json_str = jw_get_json_string(jNode);
806
807 result = issue_command_to_watchdog(WD_GET_NODES_LIST_COMMAND
808 ,WD_DEFAULT_IPC_COMMAND_TIMEOUT,
809 json_str, strlen(json_str), true);
810
811 jw_destroy(jNode);
812
813 if (result == NULL)
814 {
815 ereport(WARNING,
816 (errmsg("get watchdog nodes command failed"),
817 errdetail("issue command to watchdog returned NULL")));
818 return NULL;
819 }
820 else if (result->type == WD_IPC_CMD_CLUSTER_IN_TRAN)
821 {
822 ereport(WARNING,
823 (errmsg("get watchdog nodes command failed"),
824 errdetail("watchdog cluster is not in stable state"),
825 errhint("try again when the cluster is fully initialized")));
826 FreeCmdResult(result);
827 return NULL;
828 }
829 else if (result->type == WD_IPC_CMD_TIMEOUT)
830 {
831 ereport(WARNING,
832 (errmsg("get watchdog nodes command failed"),
833 errdetail("ipc command timeout")));
834 FreeCmdResult(result);
835 return NULL;
836 }
837 else if (result->type == WD_IPC_CMD_RESULT_OK)
838 {
839 char* data = result->data;
840 /* do not free the result->data, Save the data copy */
841 pfree(result);
842 return data;
843 }
844 FreeCmdResult(result);
845 return NULL;
846 }
847
848 static int
open_wd_command_sock(bool throw_error)849 open_wd_command_sock(bool throw_error)
850 {
851 size_t len;
852 struct sockaddr_un addr;
853 int sock = -1;
854
855 /* We use unix domain stream sockets for the purpose */
856 if ((sock = socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
857 {
858 /* socket create failed */
859 ereport(throw_error? ERROR:LOG,
860 (errmsg("failed to connect to watchdog command server socket"),
861 errdetail("connect on \"%s\" failed with reason: \"%s\"", addr.sun_path, strerror(errno))));
862 return -1;
863 }
864
865 memset(&addr, 0, sizeof(addr));
866 addr.sun_family = AF_UNIX;
867 snprintf(addr.sun_path, sizeof(addr.sun_path),"%s",watchdog_ipc_address);
868 len = sizeof(struct sockaddr_un);
869
870 if (connect(sock, (struct sockaddr *) &addr, len) == -1)
871 {
872 close(sock);
873 ereport(throw_error? ERROR:LOG,
874 (errmsg("failed to connect to watchdog command server socket"),
875 errdetail("connect on \"%s\" failed with reason: \"%s\"", addr.sun_path, strerror(errno))));
876 return -1;
877 }
878 return sock;
879 }
880
wd_start_failover_interlocking(unsigned int wd_failover_id)881 WDFailoverCMDResults wd_start_failover_interlocking(unsigned int wd_failover_id)
882 {
883 if (pool_config->use_watchdog)
884 return wd_issue_failover_lock_command(WD_REQ_FAILOVER_START, 0, wd_failover_id);
885 return FAILOVER_RES_I_AM_LOCK_HOLDER;
886 }
887
wd_end_failover_interlocking(unsigned int wd_failover_id)888 WDFailoverCMDResults wd_end_failover_interlocking(unsigned int wd_failover_id)
889 {
890 if (pool_config->use_watchdog)
891 return wd_issue_failover_lock_command(WD_REQ_FAILOVER_END, 0, wd_failover_id);
892 return FAILOVER_RES_SUCCESS;
893 }
894
wd_failover_lock_release(enum WDFailoverLocks lock,unsigned int wd_failover_id)895 WDFailoverCMDResults wd_failover_lock_release(enum WDFailoverLocks lock, unsigned int wd_failover_id)
896 {
897 if (pool_config->use_watchdog)
898 return wd_issue_failover_lock_command(WD_REQ_FAILOVER_RELEASE_LOCK, lock, wd_failover_id);
899 return FAILOVER_RES_SUCCESS;
900 }
901
wd_failover_lock_status(enum WDFailoverLocks lock,unsigned int wd_failover_id)902 WDFailoverCMDResults wd_failover_lock_status(enum WDFailoverLocks lock, unsigned int wd_failover_id)
903 {
904 if (pool_config->use_watchdog)
905 return wd_issue_failover_lock_command(WD_REQ_FAILOVER_LOCK_STATUS, lock, wd_failover_id);
906 return FAILOVER_RES_UNLOCKED;
907 }
908
wd_wait_until_command_complete_or_timeout(enum WDFailoverLocks lock,unsigned int wd_failover_id)909 void wd_wait_until_command_complete_or_timeout(enum WDFailoverLocks lock, unsigned int wd_failover_id)
910 {
911 WDFailoverCMDResults res = FAILOVER_RES_TRANSITION;
912 int count = WD_INTERLOCK_WAIT_COUNT;
913
914 while (pool_config->use_watchdog)
915 {
916 res = wd_failover_lock_status(lock, wd_failover_id);
917 if (res == FAILOVER_RES_UNLOCKED ||
918 res == FAILOVER_RES_NO_LOCKHOLDER)
919 {
920 /* we have the permision */
921 return;
922 }
923 sleep_in_waiting();
924 if (--count < 0)
925 {
926 ereport(WARNING,
927 (errmsg("timeout wating for unlock")));
928 break;
929 }
930 }
931 }
932
933 /*
934 * This is just a wrapper over wd_send_failover_sync_command()
935 * but try to wait for WD_INTERLOCK_TIMEOUT_SEC amount of time
936 * if watchdog is in transition state
937 */
938
wd_issue_failover_lock_command(char * syncReqType,enum WDFailoverLocks lockID,unsigned int wd_failover_id)939 static WDFailoverCMDResults wd_issue_failover_lock_command(char* syncReqType, enum WDFailoverLocks lockID, unsigned int wd_failover_id)
940 {
941 WDFailoverCMDResults res;
942 int x;
943 for (x=0; x < MAX_SEC_WAIT_FOR_CLUSTER_TRANSATION/2; x++)
944 {
945 res = wd_send_failover_sync_command(syncReqType, lockID, wd_failover_id);
946 if (res != FAILOVER_RES_TRANSITION)
947 break;
948 sleep(2);
949 }
950 return res;
951 }
952
953 static void
sleep_in_waiting(void)954 sleep_in_waiting(void)
955 {
956 struct timeval t = {0, WD_INTERLOCK_WAIT_MSEC * 1000};
957 select(0, NULL, NULL, NULL, &t);
958 }
959
960
961
FreeCmdResult(WDIPCCmdResult * res)962 static void FreeCmdResult(WDIPCCmdResult* res)
963 {
964 if (res == NULL)
965 return;
966
967 if (res->data)
968 pfree(res->data);
969 pfree (res);
970 }
971