1 #include "common.h"
2 
3 #ifdef HAVE_IPCSERVICE
4 
5 #ifdef HAVE_LIBEVENT
6 #	include <event.h>
7 #endif
8 
9 #include "zbxtypes.h"
10 #include "zbxalgo.h"
11 #include "log.h"
12 #include "zbxipcservice.h"
13 
14 #define ZBX_IPC_PATH_MAX	sizeof(((struct sockaddr_un *)0)->sun_path)
15 
16 #define ZBX_IPC_DATA_DUMP_SIZE		128
17 
18 static char	ipc_path[ZBX_IPC_PATH_MAX] = {0};
19 static size_t	ipc_path_root_len = 0;
20 
21 #define ZBX_IPC_CLIENT_STATE_NONE	0
22 #define ZBX_IPC_CLIENT_STATE_QUEUED	1
23 
24 #define ZBX_IPC_ASYNC_SOCKET_STATE_NONE		0
25 #define ZBX_IPC_ASYNC_SOCKET_STATE_TIMEOUT	1
26 #define ZBX_IPC_ASYNC_SOCKET_STATE_ERROR	2
27 
28 extern unsigned char	program_type;
29 
30 /* IPC client, providing nonblocking connections through socket */
31 struct zbx_ipc_client
32 {
33 	zbx_ipc_socket_t	csocket;
34 	zbx_ipc_service_t	*service;
35 
36 	zbx_uint32_t		rx_header[2];
37 	unsigned char		*rx_data;
38 	zbx_uint32_t		rx_bytes;
39 	zbx_queue_ptr_t		rx_queue;
40 	struct event		*rx_event;
41 
42 	zbx_uint32_t		tx_header[2];
43 	unsigned char		*tx_data;
44 	zbx_uint32_t		tx_bytes;
45 	zbx_queue_ptr_t		tx_queue;
46 	struct event		*tx_event;
47 
48 	zbx_uint64_t		id;
49 	unsigned char		state;
50 
51 	zbx_uint32_t		refcount;
52 };
53 
54 /*
55  * Private API
56  */
57 
58 #define ZBX_IPC_HEADER_SIZE	(int)(sizeof(zbx_uint32_t) * 2)
59 
60 #define ZBX_IPC_MESSAGE_CODE	0
61 #define ZBX_IPC_MESSAGE_SIZE	1
62 
63 #if !defined(LIBEVENT_VERSION_NUMBER) || LIBEVENT_VERSION_NUMBER < 0x2000000
64 typedef int evutil_socket_t;
65 
event_new(struct event_base * ev,evutil_socket_t fd,short what,void (* cb_func)(int,short,void *),void * cb_arg)66 static struct event	*event_new(struct event_base *ev, evutil_socket_t fd, short what,
67 		void(*cb_func)(int, short, void *), void *cb_arg)
68 {
69 	struct event	*event;
70 
71 	event = zbx_malloc(NULL, sizeof(struct event));
72 	event_set(event, fd, what, cb_func, cb_arg);
73 	event_base_set(ev, event);
74 
75 	return event;
76 }
77 
event_free(struct event * event)78 static void	event_free(struct event *event)
79 {
80 	event_del(event);
81 	zbx_free(event);
82 }
83 
84 #endif
85 
86 static void	ipc_client_read_event_cb(evutil_socket_t fd, short what, void *arg);
87 static void	ipc_client_write_event_cb(evutil_socket_t fd, short what, void *arg);
88 
ipc_get_path(void)89 static const char	*ipc_get_path(void)
90 {
91 	ipc_path[ipc_path_root_len] = '\0';
92 
93 	return ipc_path;
94 }
95 
96 #define ZBX_IPC_SOCKET_PREFIX	"/zabbix_"
97 #define ZBX_IPC_SOCKET_SUFFIX	".sock"
98 
99 #define ZBX_IPC_CLASS_PREFIX_NONE	""
100 #define ZBX_IPC_CLASS_PREFIX_SERVER	"server_"
101 #define ZBX_IPC_CLASS_PREFIX_PROXY	"proxy_"
102 #define ZBX_IPC_CLASS_PREFIX_AGENT	"agent_"
103 
104 /******************************************************************************
105  *                                                                            *
106  * Function: ipc_make_path                                                    *
107  *                                                                            *
108  * Purpose: makes socket path from the service name                           *
109  *                                                                            *
110  * Parameters: service_name - [IN] the service name                           *
111  *             error        - [OUT] the error message                         *
112  *                                                                            *
113  * Return value: The created path or NULL if the path exceeds unix domain     *
114  *               socket path maximum length                                   *
115  *                                                                            *
116  ******************************************************************************/
ipc_make_path(const char * service_name,char ** error)117 static const char	*ipc_make_path(const char *service_name, char **error)
118 {
119 	const char	*prefix;
120 	size_t		path_len, offset, prefix_len;
121 
122 	path_len = strlen(service_name);
123 
124 	switch (program_type)
125 	{
126 		case ZBX_PROGRAM_TYPE_SERVER:
127 			prefix = ZBX_IPC_CLASS_PREFIX_SERVER;
128 			prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_SERVER);
129 			break;
130 		case ZBX_PROGRAM_TYPE_PROXY_ACTIVE:
131 		case ZBX_PROGRAM_TYPE_PROXY_PASSIVE:
132 			prefix = ZBX_IPC_CLASS_PREFIX_PROXY;
133 			prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_PROXY);
134 			break;
135 		case ZBX_PROGRAM_TYPE_AGENTD:
136 			prefix = ZBX_IPC_CLASS_PREFIX_AGENT;
137 			prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_AGENT);
138 			break;
139 		default:
140 			prefix = ZBX_IPC_CLASS_PREFIX_NONE;
141 			prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_NONE);
142 			break;
143 	}
144 
145 	if (ZBX_IPC_PATH_MAX < ipc_path_root_len + path_len + 1 + ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_PREFIX) +
146 			ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_SUFFIX) + prefix_len)
147 	{
148 		*error = zbx_dsprintf(*error,
149 				"Socket path \"%s%s%s%s%s\" exceeds maximum length of unix domain socket path.",
150 				ipc_path, ZBX_IPC_SOCKET_PREFIX, prefix, service_name, ZBX_IPC_SOCKET_SUFFIX);
151 		return NULL;
152 	}
153 
154 	offset = ipc_path_root_len;
155 	memcpy(ipc_path + offset , ZBX_IPC_SOCKET_PREFIX, ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_PREFIX));
156 	offset += ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_PREFIX);
157 	memcpy(ipc_path + offset, prefix, prefix_len);
158 	offset += prefix_len;
159 	memcpy(ipc_path + offset, service_name, path_len);
160 	offset += path_len;
161 	memcpy(ipc_path + offset, ZBX_IPC_SOCKET_SUFFIX, ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_SUFFIX) + 1);
162 
163 	return ipc_path;
164 }
165 
166 /******************************************************************************
167  *                                                                            *
168  * Function: ipc_write_data                                                   *
169  *                                                                            *
170  * Purpose: writes data to a socket                                           *
171  *                                                                            *
172  * Parameters: fd        - [IN] the socket file descriptor                    *
173  *             data      - [IN] the data                                      *
174  *             size      - [IN] the data size                                 *
175  *             size_sent - [IN] the actual size written to socket             *
176  *                                                                            *
177  * Return value: SUCCEED - no socket errors were detected. Either the data or *
178  *                         a part of it was written to socket or a write to   *
179  *                         non-blocking socket would block                    *
180  *               FAIL    - otherwise                                          *
181  *                                                                            *
182  ******************************************************************************/
ipc_write_data(int fd,const unsigned char * data,zbx_uint32_t size,zbx_uint32_t * size_sent)183 static int	ipc_write_data(int fd, const unsigned char *data, zbx_uint32_t size, zbx_uint32_t *size_sent)
184 {
185 	zbx_uint32_t	offset = 0;
186 	int		n, ret = SUCCEED;
187 
188 	while (offset != size)
189 	{
190 		n = write(fd, data + offset, size - offset);
191 
192 		if (-1 == n)
193 		{
194 			if (EINTR == errno)
195 				continue;
196 
197 			if (EWOULDBLOCK == errno || EAGAIN == errno)
198 				break;
199 
200 			zabbix_log(LOG_LEVEL_WARNING, "cannot write to IPC socket: %s", strerror(errno));
201 			ret = FAIL;
202 			break;
203 		}
204 		offset += n;
205 	}
206 
207 	*size_sent = offset;
208 
209 	return ret;
210 }
211 
212 /******************************************************************************
213  *                                                                            *
214  * Function: ipc_read_data                                                    *
215  *                                                                            *
216  * Purpose: reads data from a socket                                          *
217  *                                                                            *
218  * Parameters: fd        - [IN] the socket file descriptor                    *
219  *             data      - [IN] the data                                      *
220  *             size      - [IN] the data size                                 *
221  *             size_sent - [IN] the actual size read from socket              *
222  *                                                                            *
223  * Return value: SUCCEED - the data was successfully read                     *
224  *               FAIL    - otherwise                                          *
225  *                                                                            *
226  * Comments: When reading data from non-blocking sockets SUCCEED will be      *
227  *           returned also if there were no more data to read.                *
228  *                                                                            *
229  ******************************************************************************/
ipc_read_data(int fd,unsigned char * buffer,zbx_uint32_t size,zbx_uint32_t * read_size)230 static int	ipc_read_data(int fd, unsigned char *buffer, zbx_uint32_t size, zbx_uint32_t *read_size)
231 {
232 	int	n;
233 
234 	*read_size = 0;
235 
236 	while (-1 == (n = read(fd, buffer + *read_size, size - *read_size)))
237 	{
238 		if (EINTR == errno)
239 			continue;
240 
241 		if (EWOULDBLOCK == errno || EAGAIN == errno)
242 			return SUCCEED;
243 
244 		return FAIL;
245 	}
246 
247 	if (0 == n)
248 		return FAIL;
249 
250 	*read_size += n;
251 
252 	return SUCCEED;
253 }
254 
255 /******************************************************************************
256  *                                                                            *
257  * Function: ipc_read_data_full                                               *
258  *                                                                            *
259  * Purpose: reads data from a socket until the requested data has been read   *
260  *                                                                            *
261  * Parameters: fd        - [IN] the socket file descriptor                    *
262  *             buffer    - [IN] the data                                      *
263  *             size      - [IN] the data size                                 *
264  *             read_size - [IN] the actual size read from socket              *
265  *                                                                            *
266  * Return value: SUCCEED - the data was successfully read                     *
267  *               FAIL    - otherwise                                          *
268  *                                                                            *
269  * Comments: When reading data from non-blocking sockets this function will   *
270  *           return SUCCEED if there are no data to read, even if not all of  *
271  *           the requested data has been read.                                *
272  *                                                                            *
273  ******************************************************************************/
ipc_read_data_full(int fd,unsigned char * buffer,zbx_uint32_t size,zbx_uint32_t * read_size)274 static int	ipc_read_data_full(int fd, unsigned char *buffer, zbx_uint32_t size, zbx_uint32_t *read_size)
275 {
276 	int		ret = FAIL;
277 	zbx_uint32_t	offset = 0, chunk_size;
278 
279 	*read_size = 0;
280 
281 	while (offset < size)
282 	{
283 		if (FAIL == ipc_read_data(fd, buffer + offset, size - offset, &chunk_size))
284 			goto out;
285 
286 		if (0 == chunk_size)
287 			break;
288 
289 		offset += chunk_size;
290 	}
291 
292 	ret = SUCCEED;
293 out:
294 	*read_size = offset;
295 
296 	return ret;
297 }
298 
299 /******************************************************************************
300  *                                                                            *
301  * Function: ipc_socket_write_message                                         *
302  *                                                                            *
303  * Purpose: writes IPC message to socket                                      *
304  *                                                                            *
305  * Parameters: csocket - [IN] the IPC socket                                  *
306  *             code    - [IN] the message code                                *
307  *             data    - [IN] the data                                        *
308  *             size    - [IN] the data size                                   *
309  *             tx_size - [IN] the actual size written to socket               *
310  *                                                                            *
311  * Return value: SUCCEED - no socket errors were detected. Either the data or *
312  *                         a part of it was written to socket or a write to   *
313  *                         non-blocking socket would block                    *
314  *               FAIL    - otherwise                                          *
315  *                                                                            *
316  * Comments: When using non-blocking sockets the tx_size parameter must be    *
317  *           checked in addition to return value to tell if the message was   *
318  *           sent successfully.                                               *
319  *                                                                            *
320  ******************************************************************************/
ipc_socket_write_message(zbx_ipc_socket_t * csocket,zbx_uint32_t code,const unsigned char * data,zbx_uint32_t size,zbx_uint32_t * tx_size)321 static int	ipc_socket_write_message(zbx_ipc_socket_t *csocket, zbx_uint32_t code, const unsigned char *data,
322 		zbx_uint32_t size, zbx_uint32_t *tx_size)
323 {
324 	int		ret;
325 	zbx_uint32_t	size_data, buffer[ZBX_IPC_SOCKET_BUFFER_SIZE / sizeof(zbx_uint32_t)];
326 
327 	buffer[0] = code;
328 	buffer[1] = size;
329 
330 	if (ZBX_IPC_SOCKET_BUFFER_SIZE - ZBX_IPC_HEADER_SIZE >= size)
331 	{
332 		if (0 != size)
333 			memcpy(buffer + 2, data, size);
334 
335 		return ipc_write_data(csocket->fd, (unsigned char *)buffer, size + ZBX_IPC_HEADER_SIZE, tx_size);
336 	}
337 
338 	if (FAIL == ipc_write_data(csocket->fd, (unsigned char *)buffer, ZBX_IPC_HEADER_SIZE, tx_size))
339 		return FAIL;
340 
341 	/* in the case of non-blocking sockets only a part of the header might be sent */
342 	if (ZBX_IPC_HEADER_SIZE != *tx_size)
343 		return SUCCEED;
344 
345 	ret = ipc_write_data(csocket->fd, data, size, &size_data);
346 	*tx_size += size_data;
347 
348 	return ret;
349 }
350 
351 /******************************************************************************
352  *                                                                            *
353  * Function: ipc_read_buffer                                                  *
354  *                                                                            *
355  * Purpose: reads message header and data from buffer                         *
356  *                                                                            *
357  * Parameters: header      - [IN/OUT] the message header                      *
358  *             data        - [OUT] the message data                           *
359  *             rx_bytes    - [IN] the number of bytes stored in message       *
360  *                                (including header)                          *
361  *             buffer      - [IN] the buffer to parse                         *
362  *             size        - [IN] the number of bytes to parse                *
363  *             read_size   - [OUT] the number of bytes read                   *
364  *                                                                            *
365  * Return value: SUCCEED - message was successfully parsed                    *
366  *               FAIL - not enough data                                       *
367  *                                                                            *
368  ******************************************************************************/
ipc_read_buffer(zbx_uint32_t * header,unsigned char ** data,zbx_uint32_t rx_bytes,const unsigned char * buffer,zbx_uint32_t size,zbx_uint32_t * read_size)369 static int	ipc_read_buffer(zbx_uint32_t *header, unsigned char **data, zbx_uint32_t rx_bytes,
370 		const unsigned char *buffer, zbx_uint32_t size, zbx_uint32_t *read_size)
371 {
372 	zbx_uint32_t	copy_size, data_size, data_offset;
373 
374 	*read_size = 0;
375 
376 	if (ZBX_IPC_HEADER_SIZE > rx_bytes)
377 	{
378 		copy_size = MIN(ZBX_IPC_HEADER_SIZE - rx_bytes, size);
379 		memcpy((char *)header + rx_bytes, buffer, copy_size);
380 		*read_size += copy_size;
381 
382 		if (ZBX_IPC_HEADER_SIZE > rx_bytes + copy_size)
383 			return FAIL;
384 
385 		data_size = header[ZBX_IPC_MESSAGE_SIZE];
386 
387 		if (0 == data_size)
388 		{
389 			*data = NULL;
390 			return SUCCEED;
391 		}
392 
393 		*data = (unsigned char *)zbx_malloc(NULL, data_size);
394 		data_offset = 0;
395 	}
396 	else
397 	{
398 		data_size = header[ZBX_IPC_MESSAGE_SIZE];
399 		data_offset = rx_bytes - ZBX_IPC_HEADER_SIZE;
400 	}
401 
402 	copy_size = MIN(data_size - data_offset, size - *read_size);
403 	memcpy(*data + data_offset, buffer + *read_size, copy_size);
404 	*read_size += copy_size;
405 
406 	return (rx_bytes + *read_size == data_size + ZBX_IPC_HEADER_SIZE ? SUCCEED : FAIL);
407 }
408 
409 /******************************************************************************
410  *                                                                            *
411  * Function: ipc_message_is_completed                                         *
412  *                                                                            *
413  * Purpose: checks if IPC message has been completed                          *
414  *                                                                            *
415  * Parameters: header   - [IN] the message header                             *
416  *             rx_bytes - [IN] the number of bytes set in message             *
417  *                             (including header)                             *
418  *                                                                            *
419  * Return value:  SUCCEED - message has been completed                        *
420  *                FAIL - otherwise                                            *
421  *                                                                            *
422  ******************************************************************************/
ipc_message_is_completed(const zbx_uint32_t * header,zbx_uint32_t rx_bytes)423 static int	ipc_message_is_completed(const zbx_uint32_t *header, zbx_uint32_t rx_bytes)
424 {
425 	if (ZBX_IPC_HEADER_SIZE > rx_bytes)
426 		return FAIL;
427 
428 	if (header[ZBX_IPC_MESSAGE_SIZE] + ZBX_IPC_HEADER_SIZE != rx_bytes)
429 		return FAIL;
430 
431 	return SUCCEED;
432 }
433 
434 /******************************************************************************
435  *                                                                            *
436  * Function: ipc_socket_read_message                                          *
437  *                                                                            *
438  * Purpose: reads IPC message from buffered client socket                     *
439  *                                                                            *
440  * Parameters: csocket  - [IN] the source socket                              *
441  *             header   - [OUT] the header of the message                     *
442  *             data     - [OUT] the data of the message                       *
443  *             rx_bytes - [IN/OUT] the total message size read (including     *
444  *                                 header                                     *
445  *                                                                            *
446  * Return value:  SUCCEED - data was read successfully, check rx_bytes to     *
447  *                          determine if the message was completed.           *
448  *                FAIL - failed to read message (socket error or connection   *
449  *                       was closed).                                         *
450  *                                                                            *
451  ******************************************************************************/
ipc_socket_read_message(zbx_ipc_socket_t * csocket,zbx_uint32_t * header,unsigned char ** data,zbx_uint32_t * rx_bytes)452 static int	ipc_socket_read_message(zbx_ipc_socket_t *csocket, zbx_uint32_t *header, unsigned char **data,
453 		zbx_uint32_t *rx_bytes)
454 {
455 	zbx_uint32_t	data_size, offset, read_size = 0;
456 	int		ret = FAIL;
457 
458 	/* try to read message from socket buffer */
459 	if (csocket->rx_buffer_bytes > csocket->rx_buffer_offset)
460 	{
461 		ret = ipc_read_buffer(header, data, *rx_bytes, csocket->rx_buffer + csocket->rx_buffer_offset,
462 				csocket->rx_buffer_bytes - csocket->rx_buffer_offset, &read_size);
463 
464 		csocket->rx_buffer_offset += read_size;
465 		*rx_bytes += read_size;
466 
467 		if (SUCCEED == ret)
468 			goto out;
469 	}
470 
471 	/* not enough data in socket buffer, try to read more until message is completed or no data to read */
472 	while (SUCCEED != ret)
473 	{
474 		csocket->rx_buffer_offset = 0;
475 		csocket->rx_buffer_bytes = 0;
476 
477 		if (ZBX_IPC_HEADER_SIZE < *rx_bytes)
478 		{
479 			offset = *rx_bytes - ZBX_IPC_HEADER_SIZE;
480 			data_size = header[ZBX_IPC_MESSAGE_SIZE] - offset;
481 
482 			/* long messages will be read directly into message buffer */
483 			if (ZBX_IPC_SOCKET_BUFFER_SIZE * 0.75 < data_size)
484 			{
485 				ret = ipc_read_data_full(csocket->fd, *data + offset, data_size, &read_size);
486 				*rx_bytes += read_size;
487 				goto out;
488 			}
489 		}
490 
491 		if (FAIL == ipc_read_data(csocket->fd, csocket->rx_buffer, ZBX_IPC_SOCKET_BUFFER_SIZE, &read_size))
492 			goto out;
493 
494 		/* it's possible that nothing will be read on non-blocking sockets, return success */
495 		if (0 == read_size)
496 		{
497 			ret = SUCCEED;
498 			goto out;
499 		}
500 
501 		csocket->rx_buffer_bytes = read_size;
502 
503 		ret = ipc_read_buffer(header, data, *rx_bytes, csocket->rx_buffer, csocket->rx_buffer_bytes,
504 				&read_size);
505 
506 		csocket->rx_buffer_offset += read_size;
507 		*rx_bytes += read_size;
508 	}
509 out:
510 	return ret;
511 }
512 
513 /******************************************************************************
514  *                                                                            *
515  * Function: ipc_client_free_event                                            *
516  *                                                                            *
517  * Purpose: frees client's libevent event                                     *
518  *                                                                            *
519  * Parameters: client - [IN] the client                                       *
520  *                                                                            *
521  ******************************************************************************/
ipc_client_free_events(zbx_ipc_client_t * client)522 static void	ipc_client_free_events(zbx_ipc_client_t *client)
523 {
524 	if (NULL != client->rx_event)
525 	{
526 		event_free(client->rx_event);
527 		client->rx_event = NULL;
528 	}
529 
530 	if (NULL != client->tx_event)
531 	{
532 		event_free(client->tx_event);
533 		client->tx_event = NULL;
534 	}
535 }
536 
537 /******************************************************************************
538  *                                                                            *
539  * Function: ipc_client_free                                                  *
540  *                                                                            *
541  * Purpose: frees IPC service client                                          *
542  *                                                                            *
543  * Parameters: client - [IN] the client to free                               *
544  *                                                                            *
545  ******************************************************************************/
ipc_client_free(zbx_ipc_client_t * client)546 static void	ipc_client_free(zbx_ipc_client_t *client)
547 {
548 	zbx_ipc_message_t	*message;
549 
550 	ipc_client_free_events(client);
551 	zbx_ipc_socket_close(&client->csocket);
552 
553 	while (NULL != (message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&client->rx_queue)))
554 		zbx_ipc_message_free(message);
555 
556 	zbx_queue_ptr_destroy(&client->rx_queue);
557 	zbx_free(client->rx_data);
558 
559 	while (NULL != (message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&client->tx_queue)))
560 		zbx_ipc_message_free(message);
561 
562 	zbx_queue_ptr_destroy(&client->tx_queue);
563 	zbx_free(client->tx_data);
564 
565 	ipc_client_free_events(client);
566 
567 	zbx_free(client);
568 }
569 
570 /******************************************************************************
571  *                                                                            *
572  * Function: ipc_client_push_rx_message                                       *
573  *                                                                            *
574  * Purpose: adds message to received messages queue                           *
575  *                                                                            *
576  * Parameters: client - [IN] the client to read                               *
577  *                                                                            *
578  ******************************************************************************/
ipc_client_push_rx_message(zbx_ipc_client_t * client)579 static void	ipc_client_push_rx_message(zbx_ipc_client_t *client)
580 {
581 	zbx_ipc_message_t	*message;
582 
583 	message = (zbx_ipc_message_t *)zbx_malloc(NULL, sizeof(zbx_ipc_message_t));
584 	message->code = client->rx_header[ZBX_IPC_MESSAGE_CODE];
585 	message->size = client->rx_header[ZBX_IPC_MESSAGE_SIZE];
586 	message->data = client->rx_data;
587 	zbx_queue_ptr_push(&client->rx_queue, message);
588 
589 	client->rx_data = NULL;
590 	client->rx_bytes = 0;
591 }
592 
593 /******************************************************************************
594  *                                                                            *
595  * Function: ipc_client_pop_tx_message                                        *
596  *                                                                            *
597  * Purpose: prepares to send the next message in send queue                   *
598  *                                                                            *
599  * Parameters: client - [IN] the client                                       *
600  *                                                                            *
601  ******************************************************************************/
ipc_client_pop_tx_message(zbx_ipc_client_t * client)602 static void	ipc_client_pop_tx_message(zbx_ipc_client_t *client)
603 {
604 	zbx_ipc_message_t	*message;
605 
606 	zbx_free(client->tx_data);
607 	client->tx_bytes = 0;
608 
609 	if (NULL == (message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&client->tx_queue)))
610 		return;
611 
612 	client->tx_bytes = ZBX_IPC_HEADER_SIZE + message->size;
613 	client->tx_header[ZBX_IPC_MESSAGE_CODE] = message->code;
614 	client->tx_header[ZBX_IPC_MESSAGE_SIZE] = message->size;
615 	client->tx_data = message->data;
616 	zbx_free(message);
617 }
618 
619 /******************************************************************************
620  *                                                                            *
621  * Function: ipc_client_read                                                  *
622  *                                                                            *
623  * Purpose: reads data from IPC service client                                *
624  *                                                                            *
625  * Parameters: client - [IN] the client to read                               *
626  *                                                                            *
627  * Return value:  FAIL - read error/connection was closed                     *
628  *                                                                            *
629  * Comments: This function reads data from socket, parses it and adds         *
630  *           parsed messages to received messages queue.                      *
631  *                                                                            *
632  ******************************************************************************/
ipc_client_read(zbx_ipc_client_t * client)633 static int	ipc_client_read(zbx_ipc_client_t *client)
634 {
635 	int	rc;
636 
637 	do
638 	{
639 		if (FAIL == ipc_socket_read_message(&client->csocket, client->rx_header, &client->rx_data,
640 				&client->rx_bytes))
641 		{
642 			zbx_free(client->rx_data);
643 			client->rx_bytes = 0;
644 			return FAIL;
645 		}
646 
647 		if (SUCCEED == (rc = ipc_message_is_completed(client->rx_header, client->rx_bytes)))
648 			ipc_client_push_rx_message(client);
649 	}
650 
651 	while (SUCCEED == rc);
652 
653 	return SUCCEED;
654 }
655 
656 /******************************************************************************
657  *                                                                            *
658  * Function: ipc_client_write                                                 *
659  *                                                                            *
660  * Purpose: writes queued data to IPC service client                          *
661  *                                                                            *
662  * Parameters: client - [IN] the client                                       *
663  *                                                                            *
664  * Return value: SUCCEED - the data was sent successfully                     *
665  *               FAIL    - otherwise                                          *
666  *                                                                            *
667  ******************************************************************************/
ipc_client_write(zbx_ipc_client_t * client)668 static int	ipc_client_write(zbx_ipc_client_t *client)
669 {
670 	zbx_uint32_t	data_size, write_size;
671 
672 	data_size = client->tx_header[ZBX_IPC_MESSAGE_SIZE];
673 
674 	if (data_size < client->tx_bytes)
675 	{
676 		zbx_uint32_t	size, offset;
677 
678 		size = client->tx_bytes - data_size;
679 		offset = ZBX_IPC_HEADER_SIZE - size;
680 
681 		if (SUCCEED != ipc_write_data(client->csocket.fd, (unsigned char *)client->tx_header + offset, size,
682 				&write_size))
683 		{
684 			return FAIL;
685 		}
686 
687 		client->tx_bytes -= write_size;
688 
689 		if (data_size < client->tx_bytes)
690 			return SUCCEED;
691 	}
692 
693 	while (0 < client->tx_bytes)
694 	{
695 		if (SUCCEED != ipc_write_data(client->csocket.fd, client->tx_data + data_size - client->tx_bytes,
696 				client->tx_bytes, &write_size))
697 		{
698 			return FAIL;
699 		}
700 
701 		if (0 == write_size)
702 			return SUCCEED;
703 
704 		client->tx_bytes -= write_size;
705 	}
706 
707 	if (0 == client->tx_bytes)
708 		ipc_client_pop_tx_message(client);
709 
710 	return SUCCEED;
711 }
712 
713 /******************************************************************************
714  *                                                                            *
715  * Function: ipc_service_pop_client                                           *
716  *                                                                            *
717  * Purpose: gets the next client with messages/closed socket from recv queue  *
718  *                                                                            *
719  * Parameters: service - [IN] the IPC service                                 *
720  *                                                                            *
721  * Return value: The client with messages/closed socket                       *
722  *                                                                            *
723  ******************************************************************************/
ipc_service_pop_client(zbx_ipc_service_t * service)724 static zbx_ipc_client_t	*ipc_service_pop_client(zbx_ipc_service_t *service)
725 {
726 	zbx_ipc_client_t	*client;
727 
728 	if (NULL != (client = (zbx_ipc_client_t *)zbx_queue_ptr_pop(&service->clients_recv)))
729 		client->state = ZBX_IPC_CLIENT_STATE_NONE;
730 
731 	return client;
732 }
733 
734 /******************************************************************************
735  *                                                                            *
736  * Function: ipc_service_push_client                                          *
737  *                                                                            *
738  * Purpose: pushes client to the recv queue if needed                         *
739  *                                                                            *
740  * Parameters: service - [IN] the IPC service                                 *
741  *             client  - [IN] the IPC client                                  *
742  *                                                                            *
743  * Comments: The client is pushed to the recv queue if it isn't already there *
744  *           and there is messages to return or the client connection was     *
745  *           closed.                                                          *
746  *                                                                            *
747  ******************************************************************************/
ipc_service_push_client(zbx_ipc_service_t * service,zbx_ipc_client_t * client)748 static void	ipc_service_push_client(zbx_ipc_service_t *service, zbx_ipc_client_t *client)
749 {
750 	if (ZBX_IPC_CLIENT_STATE_QUEUED == client->state)
751 		return;
752 
753 	if (0 == zbx_queue_ptr_values_num(&client->rx_queue) && NULL != client->rx_event)
754 		return;
755 
756 	client->state = ZBX_IPC_CLIENT_STATE_QUEUED;
757 	zbx_queue_ptr_push(&service->clients_recv, client);
758 }
759 
760 /******************************************************************************
761  *                                                                            *
762  * Function: ipc_service_add_client                                           *
763  *                                                                            *
764  * Purpose: adds a new IPC service client                                     *
765  *                                                                            *
766  * Parameters: service - [IN] the IPC service                                 *
767  *             fd      - [IN] the client socket descriptor                    *
768  *                                                                            *
769  ******************************************************************************/
ipc_service_add_client(zbx_ipc_service_t * service,int fd)770 static void	ipc_service_add_client(zbx_ipc_service_t *service, int fd)
771 {
772 	const char		*__function_name = "ipc_service_add_client";
773 	static zbx_uint64_t	next_clientid = 1;
774 	zbx_ipc_client_t	*client;
775 	int			flags;
776 
777 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
778 
779 	client = (zbx_ipc_client_t *)zbx_malloc(NULL, sizeof(zbx_ipc_client_t));
780 	memset(client, 0, sizeof(zbx_ipc_client_t));
781 
782 	if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
783 	{
784 		zabbix_log(LOG_LEVEL_CRIT, "cannot get IPC client socket flags");
785 		exit(EXIT_FAILURE);
786 	}
787 
788 	if (-1 == fcntl(fd, F_SETFL, flags | O_NONBLOCK))
789 	{
790 		zabbix_log(LOG_LEVEL_CRIT, "cannot set non-blocking mode for IPC client socket");
791 		exit(EXIT_FAILURE);
792 	}
793 
794 	client->csocket.fd = fd;
795 	client->csocket.rx_buffer_bytes = 0;
796 	client->csocket.rx_buffer_offset = 0;
797 	client->id = next_clientid++;
798 	client->state = ZBX_IPC_CLIENT_STATE_NONE;
799 	client->refcount = 1;
800 
801 	zbx_queue_ptr_create(&client->rx_queue);
802 	zbx_queue_ptr_create(&client->tx_queue);
803 
804 	client->service = service;
805 	client->rx_event = event_new(service->ev, fd, EV_READ | EV_PERSIST, ipc_client_read_event_cb, (void *)client);
806 	client->tx_event = event_new(service->ev, fd, EV_WRITE | EV_PERSIST, ipc_client_write_event_cb, (void *)client);
807 	event_add(client->rx_event, NULL);
808 
809 	zbx_vector_ptr_append(&service->clients, client);
810 
811 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s() clientid:" ZBX_FS_UI64, __function_name, client->id);
812 }
813 
814 /******************************************************************************
815  *                                                                            *
816  * Function: ipc_service_remove_client                                        *
817  *                                                                            *
818  * Purpose: removes IPC service client                                        *
819  *                                                                            *
820  * Parameters: service - [IN] the IPC service                                 *
821  *             client  - [IN] the client to remove                            *
822  *                                                                            *
823  ******************************************************************************/
ipc_service_remove_client(zbx_ipc_service_t * service,zbx_ipc_client_t * client)824 static void	ipc_service_remove_client(zbx_ipc_service_t *service, zbx_ipc_client_t *client)
825 {
826 	int		i;
827 
828 	for (i = 0; i < service->clients.values_num; i++)
829 	{
830 		if (service->clients.values[i] == client)
831 			zbx_vector_ptr_remove_noorder(&service->clients, i);
832 	}
833 }
834 
835 /******************************************************************************
836  *                                                                            *
837  * Function: ipc_client_read_event_cb                                         *
838  *                                                                            *
839  * Purpose: service client read event libevent callback                       *
840  *                                                                            *
841  ******************************************************************************/
ipc_client_read_event_cb(evutil_socket_t fd,short what,void * arg)842 static void	ipc_client_read_event_cb(evutil_socket_t fd, short what, void *arg)
843 {
844 	zbx_ipc_client_t	*client = (zbx_ipc_client_t *)arg;
845 
846 	ZBX_UNUSED(fd);
847 	ZBX_UNUSED(what);
848 
849 	if (SUCCEED != ipc_client_read(client))
850 	{
851 		ipc_client_free_events(client);
852 		ipc_service_remove_client(client->service, client);
853 	}
854 
855 	ipc_service_push_client(client->service, client);
856 }
857 
858 /******************************************************************************
859  *                                                                            *
860  * Function: ipc_client_write_event_cb                                        *
861  *                                                                            *
862  * Purpose: service client write event libevent callback                      *
863  *                                                                            *
864  ******************************************************************************/
ipc_client_write_event_cb(evutil_socket_t fd,short what,void * arg)865 static void	ipc_client_write_event_cb(evutil_socket_t fd, short what, void *arg)
866 {
867 	zbx_ipc_client_t	*client = (zbx_ipc_client_t *)arg;
868 
869 	ZBX_UNUSED(fd);
870 	ZBX_UNUSED(what);
871 
872 	if (SUCCEED != ipc_client_write(client))
873 	{
874 		zabbix_log(LOG_LEVEL_CRIT, "cannot send data to IPC client");
875 		zbx_ipc_client_close(client);
876 		return;
877 	}
878 
879 	if (0 == client->tx_bytes)
880 		event_del(client->tx_event);
881 }
882 
883 /******************************************************************************
884  *                                                                            *
885  * Function: ipc_async_socket_write_event_cb                                  *
886  *                                                                            *
887  * Purpose: asynchronous socket write event libevent callback                 *
888  *                                                                            *
889  ******************************************************************************/
ipc_async_socket_write_event_cb(evutil_socket_t fd,short what,void * arg)890 static void	ipc_async_socket_write_event_cb(evutil_socket_t fd, short what, void *arg)
891 {
892 	zbx_ipc_async_socket_t	*asocket = (zbx_ipc_async_socket_t *)arg;
893 
894 	ZBX_UNUSED(fd);
895 	ZBX_UNUSED(what);
896 
897 	if (SUCCEED != ipc_client_write(asocket->client))
898 	{
899 		zabbix_log(LOG_LEVEL_CRIT, "cannot send data to IPC client");
900 		ipc_client_free_events(asocket->client);
901 		zbx_ipc_socket_close(&asocket->client->csocket);
902 		asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_ERROR;
903 		return;
904 	}
905 
906 	if (0 == asocket->client->tx_bytes)
907 		event_del(asocket->client->tx_event);
908 }
909 
910 /******************************************************************************
911  *                                                                            *
912  * Function: ipc_async_socket_read_event_cb                                   *
913  *                                                                            *
914  * Purpose: asynchronous socket read event libevent callback                  *
915  *                                                                            *
916  ******************************************************************************/
ipc_async_socket_read_event_cb(evutil_socket_t fd,short what,void * arg)917 static void	ipc_async_socket_read_event_cb(evutil_socket_t fd, short what, void *arg)
918 {
919 	zbx_ipc_async_socket_t	*asocket = (zbx_ipc_async_socket_t *)arg;
920 
921 	ZBX_UNUSED(fd);
922 	ZBX_UNUSED(what);
923 
924 	if (SUCCEED != ipc_client_read(asocket->client))
925 	{
926 		ipc_client_free_events(asocket->client);
927 		asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_ERROR;
928 	}
929 }
930 
931 /******************************************************************************
932  *                                                                            *
933  * Function: ipc_async_socket_timer_cb                                        *
934  *                                                                            *
935  * Purpose: timer callback                                                    *
936  *                                                                            *
937  ******************************************************************************/
ipc_async_socket_timer_cb(evutil_socket_t fd,short what,void * arg)938 static void	ipc_async_socket_timer_cb(evutil_socket_t fd, short what, void *arg)
939 {
940 	zbx_ipc_async_socket_t	*asocket = (zbx_ipc_async_socket_t *)arg;
941 
942 	ZBX_UNUSED(fd);
943 	ZBX_UNUSED(what);
944 
945 	asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_TIMEOUT;
946 }
947 
948 /******************************************************************************
949  *                                                                            *
950  * Function: ipc_service_accept                                               *
951  *                                                                            *
952  * Purpose: accepts a new client connection                                   *
953  *                                                                            *
954  * Parameters: service - [IN] the IPC service                                 *
955  *                                                                            *
956  ******************************************************************************/
ipc_service_accept(zbx_ipc_service_t * service)957 static void	ipc_service_accept(zbx_ipc_service_t *service)
958 {
959 	const char	*__function_name = "ipc_service_accept";
960 	int		fd;
961 
962 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
963 
964 	while (-1 == (fd = accept(service->fd, NULL, NULL)))
965 	{
966 		if (EINTR != errno)
967 		{
968 			/* If there is unaccepted connection libevent will call registered callback function over and */
969 			/* over again. It is better to exit straight away and cause all other processes to stop. */
970 			zabbix_log(LOG_LEVEL_CRIT, "cannot accept incoming IPC connection: %s", zbx_strerror(errno));
971 			exit(EXIT_FAILURE);
972 		}
973 	}
974 
975 	ipc_service_add_client(service, fd);
976 
977 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
978 }
979 
980 /******************************************************************************
981  *                                                                            *
982  * Function: ipc_message_create                                               *
983  *                                                                            *
984  * Purpose: creates IPC message                                               *
985  *                                                                            *
986  * Parameters: code    - [IN] the message code                                *
987  *             data    - [IN] the data                                        *
988  *             size    - [IN] the data size                                   *
989  *                                                                            *
990  * Return value: The created message.                                         *
991  *                                                                            *
992  ******************************************************************************/
ipc_message_create(zbx_uint32_t code,const unsigned char * data,zbx_uint32_t size)993 static zbx_ipc_message_t	*ipc_message_create(zbx_uint32_t code, const unsigned char *data, zbx_uint32_t size)
994 {
995 	zbx_ipc_message_t	*message;
996 
997 	message = (zbx_ipc_message_t *)zbx_malloc(NULL, sizeof(zbx_ipc_message_t));
998 
999 	message->code = code;
1000 	message->size = size;
1001 
1002 	if (0 != size)
1003 	{
1004 		message->data = (unsigned char *)zbx_malloc(NULL, size);
1005 		memcpy(message->data, data, size);
1006 	}
1007 	else
1008 		message->data = NULL;
1009 
1010 	return message;
1011 }
1012 
1013 /******************************************************************************
1014  *                                                                            *
1015  * Function: ipc_service_event_log                                            *
1016  *                                                                            *
1017  * Purpose: libevent logging callback                                         *
1018  *                                                                            *
1019  ******************************************************************************/
ipc_service_event_log_cb(int severity,const char * msg)1020 static void ipc_service_event_log_cb(int severity, const char *msg)
1021 {
1022 	int	loglevel;
1023 
1024 	switch (severity)
1025 	{
1026 		case _EVENT_LOG_DEBUG:
1027 			loglevel = LOG_LEVEL_TRACE;
1028 			break;
1029 		case _EVENT_LOG_MSG:
1030 			loglevel = LOG_LEVEL_DEBUG;
1031 			break;
1032 		case _EVENT_LOG_WARN:
1033 			loglevel = LOG_LEVEL_WARNING;
1034 			break;
1035 		case _EVENT_LOG_ERR:
1036 			loglevel = LOG_LEVEL_DEBUG;
1037 			break;
1038 		default:
1039 			loglevel = LOG_LEVEL_DEBUG;
1040 			break;
1041 	}
1042 
1043 	zabbix_log(loglevel, "IPC service: %s", msg);
1044 }
1045 
1046 /******************************************************************************
1047  *                                                                            *
1048  * Function: ipc_service_init_libevent                                        *
1049  *                                                                            *
1050  * Purpose: initialize libevent library                                       *
1051  *                                                                            *
1052  ******************************************************************************/
ipc_service_init_libevent(void)1053 static void	ipc_service_init_libevent(void)
1054 {
1055 	event_set_log_callback(ipc_service_event_log_cb);
1056 }
1057 
1058 /******************************************************************************
1059  *                                                                            *
1060  * Function: ipc_service_free_libevent                                        *
1061  *                                                                            *
1062  * Purpose: uninitialize libevent library                                     *
1063  *                                                                            *
1064  ******************************************************************************/
ipc_service_free_libevent(void)1065 static void	ipc_service_free_libevent(void)
1066 {
1067 }
1068 
1069 /******************************************************************************
1070  *                                                                            *
1071  * Function: ipc_service_client_connected_cb                                  *
1072  *                                                                            *
1073  * Purpose: libevent listener callback                                        *
1074  *                                                                            *
1075  ******************************************************************************/
ipc_service_client_connected_cb(evutil_socket_t fd,short what,void * arg)1076 static void	ipc_service_client_connected_cb(evutil_socket_t fd, short what, void *arg)
1077 {
1078 	zbx_ipc_service_t	*service = (zbx_ipc_service_t *)arg;
1079 
1080 	ZBX_UNUSED(fd);
1081 	ZBX_UNUSED(what);
1082 
1083 	ipc_service_accept(service);
1084 }
1085 
1086 /******************************************************************************
1087  *                                                                            *
1088  * Function: ipc_service_timer_cb                                             *
1089  *                                                                            *
1090  * Purpose: timer callback                                                    *
1091  *                                                                            *
1092  ******************************************************************************/
ipc_service_timer_cb(evutil_socket_t fd,short what,void * arg)1093 static void	ipc_service_timer_cb(evutil_socket_t fd, short what, void *arg)
1094 {
1095 	ZBX_UNUSED(fd);
1096 	ZBX_UNUSED(what);
1097 	ZBX_UNUSED(arg);
1098 }
1099 
1100 /******************************************************************************
1101  *                                                                            *
1102  * Function: ipc_check_running_service                                        *
1103  *                                                                            *
1104  * Purpose: checks if an IPC service is already running                       *
1105  *                                                                            *
1106  * Parameters: service_name - [IN]                                            *
1107  *                                                                            *
1108  ******************************************************************************/
ipc_check_running_service(const char * service_name)1109 static int	ipc_check_running_service(const char *service_name)
1110 {
1111 	zbx_ipc_socket_t	csocket;
1112 	int			ret;
1113 	char			*error = NULL;
1114 
1115 	if (SUCCEED == (ret = zbx_ipc_socket_open(&csocket, service_name, 0, &error)))
1116 		zbx_ipc_socket_close(&csocket);
1117 	else
1118 		zbx_free(error);
1119 
1120 	return ret;
1121 }
1122 
1123 /*
1124  * Public client API
1125  */
1126 
1127 /******************************************************************************
1128  *                                                                            *
1129  * Function: zbx_ipc_socket_open                                              *
1130  *                                                                            *
1131  * Purpose: opens socket to an IPC service listening on the specified path    *
1132  *                                                                            *
1133  * Parameters: csocket      - [OUT] the IPC socket to the service             *
1134  *             service_name - [IN] the IPC service name                       *
1135  *             timeout      - [IN] the connection timeout                     *
1136  *             error        - [OUT] the error message                         *
1137  *                                                                            *
1138  * Return value: SUCCEED - the socket was successfully opened                 *
1139  *               FAIL    - otherwise                                          *
1140  *                                                                            *
1141  ******************************************************************************/
zbx_ipc_socket_open(zbx_ipc_socket_t * csocket,const char * service_name,int timeout,char ** error)1142 int	zbx_ipc_socket_open(zbx_ipc_socket_t *csocket, const char *service_name, int timeout, char **error)
1143 {
1144 	const char		*__function_name = "zbx_ipc_socket_open";
1145 	struct sockaddr_un	addr;
1146 	time_t			start;
1147 	struct timespec		ts = {0, 100000000};
1148 	const char		*socket_path;
1149 	int			ret = FAIL;
1150 
1151 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
1152 
1153 	if (NULL == (socket_path = ipc_make_path(service_name, error)))
1154 		goto out;
1155 
1156 	if (-1 == (csocket->fd = socket(AF_UNIX, SOCK_STREAM, 0)))
1157 	{
1158 		*error = zbx_dsprintf(*error, "Cannot create client socket: %s.", zbx_strerror(errno));
1159 		goto out;
1160 	}
1161 
1162 	memset(&addr, 0, sizeof(addr));
1163 	addr.sun_family = AF_UNIX;
1164 	memcpy(addr.sun_path, socket_path, sizeof(addr.sun_path));
1165 
1166 	start = time(NULL);
1167 
1168 	while (0 != connect(csocket->fd, (struct sockaddr*)&addr, sizeof(addr)))
1169 	{
1170 		if (0 == timeout || time(NULL) - start > timeout)
1171 		{
1172 			*error = zbx_dsprintf(*error, "Cannot connect to service \"%s\": %s.", service_name,
1173 					zbx_strerror(errno));
1174 			close(csocket->fd);
1175 			goto out;
1176 		}
1177 
1178 		nanosleep(&ts, NULL);
1179 	}
1180 
1181 	csocket->rx_buffer_bytes = 0;
1182 	csocket->rx_buffer_offset = 0;
1183 
1184 	ret = SUCCEED;
1185 out:
1186 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __function_name, zbx_result_string(ret));
1187 	return ret;
1188 }
1189 
1190 /******************************************************************************
1191  *                                                                            *
1192  * Function: zbx_ipc_socket_close                                             *
1193  *                                                                            *
1194  * Purpose: closes socket to an IPC service                                   *
1195  *                                                                            *
1196  * Parameters: csocket - [IN/OUT] the IPC socket to close                     *
1197  *                                                                            *
1198  ******************************************************************************/
zbx_ipc_socket_close(zbx_ipc_socket_t * csocket)1199 void	zbx_ipc_socket_close(zbx_ipc_socket_t *csocket)
1200 {
1201 	const char	*__function_name = "zbx_ipc_socket_close";
1202 
1203 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
1204 
1205 	if (-1 != csocket->fd)
1206 	{
1207 		close(csocket->fd);
1208 		csocket->fd = -1;
1209 	}
1210 
1211 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
1212 }
1213 
1214 /******************************************************************************
1215  *                                                                            *
1216  * Function: zbx_ipc_socket_write                                             *
1217  *                                                                            *
1218  * Purpose: writes a message to IPC service                                   *
1219  *                                                                            *
1220  * Parameters: csocket - [IN] an opened IPC socket to the service             *
1221  *             code    - [IN] the message code                                *
1222  *             data    - [IN] the data                                        *
1223  *             size    - [IN] the data size                                   *
1224  *                                                                            *
1225  * Return value: SUCCEED - the message was successfully written               *
1226  *               FAIL    - otherwise                                          *
1227  *                                                                            *
1228  ******************************************************************************/
zbx_ipc_socket_write(zbx_ipc_socket_t * csocket,zbx_uint32_t code,const unsigned char * data,zbx_uint32_t size)1229 int	zbx_ipc_socket_write(zbx_ipc_socket_t *csocket, zbx_uint32_t code, const unsigned char *data, zbx_uint32_t size)
1230 {
1231 	const char	*__function_name = "zbx_ipc_socket_write";
1232 	int		ret;
1233 	zbx_uint32_t	size_sent;
1234 
1235 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
1236 
1237 	if (SUCCEED == ipc_socket_write_message(csocket, code, data, size, &size_sent) &&
1238 			size_sent == size + ZBX_IPC_HEADER_SIZE)
1239 	{
1240 		ret = SUCCEED;
1241 	}
1242 	else
1243 		ret = FAIL;
1244 
1245 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __function_name, zbx_result_string(ret));
1246 
1247 	return ret;
1248 }
1249 
1250 /******************************************************************************
1251  *                                                                            *
1252  * Function: zbx_ipc_socket_read                                              *
1253  *                                                                            *
1254  * Purpose: reads a message from IPC service                                  *
1255  *                                                                            *
1256  * Parameters: csocket - [IN] an opened IPC socket to the service             *
1257  *             message - [OUT] the received message                           *
1258  *                                                                            *
1259  * Return value: SUCCEED - the message was successfully received              *
1260  *               FAIL    - otherwise                                          *
1261  *                                                                            *
1262  * Comments: If this function succeeds the message must be cleaned/freed by   *
1263  *           the caller.                                                      *
1264  *                                                                            *
1265  ******************************************************************************/
zbx_ipc_socket_read(zbx_ipc_socket_t * csocket,zbx_ipc_message_t * message)1266 int	zbx_ipc_socket_read(zbx_ipc_socket_t *csocket, zbx_ipc_message_t *message)
1267 {
1268 	const char	*__function_name = "zbx_ipc_socket_read";
1269 	int		ret = FAIL;
1270 	zbx_uint32_t	rx_bytes = 0, header[2];
1271 	unsigned char	*data = NULL;
1272 
1273 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
1274 
1275 	if (SUCCEED != ipc_socket_read_message(csocket, header, &data, &rx_bytes))
1276 		goto out;
1277 
1278 	if (SUCCEED != ipc_message_is_completed(header, rx_bytes))
1279 	{
1280 		zbx_free(data);
1281 		goto out;
1282 	}
1283 
1284 	message->code = header[ZBX_IPC_MESSAGE_CODE];
1285 	message->size = header[ZBX_IPC_MESSAGE_SIZE];
1286 	message->data = data;
1287 
1288 	if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_TRACE))
1289 	{
1290 		char	*msg = NULL;
1291 
1292 		zbx_ipc_message_format(message, &msg);
1293 
1294 		zabbix_log(LOG_LEVEL_DEBUG, "%s() %s", __function_name, msg);
1295 
1296 		zbx_free(msg);
1297 	}
1298 
1299 	ret = SUCCEED;
1300 out:
1301 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __function_name, zbx_result_string(ret));
1302 
1303 	return ret;
1304 }
1305 
1306 /******************************************************************************
1307  *                                                                            *
1308  * Function: zbx_ipc_message_free                                             *
1309  *                                                                            *
1310  * Purpose: frees the resources allocated to store IPC message data           *
1311  *                                                                            *
1312  * Parameters: message - [IN] the message to free                             *
1313  *                                                                            *
1314  ******************************************************************************/
zbx_ipc_message_free(zbx_ipc_message_t * message)1315 void	zbx_ipc_message_free(zbx_ipc_message_t *message)
1316 {
1317 	if (NULL != message)
1318 	{
1319 		zbx_free(message->data);
1320 		zbx_free(message);
1321 	}
1322 }
1323 
1324 /******************************************************************************
1325  *                                                                            *
1326  * Function: zbx_ipc_message_clean                                            *
1327  *                                                                            *
1328  * Purpose: frees the resources allocated to store IPC message data           *
1329  *                                                                            *
1330  * Parameters: message - [IN] the message to clean                            *
1331  *                                                                            *
1332  ******************************************************************************/
zbx_ipc_message_clean(zbx_ipc_message_t * message)1333 void	zbx_ipc_message_clean(zbx_ipc_message_t *message)
1334 {
1335 	zbx_free(message->data);
1336 }
1337 
1338 /******************************************************************************
1339  *                                                                            *
1340  * Function: zbx_ipc_message_init                                             *
1341  *                                                                            *
1342  * Purpose: initializes IPC message                                           *
1343  *                                                                            *
1344  * Parameters: message - [IN] the message to initialize                       *
1345  *                                                                            *
1346  ******************************************************************************/
zbx_ipc_message_init(zbx_ipc_message_t * message)1347 void	zbx_ipc_message_init(zbx_ipc_message_t *message)
1348 {
1349 	memset(message, 0, sizeof(zbx_ipc_message_t));
1350 }
1351 
1352 /******************************************************************************
1353  *                                                                            *
1354  * Function: zbx_ipc_message_format                                           *
1355  *                                                                            *
1356  * Purpose: formats message to readable format for debug messages             *
1357  *                                                                            *
1358  * Parameters: message - [IN] the message                                     *
1359  *             data    - [OUT] the formatted message                          *
1360  *                                                                            *
1361  ******************************************************************************/
zbx_ipc_message_format(const zbx_ipc_message_t * message,char ** data)1362 void	zbx_ipc_message_format(const zbx_ipc_message_t *message, char **data)
1363 {
1364 	size_t		data_alloc = ZBX_IPC_DATA_DUMP_SIZE * 4 + 32, data_offset = 0;
1365 	zbx_uint32_t	i, data_num;
1366 
1367 	if (NULL == message)
1368 		return;
1369 
1370 	data_num = message->size;
1371 
1372 	if (ZBX_IPC_DATA_DUMP_SIZE < data_num)
1373 		data_num = ZBX_IPC_DATA_DUMP_SIZE;
1374 
1375 	*data = (char *)zbx_malloc(*data, data_alloc);
1376 	zbx_snprintf_alloc(data, &data_alloc, &data_offset, "code:%u size:%u data:", message->code, message->size);
1377 
1378 	for (i = 0; i < data_num; i++)
1379 	{
1380 		if (0 != i)
1381 			zbx_strcpy_alloc(data, &data_alloc, &data_offset, (0 == (i & 7) ? " | " : " "));
1382 
1383 		zbx_snprintf_alloc(data, &data_alloc, &data_offset, "%02x", (int)message->data[i]);
1384 	}
1385 
1386 	(*data)[data_offset] = '\0';
1387 }
1388 
1389 /******************************************************************************
1390  *                                                                            *
1391  * Function: zbx_ipc_message_copy                                             *
1392  *                                                                            *
1393  * Purpose: copies ipc message                                                *
1394  *                                                                            *
1395  * Parameters: dst - [IN] the destination message                             *
1396  *             src - [IN] the source message                                  *
1397  *                                                                            *
1398  ******************************************************************************/
zbx_ipc_message_copy(zbx_ipc_message_t * dst,const zbx_ipc_message_t * src)1399 void	zbx_ipc_message_copy(zbx_ipc_message_t *dst, const zbx_ipc_message_t *src)
1400 {
1401 	dst->code = src->code;
1402 	dst->size = src->size;
1403 	dst->data = (unsigned char *)zbx_malloc(NULL, src->size);
1404 	memcpy(dst->data, src->data, src->size);
1405 }
1406 
1407 /*
1408  * Public service API
1409  */
1410 
1411 /******************************************************************************
1412  *                                                                            *
1413  * Function: zbx_ipc_service_init_env                                         *
1414  *                                                                            *
1415  * Purpose: initializes IPC service environment                               *
1416  *                                                                            *
1417  * Parameters: path    - [IN] the service root path                           *
1418  *             error   - [OUT] the error message                              *
1419  *                                                                            *
1420  * Return value: SUCCEED - the environment was initialized successfully.      *
1421  *               FAIL    - otherwise                                          *
1422  *                                                                            *
1423  ******************************************************************************/
zbx_ipc_service_init_env(const char * path,char ** error)1424 int	zbx_ipc_service_init_env(const char *path, char **error)
1425 {
1426 	const char	*__function_name = "zbx_ipc_service_init_env";
1427 	struct stat	fs;
1428 	int		ret = FAIL;
1429 
1430 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() path:%s", __function_name, path);
1431 
1432 	if (0 != ipc_path_root_len)
1433 	{
1434 		*error = zbx_dsprintf(*error, "The IPC service environment has been already initialized with"
1435 				" root directory at \"%s\".", ipc_get_path());
1436 		goto out;
1437 	}
1438 
1439 	if (0 != stat(path, &fs))
1440 	{
1441 		*error = zbx_dsprintf(*error, "Failed to stat the specified path \"%s\": %s.", path,
1442 				zbx_strerror(errno));
1443 		goto out;
1444 	}
1445 
1446 	if (0 == S_ISDIR(fs.st_mode))
1447 	{
1448 		*error = zbx_dsprintf(*error, "The specified path \"%s\" is not a directory.", path);
1449 		goto out;
1450 	}
1451 
1452 	if (0 != access(path, W_OK | R_OK))
1453 	{
1454 		*error = zbx_dsprintf(*error, "Cannot access path \"%s\": %s.", path, zbx_strerror(errno));
1455 		goto out;
1456 	}
1457 
1458 	ipc_path_root_len = strlen(path);
1459 	if (ZBX_IPC_PATH_MAX < ipc_path_root_len + 3)
1460 	{
1461 		*error = zbx_dsprintf(*error, "The IPC root path \"%s\" is too long.", path);
1462 		goto out;
1463 	}
1464 
1465 	memcpy(ipc_path, path, ipc_path_root_len + 1);
1466 
1467 	while (1 < ipc_path_root_len && '/' == ipc_path[ipc_path_root_len - 1])
1468 		ipc_path[--ipc_path_root_len] = '\0';
1469 
1470 	ipc_service_init_libevent();
1471 
1472 	ret = SUCCEED;
1473 out:
1474 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __function_name, zbx_result_string(ret));
1475 
1476 	return ret;
1477 }
1478 
1479 /******************************************************************************
1480  *                                                                            *
1481  * Function: zbx_ipc_service_free_env                                         *
1482  *                                                                            *
1483  * Purpose: frees IPC service environment                                     *
1484  *                                                                            *
1485  ******************************************************************************/
zbx_ipc_service_free_env(void)1486 void	zbx_ipc_service_free_env(void)
1487 {
1488 	ipc_service_free_libevent();
1489 }
1490 
1491 
1492 /******************************************************************************
1493  *                                                                            *
1494  * Function: zbx_ipc_service_start                                            *
1495  *                                                                            *
1496  * Purpose: starts IPC service on the specified path                          *
1497  *                                                                            *
1498  * Parameters: service      - [IN/OUT] the IPC service                        *
1499  *             service_name - [IN] the unix domain socket path                *
1500  *             error        - [OUT] the error message                         *
1501  *                                                                            *
1502  * Return value: SUCCEED - the service was initialized successfully.          *
1503  *               FAIL    - otherwise                                          *
1504  *                                                                            *
1505  ******************************************************************************/
zbx_ipc_service_start(zbx_ipc_service_t * service,const char * service_name,char ** error)1506 int	zbx_ipc_service_start(zbx_ipc_service_t *service, const char *service_name, char **error)
1507 {
1508 	const char		*__function_name = "zbx_ipc_service_start";
1509 	struct sockaddr_un	addr;
1510 	const char		*socket_path;
1511 	int			ret = FAIL;
1512 	mode_t			mode;
1513 
1514 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() service:%s", __function_name, service_name);
1515 
1516 	mode = umask(077);
1517 
1518 	if (NULL == (socket_path = ipc_make_path(service_name, error)))
1519 		goto out;
1520 
1521 	if (0 == access(socket_path, F_OK))
1522 	{
1523 		if (0 != access(socket_path, W_OK))
1524 		{
1525 			*error = zbx_dsprintf(*error, "The file \"%s\" is used by another process.", socket_path);
1526 			goto out;
1527 		}
1528 
1529 		if (SUCCEED == ipc_check_running_service(service_name))
1530 		{
1531 			*error = zbx_dsprintf(*error, "\"%s\" service is already running.", service_name);
1532 			goto out;
1533 		}
1534 
1535 		unlink(socket_path);
1536 	}
1537 
1538 	if (-1 == (service->fd = socket(AF_UNIX, SOCK_STREAM, 0)))
1539 	{
1540 		*error = zbx_dsprintf(*error, "Cannot create socket: %s.", zbx_strerror(errno));
1541 		goto out;
1542 	}
1543 
1544 	memset(&addr, 0, sizeof(addr));
1545 	addr.sun_family = AF_UNIX;
1546 	memcpy(addr.sun_path, socket_path, sizeof(addr.sun_path));
1547 
1548 	if (0 != bind(service->fd, (struct sockaddr*)&addr, sizeof(addr)))
1549 	{
1550 		*error = zbx_dsprintf(*error, "Cannot bind socket to \"%s\": %s.", socket_path, zbx_strerror(errno));
1551 		goto out;
1552 	}
1553 
1554 	if (0 != listen(service->fd, SOMAXCONN))
1555 	{
1556 		*error = zbx_dsprintf(*error, "Cannot listen socket: %s.", zbx_strerror(errno));
1557 		goto out;
1558 	}
1559 
1560 	service->path = zbx_strdup(NULL, service_name);
1561 	zbx_vector_ptr_create(&service->clients);
1562 	zbx_queue_ptr_create(&service->clients_recv);
1563 
1564 	service->ev = event_base_new();
1565 	service->ev_listener = event_new(service->ev, service->fd, EV_READ | EV_PERSIST,
1566 			ipc_service_client_connected_cb, service);
1567 	event_add(service->ev_listener, NULL);
1568 
1569 	service->ev_timer = event_new(service->ev, -1, 0, ipc_service_timer_cb, service);
1570 
1571 	ret = SUCCEED;
1572 out:
1573 	umask(mode);
1574 
1575 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __function_name, zbx_result_string(ret));
1576 
1577 	return ret;
1578 }
1579 
1580 /******************************************************************************
1581  *                                                                            *
1582  * Function: zbx_ipc_service_close                                            *
1583  *                                                                            *
1584  * Purpose: closes IPC service and frees the resources allocated by it        *
1585  *                                                                            *
1586  * Parameters: service - [IN/OUT] the IPC service                             *
1587  *                                                                            *
1588  ******************************************************************************/
zbx_ipc_service_close(zbx_ipc_service_t * service)1589 void	zbx_ipc_service_close(zbx_ipc_service_t *service)
1590 {
1591 	const char	*__function_name = "zbx_ipc_service_close";
1592 	int		i;
1593 
1594 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() path:%s", __function_name, service->path);
1595 
1596 	close(service->fd);
1597 
1598 	for (i = 0; i < service->clients.values_num; i++)
1599 		ipc_client_free((zbx_ipc_client_t *)service->clients.values[i]);
1600 
1601 	zbx_free(service->path);
1602 
1603 	zbx_vector_ptr_destroy(&service->clients);
1604 	zbx_queue_ptr_destroy(&service->clients_recv);
1605 
1606 	event_free(service->ev_timer);
1607 	event_free(service->ev_listener);
1608 	event_base_free(service->ev);
1609 
1610 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
1611 }
1612 
1613 /******************************************************************************
1614  *                                                                            *
1615  * Function: zbx_ipc_service_recv                                             *
1616  *                                                                            *
1617  * Purpose: receives ipc message from a connected client                      *
1618  *                                                                            *
1619  * Parameters: service - [IN] the IPC service                                 *
1620  *             timeout - [IN] the timeout in seconds, 0 is used for           *
1621  *                            nonblocking call and ZBX_IPC_WAIT_FOREVER is    *
1622  *                            used for blocking call without timeout          *
1623  *             client  - [OUT] the client that sent the message or            *
1624  *                             NULL if there are no messages and the          *
1625  *                             specified timeout passed.                      *
1626  *                             The client must be released by caller with     *
1627  *                             zbx_ipc_client_release() function.             *
1628  *             message - [OUT] the received message or NULL if the client     *
1629  *                             connection was closed.                         *
1630  *                             The message must be freed by caller with       *
1631  *                             ipc_message_free() function.                   *
1632  *                                                                            *
1633  * Return value: ZBX_IPC_RECV_IMMEDIATE - returned immediately without        *
1634  *                                        waiting for socket events           *
1635  *                                        (pending events are processed)      *
1636  *               ZBX_IPC_RECV_WAIT      - returned after receiving socket     *
1637  *                                        event                               *
1638  *               ZBX_IPC_RECV_TIMEOUT   - returned after timeout expired      *
1639  *                                                                            *
1640  ******************************************************************************/
zbx_ipc_service_recv(zbx_ipc_service_t * service,int timeout,zbx_ipc_client_t ** client,zbx_ipc_message_t ** message)1641 int	zbx_ipc_service_recv(zbx_ipc_service_t *service, int timeout, zbx_ipc_client_t **client,
1642 		zbx_ipc_message_t **message)
1643 {
1644 	const char	*__function_name = "zbx_ipc_service_recv";
1645 
1646 	int		ret, flags;
1647 
1648 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() timeout:%d", __function_name, timeout);
1649 
1650 	if (timeout != 0 && SUCCEED == zbx_queue_ptr_empty(&service->clients_recv))
1651 	{
1652 		if (ZBX_IPC_WAIT_FOREVER != timeout)
1653 		{
1654 			struct timeval	tv = {timeout, 0};
1655 			evtimer_add(service->ev_timer, &tv);
1656 		}
1657 		flags = EVLOOP_ONCE;
1658 	}
1659 	else
1660 		flags = EVLOOP_NONBLOCK;
1661 
1662 	event_base_loop(service->ev, flags);
1663 
1664 	if (NULL != (*client = ipc_service_pop_client(service)))
1665 	{
1666 		if (NULL != (*message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&(*client)->rx_queue)))
1667 		{
1668 			if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_TRACE))
1669 			{
1670 				char	*data = NULL;
1671 
1672 				zbx_ipc_message_format(*message, &data);
1673 				zabbix_log(LOG_LEVEL_DEBUG, "%s() %s", __function_name, data);
1674 
1675 				zbx_free(data);
1676 			}
1677 
1678 			ipc_service_push_client(service, *client);
1679 			zbx_ipc_client_addref(*client);
1680 		}
1681 
1682 		ret = (EVLOOP_NONBLOCK == flags ? ZBX_IPC_RECV_IMMEDIATE : ZBX_IPC_RECV_WAIT);
1683 	}
1684 	else
1685 	{	ret = ZBX_IPC_RECV_TIMEOUT;
1686 		*client = NULL;
1687 		*message = NULL;
1688 	}
1689 
1690 	evtimer_del(service->ev_timer);
1691 
1692 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%d", __function_name, ret);
1693 
1694 	return ret;
1695 }
1696 
1697 /******************************************************************************
1698  *                                                                            *
1699  * Function: zbx_ipc_client_send                                              *
1700  *                                                                            *
1701  * Purpose: Sends IPC message to client                                       *
1702  *                                                                            *
1703  * Parameters: client - [IN] the IPC client                                   *
1704  *             code   - [IN] the message code                                 *
1705  *             data   - [IN] the data                                         *
1706  *             size   - [IN] the data size                                    *
1707  *                                                                            *
1708  * Comments: If data can't be written directly to socket (buffer full) then   *
1709  *           the message is queued and sent during zbx_ipc_service_recv()     *
1710  *           messaging loop whenever socket becomes ready.                    *
1711  *                                                                            *
1712  ******************************************************************************/
zbx_ipc_client_send(zbx_ipc_client_t * client,zbx_uint32_t code,const unsigned char * data,zbx_uint32_t size)1713 int	zbx_ipc_client_send(zbx_ipc_client_t *client, zbx_uint32_t code, const unsigned char *data, zbx_uint32_t size)
1714 {
1715 	const char		*__function_name = "zbx_ipc_client_send";
1716 	zbx_uint32_t		tx_size = 0;
1717 	zbx_ipc_message_t	*message;
1718 	int			ret = FAIL;
1719 
1720 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() clientid:" ZBX_FS_UI64, __function_name, client->id);
1721 
1722 	if (0 != client->tx_bytes)
1723 	{
1724 		message = ipc_message_create(code, data, size);
1725 		zbx_queue_ptr_push(&client->tx_queue, message);
1726 		ret = SUCCEED;
1727 		goto out;
1728 	}
1729 
1730 	if (FAIL == ipc_socket_write_message(&client->csocket, code, data, size, &tx_size))
1731 		goto out;
1732 
1733 	if (tx_size != ZBX_IPC_HEADER_SIZE + size)
1734 	{
1735 		client->tx_header[ZBX_IPC_MESSAGE_CODE] = code;
1736 		client->tx_header[ZBX_IPC_MESSAGE_SIZE] = size;
1737 		client->tx_data = (unsigned char *)zbx_malloc(NULL, size);
1738 		memcpy(client->tx_data, data, size);
1739 		client->tx_bytes = ZBX_IPC_HEADER_SIZE + size - tx_size;
1740 		event_add(client->tx_event, NULL);
1741 	}
1742 
1743 	ret = SUCCEED;
1744 out:
1745 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __function_name, zbx_result_string(ret));
1746 
1747 	return ret;
1748 }
1749 
1750 /******************************************************************************
1751  *                                                                            *
1752  * Function: zbx_ipc_client_close                                             *
1753  *                                                                            *
1754  * Purpose: closes client socket and frees resources allocated for client     *
1755  *                                                                            *
1756  * Parameters: client - [IN] the IPC client                                   *
1757  *                                                                            *
1758  ******************************************************************************/
zbx_ipc_client_close(zbx_ipc_client_t * client)1759 void	zbx_ipc_client_close(zbx_ipc_client_t *client)
1760 {
1761 	const char	*__function_name = "zbx_ipc_client_close";
1762 
1763 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
1764 
1765 	ipc_client_free_events(client);
1766 	zbx_ipc_socket_close(&client->csocket);
1767 
1768 	ipc_service_remove_client(client->service, client);
1769 	zbx_queue_ptr_remove_value(&client->service->clients_recv, client);
1770 	zbx_ipc_client_release(client);
1771 
1772 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
1773 }
1774 
zbx_ipc_client_addref(zbx_ipc_client_t * client)1775 void	zbx_ipc_client_addref(zbx_ipc_client_t *client)
1776 {
1777 	client->refcount++;
1778 }
1779 
zbx_ipc_client_release(zbx_ipc_client_t * client)1780 void	zbx_ipc_client_release(zbx_ipc_client_t *client)
1781 {
1782 	if (0 == --client->refcount)
1783 		ipc_client_free(client);
1784 }
1785 
zbx_ipc_client_connected(zbx_ipc_client_t * client)1786 int	zbx_ipc_client_connected(zbx_ipc_client_t *client)
1787 {
1788 	return (NULL == client->rx_event ? FAIL : SUCCEED);
1789 }
1790 
1791 /******************************************************************************
1792  *                                                                            *
1793  * Function: zbx_ipc_async_socket_open                                        *
1794  *                                                                            *
1795  * Purpose: opens asynchronous socket to IPC service client                   *
1796  *                                                                            *
1797  * Parameters: client       - [OUT] the IPC service client                    *
1798  *             service_name - [IN] the IPC service name                       *
1799  *             timeout      - [IN] the connection timeout                     *
1800  *             error        - [OUT] the error message                         *
1801  *                                                                            *
1802  * Return value: SUCCEED - the socket was successfully opened                 *
1803  *               FAIL    - otherwise                                          *
1804  *                                                                            *
1805  ******************************************************************************/
zbx_ipc_async_socket_open(zbx_ipc_async_socket_t * asocket,const char * service_name,int timeout,char ** error)1806 int	zbx_ipc_async_socket_open(zbx_ipc_async_socket_t *asocket, const char *service_name, int timeout, char **error)
1807 {
1808 	const char		*__function_name = "zbx_ipc_async_socket_open";
1809 	int			ret = FAIL, flags;
1810 
1811 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
1812 
1813 	memset(asocket, 0, sizeof(zbx_ipc_async_socket_t));
1814 	asocket->client = (zbx_ipc_client_t *)zbx_malloc(NULL, sizeof(zbx_ipc_client_t));
1815 	memset(asocket->client, 0, sizeof(zbx_ipc_client_t));
1816 
1817 	if (SUCCEED != zbx_ipc_socket_open(&asocket->client->csocket, service_name, timeout, error))
1818 	{
1819 		zbx_free(asocket->client);
1820 		goto out;
1821 	}
1822 
1823 	if (-1 == (flags = fcntl(asocket->client->csocket.fd, F_GETFL, 0)))
1824 	{
1825 		zabbix_log(LOG_LEVEL_CRIT, "cannot get IPC client socket flags");
1826 		exit(EXIT_FAILURE);
1827 	}
1828 
1829 	if (-1 == fcntl(asocket->client->csocket.fd, F_SETFL, flags | O_NONBLOCK))
1830 	{
1831 		zabbix_log(LOG_LEVEL_CRIT, "cannot set non-blocking mode for IPC client socket");
1832 		exit(EXIT_FAILURE);
1833 	}
1834 
1835 	asocket->ev = event_base_new();
1836 	asocket->ev_timer = event_new(asocket->ev, -1, 0, ipc_async_socket_timer_cb, asocket);
1837 	asocket->client->rx_event = event_new(asocket->ev, asocket->client->csocket.fd, EV_READ | EV_PERSIST,
1838 			ipc_async_socket_read_event_cb, (void *)asocket);
1839 	asocket->client->tx_event = event_new(asocket->ev, asocket->client->csocket.fd, EV_WRITE | EV_PERSIST,
1840 			ipc_async_socket_write_event_cb, (void *)asocket);
1841 	event_add(asocket->client->rx_event, NULL);
1842 
1843 	asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_NONE;
1844 
1845 	ret = SUCCEED;
1846 out:
1847 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __function_name, zbx_result_string(ret));
1848 	return ret;
1849 }
1850 
1851 /******************************************************************************
1852  *                                                                            *
1853  * Function: zbx_ipc_async_socket_close                                       *
1854  *                                                                            *
1855  * Purpose: closes asynchronous IPC socket and frees allocated resources      *
1856  *                                                                            *
1857  * Parameters: asocket - [IN] the asynchronous IPC socket                     *
1858  *                                                                            *
1859  ******************************************************************************/
zbx_ipc_async_socket_close(zbx_ipc_async_socket_t * asocket)1860 void	zbx_ipc_async_socket_close(zbx_ipc_async_socket_t *asocket)
1861 {
1862 	const char	*__function_name = "zbx_ipc_async_socket_close";
1863 
1864 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
1865 
1866 	ipc_client_free(asocket->client);
1867 
1868 	event_free(asocket->ev_timer);
1869 	event_base_free(asocket->ev);
1870 
1871 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
1872 }
1873 
1874 /******************************************************************************
1875  *                                                                            *
1876  * Function: zbx_ipc_async_socket_send                                        *
1877  *                                                                            *
1878  * Purpose: Sends message through asynchronous IPC socket                     *
1879  *                                                                            *
1880  * Parameters: asocket - [IN] the asynchronous IPC socket                     *
1881  *             code    - [IN] the message code                                *
1882  *             data    - [IN] the data                                        *
1883  *             size    - [IN] the data size                                   *
1884  *                                                                            *
1885  * Comments: If data can't be written directly to socket (buffer full) then   *
1886  *           the message is queued and sent during zbx_ipc_async_socket_recv()*
1887  *           or zbx_ipc_async_socket_flush() functions whenever socket becomes*
1888  *           ready.                                                           *
1889  *                                                                            *
1890  ******************************************************************************/
zbx_ipc_async_socket_send(zbx_ipc_async_socket_t * asocket,zbx_uint32_t code,const unsigned char * data,zbx_uint32_t size)1891 int	zbx_ipc_async_socket_send(zbx_ipc_async_socket_t *asocket, zbx_uint32_t code, const unsigned char *data,
1892 		zbx_uint32_t size)
1893 {
1894 	const char		*__function_name = "zbx_ipc_async_socket_send";
1895 	int			ret = FAIL;
1896 
1897 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
1898 
1899 	ret = zbx_ipc_client_send(asocket->client, code, data, size);
1900 
1901 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __function_name, zbx_result_string(ret));
1902 
1903 	return ret;
1904 }
1905 
1906 /******************************************************************************
1907  *                                                                            *
1908  * Function: zbx_ipc_async_socket_recv                                        *
1909  *                                                                            *
1910  * Purpose: receives message through asynchronous IPC socket                  *
1911  *                                                                            *
1912  * Parameters: asocket - [IN] the asynchronous IPC socket                     *
1913  *             timeout - [IN] the timeout in seconds, 0 is used for           *
1914  *                            nonblocking call and ZBX_IPC_WAIT_FOREVER is    *
1915  *                            used for blocking call without timeout          *
1916  *             message - [OUT] the received message or NULL if the client     *
1917  *                             connection was closed.                         *
1918  *                             The message must be freed by caller with       *
1919  *                             ipc_message_free() function.                   *
1920  *                                                                            *
1921  * Return value: SUCCEED - the message was read successfully or timeout       *
1922  *                         occurred                                           *
1923  *               FAIL    - otherwise                                          *
1924  *                                                                            *
1925  * Comments: After socket has been closed (or connection error has occurred)  *
1926  *           calls to zbx_ipc_client_read() will return success with buffered *
1927  *           messages, until all buffered messages are retrieved.             *
1928  *                                                                            *
1929  ******************************************************************************/
zbx_ipc_async_socket_recv(zbx_ipc_async_socket_t * asocket,int timeout,zbx_ipc_message_t ** message)1930 int	zbx_ipc_async_socket_recv(zbx_ipc_async_socket_t *asocket, int timeout, zbx_ipc_message_t **message)
1931 {
1932 	const char	*__function_name = "zbx_ipc_async_socket_recv";
1933 
1934 	int		ret, flags;
1935 
1936 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() timeout:%d", __function_name, timeout);
1937 
1938 	if (timeout != 0 && SUCCEED == zbx_queue_ptr_empty(&asocket->client->rx_queue))
1939 	{
1940 		if (ZBX_IPC_WAIT_FOREVER != timeout)
1941 		{
1942 			struct timeval	tv = {timeout, 0};
1943 			evtimer_add(asocket->ev_timer, &tv);
1944 		}
1945 		flags = EVLOOP_ONCE;
1946 	}
1947 	else
1948 		flags = EVLOOP_NONBLOCK;
1949 
1950 	if (ZBX_IPC_ASYNC_SOCKET_STATE_ERROR != asocket->state)
1951 		event_base_loop(asocket->ev, flags);
1952 
1953 	if (NULL != (*message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&asocket->client->rx_queue)))
1954 	{
1955 		if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_TRACE))
1956 		{
1957 			char	*data = NULL;
1958 
1959 			zbx_ipc_message_format(*message, &data);
1960 			zabbix_log(LOG_LEVEL_DEBUG, "%s() %s", __function_name, data);
1961 
1962 			zbx_free(data);
1963 		}
1964 	}
1965 
1966 	if (NULL != *message || ZBX_IPC_ASYNC_SOCKET_STATE_ERROR != asocket->state)
1967 		ret = SUCCEED;
1968 	else
1969 		ret = FAIL;
1970 
1971 	evtimer_del(asocket->ev_timer);
1972 
1973 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%d", __function_name, ret);
1974 
1975 	return ret;
1976 }
1977 
1978 /******************************************************************************
1979  *                                                                            *
1980  * Function: zbx_ipc_async_socket_flush                                       *
1981  *                                                                            *
1982  * Purpose: flushes unsent through asynchronous IPC socket                    *
1983  *                                                                            *
1984  * Parameters: asocket - [IN] the asynchronous IPC service socket             *
1985  *             timeout - [IN] the timeout in seconds, 0 is used for           *
1986  *                            nonblocking call and ZBX_IPC_WAIT_FOREVER is    *
1987  *                            used for blocking call without timeout          *
1988  *                                                                            *
1989  * Return value: SUCCEED - the data was flushed successfully or timeout       *
1990  *                         occurred. Use zbx_ipc_client_unsent_data() to      *
1991  *                         check if all data was sent.                        *
1992  *               FAIL    - failed to send data (connection was closed or an   *
1993  *                         error occurred).                                   *
1994  *                                                                            *
1995  ******************************************************************************/
zbx_ipc_async_socket_flush(zbx_ipc_async_socket_t * asocket,int timeout)1996 int	zbx_ipc_async_socket_flush(zbx_ipc_async_socket_t *asocket, int timeout)
1997 {
1998 	const char	*__function_name = "zbx_ipc_async_socket_flush";
1999 
2000 	int		ret = FAIL, flags;
2001 
2002 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() timeout:%d", __function_name, timeout);
2003 
2004 	if (0 == asocket->client->tx_bytes)
2005 	{
2006 		ret = SUCCEED;
2007 		goto out;
2008 	}
2009 
2010 	if (ZBX_IPC_ASYNC_SOCKET_STATE_ERROR == asocket->state)
2011 		goto out;
2012 
2013 	asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_NONE;
2014 
2015 	if (0 != timeout)
2016 	{
2017 		if (ZBX_IPC_WAIT_FOREVER != timeout)
2018 		{
2019 			struct timeval	tv = {timeout, 0};
2020 			evtimer_add(asocket->ev_timer, &tv);
2021 		}
2022 		flags = EVLOOP_ONCE;
2023 	}
2024 	else
2025 		flags = EVLOOP_NONBLOCK;
2026 
2027 	do
2028 	{
2029 		event_base_loop(asocket->ev, flags);
2030 
2031 		if (SUCCEED != zbx_ipc_client_connected(asocket->client))
2032 			goto out;
2033 	}
2034 	while (0 != timeout && 0 != asocket->client->tx_bytes && ZBX_IPC_ASYNC_SOCKET_STATE_NONE == asocket->state);
2035 
2036 	if (ZBX_IPC_ASYNC_SOCKET_STATE_ERROR != asocket->state)
2037 	{
2038 		ret = SUCCEED;
2039 		asocket->state = ZBX_IPC_CLIENT_STATE_NONE;
2040 	}
2041 out:
2042 	evtimer_del(asocket->ev_timer);
2043 
2044 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%d", __function_name, ret);
2045 
2046 	return ret;
2047 }
2048 
2049 /******************************************************************************
2050  *                                                                            *
2051  * Function: zbx_ipc_async_socket_check_unsent                                *
2052  *                                                                            *
2053  * Purpose: checks if there are data to be sent                               *
2054  *                                                                            *
2055  * Parameters: client  - [IN] the IPC service client                          *
2056  *                                                                            *
2057  * Return value: SUCCEED - there are messages queued to be sent               *
2058  *               FAIL    - all data has been sent                             *
2059  *                                                                            *
2060  ******************************************************************************/
zbx_ipc_async_socket_check_unsent(zbx_ipc_async_socket_t * asocket)2061 int	zbx_ipc_async_socket_check_unsent(zbx_ipc_async_socket_t *asocket)
2062 {
2063 	return (0 == asocket->client->tx_bytes ? FAIL : SUCCEED);
2064 }
2065 
2066 #endif
2067