1 /*
2  * $Header$
3  *
4  * Handles watchdog connection, and protocol communication with pgpool-II
5  *
6  * pgpool: a language independent connection pool server for PostgreSQL
7  * written by Tatsuo Ishii
8  *
9  * Copyright (c) 2003-2020	PgPool Global Development Group
10  *
11  * Permission to use, copy, modify, and distribute this software and
12  * its documentation for any purpose and without fee is hereby
13  * granted, provided that the above copyright notice appear in all
14  * copies and that both that copyright notice and this permission
15  * notice appear in supporting documentation, and that the name of the
16  * author not be used in advertising or publicity pertaining to
17  * distribution of the software without specific, written prior
18  * permission. The author makes no representations about the
19  * suitability of this software for any purpose.  It is provided "as
20  * is" without express or implied warranty.
21  *
22  */
23 
24 #include <pthread.h>
25 #include <stdio.h>
26 #include <errno.h>
27 #include <string.h>
28 #include <stdlib.h>
29 #include <unistd.h>
30 #include <netdb.h>
31 #include <sys/wait.h>
32 #include <sys/stat.h>
33 #include <ctype.h>
34 #include <errno.h>
35 
36 #include <sys/socket.h>
37 #ifdef __linux__
38 #include <linux/netlink.h>
39 #include <linux/rtnetlink.h>
40 #include <linux/if.h>
41 #else							/* __linux__ */
42 #include <net/route.h>
43 #include <net/if.h>
44 
45 #ifdef AF_LINK
46 #include <net/if_dl.h>
47 #endif
48 #endif							/* __linux__ */
49 #include <arpa/inet.h>
50 #include <ifaddrs.h>
51 
52 #ifdef __FreeBSD__
53 #include <netinet/in.h>
54 #endif
55 
56 #include "pool.h"
57 
58 #include "utils/elog.h"
59 #include "pool_config.h"
60 #include "watchdog/watchdog.h"
61 #include "watchdog/wd_utils.h"
62 
63 #ifndef __linux__
64 #define IFF_LOWER_UP	0x10000
65 #endif
66 
67 static int	exec_if_cmd(char *path, char *command);
68 
69 
70 List *
get_all_local_ips(void)71 get_all_local_ips(void)
72 {
73 	struct ifaddrs *ifAddrStruct = NULL;
74 	struct ifaddrs *ifa = NULL;
75 	void	   *tmpAddrPtr = NULL;
76 	List	   *local_addresses = NULL;
77 
78 	getifaddrs(&ifAddrStruct);
79 	for (ifa = ifAddrStruct; ifa != NULL; ifa = ifa->ifa_next)
80 	{
81 		if (!ifa->ifa_addr)
82 			continue;
83 
84 		if (ifa->ifa_addr->sa_family == AF_INET)
85 		{
86 			char	   *addressBuffer;
87 
88 			if (!strncasecmp("lo", ifa->ifa_name, 2))
89 				continue;		/* We do not need loop back addresses */
90 
91 			tmpAddrPtr = &((struct sockaddr_in *) ifa->ifa_addr)->sin_addr;
92 			addressBuffer = palloc(INET_ADDRSTRLEN);
93 			inet_ntop(AF_INET, tmpAddrPtr, addressBuffer, INET_ADDRSTRLEN);
94 			local_addresses = lappend(local_addresses, addressBuffer);
95 		}
96 	}
97 	if (ifAddrStruct != NULL)
98 		freeifaddrs(ifAddrStruct);
99 	return local_addresses;
100 }
101 
102 #define WD_TRY_PING_AT_IPUP 3
103 int
wd_IP_up(void)104 wd_IP_up(void)
105 {
106 	int			rtn = WD_OK;
107 	char		path[WD_MAX_PATH_LEN];
108 	char	   *command;
109 	int			i;
110 
111 	if (strlen(pool_config->delegate_IP) == 0)
112 	{
113 		ereport(LOG,
114 				(errmsg("trying to acquire the delegate IP address, but delegate IP is not configured")));
115 		return WD_OK;
116 	}
117 
118 	command = wd_get_cmd(pool_config->if_up_cmd);
119 	if (command)
120 	{
121 
122 		/* If if_up_cmd starts with "/", the setting specified in "if_cmd_path" will be ignored */
123 		if (command[0] == '/')
124 			snprintf(path, sizeof(path), "%s", command);
125 		else
126 			snprintf(path, sizeof(path), "%s/%s", pool_config->if_cmd_path, command);
127 
128 		rtn = exec_if_cmd(path, pool_config->if_up_cmd);
129 		pfree(command);
130 	}
131 	else
132 	{
133 		ereport(LOG,
134 				(errmsg("failed to acquire the delegate IP address"),
135 				 errdetail("unable to parse the if_up_cmd:\"%s\"", pool_config->if_up_cmd)));
136 		return WD_NG;
137 	}
138 
139 	if (rtn == WD_OK)
140 	{
141 		command = wd_get_cmd(pool_config->arping_cmd);
142 		if (command)
143 		{
144 			/* If arping_cmd starts with "/", the setting specified in "arping_path" will be ignored */
145 			if (command[0] == '/')
146 				snprintf(path, sizeof(path), "%s", command);
147 			else
148 				snprintf(path, sizeof(path), "%s/%s", pool_config->arping_path, command);
149 
150 			rtn = exec_if_cmd(path, pool_config->arping_cmd);
151 			pfree(command);
152 		}
153 		else
154 		{
155 			rtn = WD_NG;
156 			ereport(LOG,
157 					(errmsg("failed to acquire the delegate IP address"),
158 					 errdetail("unable to parse the arping_cmd:\"%s\"", pool_config->arping_cmd)));
159 		}
160 	}
161 
162 	if (rtn == WD_OK)
163 	{
164 		for (i = 0; i < WD_TRY_PING_AT_IPUP; i++)
165 		{
166 			if (wd_is_ip_exists(pool_config->delegate_IP) == true)
167 				break;
168 			ereport(LOG,
169 					(errmsg("waiting for the delegate IP address to become active"),
170 					 errdetail("waiting... count: %d", i + 1)));
171 		}
172 
173 		if (i >= WD_TRY_PING_AT_IPUP)
174 			rtn = WD_NG;
175 	}
176 
177 	if (rtn == WD_OK)
178 		ereport(LOG,
179 				(errmsg("successfully acquired the delegate IP:\"%s\"", pool_config->delegate_IP),
180 				 errdetail("'if_up_cmd' returned with success")));
181 	else
182 		ereport(LOG,
183 				(errmsg("failed to acquire the delegate IP address"),
184 				 errdetail("'if_up_cmd' failed")));
185 	return rtn;
186 }
187 
188 int
wd_IP_down(void)189 wd_IP_down(void)
190 {
191 	int			rtn = WD_OK;
192 	char		path[WD_MAX_PATH_LEN];
193 	char	   *command;
194 
195 	if (strlen(pool_config->delegate_IP) == 0)
196 	{
197 		ereport(LOG,
198 				(errmsg("trying to release the delegate IP address, but delegate IP is not configured")));
199 		return WD_OK;
200 	}
201 
202 	command = wd_get_cmd(pool_config->if_down_cmd);
203 	if (command)
204 	{
205 		/* If if_down_cmd starts with "/", the setting specified in "if_cmd_path" will be ignored */
206 		if (command[0] == '/')
207 			snprintf(path, sizeof(path), "%s", command);
208 		else
209 			snprintf(path, sizeof(path), "%s/%s", pool_config->if_cmd_path, command);
210 
211 		rtn = exec_if_cmd(path, pool_config->if_down_cmd);
212 		pfree(command);
213 	}
214 	else
215 	{
216 		ereport(LOG,
217 				(errmsg("failed to release the delegate IP address"),
218 				 errdetail("unable to parse the if_down_cmd:\"%s\"", pool_config->if_down_cmd)));
219 		return WD_NG;
220 	}
221 
222 	if (rtn == WD_OK)
223 	{
224 		ereport(LOG,
225 				(errmsg("successfully released the delegate IP:\"%s\"", pool_config->delegate_IP),
226 				 errdetail("'if_down_cmd' returned with success")));
227 	}
228 	else
229 	{
230 		ereport(LOG,
231 				(errmsg("failed to release the delegate IP:\"%s\"", pool_config->delegate_IP),
232 				 errdetail("'if_down_cmd' failed")));
233 	}
234 	return rtn;
235 }
236 
237 
238 char *
wd_get_cmd(char * cmd)239 wd_get_cmd(char *cmd)
240 {
241 	char	   *command = NULL;
242 
243 	if (cmd && *cmd)
244 	{
245 		char	   *tmp_str = pstrdup(cmd);
246 		char	   *token = strtok(tmp_str, " ");
247 
248 		if (token)
249 			command = pstrdup(token);
250 		pfree(tmp_str);
251 	}
252 	return command;
253 }
254 
255 static int
exec_if_cmd(char * path,char * command)256 exec_if_cmd(char *path, char *command)
257 {
258 	int			pfd[2];
259 	int			status;
260 	char	   *args[24];
261 	int			pid,
262 				i = 0;
263 	char	   *buf;
264 	char	   *bp,
265 			   *ep;
266 
267 	if (pipe(pfd) == -1)
268 	{
269 		ereport(WARNING,
270 				(errmsg("while executing interface up/down command, pipe open failed"),
271 				 errdetail("%m")));
272 		return WD_NG;
273 	}
274 
275 	buf = string_replace(command, "$_IP_$", pool_config->delegate_IP);
276 
277 	bp = buf;
278 	while (*bp == ' ')
279 	{
280 		bp++;
281 	}
282 	while (*bp != '\0')
283 	{
284 		ep = strchr(bp, ' ');
285 		if (ep != NULL)
286 		{
287 			*ep = '\0';
288 		}
289 		args[i++] = bp;
290 		if (ep != NULL)
291 		{
292 			bp = ep + 1;
293 			while (*bp == ' ')
294 			{
295 				bp++;
296 			}
297 		}
298 		else
299 		{
300 			break;
301 		}
302 	}
303 	args[i++] = NULL;
304 
305 	pid = fork();
306 	if (pid == -1)
307 	{
308 		ereport(FATAL,
309 				(errmsg("failed to execute interface up/down command"),
310 				 errdetail("fork() failed with reason: \"%m\"")));
311 	}
312 	if (pid == 0)
313 	{
314 		on_exit_reset();
315 		SetProcessGlobalVariables(PT_WATCHDOG_UTILITY);
316 		close(STDOUT_FILENO);
317 		dup2(pfd[1], STDOUT_FILENO);
318 		close(pfd[0]);
319 		status = execv(path, args);
320 		exit(0);
321 	}
322 	else
323 	{
324 		pfree(buf);
325 		close(pfd[1]);
326 		for (;;)
327 		{
328 			int			result;
329 
330 			result = waitpid(pid, &status, 0);
331 			if (result < 0)
332 			{
333 				if (errno == EINTR)
334 					continue;
335 
336 				ereport(DEBUG1,
337 						(errmsg("watchdog exec waitpid()failed"),
338 						 errdetail("waitpid() system call failed with reason \"%m\"")));
339 
340 				return WD_NG;
341 			}
342 
343 			if (WIFEXITED(status) == 0 || WEXITSTATUS(status) != 0)
344 			{
345 				ereport(DEBUG1,
346 						(errmsg("watchdog exec interface up/down command failed"),
347 						 errdetail("'%s' failed. exit status: %d", command, WEXITSTATUS(status))));
348 
349 				return WD_NG;
350 			}
351 			else
352 				break;
353 		}
354 		close(pfd[0]);
355 	}
356 	ereport(DEBUG1,
357 			(errmsg("watchdog exec interface up/down command: '%s' succeeded", command)));
358 
359 	return WD_OK;
360 }
361 
362 
363 int
create_monitoring_socket(void)364 create_monitoring_socket(void)
365 {
366 	int			sock = -1;
367 #ifdef __linux__
368 	sock = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_ROUTE);
369 
370 #else
371 	sock = socket(PF_ROUTE, SOCK_RAW, AF_UNSPEC);
372 #endif
373 	if (sock < 0)
374 		ereport(ERROR,
375 				(errmsg("watchdog: VIP monitoring failed to create socket"),
376 				 errdetail("socket() failed with error \"%m\"")));
377 
378 #ifdef __linux__
379 	struct sockaddr_nl addr;
380 
381 	memset(&addr, 0x00, sizeof(addr));
382 	addr.nl_family = AF_NETLINK;
383 	addr.nl_groups = RTMGRP_IPV4_IFADDR | RTMGRP_LINK;
384 
385 	if (bind(sock, (struct sockaddr *) &addr, sizeof(addr)) < 0)
386 	{
387 		close(sock);
388 		ereport(ERROR,
389 				(errmsg("watchdog: VIP monitoring failed to bind socket"),
390 				 errdetail("bind() failed with error \"%m\"")));
391 	}
392 #endif
393 
394 	return sock;
395 }
396 
397 #ifdef __linux__
398 bool
read_interface_change_event(int sock,bool * link_event,bool * deleted)399 read_interface_change_event(int sock, bool *link_event, bool *deleted)
400 {
401 	char		buffer[4096];
402 	int			len;
403 	struct iovec iov;
404 	struct msghdr hdr;
405 	struct nlmsghdr *nlhdr;
406 	struct ifinfomsg *ifimsg;
407 
408 	*deleted = false;
409 	*link_event = false;
410 
411 	iov.iov_base = buffer;
412 	iov.iov_len = sizeof(buffer);
413 
414 	memset(&hdr, 0, sizeof(hdr));
415 	hdr.msg_iov = &iov;
416 	hdr.msg_iovlen = 1;
417 
418 	len = recvmsg(sock, &hdr, 0);
419 	if (len < 0)
420 	{
421 		ereport(DEBUG1,
422 				(errmsg("VIP monitoring failed to receive from socket"),
423 				 errdetail("recvmsg() failed with error \"%m\"")));
424 		return false;
425 	}
426 
427 	nlhdr = (struct nlmsghdr *) buffer;
428 
429 	for (; NLMSG_OK(nlhdr, len); nlhdr = NLMSG_NEXT(nlhdr, len))
430 	{
431 		if (nlhdr->nlmsg_type == NLMSG_DONE)
432 			break;
433 
434 		ifimsg = NLMSG_DATA(nlhdr);
435 
436 		switch (nlhdr->nlmsg_type)
437 		{
438 			case RTM_DELLINK:
439 				*deleted = true;	/* fallthrough */
440 			case RTM_NEWLINK:
441 				if (!(ifimsg->ifi_flags & IFF_LOWER_UP) || !(ifimsg->ifi_flags & IFF_RUNNING))
442 					*deleted = true;
443 				else
444 					*link_event = true;
445 				return true;
446 				break;
447 
448 			case RTM_DELADDR:
449 				*deleted = true;	/* fallthrough */
450 			case RTM_NEWADDR:
451 				*link_event = false;
452 				return true;
453 				break;
454 			default:
455 				ereport(DEBUG2,
456 						(errmsg("unknown nlmsg_type=%d", nlhdr->nlmsg_type)));
457 		}
458 	}
459 	return false;
460 }
461 
462 #else							/* For non linux chaps */
463 #if defined(__OpenBSD__) || defined(__FreeBSD__)
464 #define SALIGN (sizeof(long) - 1)
465 #else
466 #define SALIGN (sizeof(int32_t) - 1)
467 #endif
468 
469 #define SA_RLEN(sa) ((sa)->sa_len ? (((sa)->sa_len + SALIGN) & ~SALIGN) : (SALIGN + 1))
470 /* With the help from https://github.com/miniupnp/miniupnp/blob/master/minissdpd/ifacewatch.c */
471 
472 bool
read_interface_change_event(int sock,bool * link_event,bool * deleted)473 read_interface_change_event(int sock, bool *link_event, bool *deleted)
474 {
475 	char		buffer[1024];
476 	int			len;
477 	struct rt_msghdr *nlhdr;
478 
479 	*deleted = false;
480 	*link_event = false;
481 
482 	len = recv(sock, buffer, sizeof(buffer), 0);
483 	if (len < 0)
484 	{
485 		ereport(DEBUG1,
486 				(errmsg("VIP monitoring failed to receive from socket"),
487 				 errdetail("recv() failed with error \"%m\"")));
488 		return false;
489 	}
490 
491 	nlhdr = (struct rt_msghdr *) buffer;
492 	switch (nlhdr->rtm_type)
493 	{
494 		case RTM_DELETE:
495 			*deleted = true;	/* fallthrough */
496 		case RTM_ADD:
497 			*link_event = true;
498 			return true;
499 			break;
500 
501 		case RTM_DELADDR:
502 			*deleted = true;	/* fallthrough */
503 		case RTM_NEWADDR:
504 			*link_event = false;
505 			return true;
506 			break;
507 		default:
508 			ereport(DEBUG2,
509 					(errmsg("unknown nlmsg_type=%d", nlhdr->rtm_type)));
510 	}
511 	return false;
512 }
513 #endif
514 
515 bool
is_interface_up(struct ifaddrs * ifa)516 is_interface_up(struct ifaddrs *ifa)
517 {
518 	bool		result = false;
519 
520 	if (ifa->ifa_flags & IFF_RUNNING)
521 	{
522 		ereport(DEBUG1,
523 				(errmsg("network interface \"%s\" link is active", ifa->ifa_name)));
524 
525 		if (ifa->ifa_flags & IFF_LOWER_UP)
526 		{
527 			ereport(DEBUG1,
528 					(errmsg("network interface \"%s\" link is up", ifa->ifa_name)));
529 			result = true;
530 		}
531 		else
532 			ereport(NOTICE,
533 					(errmsg("network interface \"%s\" link is down", ifa->ifa_name)));
534 	}
535 	else
536 		ereport(NOTICE,
537 				(errmsg("network interface \"%s\" link is inactive", ifa->ifa_name)));
538 
539 	return result;
540 }
541