1 #include "common.h"
2 
3 #ifdef HAVE_IPCSERVICE
4 
5 #ifdef HAVE_LIBEVENT
6 #	include <event.h>
7 #endif
8 
9 #include "zbxtypes.h"
10 #include "zbxalgo.h"
11 #include "log.h"
12 #include "zbxipcservice.h"
13 
14 #define ZBX_IPC_PATH_MAX	sizeof(((struct sockaddr_un *)0)->sun_path)
15 
16 #define ZBX_IPC_DATA_DUMP_SIZE		128
17 
18 static char	ipc_path[ZBX_IPC_PATH_MAX] = {0};
19 static size_t	ipc_path_root_len = 0;
20 
21 #define ZBX_IPC_CLIENT_STATE_NONE	0
22 #define ZBX_IPC_CLIENT_STATE_QUEUED	1
23 
24 #define ZBX_IPC_ASYNC_SOCKET_STATE_NONE		0
25 #define ZBX_IPC_ASYNC_SOCKET_STATE_TIMEOUT	1
26 #define ZBX_IPC_ASYNC_SOCKET_STATE_ERROR	2
27 
28 extern unsigned char	program_type;
29 
30 /* IPC client, providing nonblocking connections through socket */
31 struct zbx_ipc_client
32 {
33 	zbx_ipc_socket_t	csocket;
34 	zbx_ipc_service_t	*service;
35 
36 	zbx_uint32_t		rx_header[2];
37 	unsigned char		*rx_data;
38 	zbx_uint32_t		rx_bytes;
39 	zbx_queue_ptr_t		rx_queue;
40 	struct event		*rx_event;
41 
42 	zbx_uint32_t		tx_header[2];
43 	unsigned char		*tx_data;
44 	zbx_uint32_t		tx_bytes;
45 	zbx_queue_ptr_t		tx_queue;
46 	struct event		*tx_event;
47 
48 	zbx_uint64_t		id;
49 	unsigned char		state;
50 
51 	zbx_uint32_t		refcount;
52 };
53 
54 /*
55  * Private API
56  */
57 
58 #define ZBX_IPC_HEADER_SIZE	(int)(sizeof(zbx_uint32_t) * 2)
59 
60 #define ZBX_IPC_MESSAGE_CODE	0
61 #define ZBX_IPC_MESSAGE_SIZE	1
62 
63 #if !defined(LIBEVENT_VERSION_NUMBER) || LIBEVENT_VERSION_NUMBER < 0x2000000
64 typedef int evutil_socket_t;
65 
event_new(struct event_base * ev,evutil_socket_t fd,short what,void (* cb_func)(int,short,void *),void * cb_arg)66 static struct event	*event_new(struct event_base *ev, evutil_socket_t fd, short what,
67 		void(*cb_func)(int, short, void *), void *cb_arg)
68 {
69 	struct event	*event;
70 
71 	event = zbx_malloc(NULL, sizeof(struct event));
72 	event_set(event, fd, what, cb_func, cb_arg);
73 	event_base_set(ev, event);
74 
75 	return event;
76 }
77 
event_free(struct event * event)78 static void	event_free(struct event *event)
79 {
80 	event_del(event);
81 	zbx_free(event);
82 }
83 
84 #endif
85 
86 static void	ipc_client_read_event_cb(evutil_socket_t fd, short what, void *arg);
87 static void	ipc_client_write_event_cb(evutil_socket_t fd, short what, void *arg);
88 
ipc_get_path(void)89 static const char	*ipc_get_path(void)
90 {
91 	ipc_path[ipc_path_root_len] = '\0';
92 
93 	return ipc_path;
94 }
95 
96 #define ZBX_IPC_SOCKET_PREFIX	"/zabbix_"
97 #define ZBX_IPC_SOCKET_SUFFIX	".sock"
98 
99 #define ZBX_IPC_CLASS_PREFIX_NONE	""
100 #define ZBX_IPC_CLASS_PREFIX_SERVER	"server_"
101 #define ZBX_IPC_CLASS_PREFIX_PROXY	"proxy_"
102 #define ZBX_IPC_CLASS_PREFIX_AGENT	"agent_"
103 
104 /******************************************************************************
105  *                                                                            *
106  * Function: ipc_make_path                                                    *
107  *                                                                            *
108  * Purpose: makes socket path from the service name                           *
109  *                                                                            *
110  * Parameters: service_name - [IN] the service name                           *
111  *             error        - [OUT] the error message                         *
112  *                                                                            *
113  * Return value: The created path or NULL if the path exceeds unix domain     *
114  *               socket path maximum length                                   *
115  *                                                                            *
116  ******************************************************************************/
ipc_make_path(const char * service_name,char ** error)117 static const char	*ipc_make_path(const char *service_name, char **error)
118 {
119 	const char	*prefix;
120 	size_t		path_len, offset, prefix_len;
121 
122 	path_len = strlen(service_name);
123 
124 	switch (program_type)
125 	{
126 		case ZBX_PROGRAM_TYPE_SERVER:
127 			prefix = ZBX_IPC_CLASS_PREFIX_SERVER;
128 			prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_SERVER);
129 			break;
130 		case ZBX_PROGRAM_TYPE_PROXY_ACTIVE:
131 		case ZBX_PROGRAM_TYPE_PROXY_PASSIVE:
132 			prefix = ZBX_IPC_CLASS_PREFIX_PROXY;
133 			prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_PROXY);
134 			break;
135 		case ZBX_PROGRAM_TYPE_AGENTD:
136 			prefix = ZBX_IPC_CLASS_PREFIX_AGENT;
137 			prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_AGENT);
138 			break;
139 		default:
140 			prefix = ZBX_IPC_CLASS_PREFIX_NONE;
141 			prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_NONE);
142 			break;
143 	}
144 
145 	if (ZBX_IPC_PATH_MAX < ipc_path_root_len + path_len + 1 + ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_PREFIX) +
146 			ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_SUFFIX) + prefix_len)
147 	{
148 		*error = zbx_dsprintf(*error,
149 				"Socket path \"%s%s%s%s%s\" exceeds maximum length of unix domain socket path.",
150 				ipc_path, ZBX_IPC_SOCKET_PREFIX, prefix, service_name, ZBX_IPC_SOCKET_SUFFIX);
151 		return NULL;
152 	}
153 
154 	offset = ipc_path_root_len;
155 	memcpy(ipc_path + offset , ZBX_IPC_SOCKET_PREFIX, ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_PREFIX));
156 	offset += ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_PREFIX);
157 	memcpy(ipc_path + offset, prefix, prefix_len);
158 	offset += prefix_len;
159 	memcpy(ipc_path + offset, service_name, path_len);
160 	offset += path_len;
161 	memcpy(ipc_path + offset, ZBX_IPC_SOCKET_SUFFIX, ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_SUFFIX) + 1);
162 
163 	return ipc_path;
164 }
165 
166 /******************************************************************************
167  *                                                                            *
168  * Function: ipc_write_data                                                   *
169  *                                                                            *
170  * Purpose: writes data to a socket                                           *
171  *                                                                            *
172  * Parameters: fd        - [IN] the socket file descriptor                    *
173  *             data      - [IN] the data                                      *
174  *             size      - [IN] the data size                                 *
175  *             size_sent - [IN] the actual size written to socket             *
176  *                                                                            *
177  * Return value: SUCCEED - no socket errors were detected. Either the data or *
178  *                         a part of it was written to socket or a write to   *
179  *                         non-blocking socket would block                    *
180  *               FAIL    - otherwise                                          *
181  *                                                                            *
182  ******************************************************************************/
ipc_write_data(int fd,const unsigned char * data,zbx_uint32_t size,zbx_uint32_t * size_sent)183 static int	ipc_write_data(int fd, const unsigned char *data, zbx_uint32_t size, zbx_uint32_t *size_sent)
184 {
185 	zbx_uint32_t	offset = 0;
186 	int		n, ret = SUCCEED;
187 
188 	while (offset != size)
189 	{
190 		n = write(fd, data + offset, size - offset);
191 
192 		if (-1 == n)
193 		{
194 			if (EINTR == errno)
195 				continue;
196 
197 			if (EWOULDBLOCK == errno || EAGAIN == errno)
198 				break;
199 
200 			zabbix_log(LOG_LEVEL_WARNING, "cannot write to IPC socket: %s", strerror(errno));
201 			ret = FAIL;
202 			break;
203 		}
204 		offset += n;
205 	}
206 
207 	*size_sent = offset;
208 
209 	return ret;
210 }
211 
212 /******************************************************************************
213  *                                                                            *
214  * Function: ipc_read_data                                                    *
215  *                                                                            *
216  * Purpose: reads data from a socket                                          *
217  *                                                                            *
218  * Parameters: fd        - [IN] the socket file descriptor                    *
219  *             data      - [IN] the data                                      *
220  *             size      - [IN] the data size                                 *
221  *             size_sent - [IN] the actual size read from socket              *
222  *                                                                            *
223  * Return value: SUCCEED - the data was successfully read                     *
224  *               FAIL    - otherwise                                          *
225  *                                                                            *
226  * Comments: When reading data from non-blocking sockets SUCCEED will be      *
227  *           returned also if there were no more data to read.                *
228  *                                                                            *
229  ******************************************************************************/
ipc_read_data(int fd,unsigned char * buffer,zbx_uint32_t size,zbx_uint32_t * read_size)230 static int	ipc_read_data(int fd, unsigned char *buffer, zbx_uint32_t size, zbx_uint32_t *read_size)
231 {
232 	int	n;
233 
234 	*read_size = 0;
235 
236 	while (-1 == (n = read(fd, buffer + *read_size, size - *read_size)))
237 	{
238 		if (EINTR == errno)
239 			continue;
240 
241 		if (EWOULDBLOCK == errno || EAGAIN == errno)
242 			return SUCCEED;
243 
244 		return FAIL;
245 	}
246 
247 	if (0 == n)
248 		return FAIL;
249 
250 	*read_size += n;
251 
252 	return SUCCEED;
253 }
254 
255 /******************************************************************************
256  *                                                                            *
257  * Function: ipc_read_data_full                                               *
258  *                                                                            *
259  * Purpose: reads data from a socket until the requested data has been read   *
260  *                                                                            *
261  * Parameters: fd        - [IN] the socket file descriptor                    *
262  *             buffer    - [IN] the data                                      *
263  *             size      - [IN] the data size                                 *
264  *             read_size - [IN] the actual size read from socket              *
265  *                                                                            *
266  * Return value: SUCCEED - the data was successfully read                     *
267  *               FAIL    - otherwise                                          *
268  *                                                                            *
269  * Comments: When reading data from non-blocking sockets this function will   *
270  *           return SUCCEED if there are no data to read, even if not all of  *
271  *           the requested data has been read.                                *
272  *                                                                            *
273  ******************************************************************************/
ipc_read_data_full(int fd,unsigned char * buffer,zbx_uint32_t size,zbx_uint32_t * read_size)274 static int	ipc_read_data_full(int fd, unsigned char *buffer, zbx_uint32_t size, zbx_uint32_t *read_size)
275 {
276 	int		ret = FAIL;
277 	zbx_uint32_t	offset = 0, chunk_size;
278 
279 	*read_size = 0;
280 
281 	while (offset < size)
282 	{
283 		if (FAIL == ipc_read_data(fd, buffer + offset, size - offset, &chunk_size))
284 			goto out;
285 
286 		if (0 == chunk_size)
287 			break;
288 
289 		offset += chunk_size;
290 	}
291 
292 	ret = SUCCEED;
293 out:
294 	*read_size = offset;
295 
296 	return ret;
297 }
298 
299 /******************************************************************************
300  *                                                                            *
301  * Function: ipc_socket_write_message                                         *
302  *                                                                            *
303  * Purpose: writes IPC message to socket                                      *
304  *                                                                            *
305  * Parameters: csocket - [IN] the IPC socket                                  *
306  *             code    - [IN] the message code                                *
307  *             data    - [IN] the data                                        *
308  *             size    - [IN] the data size                                   *
309  *             tx_size - [IN] the actual size written to socket               *
310  *                                                                            *
311  * Return value: SUCCEED - no socket errors were detected. Either the data or *
312  *                         a part of it was written to socket or a write to   *
313  *                         non-blocking socket would block                    *
314  *               FAIL    - otherwise                                          *
315  *                                                                            *
316  * Comments: When using non-blocking sockets the tx_size parameter must be    *
317  *           checked in addition to return value to tell if the message was   *
318  *           sent successfully.                                               *
319  *                                                                            *
320  ******************************************************************************/
ipc_socket_write_message(zbx_ipc_socket_t * csocket,zbx_uint32_t code,const unsigned char * data,zbx_uint32_t size,zbx_uint32_t * tx_size)321 static int	ipc_socket_write_message(zbx_ipc_socket_t *csocket, zbx_uint32_t code, const unsigned char *data,
322 		zbx_uint32_t size, zbx_uint32_t *tx_size)
323 {
324 	int		ret;
325 	zbx_uint32_t	size_data, buffer[ZBX_IPC_SOCKET_BUFFER_SIZE / sizeof(zbx_uint32_t)];
326 
327 	buffer[0] = code;
328 	buffer[1] = size;
329 
330 	if (ZBX_IPC_SOCKET_BUFFER_SIZE - ZBX_IPC_HEADER_SIZE >= size)
331 	{
332 		if (0 != size)
333 			memcpy(buffer + 2, data, size);
334 
335 		return ipc_write_data(csocket->fd, (unsigned char *)buffer, size + ZBX_IPC_HEADER_SIZE, tx_size);
336 	}
337 
338 	if (FAIL == ipc_write_data(csocket->fd, (unsigned char *)buffer, ZBX_IPC_HEADER_SIZE, tx_size))
339 		return FAIL;
340 
341 	/* in the case of non-blocking sockets only a part of the header might be sent */
342 	if (ZBX_IPC_HEADER_SIZE != *tx_size)
343 		return SUCCEED;
344 
345 	ret = ipc_write_data(csocket->fd, data, size, &size_data);
346 	*tx_size += size_data;
347 
348 	return ret;
349 }
350 
351 /******************************************************************************
352  *                                                                            *
353  * Function: ipc_read_buffer                                                  *
354  *                                                                            *
355  * Purpose: reads message header and data from buffer                         *
356  *                                                                            *
357  * Parameters: header      - [IN/OUT] the message header                      *
358  *             data        - [OUT] the message data                           *
359  *             rx_bytes    - [IN] the number of bytes stored in message       *
360  *                                (including header)                          *
361  *             buffer      - [IN] the buffer to parse                         *
362  *             size        - [IN] the number of bytes to parse                *
363  *             read_size   - [OUT] the number of bytes read                   *
364  *                                                                            *
365  * Return value: SUCCEED - message was successfully parsed                    *
366  *               FAIL - not enough data                                       *
367  *                                                                            *
368  ******************************************************************************/
ipc_read_buffer(zbx_uint32_t * header,unsigned char ** data,zbx_uint32_t rx_bytes,const unsigned char * buffer,zbx_uint32_t size,zbx_uint32_t * read_size)369 static int	ipc_read_buffer(zbx_uint32_t *header, unsigned char **data, zbx_uint32_t rx_bytes,
370 		const unsigned char *buffer, zbx_uint32_t size, zbx_uint32_t *read_size)
371 {
372 	zbx_uint32_t	copy_size, data_size, data_offset;
373 
374 	*read_size = 0;
375 
376 	if (ZBX_IPC_HEADER_SIZE > rx_bytes)
377 	{
378 		copy_size = MIN(ZBX_IPC_HEADER_SIZE - rx_bytes, size);
379 		memcpy((char *)header + rx_bytes, buffer, copy_size);
380 		*read_size += copy_size;
381 
382 		if (ZBX_IPC_HEADER_SIZE > rx_bytes + copy_size)
383 			return FAIL;
384 
385 		data_size = header[ZBX_IPC_MESSAGE_SIZE];
386 
387 		if (0 == data_size)
388 		{
389 			*data = NULL;
390 			return SUCCEED;
391 		}
392 
393 		*data = (unsigned char *)zbx_malloc(NULL, data_size);
394 		data_offset = 0;
395 	}
396 	else
397 	{
398 		data_size = header[ZBX_IPC_MESSAGE_SIZE];
399 		data_offset = rx_bytes - ZBX_IPC_HEADER_SIZE;
400 	}
401 
402 	copy_size = MIN(data_size - data_offset, size - *read_size);
403 	memcpy(*data + data_offset, buffer + *read_size, copy_size);
404 	*read_size += copy_size;
405 
406 	return (rx_bytes + *read_size == data_size + ZBX_IPC_HEADER_SIZE ? SUCCEED : FAIL);
407 }
408 
409 /******************************************************************************
410  *                                                                            *
411  * Function: ipc_message_is_completed                                         *
412  *                                                                            *
413  * Purpose: checks if IPC message has been completed                          *
414  *                                                                            *
415  * Parameters: header   - [IN] the message header                             *
416  *             rx_bytes - [IN] the number of bytes set in message             *
417  *                             (including header)                             *
418  *                                                                            *
419  * Return value:  SUCCEED - message has been completed                        *
420  *                FAIL - otherwise                                            *
421  *                                                                            *
422  ******************************************************************************/
ipc_message_is_completed(const zbx_uint32_t * header,zbx_uint32_t rx_bytes)423 static int	ipc_message_is_completed(const zbx_uint32_t *header, zbx_uint32_t rx_bytes)
424 {
425 	if (ZBX_IPC_HEADER_SIZE > rx_bytes)
426 		return FAIL;
427 
428 	if (header[ZBX_IPC_MESSAGE_SIZE] + ZBX_IPC_HEADER_SIZE != rx_bytes)
429 		return FAIL;
430 
431 	return SUCCEED;
432 }
433 
434 /******************************************************************************
435  *                                                                            *
436  * Function: ipc_socket_read_message                                          *
437  *                                                                            *
438  * Purpose: reads IPC message from buffered client socket                     *
439  *                                                                            *
440  * Parameters: csocket  - [IN] the source socket                              *
441  *             header   - [OUT] the header of the message                     *
442  *             data     - [OUT] the data of the message                       *
443  *             rx_bytes - [IN/OUT] the total message size read (including     *
444  *                                 header                                     *
445  *                                                                            *
446  * Return value:  SUCCEED - data was read successfully, check rx_bytes to     *
447  *                          determine if the message was completed.           *
448  *                FAIL - failed to read message (socket error or connection   *
449  *                       was closed).                                         *
450  *                                                                            *
451  ******************************************************************************/
ipc_socket_read_message(zbx_ipc_socket_t * csocket,zbx_uint32_t * header,unsigned char ** data,zbx_uint32_t * rx_bytes)452 static int	ipc_socket_read_message(zbx_ipc_socket_t *csocket, zbx_uint32_t *header, unsigned char **data,
453 		zbx_uint32_t *rx_bytes)
454 {
455 	zbx_uint32_t	data_size, offset, read_size = 0;
456 	int		ret = FAIL;
457 
458 	/* try to read message from socket buffer */
459 	if (csocket->rx_buffer_bytes > csocket->rx_buffer_offset)
460 	{
461 		ret = ipc_read_buffer(header, data, *rx_bytes, csocket->rx_buffer + csocket->rx_buffer_offset,
462 				csocket->rx_buffer_bytes - csocket->rx_buffer_offset, &read_size);
463 
464 		csocket->rx_buffer_offset += read_size;
465 		*rx_bytes += read_size;
466 
467 		if (SUCCEED == ret)
468 			goto out;
469 	}
470 
471 	/* not enough data in socket buffer, try to read more until message is completed or no data to read */
472 	while (SUCCEED != ret)
473 	{
474 		csocket->rx_buffer_offset = 0;
475 		csocket->rx_buffer_bytes = 0;
476 
477 		if (ZBX_IPC_HEADER_SIZE < *rx_bytes)
478 		{
479 			offset = *rx_bytes - ZBX_IPC_HEADER_SIZE;
480 			data_size = header[ZBX_IPC_MESSAGE_SIZE] - offset;
481 
482 			/* long messages will be read directly into message buffer */
483 			if (ZBX_IPC_SOCKET_BUFFER_SIZE * 0.75 < data_size)
484 			{
485 				ret = ipc_read_data_full(csocket->fd, *data + offset, data_size, &read_size);
486 				*rx_bytes += read_size;
487 				goto out;
488 			}
489 		}
490 
491 		if (FAIL == ipc_read_data(csocket->fd, csocket->rx_buffer, ZBX_IPC_SOCKET_BUFFER_SIZE, &read_size))
492 			goto out;
493 
494 		/* it's possible that nothing will be read on non-blocking sockets, return success */
495 		if (0 == read_size)
496 		{
497 			ret = SUCCEED;
498 			goto out;
499 		}
500 
501 		csocket->rx_buffer_bytes = read_size;
502 
503 		ret = ipc_read_buffer(header, data, *rx_bytes, csocket->rx_buffer, csocket->rx_buffer_bytes,
504 				&read_size);
505 
506 		csocket->rx_buffer_offset += read_size;
507 		*rx_bytes += read_size;
508 	}
509 out:
510 	return ret;
511 }
512 
513 /******************************************************************************
514  *                                                                            *
515  * Function: ipc_client_free_event                                            *
516  *                                                                            *
517  * Purpose: frees client's libevent event                                     *
518  *                                                                            *
519  * Parameters: client - [IN] the client                                       *
520  *                                                                            *
521  ******************************************************************************/
ipc_client_free_events(zbx_ipc_client_t * client)522 static void	ipc_client_free_events(zbx_ipc_client_t *client)
523 {
524 	if (NULL != client->rx_event)
525 	{
526 		event_free(client->rx_event);
527 		client->rx_event = NULL;
528 	}
529 
530 	if (NULL != client->tx_event)
531 	{
532 		event_free(client->tx_event);
533 		client->tx_event = NULL;
534 	}
535 }
536 
537 /******************************************************************************
538  *                                                                            *
539  * Function: ipc_client_free                                                  *
540  *                                                                            *
541  * Purpose: frees IPC service client                                          *
542  *                                                                            *
543  * Parameters: client - [IN] the client to free                               *
544  *                                                                            *
545  ******************************************************************************/
ipc_client_free(zbx_ipc_client_t * client)546 static void	ipc_client_free(zbx_ipc_client_t *client)
547 {
548 	zbx_ipc_message_t	*message;
549 
550 	ipc_client_free_events(client);
551 	zbx_ipc_socket_close(&client->csocket);
552 
553 	while (NULL != (message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&client->rx_queue)))
554 		zbx_ipc_message_free(message);
555 
556 	zbx_queue_ptr_destroy(&client->rx_queue);
557 	zbx_free(client->rx_data);
558 
559 	while (NULL != (message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&client->tx_queue)))
560 		zbx_ipc_message_free(message);
561 
562 	zbx_queue_ptr_destroy(&client->tx_queue);
563 	zbx_free(client->tx_data);
564 
565 	ipc_client_free_events(client);
566 
567 	zbx_free(client);
568 }
569 
570 /******************************************************************************
571  *                                                                            *
572  * Function: ipc_client_push_rx_message                                       *
573  *                                                                            *
574  * Purpose: adds message to received messages queue                           *
575  *                                                                            *
576  * Parameters: client - [IN] the client to read                               *
577  *                                                                            *
578  ******************************************************************************/
ipc_client_push_rx_message(zbx_ipc_client_t * client)579 static void	ipc_client_push_rx_message(zbx_ipc_client_t *client)
580 {
581 	zbx_ipc_message_t	*message;
582 
583 	message = (zbx_ipc_message_t *)zbx_malloc(NULL, sizeof(zbx_ipc_message_t));
584 	message->code = client->rx_header[ZBX_IPC_MESSAGE_CODE];
585 	message->size = client->rx_header[ZBX_IPC_MESSAGE_SIZE];
586 	message->data = client->rx_data;
587 	zbx_queue_ptr_push(&client->rx_queue, message);
588 
589 	client->rx_data = NULL;
590 	client->rx_bytes = 0;
591 }
592 
593 /******************************************************************************
594  *                                                                            *
595  * Function: ipc_client_pop_tx_message                                        *
596  *                                                                            *
597  * Purpose: prepares to send the next message in send queue                   *
598  *                                                                            *
599  * Parameters: client - [IN] the client                                       *
600  *                                                                            *
601  ******************************************************************************/
ipc_client_pop_tx_message(zbx_ipc_client_t * client)602 static void	ipc_client_pop_tx_message(zbx_ipc_client_t *client)
603 {
604 	zbx_ipc_message_t	*message;
605 
606 	zbx_free(client->tx_data);
607 	client->tx_bytes = 0;
608 
609 	if (NULL == (message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&client->tx_queue)))
610 		return;
611 
612 	client->tx_bytes = ZBX_IPC_HEADER_SIZE + message->size;
613 	client->tx_header[ZBX_IPC_MESSAGE_CODE] = message->code;
614 	client->tx_header[ZBX_IPC_MESSAGE_SIZE] = message->size;
615 	client->tx_data = message->data;
616 	zbx_free(message);
617 }
618 
619 /******************************************************************************
620  *                                                                            *
621  * Function: ipc_client_read                                                  *
622  *                                                                            *
623  * Purpose: reads data from IPC service client                                *
624  *                                                                            *
625  * Parameters: client - [IN] the client to read                               *
626  *                                                                            *
627  * Return value:  FAIL - read error/connection was closed                     *
628  *                                                                            *
629  * Comments: This function reads data from socket, parses it and adds         *
630  *           parsed messages to received messages queue.                      *
631  *                                                                            *
632  ******************************************************************************/
ipc_client_read(zbx_ipc_client_t * client)633 static int	ipc_client_read(zbx_ipc_client_t *client)
634 {
635 	int	rc;
636 
637 	do
638 	{
639 		if (FAIL == ipc_socket_read_message(&client->csocket, client->rx_header, &client->rx_data,
640 				&client->rx_bytes))
641 		{
642 			zbx_free(client->rx_data);
643 			client->rx_bytes = 0;
644 			return FAIL;
645 		}
646 
647 		if (SUCCEED == (rc = ipc_message_is_completed(client->rx_header, client->rx_bytes)))
648 			ipc_client_push_rx_message(client);
649 	}
650 
651 	while (SUCCEED == rc);
652 
653 	return SUCCEED;
654 }
655 
656 /******************************************************************************
657  *                                                                            *
658  * Function: ipc_client_write                                                 *
659  *                                                                            *
660  * Purpose: writes queued data to IPC service client                          *
661  *                                                                            *
662  * Parameters: client - [IN] the client                                       *
663  *                                                                            *
664  * Return value: SUCCEED - the data was sent successfully                     *
665  *               FAIL    - otherwise                                          *
666  *                                                                            *
667  ******************************************************************************/
ipc_client_write(zbx_ipc_client_t * client)668 static int	ipc_client_write(zbx_ipc_client_t *client)
669 {
670 	zbx_uint32_t	data_size, write_size;
671 
672 	data_size = client->tx_header[ZBX_IPC_MESSAGE_SIZE];
673 
674 	if (data_size < client->tx_bytes)
675 	{
676 		zbx_uint32_t	size, offset;
677 
678 		size = client->tx_bytes - data_size;
679 		offset = ZBX_IPC_HEADER_SIZE - size;
680 
681 		if (SUCCEED != ipc_write_data(client->csocket.fd, (unsigned char *)client->tx_header + offset, size,
682 				&write_size))
683 		{
684 			return FAIL;
685 		}
686 
687 		client->tx_bytes -= write_size;
688 
689 		if (data_size < client->tx_bytes)
690 			return SUCCEED;
691 	}
692 
693 	while (0 < client->tx_bytes)
694 	{
695 		if (SUCCEED != ipc_write_data(client->csocket.fd, client->tx_data + data_size - client->tx_bytes,
696 				client->tx_bytes, &write_size))
697 		{
698 			return FAIL;
699 		}
700 
701 		if (0 == write_size)
702 			return SUCCEED;
703 
704 		client->tx_bytes -= write_size;
705 	}
706 
707 	if (0 == client->tx_bytes)
708 		ipc_client_pop_tx_message(client);
709 
710 	return SUCCEED;
711 }
712 
713 /******************************************************************************
714  *                                                                            *
715  * Function: ipc_service_pop_client                                           *
716  *                                                                            *
717  * Purpose: gets the next client with messages/closed socket from recv queue  *
718  *                                                                            *
719  * Parameters: service - [IN] the IPC service                                 *
720  *                                                                            *
721  * Return value: The client with messages/closed socket                       *
722  *                                                                            *
723  ******************************************************************************/
ipc_service_pop_client(zbx_ipc_service_t * service)724 static zbx_ipc_client_t	*ipc_service_pop_client(zbx_ipc_service_t *service)
725 {
726 	zbx_ipc_client_t	*client;
727 
728 	if (NULL != (client = (zbx_ipc_client_t *)zbx_queue_ptr_pop(&service->clients_recv)))
729 		client->state = ZBX_IPC_CLIENT_STATE_NONE;
730 
731 	return client;
732 }
733 
734 /******************************************************************************
735  *                                                                            *
736  * Function: ipc_service_push_client                                          *
737  *                                                                            *
738  * Purpose: pushes client to the recv queue if needed                         *
739  *                                                                            *
740  * Parameters: service - [IN] the IPC service                                 *
741  *             client  - [IN] the IPC client                                  *
742  *                                                                            *
743  * Comments: The client is pushed to the recv queue if it isn't already there *
744  *           and there is messages to return or the client connection was     *
745  *           closed.                                                          *
746  *                                                                            *
747  ******************************************************************************/
ipc_service_push_client(zbx_ipc_service_t * service,zbx_ipc_client_t * client)748 static void	ipc_service_push_client(zbx_ipc_service_t *service, zbx_ipc_client_t *client)
749 {
750 	if (ZBX_IPC_CLIENT_STATE_QUEUED == client->state)
751 		return;
752 
753 	if (0 == zbx_queue_ptr_values_num(&client->rx_queue) && NULL != client->rx_event)
754 		return;
755 
756 	client->state = ZBX_IPC_CLIENT_STATE_QUEUED;
757 	zbx_queue_ptr_push(&service->clients_recv, client);
758 }
759 
760 /******************************************************************************
761  *                                                                            *
762  * Function: ipc_service_add_client                                           *
763  *                                                                            *
764  * Purpose: adds a new IPC service client                                     *
765  *                                                                            *
766  * Parameters: service - [IN] the IPC service                                 *
767  *             fd      - [IN] the client socket descriptor                    *
768  *                                                                            *
769  ******************************************************************************/
ipc_service_add_client(zbx_ipc_service_t * service,int fd)770 static void	ipc_service_add_client(zbx_ipc_service_t *service, int fd)
771 {
772 	static zbx_uint64_t	next_clientid = 1;
773 	zbx_ipc_client_t	*client;
774 	int			flags;
775 
776 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
777 
778 	client = (zbx_ipc_client_t *)zbx_malloc(NULL, sizeof(zbx_ipc_client_t));
779 	memset(client, 0, sizeof(zbx_ipc_client_t));
780 
781 	if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
782 	{
783 		zabbix_log(LOG_LEVEL_CRIT, "cannot get IPC client socket flags");
784 		exit(EXIT_FAILURE);
785 	}
786 
787 	if (-1 == fcntl(fd, F_SETFL, flags | O_NONBLOCK))
788 	{
789 		zabbix_log(LOG_LEVEL_CRIT, "cannot set non-blocking mode for IPC client socket");
790 		exit(EXIT_FAILURE);
791 	}
792 
793 	client->csocket.fd = fd;
794 	client->csocket.rx_buffer_bytes = 0;
795 	client->csocket.rx_buffer_offset = 0;
796 	client->id = next_clientid++;
797 	client->state = ZBX_IPC_CLIENT_STATE_NONE;
798 	client->refcount = 1;
799 
800 	zbx_queue_ptr_create(&client->rx_queue);
801 	zbx_queue_ptr_create(&client->tx_queue);
802 
803 	client->service = service;
804 	client->rx_event = event_new(service->ev, fd, EV_READ | EV_PERSIST, ipc_client_read_event_cb, (void *)client);
805 	client->tx_event = event_new(service->ev, fd, EV_WRITE | EV_PERSIST, ipc_client_write_event_cb, (void *)client);
806 	event_add(client->rx_event, NULL);
807 
808 	zbx_vector_ptr_append(&service->clients, client);
809 
810 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s() clientid:" ZBX_FS_UI64, __func__, client->id);
811 }
812 
813 /******************************************************************************
814  *                                                                            *
815  * Function: ipc_service_remove_client                                        *
816  *                                                                            *
817  * Purpose: removes IPC service client                                        *
818  *                                                                            *
819  * Parameters: service - [IN] the IPC service                                 *
820  *             client  - [IN] the client to remove                            *
821  *                                                                            *
822  ******************************************************************************/
ipc_service_remove_client(zbx_ipc_service_t * service,zbx_ipc_client_t * client)823 static void	ipc_service_remove_client(zbx_ipc_service_t *service, zbx_ipc_client_t *client)
824 {
825 	int		i;
826 
827 	for (i = 0; i < service->clients.values_num; i++)
828 	{
829 		if (service->clients.values[i] == client)
830 			zbx_vector_ptr_remove_noorder(&service->clients, i);
831 	}
832 }
833 
834 /******************************************************************************
835  *                                                                            *
836  * Function: zbx_ipc_client_by_id                                             *
837  *                                                                            *
838  * Purpose: to find connected client when only it's ID is known               *
839  *                                                                            *
840  * Parameters: service - [IN] the IPC service                                 *
841  *             id      - [IN] ID of client                                    *
842  *                                                                            *
843  * Return value: address of client or NULL if client has already disconnected *
844  *                                                                            *
845  ******************************************************************************/
zbx_ipc_client_by_id(const zbx_ipc_service_t * service,zbx_uint64_t id)846 zbx_ipc_client_t	*zbx_ipc_client_by_id(const zbx_ipc_service_t *service, zbx_uint64_t id)
847 {
848 	int			i;
849 	zbx_ipc_client_t	*client;
850 
851 	for (i = 0; i < service->clients.values_num; i++)
852 	{
853 		client = (zbx_ipc_client_t *) service->clients.values[i];
854 
855 		if (id == client->id)
856 			return client;
857 	}
858 
859 	return NULL;
860 }
861 
862 /******************************************************************************
863  *                                                                            *
864  * Function: ipc_client_read_event_cb                                         *
865  *                                                                            *
866  * Purpose: service client read event libevent callback                       *
867  *                                                                            *
868  ******************************************************************************/
ipc_client_read_event_cb(evutil_socket_t fd,short what,void * arg)869 static void	ipc_client_read_event_cb(evutil_socket_t fd, short what, void *arg)
870 {
871 	zbx_ipc_client_t	*client = (zbx_ipc_client_t *)arg;
872 
873 	ZBX_UNUSED(fd);
874 	ZBX_UNUSED(what);
875 
876 	if (SUCCEED != ipc_client_read(client))
877 	{
878 		ipc_client_free_events(client);
879 		ipc_service_remove_client(client->service, client);
880 	}
881 
882 	ipc_service_push_client(client->service, client);
883 }
884 
885 /******************************************************************************
886  *                                                                            *
887  * Function: ipc_client_write_event_cb                                        *
888  *                                                                            *
889  * Purpose: service client write event libevent callback                      *
890  *                                                                            *
891  ******************************************************************************/
ipc_client_write_event_cb(evutil_socket_t fd,short what,void * arg)892 static void	ipc_client_write_event_cb(evutil_socket_t fd, short what, void *arg)
893 {
894 	zbx_ipc_client_t	*client = (zbx_ipc_client_t *)arg;
895 
896 	ZBX_UNUSED(fd);
897 	ZBX_UNUSED(what);
898 
899 	if (SUCCEED != ipc_client_write(client))
900 	{
901 		zabbix_log(LOG_LEVEL_CRIT, "cannot send data to IPC client");
902 		zbx_ipc_client_close(client);
903 		return;
904 	}
905 
906 	if (0 == client->tx_bytes)
907 		event_del(client->tx_event);
908 }
909 
910 /******************************************************************************
911  *                                                                            *
912  * Function: ipc_async_socket_write_event_cb                                  *
913  *                                                                            *
914  * Purpose: asynchronous socket write event libevent callback                 *
915  *                                                                            *
916  ******************************************************************************/
ipc_async_socket_write_event_cb(evutil_socket_t fd,short what,void * arg)917 static void	ipc_async_socket_write_event_cb(evutil_socket_t fd, short what, void *arg)
918 {
919 	zbx_ipc_async_socket_t	*asocket = (zbx_ipc_async_socket_t *)arg;
920 
921 	ZBX_UNUSED(fd);
922 	ZBX_UNUSED(what);
923 
924 	if (SUCCEED != ipc_client_write(asocket->client))
925 	{
926 		zabbix_log(LOG_LEVEL_CRIT, "cannot send data to IPC client");
927 		ipc_client_free_events(asocket->client);
928 		zbx_ipc_socket_close(&asocket->client->csocket);
929 		asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_ERROR;
930 		return;
931 	}
932 
933 	if (0 == asocket->client->tx_bytes)
934 		event_del(asocket->client->tx_event);
935 }
936 
937 /******************************************************************************
938  *                                                                            *
939  * Function: ipc_async_socket_read_event_cb                                   *
940  *                                                                            *
941  * Purpose: asynchronous socket read event libevent callback                  *
942  *                                                                            *
943  ******************************************************************************/
ipc_async_socket_read_event_cb(evutil_socket_t fd,short what,void * arg)944 static void	ipc_async_socket_read_event_cb(evutil_socket_t fd, short what, void *arg)
945 {
946 	zbx_ipc_async_socket_t	*asocket = (zbx_ipc_async_socket_t *)arg;
947 
948 	ZBX_UNUSED(fd);
949 	ZBX_UNUSED(what);
950 
951 	if (SUCCEED != ipc_client_read(asocket->client))
952 	{
953 		ipc_client_free_events(asocket->client);
954 		asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_ERROR;
955 	}
956 }
957 
958 /******************************************************************************
959  *                                                                            *
960  * Function: ipc_async_socket_timer_cb                                        *
961  *                                                                            *
962  * Purpose: timer callback                                                    *
963  *                                                                            *
964  ******************************************************************************/
ipc_async_socket_timer_cb(evutil_socket_t fd,short what,void * arg)965 static void	ipc_async_socket_timer_cb(evutil_socket_t fd, short what, void *arg)
966 {
967 	zbx_ipc_async_socket_t	*asocket = (zbx_ipc_async_socket_t *)arg;
968 
969 	ZBX_UNUSED(fd);
970 	ZBX_UNUSED(what);
971 
972 	asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_TIMEOUT;
973 }
974 
975 /******************************************************************************
976  *                                                                            *
977  * Function: ipc_service_accept                                               *
978  *                                                                            *
979  * Purpose: accepts a new client connection                                   *
980  *                                                                            *
981  * Parameters: service - [IN] the IPC service                                 *
982  *                                                                            *
983  ******************************************************************************/
ipc_service_accept(zbx_ipc_service_t * service)984 static void	ipc_service_accept(zbx_ipc_service_t *service)
985 {
986 	int	fd;
987 
988 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
989 
990 	while (-1 == (fd = accept(service->fd, NULL, NULL)))
991 	{
992 		if (EINTR != errno)
993 		{
994 			/* If there is unaccepted connection libevent will call registered callback function over and */
995 			/* over again. It is better to exit straight away and cause all other processes to stop. */
996 			zabbix_log(LOG_LEVEL_CRIT, "cannot accept incoming IPC connection: %s", zbx_strerror(errno));
997 			exit(EXIT_FAILURE);
998 		}
999 	}
1000 
1001 	ipc_service_add_client(service, fd);
1002 
1003 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1004 }
1005 
1006 /******************************************************************************
1007  *                                                                            *
1008  * Function: ipc_message_create                                               *
1009  *                                                                            *
1010  * Purpose: creates IPC message                                               *
1011  *                                                                            *
1012  * Parameters: code    - [IN] the message code                                *
1013  *             data    - [IN] the data                                        *
1014  *             size    - [IN] the data size                                   *
1015  *                                                                            *
1016  * Return value: The created message.                                         *
1017  *                                                                            *
1018  ******************************************************************************/
ipc_message_create(zbx_uint32_t code,const unsigned char * data,zbx_uint32_t size)1019 static zbx_ipc_message_t	*ipc_message_create(zbx_uint32_t code, const unsigned char *data, zbx_uint32_t size)
1020 {
1021 	zbx_ipc_message_t	*message;
1022 
1023 	message = (zbx_ipc_message_t *)zbx_malloc(NULL, sizeof(zbx_ipc_message_t));
1024 
1025 	message->code = code;
1026 	message->size = size;
1027 
1028 	if (0 != size)
1029 	{
1030 		message->data = (unsigned char *)zbx_malloc(NULL, size);
1031 		memcpy(message->data, data, size);
1032 	}
1033 	else
1034 		message->data = NULL;
1035 
1036 	return message;
1037 }
1038 
1039 /******************************************************************************
1040  *                                                                            *
1041  * Function: ipc_service_event_log                                            *
1042  *                                                                            *
1043  * Purpose: libevent logging callback                                         *
1044  *                                                                            *
1045  ******************************************************************************/
ipc_service_event_log_cb(int severity,const char * msg)1046 static void ipc_service_event_log_cb(int severity, const char *msg)
1047 {
1048 	int	loglevel;
1049 
1050 	switch (severity)
1051 	{
1052 		case _EVENT_LOG_DEBUG:
1053 			loglevel = LOG_LEVEL_TRACE;
1054 			break;
1055 		case _EVENT_LOG_MSG:
1056 			loglevel = LOG_LEVEL_DEBUG;
1057 			break;
1058 		case _EVENT_LOG_WARN:
1059 			loglevel = LOG_LEVEL_WARNING;
1060 			break;
1061 		case _EVENT_LOG_ERR:
1062 			loglevel = LOG_LEVEL_DEBUG;
1063 			break;
1064 		default:
1065 			loglevel = LOG_LEVEL_DEBUG;
1066 			break;
1067 	}
1068 
1069 	zabbix_log(loglevel, "IPC service: %s", msg);
1070 }
1071 
1072 /******************************************************************************
1073  *                                                                            *
1074  * Function: ipc_service_init_libevent                                        *
1075  *                                                                            *
1076  * Purpose: initialize libevent library                                       *
1077  *                                                                            *
1078  ******************************************************************************/
ipc_service_init_libevent(void)1079 static void	ipc_service_init_libevent(void)
1080 {
1081 	event_set_log_callback(ipc_service_event_log_cb);
1082 }
1083 
1084 /******************************************************************************
1085  *                                                                            *
1086  * Function: ipc_service_free_libevent                                        *
1087  *                                                                            *
1088  * Purpose: uninitialize libevent library                                     *
1089  *                                                                            *
1090  ******************************************************************************/
ipc_service_free_libevent(void)1091 static void	ipc_service_free_libevent(void)
1092 {
1093 }
1094 
1095 /******************************************************************************
1096  *                                                                            *
1097  * Function: ipc_service_client_connected_cb                                  *
1098  *                                                                            *
1099  * Purpose: libevent listener callback                                        *
1100  *                                                                            *
1101  ******************************************************************************/
ipc_service_client_connected_cb(evutil_socket_t fd,short what,void * arg)1102 static void	ipc_service_client_connected_cb(evutil_socket_t fd, short what, void *arg)
1103 {
1104 	zbx_ipc_service_t	*service = (zbx_ipc_service_t *)arg;
1105 
1106 	ZBX_UNUSED(fd);
1107 	ZBX_UNUSED(what);
1108 
1109 	ipc_service_accept(service);
1110 }
1111 
1112 /******************************************************************************
1113  *                                                                            *
1114  * Function: ipc_service_timer_cb                                             *
1115  *                                                                            *
1116  * Purpose: timer callback                                                    *
1117  *                                                                            *
1118  ******************************************************************************/
ipc_service_timer_cb(evutil_socket_t fd,short what,void * arg)1119 static void	ipc_service_timer_cb(evutil_socket_t fd, short what, void *arg)
1120 {
1121 	ZBX_UNUSED(fd);
1122 	ZBX_UNUSED(what);
1123 	ZBX_UNUSED(arg);
1124 }
1125 
1126 /******************************************************************************
1127  *                                                                            *
1128  * Function: ipc_check_running_service                                        *
1129  *                                                                            *
1130  * Purpose: checks if an IPC service is already running                       *
1131  *                                                                            *
1132  * Parameters: service_name - [IN]                                            *
1133  *                                                                            *
1134  ******************************************************************************/
ipc_check_running_service(const char * service_name)1135 static int	ipc_check_running_service(const char *service_name)
1136 {
1137 	zbx_ipc_socket_t	csocket;
1138 	int			ret;
1139 	char			*error = NULL;
1140 
1141 	if (SUCCEED == (ret = zbx_ipc_socket_open(&csocket, service_name, 0, &error)))
1142 		zbx_ipc_socket_close(&csocket);
1143 	else
1144 		zbx_free(error);
1145 
1146 	return ret;
1147 }
1148 
1149 /*
1150  * Public client API
1151  */
1152 
1153 /******************************************************************************
1154  *                                                                            *
1155  * Function: zbx_ipc_socket_open                                              *
1156  *                                                                            *
1157  * Purpose: opens socket to an IPC service listening on the specified path    *
1158  *                                                                            *
1159  * Parameters: csocket      - [OUT] the IPC socket to the service             *
1160  *             service_name - [IN] the IPC service name                       *
1161  *             timeout      - [IN] the connection timeout                     *
1162  *             error        - [OUT] the error message                         *
1163  *                                                                            *
1164  * Return value: SUCCEED - the socket was successfully opened                 *
1165  *               FAIL    - otherwise                                          *
1166  *                                                                            *
1167  ******************************************************************************/
zbx_ipc_socket_open(zbx_ipc_socket_t * csocket,const char * service_name,int timeout,char ** error)1168 int	zbx_ipc_socket_open(zbx_ipc_socket_t *csocket, const char *service_name, int timeout, char **error)
1169 {
1170 	struct sockaddr_un	addr;
1171 	time_t			start;
1172 	struct timespec		ts = {0, 100000000};
1173 	const char		*socket_path;
1174 	int			ret = FAIL;
1175 
1176 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1177 
1178 	if (NULL == (socket_path = ipc_make_path(service_name, error)))
1179 		goto out;
1180 
1181 	if (-1 == (csocket->fd = socket(AF_UNIX, SOCK_STREAM, 0)))
1182 	{
1183 		*error = zbx_dsprintf(*error, "Cannot create client socket: %s.", zbx_strerror(errno));
1184 		goto out;
1185 	}
1186 
1187 	memset(&addr, 0, sizeof(addr));
1188 	addr.sun_family = AF_UNIX;
1189 	memcpy(addr.sun_path, socket_path, sizeof(addr.sun_path));
1190 
1191 	start = time(NULL);
1192 
1193 	while (0 != connect(csocket->fd, (struct sockaddr*)&addr, sizeof(addr)))
1194 	{
1195 		if (0 == timeout || time(NULL) - start > timeout)
1196 		{
1197 			*error = zbx_dsprintf(*error, "Cannot connect to service \"%s\": %s.", service_name,
1198 					zbx_strerror(errno));
1199 			close(csocket->fd);
1200 			goto out;
1201 		}
1202 
1203 		nanosleep(&ts, NULL);
1204 	}
1205 
1206 	csocket->rx_buffer_bytes = 0;
1207 	csocket->rx_buffer_offset = 0;
1208 
1209 	ret = SUCCEED;
1210 out:
1211 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1212 	return ret;
1213 }
1214 
1215 /******************************************************************************
1216  *                                                                            *
1217  * Function: zbx_ipc_socket_close                                             *
1218  *                                                                            *
1219  * Purpose: closes socket to an IPC service                                   *
1220  *                                                                            *
1221  * Parameters: csocket - [IN/OUT] the IPC socket to close                     *
1222  *                                                                            *
1223  ******************************************************************************/
zbx_ipc_socket_close(zbx_ipc_socket_t * csocket)1224 void	zbx_ipc_socket_close(zbx_ipc_socket_t *csocket)
1225 {
1226 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1227 
1228 	if (-1 != csocket->fd)
1229 	{
1230 		close(csocket->fd);
1231 		csocket->fd = -1;
1232 	}
1233 
1234 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1235 }
1236 
1237 /******************************************************************************
1238  *                                                                            *
1239  * Function: zbx_ipc_socket_write                                             *
1240  *                                                                            *
1241  * Purpose: writes a message to IPC service                                   *
1242  *                                                                            *
1243  * Parameters: csocket - [IN] an opened IPC socket to the service             *
1244  *             code    - [IN] the message code                                *
1245  *             data    - [IN] the data                                        *
1246  *             size    - [IN] the data size                                   *
1247  *                                                                            *
1248  * Return value: SUCCEED - the message was successfully written               *
1249  *               FAIL    - otherwise                                          *
1250  *                                                                            *
1251  ******************************************************************************/
zbx_ipc_socket_write(zbx_ipc_socket_t * csocket,zbx_uint32_t code,const unsigned char * data,zbx_uint32_t size)1252 int	zbx_ipc_socket_write(zbx_ipc_socket_t *csocket, zbx_uint32_t code, const unsigned char *data, zbx_uint32_t size)
1253 {
1254 	int		ret;
1255 	zbx_uint32_t	size_sent;
1256 
1257 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1258 
1259 	if (SUCCEED == ipc_socket_write_message(csocket, code, data, size, &size_sent) &&
1260 			size_sent == size + ZBX_IPC_HEADER_SIZE)
1261 	{
1262 		ret = SUCCEED;
1263 	}
1264 	else
1265 		ret = FAIL;
1266 
1267 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1268 
1269 	return ret;
1270 }
1271 
1272 /******************************************************************************
1273  *                                                                            *
1274  * Function: zbx_ipc_socket_read                                              *
1275  *                                                                            *
1276  * Purpose: reads a message from IPC service                                  *
1277  *                                                                            *
1278  * Parameters: csocket - [IN] an opened IPC socket to the service             *
1279  *             message - [OUT] the received message                           *
1280  *                                                                            *
1281  * Return value: SUCCEED - the message was successfully received              *
1282  *               FAIL    - otherwise                                          *
1283  *                                                                            *
1284  * Comments: If this function succeeds the message must be cleaned/freed by   *
1285  *           the caller.                                                      *
1286  *                                                                            *
1287  ******************************************************************************/
zbx_ipc_socket_read(zbx_ipc_socket_t * csocket,zbx_ipc_message_t * message)1288 int	zbx_ipc_socket_read(zbx_ipc_socket_t *csocket, zbx_ipc_message_t *message)
1289 {
1290 	int		ret = FAIL;
1291 	zbx_uint32_t	rx_bytes = 0, header[2];
1292 	unsigned char	*data = NULL;
1293 
1294 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1295 
1296 	if (SUCCEED != ipc_socket_read_message(csocket, header, &data, &rx_bytes))
1297 		goto out;
1298 
1299 	if (SUCCEED != ipc_message_is_completed(header, rx_bytes))
1300 	{
1301 		zbx_free(data);
1302 		goto out;
1303 	}
1304 
1305 	message->code = header[ZBX_IPC_MESSAGE_CODE];
1306 	message->size = header[ZBX_IPC_MESSAGE_SIZE];
1307 	message->data = data;
1308 
1309 	if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_TRACE))
1310 	{
1311 		char	*msg = NULL;
1312 
1313 		zbx_ipc_message_format(message, &msg);
1314 
1315 		zabbix_log(LOG_LEVEL_DEBUG, "%s() %s", __func__, msg);
1316 
1317 		zbx_free(msg);
1318 	}
1319 
1320 	ret = SUCCEED;
1321 out:
1322 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1323 
1324 	return ret;
1325 }
1326 
1327 /******************************************************************************
1328  *                                                                            *
1329  * Function: zbx_ipc_message_free                                             *
1330  *                                                                            *
1331  * Purpose: frees the resources allocated to store IPC message data           *
1332  *                                                                            *
1333  * Parameters: message - [IN] the message to free                             *
1334  *                                                                            *
1335  ******************************************************************************/
zbx_ipc_message_free(zbx_ipc_message_t * message)1336 void	zbx_ipc_message_free(zbx_ipc_message_t *message)
1337 {
1338 	if (NULL != message)
1339 	{
1340 		zbx_free(message->data);
1341 		zbx_free(message);
1342 	}
1343 }
1344 
1345 /******************************************************************************
1346  *                                                                            *
1347  * Function: zbx_ipc_message_clean                                            *
1348  *                                                                            *
1349  * Purpose: frees the resources allocated to store IPC message data           *
1350  *                                                                            *
1351  * Parameters: message - [IN] the message to clean                            *
1352  *                                                                            *
1353  ******************************************************************************/
zbx_ipc_message_clean(zbx_ipc_message_t * message)1354 void	zbx_ipc_message_clean(zbx_ipc_message_t *message)
1355 {
1356 	zbx_free(message->data);
1357 }
1358 
1359 /******************************************************************************
1360  *                                                                            *
1361  * Function: zbx_ipc_message_init                                             *
1362  *                                                                            *
1363  * Purpose: initializes IPC message                                           *
1364  *                                                                            *
1365  * Parameters: message - [IN] the message to initialize                       *
1366  *                                                                            *
1367  ******************************************************************************/
zbx_ipc_message_init(zbx_ipc_message_t * message)1368 void	zbx_ipc_message_init(zbx_ipc_message_t *message)
1369 {
1370 	memset(message, 0, sizeof(zbx_ipc_message_t));
1371 }
1372 
1373 /******************************************************************************
1374  *                                                                            *
1375  * Function: zbx_ipc_message_format                                           *
1376  *                                                                            *
1377  * Purpose: formats message to readable format for debug messages             *
1378  *                                                                            *
1379  * Parameters: message - [IN] the message                                     *
1380  *             data    - [OUT] the formatted message                          *
1381  *                                                                            *
1382  ******************************************************************************/
zbx_ipc_message_format(const zbx_ipc_message_t * message,char ** data)1383 void	zbx_ipc_message_format(const zbx_ipc_message_t *message, char **data)
1384 {
1385 	size_t		data_alloc = ZBX_IPC_DATA_DUMP_SIZE * 4 + 32, data_offset = 0;
1386 	zbx_uint32_t	i, data_num;
1387 
1388 	if (NULL == message)
1389 		return;
1390 
1391 	data_num = message->size;
1392 
1393 	if (ZBX_IPC_DATA_DUMP_SIZE < data_num)
1394 		data_num = ZBX_IPC_DATA_DUMP_SIZE;
1395 
1396 	*data = (char *)zbx_malloc(*data, data_alloc);
1397 	zbx_snprintf_alloc(data, &data_alloc, &data_offset, "code:%u size:%u data:", message->code, message->size);
1398 
1399 	for (i = 0; i < data_num; i++)
1400 	{
1401 		if (0 != i)
1402 			zbx_strcpy_alloc(data, &data_alloc, &data_offset, (0 == (i & 7) ? " | " : " "));
1403 
1404 		zbx_snprintf_alloc(data, &data_alloc, &data_offset, "%02x", (int)message->data[i]);
1405 	}
1406 
1407 	(*data)[data_offset] = '\0';
1408 }
1409 
1410 /******************************************************************************
1411  *                                                                            *
1412  * Function: zbx_ipc_message_copy                                             *
1413  *                                                                            *
1414  * Purpose: copies ipc message                                                *
1415  *                                                                            *
1416  * Parameters: dst - [IN] the destination message                             *
1417  *             src - [IN] the source message                                  *
1418  *                                                                            *
1419  ******************************************************************************/
zbx_ipc_message_copy(zbx_ipc_message_t * dst,const zbx_ipc_message_t * src)1420 void	zbx_ipc_message_copy(zbx_ipc_message_t *dst, const zbx_ipc_message_t *src)
1421 {
1422 	dst->code = src->code;
1423 	dst->size = src->size;
1424 	dst->data = (unsigned char *)zbx_malloc(NULL, src->size);
1425 	memcpy(dst->data, src->data, src->size);
1426 }
1427 
1428 /*
1429  * Public service API
1430  */
1431 
1432 /******************************************************************************
1433  *                                                                            *
1434  * Function: zbx_ipc_service_init_env                                         *
1435  *                                                                            *
1436  * Purpose: initializes IPC service environment                               *
1437  *                                                                            *
1438  * Parameters: path    - [IN] the service root path                           *
1439  *             error   - [OUT] the error message                              *
1440  *                                                                            *
1441  * Return value: SUCCEED - the environment was initialized successfully.      *
1442  *               FAIL    - otherwise                                          *
1443  *                                                                            *
1444  ******************************************************************************/
zbx_ipc_service_init_env(const char * path,char ** error)1445 int	zbx_ipc_service_init_env(const char *path, char **error)
1446 {
1447 	struct stat	fs;
1448 	int		ret = FAIL;
1449 
1450 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() path:%s", __func__, path);
1451 
1452 	if (0 != ipc_path_root_len)
1453 	{
1454 		*error = zbx_dsprintf(*error, "The IPC service environment has been already initialized with"
1455 				" root directory at \"%s\".", ipc_get_path());
1456 		goto out;
1457 	}
1458 
1459 	if (0 != stat(path, &fs))
1460 	{
1461 		*error = zbx_dsprintf(*error, "Failed to stat the specified path \"%s\": %s.", path,
1462 				zbx_strerror(errno));
1463 		goto out;
1464 	}
1465 
1466 	if (0 == S_ISDIR(fs.st_mode))
1467 	{
1468 		*error = zbx_dsprintf(*error, "The specified path \"%s\" is not a directory.", path);
1469 		goto out;
1470 	}
1471 
1472 	if (0 != access(path, W_OK | R_OK))
1473 	{
1474 		*error = zbx_dsprintf(*error, "Cannot access path \"%s\": %s.", path, zbx_strerror(errno));
1475 		goto out;
1476 	}
1477 
1478 	ipc_path_root_len = strlen(path);
1479 	if (ZBX_IPC_PATH_MAX < ipc_path_root_len + 3)
1480 	{
1481 		*error = zbx_dsprintf(*error, "The IPC root path \"%s\" is too long.", path);
1482 		goto out;
1483 	}
1484 
1485 	memcpy(ipc_path, path, ipc_path_root_len + 1);
1486 
1487 	while (1 < ipc_path_root_len && '/' == ipc_path[ipc_path_root_len - 1])
1488 		ipc_path[--ipc_path_root_len] = '\0';
1489 
1490 	ipc_service_init_libevent();
1491 
1492 	ret = SUCCEED;
1493 out:
1494 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1495 
1496 	return ret;
1497 }
1498 
1499 /******************************************************************************
1500  *                                                                            *
1501  * Function: zbx_ipc_service_free_env                                         *
1502  *                                                                            *
1503  * Purpose: frees IPC service environment                                     *
1504  *                                                                            *
1505  ******************************************************************************/
zbx_ipc_service_free_env(void)1506 void	zbx_ipc_service_free_env(void)
1507 {
1508 	ipc_service_free_libevent();
1509 }
1510 
1511 
1512 /******************************************************************************
1513  *                                                                            *
1514  * Function: zbx_ipc_service_start                                            *
1515  *                                                                            *
1516  * Purpose: starts IPC service on the specified path                          *
1517  *                                                                            *
1518  * Parameters: service      - [IN/OUT] the IPC service                        *
1519  *             service_name - [IN] the unix domain socket path                *
1520  *             error        - [OUT] the error message                         *
1521  *                                                                            *
1522  * Return value: SUCCEED - the service was initialized successfully.          *
1523  *               FAIL    - otherwise                                          *
1524  *                                                                            *
1525  ******************************************************************************/
zbx_ipc_service_start(zbx_ipc_service_t * service,const char * service_name,char ** error)1526 int	zbx_ipc_service_start(zbx_ipc_service_t *service, const char *service_name, char **error)
1527 {
1528 	struct sockaddr_un	addr;
1529 	const char		*socket_path;
1530 	int			ret = FAIL;
1531 	mode_t			mode;
1532 
1533 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() service:%s", __func__, service_name);
1534 
1535 	mode = umask(077);
1536 
1537 	if (NULL == (socket_path = ipc_make_path(service_name, error)))
1538 		goto out;
1539 
1540 	if (0 == access(socket_path, F_OK))
1541 	{
1542 		if (0 != access(socket_path, W_OK))
1543 		{
1544 			*error = zbx_dsprintf(*error, "The file \"%s\" is used by another process.", socket_path);
1545 			goto out;
1546 		}
1547 
1548 		if (SUCCEED == ipc_check_running_service(service_name))
1549 		{
1550 			*error = zbx_dsprintf(*error, "\"%s\" service is already running.", service_name);
1551 			goto out;
1552 		}
1553 
1554 		unlink(socket_path);
1555 	}
1556 
1557 	if (-1 == (service->fd = socket(AF_UNIX, SOCK_STREAM, 0)))
1558 	{
1559 		*error = zbx_dsprintf(*error, "Cannot create socket: %s.", zbx_strerror(errno));
1560 		goto out;
1561 	}
1562 
1563 	memset(&addr, 0, sizeof(addr));
1564 	addr.sun_family = AF_UNIX;
1565 	memcpy(addr.sun_path, socket_path, sizeof(addr.sun_path));
1566 
1567 	if (0 != bind(service->fd, (struct sockaddr*)&addr, sizeof(addr)))
1568 	{
1569 		*error = zbx_dsprintf(*error, "Cannot bind socket to \"%s\": %s.", socket_path, zbx_strerror(errno));
1570 		goto out;
1571 	}
1572 
1573 	if (0 != listen(service->fd, SOMAXCONN))
1574 	{
1575 		*error = zbx_dsprintf(*error, "Cannot listen socket: %s.", zbx_strerror(errno));
1576 		goto out;
1577 	}
1578 
1579 	service->path = zbx_strdup(NULL, service_name);
1580 	zbx_vector_ptr_create(&service->clients);
1581 	zbx_queue_ptr_create(&service->clients_recv);
1582 
1583 	service->ev = event_base_new();
1584 	service->ev_listener = event_new(service->ev, service->fd, EV_READ | EV_PERSIST,
1585 			ipc_service_client_connected_cb, service);
1586 	event_add(service->ev_listener, NULL);
1587 
1588 	service->ev_timer = event_new(service->ev, -1, 0, ipc_service_timer_cb, service);
1589 
1590 	ret = SUCCEED;
1591 out:
1592 	umask(mode);
1593 
1594 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1595 
1596 	return ret;
1597 }
1598 
1599 /******************************************************************************
1600  *                                                                            *
1601  * Function: zbx_ipc_service_close                                            *
1602  *                                                                            *
1603  * Purpose: closes IPC service and frees the resources allocated by it        *
1604  *                                                                            *
1605  * Parameters: service - [IN/OUT] the IPC service                             *
1606  *                                                                            *
1607  ******************************************************************************/
zbx_ipc_service_close(zbx_ipc_service_t * service)1608 void	zbx_ipc_service_close(zbx_ipc_service_t *service)
1609 {
1610 	int	i;
1611 
1612 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() path:%s", __func__, service->path);
1613 
1614 	close(service->fd);
1615 
1616 	for (i = 0; i < service->clients.values_num; i++)
1617 		ipc_client_free((zbx_ipc_client_t *)service->clients.values[i]);
1618 
1619 	zbx_free(service->path);
1620 
1621 	zbx_vector_ptr_destroy(&service->clients);
1622 	zbx_queue_ptr_destroy(&service->clients_recv);
1623 
1624 	event_free(service->ev_timer);
1625 	event_free(service->ev_listener);
1626 	event_base_free(service->ev);
1627 
1628 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1629 }
1630 
1631 /******************************************************************************
1632  *                                                                            *
1633  * Function: zbx_ipc_service_recv                                             *
1634  *                                                                            *
1635  * Purpose: receives ipc message from a connected client                      *
1636  *                                                                            *
1637  * Parameters: service - [IN] the IPC service                                 *
1638  *             timeout - [IN] the timeout in seconds, 0 is used for           *
1639  *                            nonblocking call and ZBX_IPC_WAIT_FOREVER is    *
1640  *                            used for blocking call without timeout          *
1641  *             client  - [OUT] the client that sent the message or            *
1642  *                             NULL if there are no messages and the          *
1643  *                             specified timeout passed.                      *
1644  *                             The client must be released by caller with     *
1645  *                             zbx_ipc_client_release() function.             *
1646  *             message - [OUT] the received message or NULL if the client     *
1647  *                             connection was closed.                         *
1648  *                             The message must be freed by caller with       *
1649  *                             ipc_message_free() function.                   *
1650  *                                                                            *
1651  * Return value: ZBX_IPC_RECV_IMMEDIATE - returned immediately without        *
1652  *                                        waiting for socket events           *
1653  *                                        (pending events are processed)      *
1654  *               ZBX_IPC_RECV_WAIT      - returned after receiving socket     *
1655  *                                        event                               *
1656  *               ZBX_IPC_RECV_TIMEOUT   - returned after timeout expired      *
1657  *                                                                            *
1658  ******************************************************************************/
zbx_ipc_service_recv(zbx_ipc_service_t * service,int timeout,zbx_ipc_client_t ** client,zbx_ipc_message_t ** message)1659 int	zbx_ipc_service_recv(zbx_ipc_service_t *service, int timeout, zbx_ipc_client_t **client,
1660 		zbx_ipc_message_t **message)
1661 {
1662 	int	ret, flags;
1663 
1664 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() timeout:%d", __func__, timeout);
1665 
1666 	if (timeout != 0 && SUCCEED == zbx_queue_ptr_empty(&service->clients_recv))
1667 	{
1668 		if (ZBX_IPC_WAIT_FOREVER != timeout)
1669 		{
1670 			struct timeval	tv = {timeout, 0};
1671 			evtimer_add(service->ev_timer, &tv);
1672 		}
1673 		flags = EVLOOP_ONCE;
1674 	}
1675 	else
1676 		flags = EVLOOP_NONBLOCK;
1677 
1678 	event_base_loop(service->ev, flags);
1679 
1680 	if (NULL != (*client = ipc_service_pop_client(service)))
1681 	{
1682 		if (NULL != (*message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&(*client)->rx_queue)))
1683 		{
1684 			if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_TRACE))
1685 			{
1686 				char	*data = NULL;
1687 
1688 				zbx_ipc_message_format(*message, &data);
1689 				zabbix_log(LOG_LEVEL_DEBUG, "%s() %s", __func__, data);
1690 
1691 				zbx_free(data);
1692 			}
1693 
1694 			ipc_service_push_client(service, *client);
1695 			zbx_ipc_client_addref(*client);
1696 		}
1697 
1698 		ret = (EVLOOP_NONBLOCK == flags ? ZBX_IPC_RECV_IMMEDIATE : ZBX_IPC_RECV_WAIT);
1699 	}
1700 	else
1701 	{	ret = ZBX_IPC_RECV_TIMEOUT;
1702 		*client = NULL;
1703 		*message = NULL;
1704 	}
1705 
1706 	evtimer_del(service->ev_timer);
1707 
1708 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%d", __func__, ret);
1709 
1710 	return ret;
1711 }
1712 
1713 /******************************************************************************
1714  *                                                                            *
1715  * Function: zbx_ipc_client_send                                              *
1716  *                                                                            *
1717  * Purpose: Sends IPC message to client                                       *
1718  *                                                                            *
1719  * Parameters: client - [IN] the IPC client                                   *
1720  *             code   - [IN] the message code                                 *
1721  *             data   - [IN] the data                                         *
1722  *             size   - [IN] the data size                                    *
1723  *                                                                            *
1724  * Comments: If data can't be written directly to socket (buffer full) then   *
1725  *           the message is queued and sent during zbx_ipc_service_recv()     *
1726  *           messaging loop whenever socket becomes ready.                    *
1727  *                                                                            *
1728  ******************************************************************************/
zbx_ipc_client_send(zbx_ipc_client_t * client,zbx_uint32_t code,const unsigned char * data,zbx_uint32_t size)1729 int	zbx_ipc_client_send(zbx_ipc_client_t *client, zbx_uint32_t code, const unsigned char *data, zbx_uint32_t size)
1730 {
1731 	zbx_uint32_t		tx_size = 0;
1732 	zbx_ipc_message_t	*message;
1733 	int			ret = FAIL;
1734 
1735 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() clientid:" ZBX_FS_UI64, __func__, client->id);
1736 
1737 	if (0 != client->tx_bytes)
1738 	{
1739 		message = ipc_message_create(code, data, size);
1740 		zbx_queue_ptr_push(&client->tx_queue, message);
1741 		ret = SUCCEED;
1742 		goto out;
1743 	}
1744 
1745 	if (FAIL == ipc_socket_write_message(&client->csocket, code, data, size, &tx_size))
1746 		goto out;
1747 
1748 	if (tx_size != ZBX_IPC_HEADER_SIZE + size)
1749 	{
1750 		client->tx_header[ZBX_IPC_MESSAGE_CODE] = code;
1751 		client->tx_header[ZBX_IPC_MESSAGE_SIZE] = size;
1752 		client->tx_data = (unsigned char *)zbx_malloc(NULL, size);
1753 		memcpy(client->tx_data, data, size);
1754 		client->tx_bytes = ZBX_IPC_HEADER_SIZE + size - tx_size;
1755 		event_add(client->tx_event, NULL);
1756 	}
1757 
1758 	ret = SUCCEED;
1759 out:
1760 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1761 
1762 	return ret;
1763 }
1764 
1765 /******************************************************************************
1766  *                                                                            *
1767  * Function: zbx_ipc_client_close                                             *
1768  *                                                                            *
1769  * Purpose: closes client socket and frees resources allocated for client     *
1770  *                                                                            *
1771  * Parameters: client - [IN] the IPC client                                   *
1772  *                                                                            *
1773  ******************************************************************************/
zbx_ipc_client_close(zbx_ipc_client_t * client)1774 void	zbx_ipc_client_close(zbx_ipc_client_t *client)
1775 {
1776 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1777 
1778 	ipc_client_free_events(client);
1779 	zbx_ipc_socket_close(&client->csocket);
1780 
1781 	ipc_service_remove_client(client->service, client);
1782 	zbx_queue_ptr_remove_value(&client->service->clients_recv, client);
1783 	zbx_ipc_client_release(client);
1784 
1785 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1786 }
1787 
zbx_ipc_client_addref(zbx_ipc_client_t * client)1788 void	zbx_ipc_client_addref(zbx_ipc_client_t *client)
1789 {
1790 	client->refcount++;
1791 }
1792 
zbx_ipc_client_release(zbx_ipc_client_t * client)1793 void	zbx_ipc_client_release(zbx_ipc_client_t *client)
1794 {
1795 	if (0 == --client->refcount)
1796 		ipc_client_free(client);
1797 }
1798 
zbx_ipc_client_connected(zbx_ipc_client_t * client)1799 int	zbx_ipc_client_connected(zbx_ipc_client_t *client)
1800 {
1801 	return (NULL == client->rx_event ? FAIL : SUCCEED);
1802 }
1803 
zbx_ipc_client_id(const zbx_ipc_client_t * client)1804 zbx_uint64_t	zbx_ipc_client_id(const zbx_ipc_client_t *client)
1805 {
1806 	return client->id;
1807 }
1808 
1809 /******************************************************************************
1810  *                                                                            *
1811  * Function: zbx_ipc_async_socket_open                                        *
1812  *                                                                            *
1813  * Purpose: opens asynchronous socket to IPC service client                   *
1814  *                                                                            *
1815  * Parameters: client       - [OUT] the IPC service client                    *
1816  *             service_name - [IN] the IPC service name                       *
1817  *             timeout      - [IN] the connection timeout                     *
1818  *             error        - [OUT] the error message                         *
1819  *                                                                            *
1820  * Return value: SUCCEED - the socket was successfully opened                 *
1821  *               FAIL    - otherwise                                          *
1822  *                                                                            *
1823  ******************************************************************************/
zbx_ipc_async_socket_open(zbx_ipc_async_socket_t * asocket,const char * service_name,int timeout,char ** error)1824 int	zbx_ipc_async_socket_open(zbx_ipc_async_socket_t *asocket, const char *service_name, int timeout, char **error)
1825 {
1826 	int	ret = FAIL, flags;
1827 
1828 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1829 
1830 	memset(asocket, 0, sizeof(zbx_ipc_async_socket_t));
1831 	asocket->client = (zbx_ipc_client_t *)zbx_malloc(NULL, sizeof(zbx_ipc_client_t));
1832 	memset(asocket->client, 0, sizeof(zbx_ipc_client_t));
1833 
1834 	if (SUCCEED != zbx_ipc_socket_open(&asocket->client->csocket, service_name, timeout, error))
1835 	{
1836 		zbx_free(asocket->client);
1837 		goto out;
1838 	}
1839 
1840 	if (-1 == (flags = fcntl(asocket->client->csocket.fd, F_GETFL, 0)))
1841 	{
1842 		zabbix_log(LOG_LEVEL_CRIT, "cannot get IPC client socket flags");
1843 		exit(EXIT_FAILURE);
1844 	}
1845 
1846 	if (-1 == fcntl(asocket->client->csocket.fd, F_SETFL, flags | O_NONBLOCK))
1847 	{
1848 		zabbix_log(LOG_LEVEL_CRIT, "cannot set non-blocking mode for IPC client socket");
1849 		exit(EXIT_FAILURE);
1850 	}
1851 
1852 	asocket->ev = event_base_new();
1853 	asocket->ev_timer = event_new(asocket->ev, -1, 0, ipc_async_socket_timer_cb, asocket);
1854 	asocket->client->rx_event = event_new(asocket->ev, asocket->client->csocket.fd, EV_READ | EV_PERSIST,
1855 			ipc_async_socket_read_event_cb, (void *)asocket);
1856 	asocket->client->tx_event = event_new(asocket->ev, asocket->client->csocket.fd, EV_WRITE | EV_PERSIST,
1857 			ipc_async_socket_write_event_cb, (void *)asocket);
1858 	event_add(asocket->client->rx_event, NULL);
1859 
1860 	asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_NONE;
1861 
1862 	ret = SUCCEED;
1863 out:
1864 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1865 	return ret;
1866 }
1867 
1868 /******************************************************************************
1869  *                                                                            *
1870  * Function: zbx_ipc_async_socket_close                                       *
1871  *                                                                            *
1872  * Purpose: closes asynchronous IPC socket and frees allocated resources      *
1873  *                                                                            *
1874  * Parameters: asocket - [IN] the asynchronous IPC socket                     *
1875  *                                                                            *
1876  ******************************************************************************/
zbx_ipc_async_socket_close(zbx_ipc_async_socket_t * asocket)1877 void	zbx_ipc_async_socket_close(zbx_ipc_async_socket_t *asocket)
1878 {
1879 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1880 
1881 	ipc_client_free(asocket->client);
1882 
1883 	event_free(asocket->ev_timer);
1884 	event_base_free(asocket->ev);
1885 
1886 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1887 }
1888 
1889 /******************************************************************************
1890  *                                                                            *
1891  * Function: zbx_ipc_async_socket_send                                        *
1892  *                                                                            *
1893  * Purpose: Sends message through asynchronous IPC socket                     *
1894  *                                                                            *
1895  * Parameters: asocket - [IN] the asynchronous IPC socket                     *
1896  *             code    - [IN] the message code                                *
1897  *             data    - [IN] the data                                        *
1898  *             size    - [IN] the data size                                   *
1899  *                                                                            *
1900  * Comments: If data can't be written directly to socket (buffer full) then   *
1901  *           the message is queued and sent during zbx_ipc_async_socket_recv()*
1902  *           or zbx_ipc_async_socket_flush() functions whenever socket becomes*
1903  *           ready.                                                           *
1904  *                                                                            *
1905  ******************************************************************************/
zbx_ipc_async_socket_send(zbx_ipc_async_socket_t * asocket,zbx_uint32_t code,const unsigned char * data,zbx_uint32_t size)1906 int	zbx_ipc_async_socket_send(zbx_ipc_async_socket_t *asocket, zbx_uint32_t code, const unsigned char *data,
1907 		zbx_uint32_t size)
1908 {
1909 	int	ret = FAIL;
1910 
1911 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1912 
1913 	ret = zbx_ipc_client_send(asocket->client, code, data, size);
1914 
1915 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1916 
1917 	return ret;
1918 }
1919 
1920 /******************************************************************************
1921  *                                                                            *
1922  * Function: zbx_ipc_async_socket_recv                                        *
1923  *                                                                            *
1924  * Purpose: receives message through asynchronous IPC socket                  *
1925  *                                                                            *
1926  * Parameters: asocket - [IN] the asynchronous IPC socket                     *
1927  *             timeout - [IN] the timeout in seconds, 0 is used for           *
1928  *                            nonblocking call and ZBX_IPC_WAIT_FOREVER is    *
1929  *                            used for blocking call without timeout          *
1930  *             message - [OUT] the received message or NULL if the client     *
1931  *                             connection was closed.                         *
1932  *                             The message must be freed by caller with       *
1933  *                             ipc_message_free() function.                   *
1934  *                                                                            *
1935  * Return value: SUCCEED - the message was read successfully or timeout       *
1936  *                         occurred                                           *
1937  *               FAIL    - otherwise                                          *
1938  *                                                                            *
1939  * Comments: After socket has been closed (or connection error has occurred)  *
1940  *           calls to zbx_ipc_client_read() will return success with buffered *
1941  *           messages, until all buffered messages are retrieved.             *
1942  *                                                                            *
1943  ******************************************************************************/
zbx_ipc_async_socket_recv(zbx_ipc_async_socket_t * asocket,int timeout,zbx_ipc_message_t ** message)1944 int	zbx_ipc_async_socket_recv(zbx_ipc_async_socket_t *asocket, int timeout, zbx_ipc_message_t **message)
1945 {
1946 	int	ret, flags;
1947 
1948 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() timeout:%d", __func__, timeout);
1949 
1950 	if (timeout != 0 && SUCCEED == zbx_queue_ptr_empty(&asocket->client->rx_queue))
1951 	{
1952 		if (ZBX_IPC_WAIT_FOREVER != timeout)
1953 		{
1954 			struct timeval	tv = {timeout, 0};
1955 			evtimer_add(asocket->ev_timer, &tv);
1956 		}
1957 		flags = EVLOOP_ONCE;
1958 	}
1959 	else
1960 		flags = EVLOOP_NONBLOCK;
1961 
1962 	asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_NONE;
1963 
1964 	do
1965 	{
1966 		event_base_loop(asocket->ev, flags);
1967 		*message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&asocket->client->rx_queue);
1968 	}
1969 	while (NULL == *message && ZBX_IPC_ASYNC_SOCKET_STATE_NONE == asocket->state);
1970 
1971 	if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_TRACE) && NULL != *message)
1972 	{
1973 		char	*data = NULL;
1974 
1975 		zbx_ipc_message_format(*message, &data);
1976 		zabbix_log(LOG_LEVEL_DEBUG, "%s() %s", __func__, data);
1977 
1978 		zbx_free(data);
1979 	}
1980 
1981 	if (NULL != *message || ZBX_IPC_ASYNC_SOCKET_STATE_ERROR != asocket->state)
1982 		ret = SUCCEED;
1983 	else
1984 		ret = FAIL;
1985 
1986 	evtimer_del(asocket->ev_timer);
1987 
1988 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%d", __func__, ret);
1989 
1990 	return ret;
1991 }
1992 
1993 /******************************************************************************
1994  *                                                                            *
1995  * Function: zbx_ipc_async_socket_flush                                       *
1996  *                                                                            *
1997  * Purpose: flushes unsent through asynchronous IPC socket                    *
1998  *                                                                            *
1999  * Parameters: asocket - [IN] the asynchronous IPC service socket             *
2000  *             timeout - [IN] the timeout in seconds, 0 is used for           *
2001  *                            nonblocking call and ZBX_IPC_WAIT_FOREVER is    *
2002  *                            used for blocking call without timeout          *
2003  *                                                                            *
2004  * Return value: SUCCEED - the data was flushed successfully or timeout       *
2005  *                         occurred. Use zbx_ipc_client_unsent_data() to      *
2006  *                         check if all data was sent.                        *
2007  *               FAIL    - failed to send data (connection was closed or an   *
2008  *                         error occurred).                                   *
2009  *                                                                            *
2010  ******************************************************************************/
zbx_ipc_async_socket_flush(zbx_ipc_async_socket_t * asocket,int timeout)2011 int	zbx_ipc_async_socket_flush(zbx_ipc_async_socket_t *asocket, int timeout)
2012 {
2013 	int	ret = FAIL, flags;
2014 
2015 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() timeout:%d", __func__, timeout);
2016 
2017 	if (0 == asocket->client->tx_bytes)
2018 	{
2019 		ret = SUCCEED;
2020 		goto out;
2021 	}
2022 
2023 	if (ZBX_IPC_ASYNC_SOCKET_STATE_ERROR == asocket->state)
2024 		goto out;
2025 
2026 	asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_NONE;
2027 
2028 	if (0 != timeout)
2029 	{
2030 		if (ZBX_IPC_WAIT_FOREVER != timeout)
2031 		{
2032 			struct timeval	tv = {timeout, 0};
2033 			evtimer_add(asocket->ev_timer, &tv);
2034 		}
2035 		flags = EVLOOP_ONCE;
2036 	}
2037 	else
2038 		flags = EVLOOP_NONBLOCK;
2039 
2040 	do
2041 	{
2042 		event_base_loop(asocket->ev, flags);
2043 
2044 		if (SUCCEED != zbx_ipc_client_connected(asocket->client))
2045 			goto out;
2046 	}
2047 	while (0 != timeout && 0 != asocket->client->tx_bytes && ZBX_IPC_ASYNC_SOCKET_STATE_NONE == asocket->state);
2048 
2049 	if (ZBX_IPC_ASYNC_SOCKET_STATE_ERROR != asocket->state)
2050 	{
2051 		ret = SUCCEED;
2052 		asocket->state = ZBX_IPC_CLIENT_STATE_NONE;
2053 	}
2054 out:
2055 	evtimer_del(asocket->ev_timer);
2056 
2057 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%d", __func__, ret);
2058 
2059 	return ret;
2060 }
2061 
2062 /******************************************************************************
2063  *                                                                            *
2064  * Function: zbx_ipc_async_socket_check_unsent                                *
2065  *                                                                            *
2066  * Purpose: checks if there are data to be sent                               *
2067  *                                                                            *
2068  * Parameters: client  - [IN] the IPC service client                          *
2069  *                                                                            *
2070  * Return value: SUCCEED - there are messages queued to be sent               *
2071  *               FAIL    - all data has been sent                             *
2072  *                                                                            *
2073  ******************************************************************************/
zbx_ipc_async_socket_check_unsent(zbx_ipc_async_socket_t * asocket)2074 int	zbx_ipc_async_socket_check_unsent(zbx_ipc_async_socket_t *asocket)
2075 {
2076 	return (0 == asocket->client->tx_bytes ? FAIL : SUCCEED);
2077 }
2078 
2079 /******************************************************************************
2080  *                                                                            *
2081  * Function: zbx_ipc_async_exchange                                           *
2082  *                                                                            *
2083  * Purpose: connect, send message and receive response in a given timeout     *
2084  *                                                                            *
2085  * Parameters: service_name - [IN] the IPC service name                       *
2086  *             code         - [IN] the message code                           *
2087  *             timeout      - [IN] time allowed to be spent on receive, note  *
2088  *                                 that this does not include open, send and  *
2089  *                                 flush that have their own timeouts         *
2090  *             data         - [IN] the data                                   *
2091  *             size         - [IN] the data size                              *
2092  *             out          - [OUT] the received message or NULL on error     *
2093  *                                  The message must be freed by zbx_free()   *
2094  *             error        - [OUT] the error message                         *
2095  *                                                                            *
2096  * Return value: SUCCEED - successfully sent message and received response    *
2097  *               FAIL    - error occurred                                     *
2098  *                                                                            *
2099  ******************************************************************************/
zbx_ipc_async_exchange(const char * service_name,zbx_uint32_t code,int timeout,const unsigned char * data,zbx_uint32_t size,unsigned char ** out,char ** error)2100 int	zbx_ipc_async_exchange(const char *service_name, zbx_uint32_t code, int timeout, const unsigned char *data,
2101 		zbx_uint32_t size, unsigned char **out, char **error)
2102 {
2103 	zbx_ipc_message_t	*message;
2104 	zbx_ipc_async_socket_t	asocket;
2105 	int			ret = FAIL;
2106 
2107 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() service:'%s' code:%u timeout:%d", __func__, service_name, code, timeout);
2108 
2109 	if (FAIL == zbx_ipc_async_socket_open(&asocket, service_name, timeout, error))
2110 		goto out;
2111 
2112 	if (FAIL == zbx_ipc_async_socket_send(&asocket, code, data, size))
2113 	{
2114 		*error = zbx_strdup(NULL, "Cannot send request");
2115 		goto fail;
2116 	}
2117 
2118 	if (FAIL == zbx_ipc_async_socket_flush(&asocket, timeout))
2119 	{
2120 		*error = zbx_strdup(NULL, "Cannot flush request");
2121 		goto fail;
2122 	}
2123 
2124 	if (FAIL == zbx_ipc_async_socket_recv(&asocket, timeout, &message))
2125 	{
2126 		*error = zbx_strdup(NULL, "Cannot receive response");
2127 		goto fail;
2128 	}
2129 
2130 	if (NULL == message)
2131 	{
2132 		*error = zbx_strdup(NULL, "Timeout while waiting for response");
2133 		goto fail;
2134 	}
2135 
2136 	*out = message->data;
2137 	message->data = NULL;
2138 
2139 	zbx_ipc_message_free(message);
2140 	ret = SUCCEED;
2141 fail:
2142 	zbx_ipc_async_socket_close(&asocket);
2143 out:
2144 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
2145 	return ret;
2146 }
2147 
2148 #endif
2149