1 #include "common.h"
2 
3 #ifdef HAVE_IPCSERVICE
4 
5 #ifdef HAVE_LIBEVENT
6 #	include <event.h>
7 #endif
8 
9 #include "zbxtypes.h"
10 #include "zbxalgo.h"
11 #include "log.h"
12 #include "zbxipcservice.h"
13 
14 #define ZBX_IPC_PATH_MAX	sizeof(((struct sockaddr_un *)0)->sun_path)
15 
16 #define ZBX_IPC_DATA_DUMP_SIZE		128
17 
18 static char	ipc_path[ZBX_IPC_PATH_MAX] = {0};
19 static size_t	ipc_path_root_len = 0;
20 
21 #define ZBX_IPC_CLIENT_STATE_NONE	0
22 #define ZBX_IPC_CLIENT_STATE_QUEUED	1
23 
24 #define ZBX_IPC_ASYNC_SOCKET_STATE_NONE		0
25 #define ZBX_IPC_ASYNC_SOCKET_STATE_TIMEOUT	1
26 #define ZBX_IPC_ASYNC_SOCKET_STATE_ERROR	2
27 
28 extern unsigned char	program_type;
29 
30 /* IPC client, providing nonblocking connections through socket */
31 struct zbx_ipc_client
32 {
33 	zbx_ipc_socket_t	csocket;
34 	zbx_ipc_service_t	*service;
35 
36 	zbx_uint32_t		rx_header[2];
37 	unsigned char		*rx_data;
38 	zbx_uint32_t		rx_bytes;
39 	zbx_queue_ptr_t		rx_queue;
40 	struct event		*rx_event;
41 
42 	zbx_uint32_t		tx_header[2];
43 	unsigned char		*tx_data;
44 	zbx_uint32_t		tx_bytes;
45 	zbx_queue_ptr_t		tx_queue;
46 	struct event		*tx_event;
47 
48 	zbx_uint64_t		id;
49 	unsigned char		state;
50 
51 	void			*userdata;
52 
53 	zbx_uint32_t		refcount;
54 };
55 
56 /*
57  * Private API
58  */
59 
60 #define ZBX_IPC_HEADER_SIZE	(int)(sizeof(zbx_uint32_t) * 2)
61 
62 #define ZBX_IPC_MESSAGE_CODE	0
63 #define ZBX_IPC_MESSAGE_SIZE	1
64 
65 #if !defined(LIBEVENT_VERSION_NUMBER) || LIBEVENT_VERSION_NUMBER < 0x2000000
66 typedef int evutil_socket_t;
67 
event_new(struct event_base * ev,evutil_socket_t fd,short what,void (* cb_func)(int,short,void *),void * cb_arg)68 static struct event	*event_new(struct event_base *ev, evutil_socket_t fd, short what,
69 		void(*cb_func)(int, short, void *), void *cb_arg)
70 {
71 	struct event	*event;
72 
73 	event = zbx_malloc(NULL, sizeof(struct event));
74 	event_set(event, fd, what, cb_func, cb_arg);
75 	event_base_set(ev, event);
76 
77 	return event;
78 }
79 
event_free(struct event * event)80 static void	event_free(struct event *event)
81 {
82 	event_del(event);
83 	zbx_free(event);
84 }
85 
86 #endif
87 
88 static void	ipc_client_read_event_cb(evutil_socket_t fd, short what, void *arg);
89 static void	ipc_client_write_event_cb(evutil_socket_t fd, short what, void *arg);
90 
ipc_get_path(void)91 static const char	*ipc_get_path(void)
92 {
93 	ipc_path[ipc_path_root_len] = '\0';
94 
95 	return ipc_path;
96 }
97 
98 #define ZBX_IPC_SOCKET_PREFIX	"/zabbix_"
99 #define ZBX_IPC_SOCKET_SUFFIX	".sock"
100 
101 #define ZBX_IPC_CLASS_PREFIX_NONE	""
102 #define ZBX_IPC_CLASS_PREFIX_SERVER	"server_"
103 #define ZBX_IPC_CLASS_PREFIX_PROXY	"proxy_"
104 #define ZBX_IPC_CLASS_PREFIX_AGENT	"agent_"
105 
106 /******************************************************************************
107  *                                                                            *
108  * Function: ipc_make_path                                                    *
109  *                                                                            *
110  * Purpose: makes socket path from the service name                           *
111  *                                                                            *
112  * Parameters: service_name - [IN] the service name                           *
113  *             error        - [OUT] the error message                         *
114  *                                                                            *
115  * Return value: The created path or NULL if the path exceeds unix domain     *
116  *               socket path maximum length                                   *
117  *                                                                            *
118  ******************************************************************************/
ipc_make_path(const char * service_name,char ** error)119 static const char	*ipc_make_path(const char *service_name, char **error)
120 {
121 	const char	*prefix;
122 	size_t		path_len, offset, prefix_len;
123 
124 	path_len = strlen(service_name);
125 
126 	switch (program_type)
127 	{
128 		case ZBX_PROGRAM_TYPE_SERVER:
129 			prefix = ZBX_IPC_CLASS_PREFIX_SERVER;
130 			prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_SERVER);
131 			break;
132 		case ZBX_PROGRAM_TYPE_PROXY_ACTIVE:
133 		case ZBX_PROGRAM_TYPE_PROXY_PASSIVE:
134 			prefix = ZBX_IPC_CLASS_PREFIX_PROXY;
135 			prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_PROXY);
136 			break;
137 		case ZBX_PROGRAM_TYPE_AGENTD:
138 			prefix = ZBX_IPC_CLASS_PREFIX_AGENT;
139 			prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_AGENT);
140 			break;
141 		default:
142 			prefix = ZBX_IPC_CLASS_PREFIX_NONE;
143 			prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_NONE);
144 			break;
145 	}
146 
147 	if (ZBX_IPC_PATH_MAX < ipc_path_root_len + path_len + 1 + ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_PREFIX) +
148 			ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_SUFFIX) + prefix_len)
149 	{
150 		*error = zbx_dsprintf(*error,
151 				"Socket path \"%s%s%s%s%s\" exceeds maximum length of unix domain socket path.",
152 				ipc_path, ZBX_IPC_SOCKET_PREFIX, prefix, service_name, ZBX_IPC_SOCKET_SUFFIX);
153 		return NULL;
154 	}
155 
156 	offset = ipc_path_root_len;
157 	memcpy(ipc_path + offset , ZBX_IPC_SOCKET_PREFIX, ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_PREFIX));
158 	offset += ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_PREFIX);
159 	memcpy(ipc_path + offset, prefix, prefix_len);
160 	offset += prefix_len;
161 	memcpy(ipc_path + offset, service_name, path_len);
162 	offset += path_len;
163 	memcpy(ipc_path + offset, ZBX_IPC_SOCKET_SUFFIX, ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_SUFFIX) + 1);
164 
165 	return ipc_path;
166 }
167 
168 /******************************************************************************
169  *                                                                            *
170  * Function: ipc_write_data                                                   *
171  *                                                                            *
172  * Purpose: writes data to a socket                                           *
173  *                                                                            *
174  * Parameters: fd        - [IN] the socket file descriptor                    *
175  *             data      - [IN] the data                                      *
176  *             size      - [IN] the data size                                 *
177  *             size_sent - [IN] the actual size written to socket             *
178  *                                                                            *
179  * Return value: SUCCEED - no socket errors were detected. Either the data or *
180  *                         a part of it was written to socket or a write to   *
181  *                         non-blocking socket would block                    *
182  *               FAIL    - otherwise                                          *
183  *                                                                            *
184  ******************************************************************************/
ipc_write_data(int fd,const unsigned char * data,zbx_uint32_t size,zbx_uint32_t * size_sent)185 static int	ipc_write_data(int fd, const unsigned char *data, zbx_uint32_t size, zbx_uint32_t *size_sent)
186 {
187 	zbx_uint32_t	offset = 0;
188 	int		n, ret = SUCCEED;
189 
190 	while (offset != size)
191 	{
192 		n = write(fd, data + offset, size - offset);
193 
194 		if (-1 == n)
195 		{
196 			if (EINTR == errno)
197 				continue;
198 
199 			if (EWOULDBLOCK == errno || EAGAIN == errno)
200 				break;
201 
202 			zabbix_log(LOG_LEVEL_WARNING, "cannot write to IPC socket: %s", strerror(errno));
203 			ret = FAIL;
204 			break;
205 		}
206 		offset += n;
207 	}
208 
209 	*size_sent = offset;
210 
211 	return ret;
212 }
213 
214 /******************************************************************************
215  *                                                                            *
216  * Function: ipc_read_data                                                    *
217  *                                                                            *
218  * Purpose: reads data from a socket                                          *
219  *                                                                            *
220  * Parameters: fd        - [IN] the socket file descriptor                    *
221  *             data      - [IN] the data                                      *
222  *             size      - [IN] the data size                                 *
223  *             size_sent - [IN] the actual size read from socket              *
224  *                                                                            *
225  * Return value: SUCCEED - the data was successfully read                     *
226  *               FAIL    - otherwise                                          *
227  *                                                                            *
228  * Comments: When reading data from non-blocking sockets SUCCEED will be      *
229  *           returned also if there were no more data to read.                *
230  *                                                                            *
231  ******************************************************************************/
ipc_read_data(int fd,unsigned char * buffer,zbx_uint32_t size,zbx_uint32_t * read_size)232 static int	ipc_read_data(int fd, unsigned char *buffer, zbx_uint32_t size, zbx_uint32_t *read_size)
233 {
234 	int	n;
235 
236 	*read_size = 0;
237 
238 	while (-1 == (n = read(fd, buffer + *read_size, size - *read_size)))
239 	{
240 		if (EINTR == errno)
241 			continue;
242 
243 		if (EWOULDBLOCK == errno || EAGAIN == errno)
244 			return SUCCEED;
245 
246 		return FAIL;
247 	}
248 
249 	if (0 == n)
250 		return FAIL;
251 
252 	*read_size += n;
253 
254 	return SUCCEED;
255 }
256 
257 /******************************************************************************
258  *                                                                            *
259  * Function: ipc_read_data_full                                               *
260  *                                                                            *
261  * Purpose: reads data from a socket until the requested data has been read   *
262  *                                                                            *
263  * Parameters: fd        - [IN] the socket file descriptor                    *
264  *             buffer    - [IN] the data                                      *
265  *             size      - [IN] the data size                                 *
266  *             read_size - [IN] the actual size read from socket              *
267  *                                                                            *
268  * Return value: SUCCEED - the data was successfully read                     *
269  *               FAIL    - otherwise                                          *
270  *                                                                            *
271  * Comments: When reading data from non-blocking sockets this function will   *
272  *           return SUCCEED if there are no data to read, even if not all of  *
273  *           the requested data has been read.                                *
274  *                                                                            *
275  ******************************************************************************/
ipc_read_data_full(int fd,unsigned char * buffer,zbx_uint32_t size,zbx_uint32_t * read_size)276 static int	ipc_read_data_full(int fd, unsigned char *buffer, zbx_uint32_t size, zbx_uint32_t *read_size)
277 {
278 	int		ret = FAIL;
279 	zbx_uint32_t	offset = 0, chunk_size;
280 
281 	*read_size = 0;
282 
283 	while (offset < size)
284 	{
285 		if (FAIL == ipc_read_data(fd, buffer + offset, size - offset, &chunk_size))
286 			goto out;
287 
288 		if (0 == chunk_size)
289 			break;
290 
291 		offset += chunk_size;
292 	}
293 
294 	ret = SUCCEED;
295 out:
296 	*read_size = offset;
297 
298 	return ret;
299 }
300 
301 /******************************************************************************
302  *                                                                            *
303  * Function: ipc_socket_write_message                                         *
304  *                                                                            *
305  * Purpose: writes IPC message to socket                                      *
306  *                                                                            *
307  * Parameters: csocket - [IN] the IPC socket                                  *
308  *             code    - [IN] the message code                                *
309  *             data    - [IN] the data                                        *
310  *             size    - [IN] the data size                                   *
311  *             tx_size - [IN] the actual size written to socket               *
312  *                                                                            *
313  * Return value: SUCCEED - no socket errors were detected. Either the data or *
314  *                         a part of it was written to socket or a write to   *
315  *                         non-blocking socket would block                    *
316  *               FAIL    - otherwise                                          *
317  *                                                                            *
318  * Comments: When using non-blocking sockets the tx_size parameter must be    *
319  *           checked in addition to return value to tell if the message was   *
320  *           sent successfully.                                               *
321  *                                                                            *
322  ******************************************************************************/
ipc_socket_write_message(zbx_ipc_socket_t * csocket,zbx_uint32_t code,const unsigned char * data,zbx_uint32_t size,zbx_uint32_t * tx_size)323 static int	ipc_socket_write_message(zbx_ipc_socket_t *csocket, zbx_uint32_t code, const unsigned char *data,
324 		zbx_uint32_t size, zbx_uint32_t *tx_size)
325 {
326 	int		ret;
327 	zbx_uint32_t	size_data, buffer[ZBX_IPC_SOCKET_BUFFER_SIZE / sizeof(zbx_uint32_t)];
328 
329 	buffer[0] = code;
330 	buffer[1] = size;
331 
332 	if (ZBX_IPC_SOCKET_BUFFER_SIZE - ZBX_IPC_HEADER_SIZE >= size)
333 	{
334 		if (0 != size)
335 			memcpy(buffer + 2, data, size);
336 
337 		return ipc_write_data(csocket->fd, (unsigned char *)buffer, size + ZBX_IPC_HEADER_SIZE, tx_size);
338 	}
339 
340 	if (FAIL == ipc_write_data(csocket->fd, (unsigned char *)buffer, ZBX_IPC_HEADER_SIZE, tx_size))
341 		return FAIL;
342 
343 	/* in the case of non-blocking sockets only a part of the header might be sent */
344 	if (ZBX_IPC_HEADER_SIZE != *tx_size)
345 		return SUCCEED;
346 
347 	ret = ipc_write_data(csocket->fd, data, size, &size_data);
348 	*tx_size += size_data;
349 
350 	return ret;
351 }
352 
353 /******************************************************************************
354  *                                                                            *
355  * Function: ipc_read_buffer                                                  *
356  *                                                                            *
357  * Purpose: reads message header and data from buffer                         *
358  *                                                                            *
359  * Parameters: header      - [IN/OUT] the message header                      *
360  *             data        - [OUT] the message data                           *
361  *             rx_bytes    - [IN] the number of bytes stored in message       *
362  *                                (including header)                          *
363  *             buffer      - [IN] the buffer to parse                         *
364  *             size        - [IN] the number of bytes to parse                *
365  *             read_size   - [OUT] the number of bytes read                   *
366  *                                                                            *
367  * Return value: SUCCEED - message was successfully parsed                    *
368  *               FAIL - not enough data                                       *
369  *                                                                            *
370  ******************************************************************************/
ipc_read_buffer(zbx_uint32_t * header,unsigned char ** data,zbx_uint32_t rx_bytes,const unsigned char * buffer,zbx_uint32_t size,zbx_uint32_t * read_size)371 static int	ipc_read_buffer(zbx_uint32_t *header, unsigned char **data, zbx_uint32_t rx_bytes,
372 		const unsigned char *buffer, zbx_uint32_t size, zbx_uint32_t *read_size)
373 {
374 	zbx_uint32_t	copy_size, data_size, data_offset;
375 
376 	*read_size = 0;
377 
378 	if (ZBX_IPC_HEADER_SIZE > rx_bytes)
379 	{
380 		copy_size = MIN(ZBX_IPC_HEADER_SIZE - rx_bytes, size);
381 		memcpy((char *)header + rx_bytes, buffer, copy_size);
382 		*read_size += copy_size;
383 
384 		if (ZBX_IPC_HEADER_SIZE > rx_bytes + copy_size)
385 			return FAIL;
386 
387 		data_size = header[ZBX_IPC_MESSAGE_SIZE];
388 
389 		if (0 == data_size)
390 		{
391 			*data = NULL;
392 			return SUCCEED;
393 		}
394 
395 		*data = (unsigned char *)zbx_malloc(NULL, data_size);
396 		data_offset = 0;
397 	}
398 	else
399 	{
400 		data_size = header[ZBX_IPC_MESSAGE_SIZE];
401 		data_offset = rx_bytes - ZBX_IPC_HEADER_SIZE;
402 	}
403 
404 	copy_size = MIN(data_size - data_offset, size - *read_size);
405 	memcpy(*data + data_offset, buffer + *read_size, copy_size);
406 	*read_size += copy_size;
407 
408 	return (rx_bytes + *read_size == data_size + ZBX_IPC_HEADER_SIZE ? SUCCEED : FAIL);
409 }
410 
411 /******************************************************************************
412  *                                                                            *
413  * Function: ipc_message_is_completed                                         *
414  *                                                                            *
415  * Purpose: checks if IPC message has been completed                          *
416  *                                                                            *
417  * Parameters: header   - [IN] the message header                             *
418  *             rx_bytes - [IN] the number of bytes set in message             *
419  *                             (including header)                             *
420  *                                                                            *
421  * Return value:  SUCCEED - message has been completed                        *
422  *                FAIL - otherwise                                            *
423  *                                                                            *
424  ******************************************************************************/
ipc_message_is_completed(const zbx_uint32_t * header,zbx_uint32_t rx_bytes)425 static int	ipc_message_is_completed(const zbx_uint32_t *header, zbx_uint32_t rx_bytes)
426 {
427 	if (ZBX_IPC_HEADER_SIZE > rx_bytes)
428 		return FAIL;
429 
430 	if (header[ZBX_IPC_MESSAGE_SIZE] + ZBX_IPC_HEADER_SIZE != rx_bytes)
431 		return FAIL;
432 
433 	return SUCCEED;
434 }
435 
436 /******************************************************************************
437  *                                                                            *
438  * Function: ipc_socket_read_message                                          *
439  *                                                                            *
440  * Purpose: reads IPC message from buffered client socket                     *
441  *                                                                            *
442  * Parameters: csocket  - [IN] the source socket                              *
443  *             header   - [OUT] the header of the message                     *
444  *             data     - [OUT] the data of the message                       *
445  *             rx_bytes - [IN/OUT] the total message size read (including     *
446  *                                 header                                     *
447  *                                                                            *
448  * Return value:  SUCCEED - data was read successfully, check rx_bytes to     *
449  *                          determine if the message was completed.           *
450  *                FAIL - failed to read message (socket error or connection   *
451  *                       was closed).                                         *
452  *                                                                            *
453  ******************************************************************************/
ipc_socket_read_message(zbx_ipc_socket_t * csocket,zbx_uint32_t * header,unsigned char ** data,zbx_uint32_t * rx_bytes)454 static int	ipc_socket_read_message(zbx_ipc_socket_t *csocket, zbx_uint32_t *header, unsigned char **data,
455 		zbx_uint32_t *rx_bytes)
456 {
457 	zbx_uint32_t	data_size, offset, read_size = 0;
458 	int		ret = FAIL;
459 
460 	/* try to read message from socket buffer */
461 	if (csocket->rx_buffer_bytes > csocket->rx_buffer_offset)
462 	{
463 		ret = ipc_read_buffer(header, data, *rx_bytes, csocket->rx_buffer + csocket->rx_buffer_offset,
464 				csocket->rx_buffer_bytes - csocket->rx_buffer_offset, &read_size);
465 
466 		csocket->rx_buffer_offset += read_size;
467 		*rx_bytes += read_size;
468 
469 		if (SUCCEED == ret)
470 			goto out;
471 	}
472 
473 	/* not enough data in socket buffer, try to read more until message is completed or no data to read */
474 	while (SUCCEED != ret)
475 	{
476 		csocket->rx_buffer_offset = 0;
477 		csocket->rx_buffer_bytes = 0;
478 
479 		if (ZBX_IPC_HEADER_SIZE < *rx_bytes)
480 		{
481 			offset = *rx_bytes - ZBX_IPC_HEADER_SIZE;
482 			data_size = header[ZBX_IPC_MESSAGE_SIZE] - offset;
483 
484 			/* long messages will be read directly into message buffer */
485 			if (ZBX_IPC_SOCKET_BUFFER_SIZE * 0.75 < data_size)
486 			{
487 				ret = ipc_read_data_full(csocket->fd, *data + offset, data_size, &read_size);
488 				*rx_bytes += read_size;
489 				goto out;
490 			}
491 		}
492 
493 		if (FAIL == ipc_read_data(csocket->fd, csocket->rx_buffer, ZBX_IPC_SOCKET_BUFFER_SIZE, &read_size))
494 			goto out;
495 
496 		/* it's possible that nothing will be read on non-blocking sockets, return success */
497 		if (0 == read_size)
498 		{
499 			ret = SUCCEED;
500 			goto out;
501 		}
502 
503 		csocket->rx_buffer_bytes = read_size;
504 
505 		ret = ipc_read_buffer(header, data, *rx_bytes, csocket->rx_buffer, csocket->rx_buffer_bytes,
506 				&read_size);
507 
508 		csocket->rx_buffer_offset += read_size;
509 		*rx_bytes += read_size;
510 	}
511 out:
512 	return ret;
513 }
514 
515 /******************************************************************************
516  *                                                                            *
517  * Function: ipc_client_free_event                                            *
518  *                                                                            *
519  * Purpose: frees client's libevent event                                     *
520  *                                                                            *
521  * Parameters: client - [IN] the client                                       *
522  *                                                                            *
523  ******************************************************************************/
ipc_client_free_events(zbx_ipc_client_t * client)524 static void	ipc_client_free_events(zbx_ipc_client_t *client)
525 {
526 	if (NULL != client->rx_event)
527 	{
528 		event_free(client->rx_event);
529 		client->rx_event = NULL;
530 	}
531 
532 	if (NULL != client->tx_event)
533 	{
534 		event_free(client->tx_event);
535 		client->tx_event = NULL;
536 	}
537 }
538 
539 /******************************************************************************
540  *                                                                            *
541  * Function: ipc_client_free                                                  *
542  *                                                                            *
543  * Purpose: frees IPC service client                                          *
544  *                                                                            *
545  * Parameters: client - [IN] the client to free                               *
546  *                                                                            *
547  ******************************************************************************/
ipc_client_free(zbx_ipc_client_t * client)548 static void	ipc_client_free(zbx_ipc_client_t *client)
549 {
550 	zbx_ipc_message_t	*message;
551 
552 	ipc_client_free_events(client);
553 	zbx_ipc_socket_close(&client->csocket);
554 
555 	while (NULL != (message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&client->rx_queue)))
556 		zbx_ipc_message_free(message);
557 
558 	zbx_queue_ptr_destroy(&client->rx_queue);
559 	zbx_free(client->rx_data);
560 
561 	while (NULL != (message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&client->tx_queue)))
562 		zbx_ipc_message_free(message);
563 
564 	zbx_queue_ptr_destroy(&client->tx_queue);
565 	zbx_free(client->tx_data);
566 
567 	ipc_client_free_events(client);
568 
569 	zbx_free(client);
570 }
571 
572 /******************************************************************************
573  *                                                                            *
574  * Function: ipc_client_push_rx_message                                       *
575  *                                                                            *
576  * Purpose: adds message to received messages queue                           *
577  *                                                                            *
578  * Parameters: client - [IN] the client to read                               *
579  *                                                                            *
580  ******************************************************************************/
ipc_client_push_rx_message(zbx_ipc_client_t * client)581 static void	ipc_client_push_rx_message(zbx_ipc_client_t *client)
582 {
583 	zbx_ipc_message_t	*message;
584 
585 	message = (zbx_ipc_message_t *)zbx_malloc(NULL, sizeof(zbx_ipc_message_t));
586 	message->code = client->rx_header[ZBX_IPC_MESSAGE_CODE];
587 	message->size = client->rx_header[ZBX_IPC_MESSAGE_SIZE];
588 	message->data = client->rx_data;
589 	zbx_queue_ptr_push(&client->rx_queue, message);
590 
591 	client->rx_data = NULL;
592 	client->rx_bytes = 0;
593 }
594 
595 /******************************************************************************
596  *                                                                            *
597  * Function: ipc_client_pop_tx_message                                        *
598  *                                                                            *
599  * Purpose: prepares to send the next message in send queue                   *
600  *                                                                            *
601  * Parameters: client - [IN] the client                                       *
602  *                                                                            *
603  ******************************************************************************/
ipc_client_pop_tx_message(zbx_ipc_client_t * client)604 static void	ipc_client_pop_tx_message(zbx_ipc_client_t *client)
605 {
606 	zbx_ipc_message_t	*message;
607 
608 	zbx_free(client->tx_data);
609 	client->tx_bytes = 0;
610 
611 	if (NULL == (message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&client->tx_queue)))
612 		return;
613 
614 	client->tx_bytes = ZBX_IPC_HEADER_SIZE + message->size;
615 	client->tx_header[ZBX_IPC_MESSAGE_CODE] = message->code;
616 	client->tx_header[ZBX_IPC_MESSAGE_SIZE] = message->size;
617 	client->tx_data = message->data;
618 	zbx_free(message);
619 }
620 
621 /******************************************************************************
622  *                                                                            *
623  * Function: ipc_client_read                                                  *
624  *                                                                            *
625  * Purpose: reads data from IPC service client                                *
626  *                                                                            *
627  * Parameters: client - [IN] the client to read                               *
628  *                                                                            *
629  * Return value:  FAIL - read error/connection was closed                     *
630  *                                                                            *
631  * Comments: This function reads data from socket, parses it and adds         *
632  *           parsed messages to received messages queue.                      *
633  *                                                                            *
634  ******************************************************************************/
ipc_client_read(zbx_ipc_client_t * client)635 static int	ipc_client_read(zbx_ipc_client_t *client)
636 {
637 	int	rc;
638 
639 	do
640 	{
641 		if (FAIL == ipc_socket_read_message(&client->csocket, client->rx_header, &client->rx_data,
642 				&client->rx_bytes))
643 		{
644 			zbx_free(client->rx_data);
645 			client->rx_bytes = 0;
646 			return FAIL;
647 		}
648 
649 		if (SUCCEED == (rc = ipc_message_is_completed(client->rx_header, client->rx_bytes)))
650 			ipc_client_push_rx_message(client);
651 	}
652 
653 	while (SUCCEED == rc);
654 
655 	return SUCCEED;
656 }
657 
658 /******************************************************************************
659  *                                                                            *
660  * Function: ipc_client_write                                                 *
661  *                                                                            *
662  * Purpose: writes queued data to IPC service client                          *
663  *                                                                            *
664  * Parameters: client - [IN] the client                                       *
665  *                                                                            *
666  * Return value: SUCCEED - the data was sent successfully                     *
667  *               FAIL    - otherwise                                          *
668  *                                                                            *
669  ******************************************************************************/
ipc_client_write(zbx_ipc_client_t * client)670 static int	ipc_client_write(zbx_ipc_client_t *client)
671 {
672 	zbx_uint32_t	data_size, write_size;
673 
674 	data_size = client->tx_header[ZBX_IPC_MESSAGE_SIZE];
675 
676 	if (data_size < client->tx_bytes)
677 	{
678 		zbx_uint32_t	size, offset;
679 
680 		size = client->tx_bytes - data_size;
681 		offset = ZBX_IPC_HEADER_SIZE - size;
682 
683 		if (SUCCEED != ipc_write_data(client->csocket.fd, (unsigned char *)client->tx_header + offset, size,
684 				&write_size))
685 		{
686 			return FAIL;
687 		}
688 
689 		client->tx_bytes -= write_size;
690 
691 		if (data_size < client->tx_bytes)
692 			return SUCCEED;
693 	}
694 
695 	while (0 < client->tx_bytes)
696 	{
697 		if (SUCCEED != ipc_write_data(client->csocket.fd, client->tx_data + data_size - client->tx_bytes,
698 				client->tx_bytes, &write_size))
699 		{
700 			return FAIL;
701 		}
702 
703 		if (0 == write_size)
704 			return SUCCEED;
705 
706 		client->tx_bytes -= write_size;
707 	}
708 
709 	if (0 == client->tx_bytes)
710 		ipc_client_pop_tx_message(client);
711 
712 	return SUCCEED;
713 }
714 
715 /******************************************************************************
716  *                                                                            *
717  * Function: ipc_service_pop_client                                           *
718  *                                                                            *
719  * Purpose: gets the next client with messages/closed socket from recv queue  *
720  *                                                                            *
721  * Parameters: service - [IN] the IPC service                                 *
722  *                                                                            *
723  * Return value: The client with messages/closed socket                       *
724  *                                                                            *
725  ******************************************************************************/
ipc_service_pop_client(zbx_ipc_service_t * service)726 static zbx_ipc_client_t	*ipc_service_pop_client(zbx_ipc_service_t *service)
727 {
728 	zbx_ipc_client_t	*client;
729 
730 	if (NULL != (client = (zbx_ipc_client_t *)zbx_queue_ptr_pop(&service->clients_recv)))
731 		client->state = ZBX_IPC_CLIENT_STATE_NONE;
732 
733 	return client;
734 }
735 
736 /******************************************************************************
737  *                                                                            *
738  * Function: ipc_service_push_client                                          *
739  *                                                                            *
740  * Purpose: pushes client to the recv queue if needed                         *
741  *                                                                            *
742  * Parameters: service - [IN] the IPC service                                 *
743  *             client  - [IN] the IPC client                                  *
744  *                                                                            *
745  * Comments: The client is pushed to the recv queue if it isn't already there *
746  *           and there is messages to return or the client connection was     *
747  *           closed.                                                          *
748  *                                                                            *
749  ******************************************************************************/
ipc_service_push_client(zbx_ipc_service_t * service,zbx_ipc_client_t * client)750 static void	ipc_service_push_client(zbx_ipc_service_t *service, zbx_ipc_client_t *client)
751 {
752 	if (ZBX_IPC_CLIENT_STATE_QUEUED == client->state)
753 		return;
754 
755 	if (0 == zbx_queue_ptr_values_num(&client->rx_queue) && NULL != client->rx_event)
756 		return;
757 
758 	client->state = ZBX_IPC_CLIENT_STATE_QUEUED;
759 	zbx_queue_ptr_push(&service->clients_recv, client);
760 }
761 
762 /******************************************************************************
763  *                                                                            *
764  * Function: ipc_service_add_client                                           *
765  *                                                                            *
766  * Purpose: adds a new IPC service client                                     *
767  *                                                                            *
768  * Parameters: service - [IN] the IPC service                                 *
769  *             fd      - [IN] the client socket descriptor                    *
770  *                                                                            *
771  ******************************************************************************/
ipc_service_add_client(zbx_ipc_service_t * service,int fd)772 static void	ipc_service_add_client(zbx_ipc_service_t *service, int fd)
773 {
774 	static zbx_uint64_t	next_clientid = 1;
775 	zbx_ipc_client_t	*client;
776 	int			flags;
777 
778 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
779 
780 	client = (zbx_ipc_client_t *)zbx_malloc(NULL, sizeof(zbx_ipc_client_t));
781 	memset(client, 0, sizeof(zbx_ipc_client_t));
782 
783 	if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
784 	{
785 		zabbix_log(LOG_LEVEL_CRIT, "cannot get IPC client socket flags");
786 		exit(EXIT_FAILURE);
787 	}
788 
789 	if (-1 == fcntl(fd, F_SETFL, flags | O_NONBLOCK))
790 	{
791 		zabbix_log(LOG_LEVEL_CRIT, "cannot set non-blocking mode for IPC client socket");
792 		exit(EXIT_FAILURE);
793 	}
794 
795 	client->csocket.fd = fd;
796 	client->csocket.rx_buffer_bytes = 0;
797 	client->csocket.rx_buffer_offset = 0;
798 	client->id = next_clientid++;
799 	client->state = ZBX_IPC_CLIENT_STATE_NONE;
800 	client->refcount = 1;
801 
802 	zbx_queue_ptr_create(&client->rx_queue);
803 	zbx_queue_ptr_create(&client->tx_queue);
804 
805 	client->service = service;
806 	client->rx_event = event_new(service->ev, fd, EV_READ | EV_PERSIST, ipc_client_read_event_cb, (void *)client);
807 	client->tx_event = event_new(service->ev, fd, EV_WRITE | EV_PERSIST, ipc_client_write_event_cb, (void *)client);
808 	event_add(client->rx_event, NULL);
809 
810 	zbx_vector_ptr_append(&service->clients, client);
811 
812 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s() clientid:" ZBX_FS_UI64, __func__, client->id);
813 }
814 
815 /******************************************************************************
816  *                                                                            *
817  * Function: ipc_service_remove_client                                        *
818  *                                                                            *
819  * Purpose: removes IPC service client                                        *
820  *                                                                            *
821  * Parameters: service - [IN] the IPC service                                 *
822  *             client  - [IN] the client to remove                            *
823  *                                                                            *
824  ******************************************************************************/
ipc_service_remove_client(zbx_ipc_service_t * service,zbx_ipc_client_t * client)825 static void	ipc_service_remove_client(zbx_ipc_service_t *service, zbx_ipc_client_t *client)
826 {
827 	int		i;
828 
829 	for (i = 0; i < service->clients.values_num; i++)
830 	{
831 		if (service->clients.values[i] == client)
832 			zbx_vector_ptr_remove_noorder(&service->clients, i);
833 	}
834 }
835 
836 /******************************************************************************
837  *                                                                            *
838  * Function: zbx_ipc_client_by_id                                             *
839  *                                                                            *
840  * Purpose: to find connected client when only it's ID is known               *
841  *                                                                            *
842  * Parameters: service - [IN] the IPC service                                 *
843  *             id      - [IN] ID of client                                    *
844  *                                                                            *
845  * Return value: address of client or NULL if client has already disconnected *
846  *                                                                            *
847  ******************************************************************************/
zbx_ipc_client_by_id(const zbx_ipc_service_t * service,zbx_uint64_t id)848 zbx_ipc_client_t	*zbx_ipc_client_by_id(const zbx_ipc_service_t *service, zbx_uint64_t id)
849 {
850 	int			i;
851 	zbx_ipc_client_t	*client;
852 
853 	for (i = 0; i < service->clients.values_num; i++)
854 	{
855 		client = (zbx_ipc_client_t *) service->clients.values[i];
856 
857 		if (id == client->id)
858 			return client;
859 	}
860 
861 	return NULL;
862 }
863 
864 /******************************************************************************
865  *                                                                            *
866  * Function: ipc_client_read_event_cb                                         *
867  *                                                                            *
868  * Purpose: service client read event libevent callback                       *
869  *                                                                            *
870  ******************************************************************************/
ipc_client_read_event_cb(evutil_socket_t fd,short what,void * arg)871 static void	ipc_client_read_event_cb(evutil_socket_t fd, short what, void *arg)
872 {
873 	zbx_ipc_client_t	*client = (zbx_ipc_client_t *)arg;
874 
875 	ZBX_UNUSED(fd);
876 	ZBX_UNUSED(what);
877 
878 	if (SUCCEED != ipc_client_read(client))
879 	{
880 		ipc_client_free_events(client);
881 		ipc_service_remove_client(client->service, client);
882 	}
883 
884 	ipc_service_push_client(client->service, client);
885 }
886 
887 /******************************************************************************
888  *                                                                            *
889  * Function: ipc_client_write_event_cb                                        *
890  *                                                                            *
891  * Purpose: service client write event libevent callback                      *
892  *                                                                            *
893  ******************************************************************************/
ipc_client_write_event_cb(evutil_socket_t fd,short what,void * arg)894 static void	ipc_client_write_event_cb(evutil_socket_t fd, short what, void *arg)
895 {
896 	zbx_ipc_client_t	*client = (zbx_ipc_client_t *)arg;
897 
898 	ZBX_UNUSED(fd);
899 	ZBX_UNUSED(what);
900 
901 	if (SUCCEED != ipc_client_write(client))
902 	{
903 		zabbix_log(LOG_LEVEL_CRIT, "cannot send data to IPC client");
904 		zbx_ipc_client_close(client);
905 		return;
906 	}
907 
908 	if (0 == client->tx_bytes)
909 		event_del(client->tx_event);
910 }
911 
912 /******************************************************************************
913  *                                                                            *
914  * Function: ipc_async_socket_write_event_cb                                  *
915  *                                                                            *
916  * Purpose: asynchronous socket write event libevent callback                 *
917  *                                                                            *
918  ******************************************************************************/
ipc_async_socket_write_event_cb(evutil_socket_t fd,short what,void * arg)919 static void	ipc_async_socket_write_event_cb(evutil_socket_t fd, short what, void *arg)
920 {
921 	zbx_ipc_async_socket_t	*asocket = (zbx_ipc_async_socket_t *)arg;
922 
923 	ZBX_UNUSED(fd);
924 	ZBX_UNUSED(what);
925 
926 	if (SUCCEED != ipc_client_write(asocket->client))
927 	{
928 		zabbix_log(LOG_LEVEL_CRIT, "cannot send data to IPC client");
929 		ipc_client_free_events(asocket->client);
930 		zbx_ipc_socket_close(&asocket->client->csocket);
931 		asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_ERROR;
932 		return;
933 	}
934 
935 	if (0 == asocket->client->tx_bytes)
936 		event_del(asocket->client->tx_event);
937 }
938 
939 /******************************************************************************
940  *                                                                            *
941  * Function: ipc_async_socket_read_event_cb                                   *
942  *                                                                            *
943  * Purpose: asynchronous socket read event libevent callback                  *
944  *                                                                            *
945  ******************************************************************************/
ipc_async_socket_read_event_cb(evutil_socket_t fd,short what,void * arg)946 static void	ipc_async_socket_read_event_cb(evutil_socket_t fd, short what, void *arg)
947 {
948 	zbx_ipc_async_socket_t	*asocket = (zbx_ipc_async_socket_t *)arg;
949 
950 	ZBX_UNUSED(fd);
951 	ZBX_UNUSED(what);
952 
953 	if (SUCCEED != ipc_client_read(asocket->client))
954 	{
955 		ipc_client_free_events(asocket->client);
956 		asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_ERROR;
957 	}
958 }
959 
960 /******************************************************************************
961  *                                                                            *
962  * Function: ipc_async_socket_timer_cb                                        *
963  *                                                                            *
964  * Purpose: timer callback                                                    *
965  *                                                                            *
966  ******************************************************************************/
ipc_async_socket_timer_cb(evutil_socket_t fd,short what,void * arg)967 static void	ipc_async_socket_timer_cb(evutil_socket_t fd, short what, void *arg)
968 {
969 	zbx_ipc_async_socket_t	*asocket = (zbx_ipc_async_socket_t *)arg;
970 
971 	ZBX_UNUSED(fd);
972 	ZBX_UNUSED(what);
973 
974 	asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_TIMEOUT;
975 }
976 
977 /******************************************************************************
978  *                                                                            *
979  * Function: ipc_service_accept                                               *
980  *                                                                            *
981  * Purpose: accepts a new client connection                                   *
982  *                                                                            *
983  * Parameters: service - [IN] the IPC service                                 *
984  *                                                                            *
985  ******************************************************************************/
ipc_service_accept(zbx_ipc_service_t * service)986 static void	ipc_service_accept(zbx_ipc_service_t *service)
987 {
988 	int	fd;
989 
990 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
991 
992 	while (-1 == (fd = accept(service->fd, NULL, NULL)))
993 	{
994 		if (EINTR != errno)
995 		{
996 			/* If there is unaccepted connection libevent will call registered callback function over and */
997 			/* over again. It is better to exit straight away and cause all other processes to stop. */
998 			zabbix_log(LOG_LEVEL_CRIT, "cannot accept incoming IPC connection: %s", zbx_strerror(errno));
999 			exit(EXIT_FAILURE);
1000 		}
1001 	}
1002 
1003 	ipc_service_add_client(service, fd);
1004 
1005 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1006 }
1007 
1008 /******************************************************************************
1009  *                                                                            *
1010  * Function: ipc_message_create                                               *
1011  *                                                                            *
1012  * Purpose: creates IPC message                                               *
1013  *                                                                            *
1014  * Parameters: code    - [IN] the message code                                *
1015  *             data    - [IN] the data                                        *
1016  *             size    - [IN] the data size                                   *
1017  *                                                                            *
1018  * Return value: The created message.                                         *
1019  *                                                                            *
1020  ******************************************************************************/
ipc_message_create(zbx_uint32_t code,const unsigned char * data,zbx_uint32_t size)1021 static zbx_ipc_message_t	*ipc_message_create(zbx_uint32_t code, const unsigned char *data, zbx_uint32_t size)
1022 {
1023 	zbx_ipc_message_t	*message;
1024 
1025 	message = (zbx_ipc_message_t *)zbx_malloc(NULL, sizeof(zbx_ipc_message_t));
1026 
1027 	message->code = code;
1028 	message->size = size;
1029 
1030 	if (0 != size)
1031 	{
1032 		message->data = (unsigned char *)zbx_malloc(NULL, size);
1033 		memcpy(message->data, data, size);
1034 	}
1035 	else
1036 		message->data = NULL;
1037 
1038 	return message;
1039 }
1040 
1041 /******************************************************************************
1042  *                                                                            *
1043  * Function: ipc_service_event_log                                            *
1044  *                                                                            *
1045  * Purpose: libevent logging callback                                         *
1046  *                                                                            *
1047  ******************************************************************************/
ipc_service_event_log_cb(int severity,const char * msg)1048 static void ipc_service_event_log_cb(int severity, const char *msg)
1049 {
1050 	int	loglevel;
1051 
1052 	switch (severity)
1053 	{
1054 		case _EVENT_LOG_DEBUG:
1055 			loglevel = LOG_LEVEL_TRACE;
1056 			break;
1057 		case _EVENT_LOG_MSG:
1058 			loglevel = LOG_LEVEL_DEBUG;
1059 			break;
1060 		case _EVENT_LOG_WARN:
1061 			loglevel = LOG_LEVEL_WARNING;
1062 			break;
1063 		case _EVENT_LOG_ERR:
1064 			loglevel = LOG_LEVEL_DEBUG;
1065 			break;
1066 		default:
1067 			loglevel = LOG_LEVEL_DEBUG;
1068 			break;
1069 	}
1070 
1071 	zabbix_log(loglevel, "IPC service: %s", msg);
1072 }
1073 
1074 /******************************************************************************
1075  *                                                                            *
1076  * Function: ipc_service_init_libevent                                        *
1077  *                                                                            *
1078  * Purpose: initialize libevent library                                       *
1079  *                                                                            *
1080  ******************************************************************************/
ipc_service_init_libevent(void)1081 static void	ipc_service_init_libevent(void)
1082 {
1083 	event_set_log_callback(ipc_service_event_log_cb);
1084 }
1085 
1086 /******************************************************************************
1087  *                                                                            *
1088  * Function: ipc_service_free_libevent                                        *
1089  *                                                                            *
1090  * Purpose: uninitialize libevent library                                     *
1091  *                                                                            *
1092  ******************************************************************************/
ipc_service_free_libevent(void)1093 static void	ipc_service_free_libevent(void)
1094 {
1095 }
1096 
1097 /******************************************************************************
1098  *                                                                            *
1099  * Function: ipc_service_client_connected_cb                                  *
1100  *                                                                            *
1101  * Purpose: libevent listener callback                                        *
1102  *                                                                            *
1103  ******************************************************************************/
ipc_service_client_connected_cb(evutil_socket_t fd,short what,void * arg)1104 static void	ipc_service_client_connected_cb(evutil_socket_t fd, short what, void *arg)
1105 {
1106 	zbx_ipc_service_t	*service = (zbx_ipc_service_t *)arg;
1107 
1108 	ZBX_UNUSED(fd);
1109 	ZBX_UNUSED(what);
1110 
1111 	ipc_service_accept(service);
1112 }
1113 
1114 /******************************************************************************
1115  *                                                                            *
1116  * Function: ipc_service_timer_cb                                             *
1117  *                                                                            *
1118  * Purpose: timer callback                                                    *
1119  *                                                                            *
1120  ******************************************************************************/
ipc_service_timer_cb(evutil_socket_t fd,short what,void * arg)1121 static void	ipc_service_timer_cb(evutil_socket_t fd, short what, void *arg)
1122 {
1123 	ZBX_UNUSED(fd);
1124 	ZBX_UNUSED(what);
1125 	ZBX_UNUSED(arg);
1126 }
1127 
1128 /******************************************************************************
1129  *                                                                            *
1130  * Function: ipc_check_running_service                                        *
1131  *                                                                            *
1132  * Purpose: checks if an IPC service is already running                       *
1133  *                                                                            *
1134  * Parameters: service_name - [IN]                                            *
1135  *                                                                            *
1136  ******************************************************************************/
ipc_check_running_service(const char * service_name)1137 static int	ipc_check_running_service(const char *service_name)
1138 {
1139 	zbx_ipc_socket_t	csocket;
1140 	int			ret;
1141 	char			*error = NULL;
1142 
1143 	if (SUCCEED == (ret = zbx_ipc_socket_open(&csocket, service_name, 0, &error)))
1144 		zbx_ipc_socket_close(&csocket);
1145 	else
1146 		zbx_free(error);
1147 
1148 	return ret;
1149 }
1150 
1151 /*
1152  * Public client API
1153  */
1154 
1155 /******************************************************************************
1156  *                                                                            *
1157  * Function: zbx_ipc_socket_open                                              *
1158  *                                                                            *
1159  * Purpose: opens socket to an IPC service listening on the specified path    *
1160  *                                                                            *
1161  * Parameters: csocket      - [OUT] the IPC socket to the service             *
1162  *             service_name - [IN] the IPC service name                       *
1163  *             timeout      - [IN] the connection timeout                     *
1164  *             error        - [OUT] the error message                         *
1165  *                                                                            *
1166  * Return value: SUCCEED - the socket was successfully opened                 *
1167  *               FAIL    - otherwise                                          *
1168  *                                                                            *
1169  ******************************************************************************/
zbx_ipc_socket_open(zbx_ipc_socket_t * csocket,const char * service_name,int timeout,char ** error)1170 int	zbx_ipc_socket_open(zbx_ipc_socket_t *csocket, const char *service_name, int timeout, char **error)
1171 {
1172 	struct sockaddr_un	addr;
1173 	time_t			start;
1174 	struct timespec		ts = {0, 100000000};
1175 	const char		*socket_path;
1176 	int			ret = FAIL;
1177 
1178 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1179 
1180 	if (NULL == (socket_path = ipc_make_path(service_name, error)))
1181 		goto out;
1182 
1183 	if (-1 == (csocket->fd = socket(AF_UNIX, SOCK_STREAM, 0)))
1184 	{
1185 		*error = zbx_dsprintf(*error, "Cannot create client socket: %s.", zbx_strerror(errno));
1186 		goto out;
1187 	}
1188 
1189 	memset(&addr, 0, sizeof(addr));
1190 	addr.sun_family = AF_UNIX;
1191 	memcpy(addr.sun_path, socket_path, sizeof(addr.sun_path));
1192 
1193 	start = time(NULL);
1194 
1195 	while (0 != connect(csocket->fd, (struct sockaddr*)&addr, sizeof(addr)))
1196 	{
1197 		if (0 == timeout || time(NULL) - start > timeout)
1198 		{
1199 			*error = zbx_dsprintf(*error, "Cannot connect to service \"%s\": %s.", service_name,
1200 					zbx_strerror(errno));
1201 			close(csocket->fd);
1202 			goto out;
1203 		}
1204 
1205 		nanosleep(&ts, NULL);
1206 	}
1207 
1208 	csocket->rx_buffer_bytes = 0;
1209 	csocket->rx_buffer_offset = 0;
1210 
1211 	ret = SUCCEED;
1212 out:
1213 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1214 	return ret;
1215 }
1216 
1217 /******************************************************************************
1218  *                                                                            *
1219  * Function: zbx_ipc_socket_close                                             *
1220  *                                                                            *
1221  * Purpose: closes socket to an IPC service                                   *
1222  *                                                                            *
1223  * Parameters: csocket - [IN/OUT] the IPC socket to close                     *
1224  *                                                                            *
1225  ******************************************************************************/
zbx_ipc_socket_close(zbx_ipc_socket_t * csocket)1226 void	zbx_ipc_socket_close(zbx_ipc_socket_t *csocket)
1227 {
1228 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1229 
1230 	if (-1 != csocket->fd)
1231 	{
1232 		close(csocket->fd);
1233 		csocket->fd = -1;
1234 	}
1235 
1236 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1237 }
1238 
1239 /******************************************************************************
1240  *                                                                            *
1241  * Function: zbx_ipc_socket_write                                             *
1242  *                                                                            *
1243  * Purpose: writes a message to IPC service                                   *
1244  *                                                                            *
1245  * Parameters: csocket - [IN] an opened IPC socket to the service             *
1246  *             code    - [IN] the message code                                *
1247  *             data    - [IN] the data                                        *
1248  *             size    - [IN] the data size                                   *
1249  *                                                                            *
1250  * Return value: SUCCEED - the message was successfully written               *
1251  *               FAIL    - otherwise                                          *
1252  *                                                                            *
1253  ******************************************************************************/
zbx_ipc_socket_write(zbx_ipc_socket_t * csocket,zbx_uint32_t code,const unsigned char * data,zbx_uint32_t size)1254 int	zbx_ipc_socket_write(zbx_ipc_socket_t *csocket, zbx_uint32_t code, const unsigned char *data, zbx_uint32_t size)
1255 {
1256 	int		ret;
1257 	zbx_uint32_t	size_sent;
1258 
1259 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1260 
1261 	if (SUCCEED == ipc_socket_write_message(csocket, code, data, size, &size_sent) &&
1262 			size_sent == size + ZBX_IPC_HEADER_SIZE)
1263 	{
1264 		ret = SUCCEED;
1265 	}
1266 	else
1267 		ret = FAIL;
1268 
1269 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1270 
1271 	return ret;
1272 }
1273 
1274 /******************************************************************************
1275  *                                                                            *
1276  * Function: zbx_ipc_socket_read                                              *
1277  *                                                                            *
1278  * Purpose: reads a message from IPC service                                  *
1279  *                                                                            *
1280  * Parameters: csocket - [IN] an opened IPC socket to the service             *
1281  *             message - [OUT] the received message                           *
1282  *                                                                            *
1283  * Return value: SUCCEED - the message was successfully received              *
1284  *               FAIL    - otherwise                                          *
1285  *                                                                            *
1286  * Comments: If this function succeeds the message must be cleaned/freed by   *
1287  *           the caller.                                                      *
1288  *                                                                            *
1289  ******************************************************************************/
zbx_ipc_socket_read(zbx_ipc_socket_t * csocket,zbx_ipc_message_t * message)1290 int	zbx_ipc_socket_read(zbx_ipc_socket_t *csocket, zbx_ipc_message_t *message)
1291 {
1292 	int		ret = FAIL;
1293 	zbx_uint32_t	rx_bytes = 0, header[2];
1294 	unsigned char	*data = NULL;
1295 
1296 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1297 
1298 	if (SUCCEED != ipc_socket_read_message(csocket, header, &data, &rx_bytes))
1299 		goto out;
1300 
1301 	if (SUCCEED != ipc_message_is_completed(header, rx_bytes))
1302 	{
1303 		zbx_free(data);
1304 		goto out;
1305 	}
1306 
1307 	message->code = header[ZBX_IPC_MESSAGE_CODE];
1308 	message->size = header[ZBX_IPC_MESSAGE_SIZE];
1309 	message->data = data;
1310 
1311 	if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_TRACE))
1312 	{
1313 		char	*msg = NULL;
1314 
1315 		zbx_ipc_message_format(message, &msg);
1316 
1317 		zabbix_log(LOG_LEVEL_DEBUG, "%s() %s", __func__, msg);
1318 
1319 		zbx_free(msg);
1320 	}
1321 
1322 	ret = SUCCEED;
1323 out:
1324 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1325 
1326 	return ret;
1327 }
1328 
1329 /******************************************************************************
1330  *                                                                            *
1331  * Function: zbx_ipc_message_free                                             *
1332  *                                                                            *
1333  * Purpose: frees the resources allocated to store IPC message data           *
1334  *                                                                            *
1335  * Parameters: message - [IN] the message to free                             *
1336  *                                                                            *
1337  ******************************************************************************/
zbx_ipc_message_free(zbx_ipc_message_t * message)1338 void	zbx_ipc_message_free(zbx_ipc_message_t *message)
1339 {
1340 	if (NULL != message)
1341 	{
1342 		zbx_free(message->data);
1343 		zbx_free(message);
1344 	}
1345 }
1346 
1347 /******************************************************************************
1348  *                                                                            *
1349  * Function: zbx_ipc_message_clean                                            *
1350  *                                                                            *
1351  * Purpose: frees the resources allocated to store IPC message data           *
1352  *                                                                            *
1353  * Parameters: message - [IN] the message to clean                            *
1354  *                                                                            *
1355  ******************************************************************************/
zbx_ipc_message_clean(zbx_ipc_message_t * message)1356 void	zbx_ipc_message_clean(zbx_ipc_message_t *message)
1357 {
1358 	zbx_free(message->data);
1359 }
1360 
1361 /******************************************************************************
1362  *                                                                            *
1363  * Function: zbx_ipc_message_init                                             *
1364  *                                                                            *
1365  * Purpose: initializes IPC message                                           *
1366  *                                                                            *
1367  * Parameters: message - [IN] the message to initialize                       *
1368  *                                                                            *
1369  ******************************************************************************/
zbx_ipc_message_init(zbx_ipc_message_t * message)1370 void	zbx_ipc_message_init(zbx_ipc_message_t *message)
1371 {
1372 	memset(message, 0, sizeof(zbx_ipc_message_t));
1373 }
1374 
1375 /******************************************************************************
1376  *                                                                            *
1377  * Function: zbx_ipc_message_format                                           *
1378  *                                                                            *
1379  * Purpose: formats message to readable format for debug messages             *
1380  *                                                                            *
1381  * Parameters: message - [IN] the message                                     *
1382  *             data    - [OUT] the formatted message                          *
1383  *                                                                            *
1384  ******************************************************************************/
zbx_ipc_message_format(const zbx_ipc_message_t * message,char ** data)1385 void	zbx_ipc_message_format(const zbx_ipc_message_t *message, char **data)
1386 {
1387 	size_t		data_alloc = ZBX_IPC_DATA_DUMP_SIZE * 4 + 32, data_offset = 0;
1388 	zbx_uint32_t	i, data_num;
1389 
1390 	if (NULL == message)
1391 		return;
1392 
1393 	data_num = message->size;
1394 
1395 	if (ZBX_IPC_DATA_DUMP_SIZE < data_num)
1396 		data_num = ZBX_IPC_DATA_DUMP_SIZE;
1397 
1398 	*data = (char *)zbx_malloc(*data, data_alloc);
1399 	zbx_snprintf_alloc(data, &data_alloc, &data_offset, "code:%u size:%u data:", message->code, message->size);
1400 
1401 	for (i = 0; i < data_num; i++)
1402 	{
1403 		if (0 != i)
1404 			zbx_strcpy_alloc(data, &data_alloc, &data_offset, (0 == (i & 7) ? " | " : " "));
1405 
1406 		zbx_snprintf_alloc(data, &data_alloc, &data_offset, "%02x", (int)message->data[i]);
1407 	}
1408 
1409 	(*data)[data_offset] = '\0';
1410 }
1411 
1412 /******************************************************************************
1413  *                                                                            *
1414  * Function: zbx_ipc_message_copy                                             *
1415  *                                                                            *
1416  * Purpose: copies ipc message                                                *
1417  *                                                                            *
1418  * Parameters: dst - [IN] the destination message                             *
1419  *             src - [IN] the source message                                  *
1420  *                                                                            *
1421  ******************************************************************************/
zbx_ipc_message_copy(zbx_ipc_message_t * dst,const zbx_ipc_message_t * src)1422 void	zbx_ipc_message_copy(zbx_ipc_message_t *dst, const zbx_ipc_message_t *src)
1423 {
1424 	dst->code = src->code;
1425 	dst->size = src->size;
1426 	dst->data = (unsigned char *)zbx_malloc(NULL, src->size);
1427 	memcpy(dst->data, src->data, src->size);
1428 }
1429 
1430 /*
1431  * Public service API
1432  */
1433 
1434 /******************************************************************************
1435  *                                                                            *
1436  * Function: zbx_ipc_service_init_env                                         *
1437  *                                                                            *
1438  * Purpose: initializes IPC service environment                               *
1439  *                                                                            *
1440  * Parameters: path    - [IN] the service root path                           *
1441  *             error   - [OUT] the error message                              *
1442  *                                                                            *
1443  * Return value: SUCCEED - the environment was initialized successfully.      *
1444  *               FAIL    - otherwise                                          *
1445  *                                                                            *
1446  ******************************************************************************/
zbx_ipc_service_init_env(const char * path,char ** error)1447 int	zbx_ipc_service_init_env(const char *path, char **error)
1448 {
1449 	struct stat	fs;
1450 	int		ret = FAIL;
1451 
1452 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() path:%s", __func__, path);
1453 
1454 	if (0 != ipc_path_root_len)
1455 	{
1456 		*error = zbx_dsprintf(*error, "The IPC service environment has been already initialized with"
1457 				" root directory at \"%s\".", ipc_get_path());
1458 		goto out;
1459 	}
1460 
1461 	if (0 != stat(path, &fs))
1462 	{
1463 		*error = zbx_dsprintf(*error, "Failed to stat the specified path \"%s\": %s.", path,
1464 				zbx_strerror(errno));
1465 		goto out;
1466 	}
1467 
1468 	if (0 == S_ISDIR(fs.st_mode))
1469 	{
1470 		*error = zbx_dsprintf(*error, "The specified path \"%s\" is not a directory.", path);
1471 		goto out;
1472 	}
1473 
1474 	if (0 != access(path, W_OK | R_OK))
1475 	{
1476 		*error = zbx_dsprintf(*error, "Cannot access path \"%s\": %s.", path, zbx_strerror(errno));
1477 		goto out;
1478 	}
1479 
1480 	ipc_path_root_len = strlen(path);
1481 	if (ZBX_IPC_PATH_MAX < ipc_path_root_len + 3)
1482 	{
1483 		*error = zbx_dsprintf(*error, "The IPC root path \"%s\" is too long.", path);
1484 		goto out;
1485 	}
1486 
1487 	memcpy(ipc_path, path, ipc_path_root_len + 1);
1488 
1489 	while (1 < ipc_path_root_len && '/' == ipc_path[ipc_path_root_len - 1])
1490 		ipc_path[--ipc_path_root_len] = '\0';
1491 
1492 	ipc_service_init_libevent();
1493 
1494 	ret = SUCCEED;
1495 out:
1496 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1497 
1498 	return ret;
1499 }
1500 
1501 /******************************************************************************
1502  *                                                                            *
1503  * Function: zbx_ipc_service_free_env                                         *
1504  *                                                                            *
1505  * Purpose: frees IPC service environment                                     *
1506  *                                                                            *
1507  ******************************************************************************/
zbx_ipc_service_free_env(void)1508 void	zbx_ipc_service_free_env(void)
1509 {
1510 	ipc_service_free_libevent();
1511 }
1512 
1513 
1514 /******************************************************************************
1515  *                                                                            *
1516  * Function: zbx_ipc_service_start                                            *
1517  *                                                                            *
1518  * Purpose: starts IPC service on the specified path                          *
1519  *                                                                            *
1520  * Parameters: service      - [IN/OUT] the IPC service                        *
1521  *             service_name - [IN] the unix domain socket path                *
1522  *             error        - [OUT] the error message                         *
1523  *                                                                            *
1524  * Return value: SUCCEED - the service was initialized successfully.          *
1525  *               FAIL    - otherwise                                          *
1526  *                                                                            *
1527  ******************************************************************************/
zbx_ipc_service_start(zbx_ipc_service_t * service,const char * service_name,char ** error)1528 int	zbx_ipc_service_start(zbx_ipc_service_t *service, const char *service_name, char **error)
1529 {
1530 	struct sockaddr_un	addr;
1531 	const char		*socket_path;
1532 	int			ret = FAIL;
1533 	mode_t			mode;
1534 
1535 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() service:%s", __func__, service_name);
1536 
1537 	mode = umask(077);
1538 
1539 	if (NULL == (socket_path = ipc_make_path(service_name, error)))
1540 		goto out;
1541 
1542 	if (0 == access(socket_path, F_OK))
1543 	{
1544 		if (0 != access(socket_path, W_OK))
1545 		{
1546 			*error = zbx_dsprintf(*error, "The file \"%s\" is used by another process.", socket_path);
1547 			goto out;
1548 		}
1549 
1550 		if (SUCCEED == ipc_check_running_service(service_name))
1551 		{
1552 			*error = zbx_dsprintf(*error, "\"%s\" service is already running.", service_name);
1553 			goto out;
1554 		}
1555 
1556 		unlink(socket_path);
1557 	}
1558 
1559 	if (-1 == (service->fd = socket(AF_UNIX, SOCK_STREAM, 0)))
1560 	{
1561 		*error = zbx_dsprintf(*error, "Cannot create socket: %s.", zbx_strerror(errno));
1562 		goto out;
1563 	}
1564 
1565 	memset(&addr, 0, sizeof(addr));
1566 	addr.sun_family = AF_UNIX;
1567 	memcpy(addr.sun_path, socket_path, sizeof(addr.sun_path));
1568 
1569 	if (0 != bind(service->fd, (struct sockaddr*)&addr, sizeof(addr)))
1570 	{
1571 		*error = zbx_dsprintf(*error, "Cannot bind socket to \"%s\": %s.", socket_path, zbx_strerror(errno));
1572 		goto out;
1573 	}
1574 
1575 	if (0 != listen(service->fd, SOMAXCONN))
1576 	{
1577 		*error = zbx_dsprintf(*error, "Cannot listen socket: %s.", zbx_strerror(errno));
1578 		goto out;
1579 	}
1580 
1581 	service->path = zbx_strdup(NULL, service_name);
1582 	zbx_vector_ptr_create(&service->clients);
1583 	zbx_queue_ptr_create(&service->clients_recv);
1584 
1585 	service->ev = event_base_new();
1586 	service->ev_listener = event_new(service->ev, service->fd, EV_READ | EV_PERSIST,
1587 			ipc_service_client_connected_cb, service);
1588 	event_add(service->ev_listener, NULL);
1589 
1590 	service->ev_timer = event_new(service->ev, -1, 0, ipc_service_timer_cb, service);
1591 
1592 	ret = SUCCEED;
1593 out:
1594 	umask(mode);
1595 
1596 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1597 
1598 	return ret;
1599 }
1600 
1601 /******************************************************************************
1602  *                                                                            *
1603  * Function: zbx_ipc_service_close                                            *
1604  *                                                                            *
1605  * Purpose: closes IPC service and frees the resources allocated by it        *
1606  *                                                                            *
1607  * Parameters: service - [IN/OUT] the IPC service                             *
1608  *                                                                            *
1609  ******************************************************************************/
zbx_ipc_service_close(zbx_ipc_service_t * service)1610 void	zbx_ipc_service_close(zbx_ipc_service_t *service)
1611 {
1612 	int	i;
1613 
1614 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() path:%s", __func__, service->path);
1615 
1616 	close(service->fd);
1617 
1618 	for (i = 0; i < service->clients.values_num; i++)
1619 		ipc_client_free((zbx_ipc_client_t *)service->clients.values[i]);
1620 
1621 	zbx_free(service->path);
1622 
1623 	zbx_vector_ptr_destroy(&service->clients);
1624 	zbx_queue_ptr_destroy(&service->clients_recv);
1625 
1626 	event_free(service->ev_timer);
1627 	event_free(service->ev_listener);
1628 	event_base_free(service->ev);
1629 
1630 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1631 }
1632 
1633 /******************************************************************************
1634  *                                                                            *
1635  * Function: zbx_ipc_service_recv                                             *
1636  *                                                                            *
1637  * Purpose: receives ipc message from a connected client                      *
1638  *                                                                            *
1639  * Parameters: service - [IN] the IPC service                                 *
1640  *             timeout - [IN] the timeout in seconds, 0 is used for           *
1641  *                            nonblocking call and ZBX_IPC_WAIT_FOREVER is    *
1642  *                            used for blocking call without timeout          *
1643  *             client  - [OUT] the client that sent the message or            *
1644  *                             NULL if there are no messages and the          *
1645  *                             specified timeout passed.                      *
1646  *                             The client must be released by caller with     *
1647  *                             zbx_ipc_client_release() function.             *
1648  *             message - [OUT] the received message or NULL if the client     *
1649  *                             connection was closed.                         *
1650  *                             The message must be freed by caller with       *
1651  *                             ipc_message_free() function.                   *
1652  *                                                                            *
1653  * Return value: ZBX_IPC_RECV_IMMEDIATE - returned immediately without        *
1654  *                                        waiting for socket events           *
1655  *                                        (pending events are processed)      *
1656  *               ZBX_IPC_RECV_WAIT      - returned after receiving socket     *
1657  *                                        event                               *
1658  *               ZBX_IPC_RECV_TIMEOUT   - returned after timeout expired      *
1659  *                                                                            *
1660  ******************************************************************************/
zbx_ipc_service_recv(zbx_ipc_service_t * service,int timeout,zbx_ipc_client_t ** client,zbx_ipc_message_t ** message)1661 int	zbx_ipc_service_recv(zbx_ipc_service_t *service, int timeout, zbx_ipc_client_t **client,
1662 		zbx_ipc_message_t **message)
1663 {
1664 	int	ret, flags;
1665 
1666 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() timeout:%d", __func__, timeout);
1667 
1668 	if (timeout != 0 && SUCCEED == zbx_queue_ptr_empty(&service->clients_recv))
1669 	{
1670 		if (ZBX_IPC_WAIT_FOREVER != timeout)
1671 		{
1672 			struct timeval	tv = {timeout, 0};
1673 			evtimer_add(service->ev_timer, &tv);
1674 		}
1675 		flags = EVLOOP_ONCE;
1676 	}
1677 	else
1678 		flags = EVLOOP_NONBLOCK;
1679 
1680 	event_base_loop(service->ev, flags);
1681 
1682 	if (NULL != (*client = ipc_service_pop_client(service)))
1683 	{
1684 		if (NULL != (*message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&(*client)->rx_queue)))
1685 		{
1686 			if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_TRACE))
1687 			{
1688 				char	*data = NULL;
1689 
1690 				zbx_ipc_message_format(*message, &data);
1691 				zabbix_log(LOG_LEVEL_DEBUG, "%s() %s", __func__, data);
1692 
1693 				zbx_free(data);
1694 			}
1695 
1696 			ipc_service_push_client(service, *client);
1697 			zbx_ipc_client_addref(*client);
1698 		}
1699 
1700 		ret = (EVLOOP_NONBLOCK == flags ? ZBX_IPC_RECV_IMMEDIATE : ZBX_IPC_RECV_WAIT);
1701 	}
1702 	else
1703 	{	ret = ZBX_IPC_RECV_TIMEOUT;
1704 		*client = NULL;
1705 		*message = NULL;
1706 	}
1707 
1708 	evtimer_del(service->ev_timer);
1709 
1710 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%d", __func__, ret);
1711 
1712 	return ret;
1713 }
1714 
1715 /******************************************************************************
1716  *                                                                            *
1717  * Function: zbx_ipc_client_send                                              *
1718  *                                                                            *
1719  * Purpose: Sends IPC message to client                                       *
1720  *                                                                            *
1721  * Parameters: client - [IN] the IPC client                                   *
1722  *             code   - [IN] the message code                                 *
1723  *             data   - [IN] the data                                         *
1724  *             size   - [IN] the data size                                    *
1725  *                                                                            *
1726  * Comments: If data can't be written directly to socket (buffer full) then   *
1727  *           the message is queued and sent during zbx_ipc_service_recv()     *
1728  *           messaging loop whenever socket becomes ready.                    *
1729  *                                                                            *
1730  ******************************************************************************/
zbx_ipc_client_send(zbx_ipc_client_t * client,zbx_uint32_t code,const unsigned char * data,zbx_uint32_t size)1731 int	zbx_ipc_client_send(zbx_ipc_client_t *client, zbx_uint32_t code, const unsigned char *data, zbx_uint32_t size)
1732 {
1733 	zbx_uint32_t		tx_size = 0;
1734 	zbx_ipc_message_t	*message;
1735 	int			ret = FAIL;
1736 
1737 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() clientid:" ZBX_FS_UI64, __func__, client->id);
1738 
1739 	if (0 != client->tx_bytes)
1740 	{
1741 		message = ipc_message_create(code, data, size);
1742 		zbx_queue_ptr_push(&client->tx_queue, message);
1743 		ret = SUCCEED;
1744 		goto out;
1745 	}
1746 
1747 	if (FAIL == ipc_socket_write_message(&client->csocket, code, data, size, &tx_size))
1748 		goto out;
1749 
1750 	if (tx_size != ZBX_IPC_HEADER_SIZE + size)
1751 	{
1752 		client->tx_header[ZBX_IPC_MESSAGE_CODE] = code;
1753 		client->tx_header[ZBX_IPC_MESSAGE_SIZE] = size;
1754 		client->tx_data = (unsigned char *)zbx_malloc(NULL, size);
1755 		memcpy(client->tx_data, data, size);
1756 		client->tx_bytes = ZBX_IPC_HEADER_SIZE + size - tx_size;
1757 		event_add(client->tx_event, NULL);
1758 	}
1759 
1760 	ret = SUCCEED;
1761 out:
1762 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1763 
1764 	return ret;
1765 }
1766 
1767 /******************************************************************************
1768  *                                                                            *
1769  * Function: zbx_ipc_client_close                                             *
1770  *                                                                            *
1771  * Purpose: closes client socket and frees resources allocated for client     *
1772  *                                                                            *
1773  * Parameters: client - [IN] the IPC client                                   *
1774  *                                                                            *
1775  ******************************************************************************/
zbx_ipc_client_close(zbx_ipc_client_t * client)1776 void	zbx_ipc_client_close(zbx_ipc_client_t *client)
1777 {
1778 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1779 
1780 	ipc_client_free_events(client);
1781 	zbx_ipc_socket_close(&client->csocket);
1782 
1783 	ipc_service_remove_client(client->service, client);
1784 	zbx_queue_ptr_remove_value(&client->service->clients_recv, client);
1785 	zbx_ipc_client_release(client);
1786 
1787 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1788 }
1789 
zbx_ipc_client_addref(zbx_ipc_client_t * client)1790 void	zbx_ipc_client_addref(zbx_ipc_client_t *client)
1791 {
1792 	client->refcount++;
1793 }
1794 
zbx_ipc_client_release(zbx_ipc_client_t * client)1795 void	zbx_ipc_client_release(zbx_ipc_client_t *client)
1796 {
1797 	if (0 == --client->refcount)
1798 		ipc_client_free(client);
1799 }
1800 
zbx_ipc_client_connected(zbx_ipc_client_t * client)1801 int	zbx_ipc_client_connected(zbx_ipc_client_t *client)
1802 {
1803 	return (NULL == client->rx_event ? FAIL : SUCCEED);
1804 }
1805 
zbx_ipc_client_id(const zbx_ipc_client_t * client)1806 zbx_uint64_t	zbx_ipc_client_id(const zbx_ipc_client_t *client)
1807 {
1808 	return client->id;
1809 }
1810 
zbx_ipc_client_set_userdata(zbx_ipc_client_t * client,void * userdata)1811 void	zbx_ipc_client_set_userdata(zbx_ipc_client_t *client, void *userdata)
1812 {
1813 	client->userdata = userdata;
1814 }
1815 
zbx_ipc_client_get_userdata(zbx_ipc_client_t * client)1816 void	*zbx_ipc_client_get_userdata(zbx_ipc_client_t *client)
1817 {
1818 	return client->userdata;
1819 }
1820 
1821 /******************************************************************************
1822  *                                                                            *
1823  * Function: zbx_ipc_async_socket_open                                        *
1824  *                                                                            *
1825  * Purpose: opens asynchronous socket to IPC service client                   *
1826  *                                                                            *
1827  * Parameters: client       - [OUT] the IPC service client                    *
1828  *             service_name - [IN] the IPC service name                       *
1829  *             timeout      - [IN] the connection timeout                     *
1830  *             error        - [OUT] the error message                         *
1831  *                                                                            *
1832  * Return value: SUCCEED - the socket was successfully opened                 *
1833  *               FAIL    - otherwise                                          *
1834  *                                                                            *
1835  ******************************************************************************/
zbx_ipc_async_socket_open(zbx_ipc_async_socket_t * asocket,const char * service_name,int timeout,char ** error)1836 int	zbx_ipc_async_socket_open(zbx_ipc_async_socket_t *asocket, const char *service_name, int timeout, char **error)
1837 {
1838 	int	ret = FAIL, flags;
1839 
1840 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1841 
1842 	memset(asocket, 0, sizeof(zbx_ipc_async_socket_t));
1843 	asocket->client = (zbx_ipc_client_t *)zbx_malloc(NULL, sizeof(zbx_ipc_client_t));
1844 	memset(asocket->client, 0, sizeof(zbx_ipc_client_t));
1845 
1846 	if (SUCCEED != zbx_ipc_socket_open(&asocket->client->csocket, service_name, timeout, error))
1847 	{
1848 		zbx_free(asocket->client);
1849 		goto out;
1850 	}
1851 
1852 	if (-1 == (flags = fcntl(asocket->client->csocket.fd, F_GETFL, 0)))
1853 	{
1854 		zabbix_log(LOG_LEVEL_CRIT, "cannot get IPC client socket flags");
1855 		exit(EXIT_FAILURE);
1856 	}
1857 
1858 	if (-1 == fcntl(asocket->client->csocket.fd, F_SETFL, flags | O_NONBLOCK))
1859 	{
1860 		zabbix_log(LOG_LEVEL_CRIT, "cannot set non-blocking mode for IPC client socket");
1861 		exit(EXIT_FAILURE);
1862 	}
1863 
1864 	asocket->ev = event_base_new();
1865 	asocket->ev_timer = event_new(asocket->ev, -1, 0, ipc_async_socket_timer_cb, asocket);
1866 	asocket->client->rx_event = event_new(asocket->ev, asocket->client->csocket.fd, EV_READ | EV_PERSIST,
1867 			ipc_async_socket_read_event_cb, (void *)asocket);
1868 	asocket->client->tx_event = event_new(asocket->ev, asocket->client->csocket.fd, EV_WRITE | EV_PERSIST,
1869 			ipc_async_socket_write_event_cb, (void *)asocket);
1870 	event_add(asocket->client->rx_event, NULL);
1871 
1872 	asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_NONE;
1873 
1874 	ret = SUCCEED;
1875 out:
1876 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1877 	return ret;
1878 }
1879 
1880 /******************************************************************************
1881  *                                                                            *
1882  * Function: zbx_ipc_async_socket_close                                       *
1883  *                                                                            *
1884  * Purpose: closes asynchronous IPC socket and frees allocated resources      *
1885  *                                                                            *
1886  * Parameters: asocket - [IN] the asynchronous IPC socket                     *
1887  *                                                                            *
1888  ******************************************************************************/
zbx_ipc_async_socket_close(zbx_ipc_async_socket_t * asocket)1889 void	zbx_ipc_async_socket_close(zbx_ipc_async_socket_t *asocket)
1890 {
1891 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1892 
1893 	ipc_client_free(asocket->client);
1894 	asocket->client = NULL;
1895 
1896 	event_free(asocket->ev_timer);
1897 	event_base_free(asocket->ev);
1898 
1899 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1900 }
1901 
1902 /******************************************************************************
1903  *                                                                            *
1904  * Function: zbx_ipc_async_socket_send                                        *
1905  *                                                                            *
1906  * Purpose: Sends message through asynchronous IPC socket                     *
1907  *                                                                            *
1908  * Parameters: asocket - [IN] the asynchronous IPC socket                     *
1909  *             code    - [IN] the message code                                *
1910  *             data    - [IN] the data                                        *
1911  *             size    - [IN] the data size                                   *
1912  *                                                                            *
1913  * Comments: If data can't be written directly to socket (buffer full) then   *
1914  *           the message is queued and sent during zbx_ipc_async_socket_recv()*
1915  *           or zbx_ipc_async_socket_flush() functions whenever socket becomes*
1916  *           ready.                                                           *
1917  *                                                                            *
1918  ******************************************************************************/
zbx_ipc_async_socket_send(zbx_ipc_async_socket_t * asocket,zbx_uint32_t code,const unsigned char * data,zbx_uint32_t size)1919 int	zbx_ipc_async_socket_send(zbx_ipc_async_socket_t *asocket, zbx_uint32_t code, const unsigned char *data,
1920 		zbx_uint32_t size)
1921 {
1922 	int	ret = FAIL;
1923 
1924 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1925 
1926 	ret = zbx_ipc_client_send(asocket->client, code, data, size);
1927 
1928 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1929 
1930 	return ret;
1931 }
1932 
1933 /******************************************************************************
1934  *                                                                            *
1935  * Function: zbx_ipc_async_socket_recv                                        *
1936  *                                                                            *
1937  * Purpose: receives message through asynchronous IPC socket                  *
1938  *                                                                            *
1939  * Parameters: asocket - [IN] the asynchronous IPC socket                     *
1940  *             timeout - [IN] the timeout in seconds, 0 is used for           *
1941  *                            nonblocking call and ZBX_IPC_WAIT_FOREVER is    *
1942  *                            used for blocking call without timeout          *
1943  *             message - [OUT] the received message or NULL if the client     *
1944  *                             connection was closed.                         *
1945  *                             The message must be freed by caller with       *
1946  *                             ipc_message_free() function.                   *
1947  *                                                                            *
1948  * Return value: SUCCEED - the message was read successfully or timeout       *
1949  *                         occurred                                           *
1950  *               FAIL    - otherwise                                          *
1951  *                                                                            *
1952  * Comments: After socket has been closed (or connection error has occurred)  *
1953  *           calls to zbx_ipc_client_read() will return success with buffered *
1954  *           messages, until all buffered messages are retrieved.             *
1955  *                                                                            *
1956  ******************************************************************************/
zbx_ipc_async_socket_recv(zbx_ipc_async_socket_t * asocket,int timeout,zbx_ipc_message_t ** message)1957 int	zbx_ipc_async_socket_recv(zbx_ipc_async_socket_t *asocket, int timeout, zbx_ipc_message_t **message)
1958 {
1959 	int	ret, flags;
1960 
1961 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() timeout:%d", __func__, timeout);
1962 
1963 	if (timeout != 0 && SUCCEED == zbx_queue_ptr_empty(&asocket->client->rx_queue))
1964 	{
1965 		if (ZBX_IPC_WAIT_FOREVER != timeout)
1966 		{
1967 			struct timeval	tv = {timeout, 0};
1968 			evtimer_add(asocket->ev_timer, &tv);
1969 		}
1970 		flags = EVLOOP_ONCE;
1971 	}
1972 	else
1973 		flags = EVLOOP_NONBLOCK;
1974 
1975 	asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_NONE;
1976 
1977 	do
1978 	{
1979 		event_base_loop(asocket->ev, flags);
1980 		*message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&asocket->client->rx_queue);
1981 	}
1982 	while (NULL == *message && ZBX_IPC_ASYNC_SOCKET_STATE_NONE == asocket->state);
1983 
1984 	if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_TRACE) && NULL != *message)
1985 	{
1986 		char	*data = NULL;
1987 
1988 		zbx_ipc_message_format(*message, &data);
1989 		zabbix_log(LOG_LEVEL_DEBUG, "%s() %s", __func__, data);
1990 
1991 		zbx_free(data);
1992 	}
1993 
1994 	if (NULL != *message || ZBX_IPC_ASYNC_SOCKET_STATE_ERROR != asocket->state)
1995 		ret = SUCCEED;
1996 	else
1997 		ret = FAIL;
1998 
1999 	evtimer_del(asocket->ev_timer);
2000 
2001 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%d", __func__, ret);
2002 
2003 	return ret;
2004 }
2005 
2006 /******************************************************************************
2007  *                                                                            *
2008  * Function: zbx_ipc_async_socket_flush                                       *
2009  *                                                                            *
2010  * Purpose: flushes unsent through asynchronous IPC socket                    *
2011  *                                                                            *
2012  * Parameters: asocket - [IN] the asynchronous IPC service socket             *
2013  *             timeout - [IN] the timeout in seconds, 0 is used for           *
2014  *                            nonblocking call and ZBX_IPC_WAIT_FOREVER is    *
2015  *                            used for blocking call without timeout          *
2016  *                                                                            *
2017  * Return value: SUCCEED - the data was flushed successfully or timeout       *
2018  *                         occurred. Use zbx_ipc_client_unsent_data() to      *
2019  *                         check if all data was sent.                        *
2020  *               FAIL    - failed to send data (connection was closed or an   *
2021  *                         error occurred).                                   *
2022  *                                                                            *
2023  ******************************************************************************/
zbx_ipc_async_socket_flush(zbx_ipc_async_socket_t * asocket,int timeout)2024 int	zbx_ipc_async_socket_flush(zbx_ipc_async_socket_t *asocket, int timeout)
2025 {
2026 	int	ret = FAIL, flags;
2027 
2028 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() timeout:%d", __func__, timeout);
2029 
2030 	if (0 == asocket->client->tx_bytes)
2031 	{
2032 		ret = SUCCEED;
2033 		goto out;
2034 	}
2035 
2036 	if (ZBX_IPC_ASYNC_SOCKET_STATE_ERROR == asocket->state)
2037 		goto out;
2038 
2039 	asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_NONE;
2040 
2041 	if (0 != timeout)
2042 	{
2043 		if (ZBX_IPC_WAIT_FOREVER != timeout)
2044 		{
2045 			struct timeval	tv = {timeout, 0};
2046 			evtimer_add(asocket->ev_timer, &tv);
2047 		}
2048 		flags = EVLOOP_ONCE;
2049 	}
2050 	else
2051 		flags = EVLOOP_NONBLOCK;
2052 
2053 	do
2054 	{
2055 		event_base_loop(asocket->ev, flags);
2056 
2057 		if (SUCCEED != zbx_ipc_client_connected(asocket->client))
2058 			goto out;
2059 	}
2060 	while (0 != timeout && 0 != asocket->client->tx_bytes && ZBX_IPC_ASYNC_SOCKET_STATE_NONE == asocket->state);
2061 
2062 	if (ZBX_IPC_ASYNC_SOCKET_STATE_ERROR != asocket->state)
2063 	{
2064 		ret = SUCCEED;
2065 		asocket->state = ZBX_IPC_CLIENT_STATE_NONE;
2066 	}
2067 out:
2068 	evtimer_del(asocket->ev_timer);
2069 
2070 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%d", __func__, ret);
2071 
2072 	return ret;
2073 }
2074 
2075 /******************************************************************************
2076  *                                                                            *
2077  * Function: zbx_ipc_async_socket_check_unsent                                *
2078  *                                                                            *
2079  * Purpose: check if there are data to be sent                                *
2080  *                                                                            *
2081  * Parameters: asocket - [IN] the asynchronous IPC service socket             *
2082  *                                                                            *
2083  * Return value: SUCCEED - there are messages queued to be sent               *
2084  *               FAIL    - all data has been sent                             *
2085  *                                                                            *
2086  ******************************************************************************/
zbx_ipc_async_socket_check_unsent(zbx_ipc_async_socket_t * asocket)2087 int	zbx_ipc_async_socket_check_unsent(zbx_ipc_async_socket_t *asocket)
2088 {
2089 	return (0 == asocket->client->tx_bytes ? FAIL : SUCCEED);
2090 }
2091 
2092 /******************************************************************************
2093  *                                                                            *
2094  * Function: zbx_ipc_async_socket_connected                                   *
2095  *                                                                            *
2096  * Purpose: check if socket is connected                                      *
2097  *                                                                            *
2098  * Parameters: asocket - [IN] the asynchronous IPC service socket             *
2099  *                                                                            *
2100  * Return value: SUCCEED - socket is connected                                *
2101  *               FAIL    - otherwise                                          *
2102  *                                                                            *
2103  ******************************************************************************/
zbx_ipc_async_socket_connected(zbx_ipc_async_socket_t * asocket)2104 int	zbx_ipc_async_socket_connected(zbx_ipc_async_socket_t *asocket)
2105 {
2106 	if (NULL == asocket->client)
2107 		return FAIL;
2108 
2109 	return zbx_ipc_client_connected(asocket->client);
2110 }
2111 
2112 /******************************************************************************
2113  *                                                                            *
2114  * Function: zbx_ipc_async_exchange                                           *
2115  *                                                                            *
2116  * Purpose: connect, send message and receive response in a given timeout     *
2117  *                                                                            *
2118  * Parameters: service_name - [IN] the IPC service name                       *
2119  *             code         - [IN] the message code                           *
2120  *             timeout      - [IN] time allowed to be spent on receive, note  *
2121  *                                 that this does not include open, send and  *
2122  *                                 flush that have their own timeouts         *
2123  *             data         - [IN] the data                                   *
2124  *             size         - [IN] the data size                              *
2125  *             out          - [OUT] the received message or NULL on error     *
2126  *                                  The message must be freed by zbx_free()   *
2127  *             error        - [OUT] the error message                         *
2128  *                                                                            *
2129  * Return value: SUCCEED - successfully sent message and received response    *
2130  *               FAIL    - error occurred                                     *
2131  *                                                                            *
2132  ******************************************************************************/
zbx_ipc_async_exchange(const char * service_name,zbx_uint32_t code,int timeout,const unsigned char * data,zbx_uint32_t size,unsigned char ** out,char ** error)2133 int	zbx_ipc_async_exchange(const char *service_name, zbx_uint32_t code, int timeout, const unsigned char *data,
2134 		zbx_uint32_t size, unsigned char **out, char **error)
2135 {
2136 	zbx_ipc_message_t	*message;
2137 	zbx_ipc_async_socket_t	asocket;
2138 	int			ret = FAIL;
2139 
2140 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() service:'%s' code:%u timeout:%d", __func__, service_name, code, timeout);
2141 
2142 	if (FAIL == zbx_ipc_async_socket_open(&asocket, service_name, timeout, error))
2143 		goto out;
2144 
2145 	if (FAIL == zbx_ipc_async_socket_send(&asocket, code, data, size))
2146 	{
2147 		*error = zbx_strdup(NULL, "Cannot send request");
2148 		goto fail;
2149 	}
2150 
2151 	if (FAIL == zbx_ipc_async_socket_flush(&asocket, timeout))
2152 	{
2153 		*error = zbx_strdup(NULL, "Cannot flush request");
2154 		goto fail;
2155 	}
2156 
2157 	if (FAIL == zbx_ipc_async_socket_recv(&asocket, timeout, &message))
2158 	{
2159 		*error = zbx_strdup(NULL, "Cannot receive response");
2160 		goto fail;
2161 	}
2162 
2163 	if (NULL == message)
2164 	{
2165 		*error = zbx_strdup(NULL, "Timeout while waiting for response");
2166 		goto fail;
2167 	}
2168 
2169 	*out = message->data;
2170 	message->data = NULL;
2171 
2172 	zbx_ipc_message_free(message);
2173 	ret = SUCCEED;
2174 fail:
2175 	zbx_ipc_async_socket_close(&asocket);
2176 out:
2177 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
2178 	return ret;
2179 }
2180 
2181 #endif
2182