1 #include "common.h"
2
3 #ifdef HAVE_IPCSERVICE
4
5 #ifdef HAVE_LIBEVENT
6 # include <event.h>
7 #endif
8
9 #include "zbxtypes.h"
10 #include "zbxalgo.h"
11 #include "log.h"
12 #include "zbxipcservice.h"
13
14 #define ZBX_IPC_PATH_MAX sizeof(((struct sockaddr_un *)0)->sun_path)
15
16 #define ZBX_IPC_DATA_DUMP_SIZE 128
17
18 static char ipc_path[ZBX_IPC_PATH_MAX] = {0};
19 static size_t ipc_path_root_len = 0;
20
21 #define ZBX_IPC_CLIENT_STATE_NONE 0
22 #define ZBX_IPC_CLIENT_STATE_QUEUED 1
23
24 #define ZBX_IPC_ASYNC_SOCKET_STATE_NONE 0
25 #define ZBX_IPC_ASYNC_SOCKET_STATE_TIMEOUT 1
26 #define ZBX_IPC_ASYNC_SOCKET_STATE_ERROR 2
27
28 extern unsigned char program_type;
29
30 /* IPC client, providing nonblocking connections through socket */
31 struct zbx_ipc_client
32 {
33 zbx_ipc_socket_t csocket;
34 zbx_ipc_service_t *service;
35
36 zbx_uint32_t rx_header[2];
37 unsigned char *rx_data;
38 zbx_uint32_t rx_bytes;
39 zbx_queue_ptr_t rx_queue;
40 struct event *rx_event;
41
42 zbx_uint32_t tx_header[2];
43 unsigned char *tx_data;
44 zbx_uint32_t tx_bytes;
45 zbx_queue_ptr_t tx_queue;
46 struct event *tx_event;
47
48 zbx_uint64_t id;
49 unsigned char state;
50
51 zbx_uint32_t refcount;
52 };
53
54 /*
55 * Private API
56 */
57
58 #define ZBX_IPC_HEADER_SIZE (int)(sizeof(zbx_uint32_t) * 2)
59
60 #define ZBX_IPC_MESSAGE_CODE 0
61 #define ZBX_IPC_MESSAGE_SIZE 1
62
63 #if !defined(LIBEVENT_VERSION_NUMBER) || LIBEVENT_VERSION_NUMBER < 0x2000000
64 typedef int evutil_socket_t;
65
event_new(struct event_base * ev,evutil_socket_t fd,short what,void (* cb_func)(int,short,void *),void * cb_arg)66 static struct event *event_new(struct event_base *ev, evutil_socket_t fd, short what,
67 void(*cb_func)(int, short, void *), void *cb_arg)
68 {
69 struct event *event;
70
71 event = zbx_malloc(NULL, sizeof(struct event));
72 event_set(event, fd, what, cb_func, cb_arg);
73 event_base_set(ev, event);
74
75 return event;
76 }
77
event_free(struct event * event)78 static void event_free(struct event *event)
79 {
80 event_del(event);
81 zbx_free(event);
82 }
83
84 #endif
85
86 static void ipc_client_read_event_cb(evutil_socket_t fd, short what, void *arg);
87 static void ipc_client_write_event_cb(evutil_socket_t fd, short what, void *arg);
88
ipc_get_path(void)89 static const char *ipc_get_path(void)
90 {
91 ipc_path[ipc_path_root_len] = '\0';
92
93 return ipc_path;
94 }
95
96 #define ZBX_IPC_SOCKET_PREFIX "/zabbix_"
97 #define ZBX_IPC_SOCKET_SUFFIX ".sock"
98
99 #define ZBX_IPC_CLASS_PREFIX_NONE ""
100 #define ZBX_IPC_CLASS_PREFIX_SERVER "server_"
101 #define ZBX_IPC_CLASS_PREFIX_PROXY "proxy_"
102 #define ZBX_IPC_CLASS_PREFIX_AGENT "agent_"
103
104 /******************************************************************************
105 * *
106 * Function: ipc_make_path *
107 * *
108 * Purpose: makes socket path from the service name *
109 * *
110 * Parameters: service_name - [IN] the service name *
111 * error - [OUT] the error message *
112 * *
113 * Return value: The created path or NULL if the path exceeds unix domain *
114 * socket path maximum length *
115 * *
116 ******************************************************************************/
ipc_make_path(const char * service_name,char ** error)117 static const char *ipc_make_path(const char *service_name, char **error)
118 {
119 const char *prefix;
120 size_t path_len, offset, prefix_len;
121
122 path_len = strlen(service_name);
123
124 switch (program_type)
125 {
126 case ZBX_PROGRAM_TYPE_SERVER:
127 prefix = ZBX_IPC_CLASS_PREFIX_SERVER;
128 prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_SERVER);
129 break;
130 case ZBX_PROGRAM_TYPE_PROXY_ACTIVE:
131 case ZBX_PROGRAM_TYPE_PROXY_PASSIVE:
132 prefix = ZBX_IPC_CLASS_PREFIX_PROXY;
133 prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_PROXY);
134 break;
135 case ZBX_PROGRAM_TYPE_AGENTD:
136 prefix = ZBX_IPC_CLASS_PREFIX_AGENT;
137 prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_AGENT);
138 break;
139 default:
140 prefix = ZBX_IPC_CLASS_PREFIX_NONE;
141 prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_NONE);
142 break;
143 }
144
145 if (ZBX_IPC_PATH_MAX < ipc_path_root_len + path_len + 1 + ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_PREFIX) +
146 ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_SUFFIX) + prefix_len)
147 {
148 *error = zbx_dsprintf(*error,
149 "Socket path \"%s%s%s%s%s\" exceeds maximum length of unix domain socket path.",
150 ipc_path, ZBX_IPC_SOCKET_PREFIX, prefix, service_name, ZBX_IPC_SOCKET_SUFFIX);
151 return NULL;
152 }
153
154 offset = ipc_path_root_len;
155 memcpy(ipc_path + offset , ZBX_IPC_SOCKET_PREFIX, ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_PREFIX));
156 offset += ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_PREFIX);
157 memcpy(ipc_path + offset, prefix, prefix_len);
158 offset += prefix_len;
159 memcpy(ipc_path + offset, service_name, path_len);
160 offset += path_len;
161 memcpy(ipc_path + offset, ZBX_IPC_SOCKET_SUFFIX, ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_SUFFIX) + 1);
162
163 return ipc_path;
164 }
165
166 /******************************************************************************
167 * *
168 * Function: ipc_write_data *
169 * *
170 * Purpose: writes data to a socket *
171 * *
172 * Parameters: fd - [IN] the socket file descriptor *
173 * data - [IN] the data *
174 * size - [IN] the data size *
175 * size_sent - [IN] the actual size written to socket *
176 * *
177 * Return value: SUCCEED - no socket errors were detected. Either the data or *
178 * a part of it was written to socket or a write to *
179 * non-blocking socket would block *
180 * FAIL - otherwise *
181 * *
182 ******************************************************************************/
ipc_write_data(int fd,const unsigned char * data,zbx_uint32_t size,zbx_uint32_t * size_sent)183 static int ipc_write_data(int fd, const unsigned char *data, zbx_uint32_t size, zbx_uint32_t *size_sent)
184 {
185 zbx_uint32_t offset = 0;
186 int n, ret = SUCCEED;
187
188 while (offset != size)
189 {
190 n = write(fd, data + offset, size - offset);
191
192 if (-1 == n)
193 {
194 if (EINTR == errno)
195 continue;
196
197 if (EWOULDBLOCK == errno || EAGAIN == errno)
198 break;
199
200 zabbix_log(LOG_LEVEL_WARNING, "cannot write to IPC socket: %s", strerror(errno));
201 ret = FAIL;
202 break;
203 }
204 offset += n;
205 }
206
207 *size_sent = offset;
208
209 return ret;
210 }
211
212 /******************************************************************************
213 * *
214 * Function: ipc_read_data *
215 * *
216 * Purpose: reads data from a socket *
217 * *
218 * Parameters: fd - [IN] the socket file descriptor *
219 * data - [IN] the data *
220 * size - [IN] the data size *
221 * size_sent - [IN] the actual size read from socket *
222 * *
223 * Return value: SUCCEED - the data was successfully read *
224 * FAIL - otherwise *
225 * *
226 * Comments: When reading data from non-blocking sockets SUCCEED will be *
227 * returned also if there were no more data to read. *
228 * *
229 ******************************************************************************/
ipc_read_data(int fd,unsigned char * buffer,zbx_uint32_t size,zbx_uint32_t * read_size)230 static int ipc_read_data(int fd, unsigned char *buffer, zbx_uint32_t size, zbx_uint32_t *read_size)
231 {
232 int n;
233
234 *read_size = 0;
235
236 while (-1 == (n = read(fd, buffer + *read_size, size - *read_size)))
237 {
238 if (EINTR == errno)
239 continue;
240
241 if (EWOULDBLOCK == errno || EAGAIN == errno)
242 return SUCCEED;
243
244 return FAIL;
245 }
246
247 if (0 == n)
248 return FAIL;
249
250 *read_size += n;
251
252 return SUCCEED;
253 }
254
255 /******************************************************************************
256 * *
257 * Function: ipc_read_data_full *
258 * *
259 * Purpose: reads data from a socket until the requested data has been read *
260 * *
261 * Parameters: fd - [IN] the socket file descriptor *
262 * buffer - [IN] the data *
263 * size - [IN] the data size *
264 * read_size - [IN] the actual size read from socket *
265 * *
266 * Return value: SUCCEED - the data was successfully read *
267 * FAIL - otherwise *
268 * *
269 * Comments: When reading data from non-blocking sockets this function will *
270 * return SUCCEED if there are no data to read, even if not all of *
271 * the requested data has been read. *
272 * *
273 ******************************************************************************/
ipc_read_data_full(int fd,unsigned char * buffer,zbx_uint32_t size,zbx_uint32_t * read_size)274 static int ipc_read_data_full(int fd, unsigned char *buffer, zbx_uint32_t size, zbx_uint32_t *read_size)
275 {
276 int ret = FAIL;
277 zbx_uint32_t offset = 0, chunk_size;
278
279 *read_size = 0;
280
281 while (offset < size)
282 {
283 if (FAIL == ipc_read_data(fd, buffer + offset, size - offset, &chunk_size))
284 goto out;
285
286 if (0 == chunk_size)
287 break;
288
289 offset += chunk_size;
290 }
291
292 ret = SUCCEED;
293 out:
294 *read_size = offset;
295
296 return ret;
297 }
298
299 /******************************************************************************
300 * *
301 * Function: ipc_socket_write_message *
302 * *
303 * Purpose: writes IPC message to socket *
304 * *
305 * Parameters: csocket - [IN] the IPC socket *
306 * code - [IN] the message code *
307 * data - [IN] the data *
308 * size - [IN] the data size *
309 * tx_size - [IN] the actual size written to socket *
310 * *
311 * Return value: SUCCEED - no socket errors were detected. Either the data or *
312 * a part of it was written to socket or a write to *
313 * non-blocking socket would block *
314 * FAIL - otherwise *
315 * *
316 * Comments: When using non-blocking sockets the tx_size parameter must be *
317 * checked in addition to return value to tell if the message was *
318 * sent successfully. *
319 * *
320 ******************************************************************************/
ipc_socket_write_message(zbx_ipc_socket_t * csocket,zbx_uint32_t code,const unsigned char * data,zbx_uint32_t size,zbx_uint32_t * tx_size)321 static int ipc_socket_write_message(zbx_ipc_socket_t *csocket, zbx_uint32_t code, const unsigned char *data,
322 zbx_uint32_t size, zbx_uint32_t *tx_size)
323 {
324 int ret;
325 zbx_uint32_t size_data, buffer[ZBX_IPC_SOCKET_BUFFER_SIZE / sizeof(zbx_uint32_t)];
326
327 buffer[0] = code;
328 buffer[1] = size;
329
330 if (ZBX_IPC_SOCKET_BUFFER_SIZE - ZBX_IPC_HEADER_SIZE >= size)
331 {
332 if (0 != size)
333 memcpy(buffer + 2, data, size);
334
335 return ipc_write_data(csocket->fd, (unsigned char *)buffer, size + ZBX_IPC_HEADER_SIZE, tx_size);
336 }
337
338 if (FAIL == ipc_write_data(csocket->fd, (unsigned char *)buffer, ZBX_IPC_HEADER_SIZE, tx_size))
339 return FAIL;
340
341 /* in the case of non-blocking sockets only a part of the header might be sent */
342 if (ZBX_IPC_HEADER_SIZE != *tx_size)
343 return SUCCEED;
344
345 ret = ipc_write_data(csocket->fd, data, size, &size_data);
346 *tx_size += size_data;
347
348 return ret;
349 }
350
351 /******************************************************************************
352 * *
353 * Function: ipc_read_buffer *
354 * *
355 * Purpose: reads message header and data from buffer *
356 * *
357 * Parameters: header - [IN/OUT] the message header *
358 * data - [OUT] the message data *
359 * rx_bytes - [IN] the number of bytes stored in message *
360 * (including header) *
361 * buffer - [IN] the buffer to parse *
362 * size - [IN] the number of bytes to parse *
363 * read_size - [OUT] the number of bytes read *
364 * *
365 * Return value: SUCCEED - message was successfully parsed *
366 * FAIL - not enough data *
367 * *
368 ******************************************************************************/
ipc_read_buffer(zbx_uint32_t * header,unsigned char ** data,zbx_uint32_t rx_bytes,const unsigned char * buffer,zbx_uint32_t size,zbx_uint32_t * read_size)369 static int ipc_read_buffer(zbx_uint32_t *header, unsigned char **data, zbx_uint32_t rx_bytes,
370 const unsigned char *buffer, zbx_uint32_t size, zbx_uint32_t *read_size)
371 {
372 zbx_uint32_t copy_size, data_size, data_offset;
373
374 *read_size = 0;
375
376 if (ZBX_IPC_HEADER_SIZE > rx_bytes)
377 {
378 copy_size = MIN(ZBX_IPC_HEADER_SIZE - rx_bytes, size);
379 memcpy((char *)header + rx_bytes, buffer, copy_size);
380 *read_size += copy_size;
381
382 if (ZBX_IPC_HEADER_SIZE > rx_bytes + copy_size)
383 return FAIL;
384
385 data_size = header[ZBX_IPC_MESSAGE_SIZE];
386
387 if (0 == data_size)
388 {
389 *data = NULL;
390 return SUCCEED;
391 }
392
393 *data = (unsigned char *)zbx_malloc(NULL, data_size);
394 data_offset = 0;
395 }
396 else
397 {
398 data_size = header[ZBX_IPC_MESSAGE_SIZE];
399 data_offset = rx_bytes - ZBX_IPC_HEADER_SIZE;
400 }
401
402 copy_size = MIN(data_size - data_offset, size - *read_size);
403 memcpy(*data + data_offset, buffer + *read_size, copy_size);
404 *read_size += copy_size;
405
406 return (rx_bytes + *read_size == data_size + ZBX_IPC_HEADER_SIZE ? SUCCEED : FAIL);
407 }
408
409 /******************************************************************************
410 * *
411 * Function: ipc_message_is_completed *
412 * *
413 * Purpose: checks if IPC message has been completed *
414 * *
415 * Parameters: header - [IN] the message header *
416 * rx_bytes - [IN] the number of bytes set in message *
417 * (including header) *
418 * *
419 * Return value: SUCCEED - message has been completed *
420 * FAIL - otherwise *
421 * *
422 ******************************************************************************/
ipc_message_is_completed(const zbx_uint32_t * header,zbx_uint32_t rx_bytes)423 static int ipc_message_is_completed(const zbx_uint32_t *header, zbx_uint32_t rx_bytes)
424 {
425 if (ZBX_IPC_HEADER_SIZE > rx_bytes)
426 return FAIL;
427
428 if (header[ZBX_IPC_MESSAGE_SIZE] + ZBX_IPC_HEADER_SIZE != rx_bytes)
429 return FAIL;
430
431 return SUCCEED;
432 }
433
434 /******************************************************************************
435 * *
436 * Function: ipc_socket_read_message *
437 * *
438 * Purpose: reads IPC message from buffered client socket *
439 * *
440 * Parameters: csocket - [IN] the source socket *
441 * header - [OUT] the header of the message *
442 * data - [OUT] the data of the message *
443 * rx_bytes - [IN/OUT] the total message size read (including *
444 * header *
445 * *
446 * Return value: SUCCEED - data was read successfully, check rx_bytes to *
447 * determine if the message was completed. *
448 * FAIL - failed to read message (socket error or connection *
449 * was closed). *
450 * *
451 ******************************************************************************/
ipc_socket_read_message(zbx_ipc_socket_t * csocket,zbx_uint32_t * header,unsigned char ** data,zbx_uint32_t * rx_bytes)452 static int ipc_socket_read_message(zbx_ipc_socket_t *csocket, zbx_uint32_t *header, unsigned char **data,
453 zbx_uint32_t *rx_bytes)
454 {
455 zbx_uint32_t data_size, offset, read_size = 0;
456 int ret = FAIL;
457
458 /* try to read message from socket buffer */
459 if (csocket->rx_buffer_bytes > csocket->rx_buffer_offset)
460 {
461 ret = ipc_read_buffer(header, data, *rx_bytes, csocket->rx_buffer + csocket->rx_buffer_offset,
462 csocket->rx_buffer_bytes - csocket->rx_buffer_offset, &read_size);
463
464 csocket->rx_buffer_offset += read_size;
465 *rx_bytes += read_size;
466
467 if (SUCCEED == ret)
468 goto out;
469 }
470
471 /* not enough data in socket buffer, try to read more until message is completed or no data to read */
472 while (SUCCEED != ret)
473 {
474 csocket->rx_buffer_offset = 0;
475 csocket->rx_buffer_bytes = 0;
476
477 if (ZBX_IPC_HEADER_SIZE < *rx_bytes)
478 {
479 offset = *rx_bytes - ZBX_IPC_HEADER_SIZE;
480 data_size = header[ZBX_IPC_MESSAGE_SIZE] - offset;
481
482 /* long messages will be read directly into message buffer */
483 if (ZBX_IPC_SOCKET_BUFFER_SIZE * 0.75 < data_size)
484 {
485 ret = ipc_read_data_full(csocket->fd, *data + offset, data_size, &read_size);
486 *rx_bytes += read_size;
487 goto out;
488 }
489 }
490
491 if (FAIL == ipc_read_data(csocket->fd, csocket->rx_buffer, ZBX_IPC_SOCKET_BUFFER_SIZE, &read_size))
492 goto out;
493
494 /* it's possible that nothing will be read on non-blocking sockets, return success */
495 if (0 == read_size)
496 {
497 ret = SUCCEED;
498 goto out;
499 }
500
501 csocket->rx_buffer_bytes = read_size;
502
503 ret = ipc_read_buffer(header, data, *rx_bytes, csocket->rx_buffer, csocket->rx_buffer_bytes,
504 &read_size);
505
506 csocket->rx_buffer_offset += read_size;
507 *rx_bytes += read_size;
508 }
509 out:
510 return ret;
511 }
512
513 /******************************************************************************
514 * *
515 * Function: ipc_client_free_event *
516 * *
517 * Purpose: frees client's libevent event *
518 * *
519 * Parameters: client - [IN] the client *
520 * *
521 ******************************************************************************/
ipc_client_free_events(zbx_ipc_client_t * client)522 static void ipc_client_free_events(zbx_ipc_client_t *client)
523 {
524 if (NULL != client->rx_event)
525 {
526 event_free(client->rx_event);
527 client->rx_event = NULL;
528 }
529
530 if (NULL != client->tx_event)
531 {
532 event_free(client->tx_event);
533 client->tx_event = NULL;
534 }
535 }
536
537 /******************************************************************************
538 * *
539 * Function: ipc_client_free *
540 * *
541 * Purpose: frees IPC service client *
542 * *
543 * Parameters: client - [IN] the client to free *
544 * *
545 ******************************************************************************/
ipc_client_free(zbx_ipc_client_t * client)546 static void ipc_client_free(zbx_ipc_client_t *client)
547 {
548 zbx_ipc_message_t *message;
549
550 ipc_client_free_events(client);
551 zbx_ipc_socket_close(&client->csocket);
552
553 while (NULL != (message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&client->rx_queue)))
554 zbx_ipc_message_free(message);
555
556 zbx_queue_ptr_destroy(&client->rx_queue);
557 zbx_free(client->rx_data);
558
559 while (NULL != (message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&client->tx_queue)))
560 zbx_ipc_message_free(message);
561
562 zbx_queue_ptr_destroy(&client->tx_queue);
563 zbx_free(client->tx_data);
564
565 ipc_client_free_events(client);
566
567 zbx_free(client);
568 }
569
570 /******************************************************************************
571 * *
572 * Function: ipc_client_push_rx_message *
573 * *
574 * Purpose: adds message to received messages queue *
575 * *
576 * Parameters: client - [IN] the client to read *
577 * *
578 ******************************************************************************/
ipc_client_push_rx_message(zbx_ipc_client_t * client)579 static void ipc_client_push_rx_message(zbx_ipc_client_t *client)
580 {
581 zbx_ipc_message_t *message;
582
583 message = (zbx_ipc_message_t *)zbx_malloc(NULL, sizeof(zbx_ipc_message_t));
584 message->code = client->rx_header[ZBX_IPC_MESSAGE_CODE];
585 message->size = client->rx_header[ZBX_IPC_MESSAGE_SIZE];
586 message->data = client->rx_data;
587 zbx_queue_ptr_push(&client->rx_queue, message);
588
589 client->rx_data = NULL;
590 client->rx_bytes = 0;
591 }
592
593 /******************************************************************************
594 * *
595 * Function: ipc_client_pop_tx_message *
596 * *
597 * Purpose: prepares to send the next message in send queue *
598 * *
599 * Parameters: client - [IN] the client *
600 * *
601 ******************************************************************************/
ipc_client_pop_tx_message(zbx_ipc_client_t * client)602 static void ipc_client_pop_tx_message(zbx_ipc_client_t *client)
603 {
604 zbx_ipc_message_t *message;
605
606 zbx_free(client->tx_data);
607 client->tx_bytes = 0;
608
609 if (NULL == (message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&client->tx_queue)))
610 return;
611
612 client->tx_bytes = ZBX_IPC_HEADER_SIZE + message->size;
613 client->tx_header[ZBX_IPC_MESSAGE_CODE] = message->code;
614 client->tx_header[ZBX_IPC_MESSAGE_SIZE] = message->size;
615 client->tx_data = message->data;
616 zbx_free(message);
617 }
618
619 /******************************************************************************
620 * *
621 * Function: ipc_client_read *
622 * *
623 * Purpose: reads data from IPC service client *
624 * *
625 * Parameters: client - [IN] the client to read *
626 * *
627 * Return value: FAIL - read error/connection was closed *
628 * *
629 * Comments: This function reads data from socket, parses it and adds *
630 * parsed messages to received messages queue. *
631 * *
632 ******************************************************************************/
ipc_client_read(zbx_ipc_client_t * client)633 static int ipc_client_read(zbx_ipc_client_t *client)
634 {
635 int rc;
636
637 do
638 {
639 if (FAIL == ipc_socket_read_message(&client->csocket, client->rx_header, &client->rx_data,
640 &client->rx_bytes))
641 {
642 zbx_free(client->rx_data);
643 client->rx_bytes = 0;
644 return FAIL;
645 }
646
647 if (SUCCEED == (rc = ipc_message_is_completed(client->rx_header, client->rx_bytes)))
648 ipc_client_push_rx_message(client);
649 }
650
651 while (SUCCEED == rc);
652
653 return SUCCEED;
654 }
655
656 /******************************************************************************
657 * *
658 * Function: ipc_client_write *
659 * *
660 * Purpose: writes queued data to IPC service client *
661 * *
662 * Parameters: client - [IN] the client *
663 * *
664 * Return value: SUCCEED - the data was sent successfully *
665 * FAIL - otherwise *
666 * *
667 ******************************************************************************/
ipc_client_write(zbx_ipc_client_t * client)668 static int ipc_client_write(zbx_ipc_client_t *client)
669 {
670 zbx_uint32_t data_size, write_size;
671
672 data_size = client->tx_header[ZBX_IPC_MESSAGE_SIZE];
673
674 if (data_size < client->tx_bytes)
675 {
676 zbx_uint32_t size, offset;
677
678 size = client->tx_bytes - data_size;
679 offset = ZBX_IPC_HEADER_SIZE - size;
680
681 if (SUCCEED != ipc_write_data(client->csocket.fd, (unsigned char *)client->tx_header + offset, size,
682 &write_size))
683 {
684 return FAIL;
685 }
686
687 client->tx_bytes -= write_size;
688
689 if (data_size < client->tx_bytes)
690 return SUCCEED;
691 }
692
693 while (0 < client->tx_bytes)
694 {
695 if (SUCCEED != ipc_write_data(client->csocket.fd, client->tx_data + data_size - client->tx_bytes,
696 client->tx_bytes, &write_size))
697 {
698 return FAIL;
699 }
700
701 if (0 == write_size)
702 return SUCCEED;
703
704 client->tx_bytes -= write_size;
705 }
706
707 if (0 == client->tx_bytes)
708 ipc_client_pop_tx_message(client);
709
710 return SUCCEED;
711 }
712
713 /******************************************************************************
714 * *
715 * Function: ipc_service_pop_client *
716 * *
717 * Purpose: gets the next client with messages/closed socket from recv queue *
718 * *
719 * Parameters: service - [IN] the IPC service *
720 * *
721 * Return value: The client with messages/closed socket *
722 * *
723 ******************************************************************************/
ipc_service_pop_client(zbx_ipc_service_t * service)724 static zbx_ipc_client_t *ipc_service_pop_client(zbx_ipc_service_t *service)
725 {
726 zbx_ipc_client_t *client;
727
728 if (NULL != (client = (zbx_ipc_client_t *)zbx_queue_ptr_pop(&service->clients_recv)))
729 client->state = ZBX_IPC_CLIENT_STATE_NONE;
730
731 return client;
732 }
733
734 /******************************************************************************
735 * *
736 * Function: ipc_service_push_client *
737 * *
738 * Purpose: pushes client to the recv queue if needed *
739 * *
740 * Parameters: service - [IN] the IPC service *
741 * client - [IN] the IPC client *
742 * *
743 * Comments: The client is pushed to the recv queue if it isn't already there *
744 * and there is messages to return or the client connection was *
745 * closed. *
746 * *
747 ******************************************************************************/
ipc_service_push_client(zbx_ipc_service_t * service,zbx_ipc_client_t * client)748 static void ipc_service_push_client(zbx_ipc_service_t *service, zbx_ipc_client_t *client)
749 {
750 if (ZBX_IPC_CLIENT_STATE_QUEUED == client->state)
751 return;
752
753 if (0 == zbx_queue_ptr_values_num(&client->rx_queue) && NULL != client->rx_event)
754 return;
755
756 client->state = ZBX_IPC_CLIENT_STATE_QUEUED;
757 zbx_queue_ptr_push(&service->clients_recv, client);
758 }
759
760 /******************************************************************************
761 * *
762 * Function: ipc_service_add_client *
763 * *
764 * Purpose: adds a new IPC service client *
765 * *
766 * Parameters: service - [IN] the IPC service *
767 * fd - [IN] the client socket descriptor *
768 * *
769 ******************************************************************************/
ipc_service_add_client(zbx_ipc_service_t * service,int fd)770 static void ipc_service_add_client(zbx_ipc_service_t *service, int fd)
771 {
772 const char *__function_name = "ipc_service_add_client";
773 static zbx_uint64_t next_clientid = 1;
774 zbx_ipc_client_t *client;
775 int flags;
776
777 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
778
779 client = (zbx_ipc_client_t *)zbx_malloc(NULL, sizeof(zbx_ipc_client_t));
780 memset(client, 0, sizeof(zbx_ipc_client_t));
781
782 if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
783 {
784 zabbix_log(LOG_LEVEL_CRIT, "cannot get IPC client socket flags");
785 exit(EXIT_FAILURE);
786 }
787
788 if (-1 == fcntl(fd, F_SETFL, flags | O_NONBLOCK))
789 {
790 zabbix_log(LOG_LEVEL_CRIT, "cannot set non-blocking mode for IPC client socket");
791 exit(EXIT_FAILURE);
792 }
793
794 client->csocket.fd = fd;
795 client->csocket.rx_buffer_bytes = 0;
796 client->csocket.rx_buffer_offset = 0;
797 client->id = next_clientid++;
798 client->state = ZBX_IPC_CLIENT_STATE_NONE;
799 client->refcount = 1;
800
801 zbx_queue_ptr_create(&client->rx_queue);
802 zbx_queue_ptr_create(&client->tx_queue);
803
804 client->service = service;
805 client->rx_event = event_new(service->ev, fd, EV_READ | EV_PERSIST, ipc_client_read_event_cb, (void *)client);
806 client->tx_event = event_new(service->ev, fd, EV_WRITE | EV_PERSIST, ipc_client_write_event_cb, (void *)client);
807 event_add(client->rx_event, NULL);
808
809 zbx_vector_ptr_append(&service->clients, client);
810
811 zabbix_log(LOG_LEVEL_DEBUG, "End of %s() clientid:" ZBX_FS_UI64, __function_name, client->id);
812 }
813
814 /******************************************************************************
815 * *
816 * Function: ipc_service_remove_client *
817 * *
818 * Purpose: removes IPC service client *
819 * *
820 * Parameters: service - [IN] the IPC service *
821 * client - [IN] the client to remove *
822 * *
823 ******************************************************************************/
ipc_service_remove_client(zbx_ipc_service_t * service,zbx_ipc_client_t * client)824 static void ipc_service_remove_client(zbx_ipc_service_t *service, zbx_ipc_client_t *client)
825 {
826 int i;
827
828 for (i = 0; i < service->clients.values_num; i++)
829 {
830 if (service->clients.values[i] == client)
831 zbx_vector_ptr_remove_noorder(&service->clients, i);
832 }
833 }
834
835 /******************************************************************************
836 * *
837 * Function: ipc_client_read_event_cb *
838 * *
839 * Purpose: service client read event libevent callback *
840 * *
841 ******************************************************************************/
ipc_client_read_event_cb(evutil_socket_t fd,short what,void * arg)842 static void ipc_client_read_event_cb(evutil_socket_t fd, short what, void *arg)
843 {
844 zbx_ipc_client_t *client = (zbx_ipc_client_t *)arg;
845
846 ZBX_UNUSED(fd);
847 ZBX_UNUSED(what);
848
849 if (SUCCEED != ipc_client_read(client))
850 {
851 ipc_client_free_events(client);
852 ipc_service_remove_client(client->service, client);
853 }
854
855 ipc_service_push_client(client->service, client);
856 }
857
858 /******************************************************************************
859 * *
860 * Function: ipc_client_write_event_cb *
861 * *
862 * Purpose: service client write event libevent callback *
863 * *
864 ******************************************************************************/
ipc_client_write_event_cb(evutil_socket_t fd,short what,void * arg)865 static void ipc_client_write_event_cb(evutil_socket_t fd, short what, void *arg)
866 {
867 zbx_ipc_client_t *client = (zbx_ipc_client_t *)arg;
868
869 ZBX_UNUSED(fd);
870 ZBX_UNUSED(what);
871
872 if (SUCCEED != ipc_client_write(client))
873 {
874 zabbix_log(LOG_LEVEL_CRIT, "cannot send data to IPC client");
875 zbx_ipc_client_close(client);
876 return;
877 }
878
879 if (0 == client->tx_bytes)
880 event_del(client->tx_event);
881 }
882
883 /******************************************************************************
884 * *
885 * Function: ipc_async_socket_write_event_cb *
886 * *
887 * Purpose: asynchronous socket write event libevent callback *
888 * *
889 ******************************************************************************/
ipc_async_socket_write_event_cb(evutil_socket_t fd,short what,void * arg)890 static void ipc_async_socket_write_event_cb(evutil_socket_t fd, short what, void *arg)
891 {
892 zbx_ipc_async_socket_t *asocket = (zbx_ipc_async_socket_t *)arg;
893
894 ZBX_UNUSED(fd);
895 ZBX_UNUSED(what);
896
897 if (SUCCEED != ipc_client_write(asocket->client))
898 {
899 zabbix_log(LOG_LEVEL_CRIT, "cannot send data to IPC client");
900 ipc_client_free_events(asocket->client);
901 zbx_ipc_socket_close(&asocket->client->csocket);
902 asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_ERROR;
903 return;
904 }
905
906 if (0 == asocket->client->tx_bytes)
907 event_del(asocket->client->tx_event);
908 }
909
910 /******************************************************************************
911 * *
912 * Function: ipc_async_socket_read_event_cb *
913 * *
914 * Purpose: asynchronous socket read event libevent callback *
915 * *
916 ******************************************************************************/
ipc_async_socket_read_event_cb(evutil_socket_t fd,short what,void * arg)917 static void ipc_async_socket_read_event_cb(evutil_socket_t fd, short what, void *arg)
918 {
919 zbx_ipc_async_socket_t *asocket = (zbx_ipc_async_socket_t *)arg;
920
921 ZBX_UNUSED(fd);
922 ZBX_UNUSED(what);
923
924 if (SUCCEED != ipc_client_read(asocket->client))
925 {
926 ipc_client_free_events(asocket->client);
927 asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_ERROR;
928 }
929 }
930
931 /******************************************************************************
932 * *
933 * Function: ipc_async_socket_timer_cb *
934 * *
935 * Purpose: timer callback *
936 * *
937 ******************************************************************************/
ipc_async_socket_timer_cb(evutil_socket_t fd,short what,void * arg)938 static void ipc_async_socket_timer_cb(evutil_socket_t fd, short what, void *arg)
939 {
940 zbx_ipc_async_socket_t *asocket = (zbx_ipc_async_socket_t *)arg;
941
942 ZBX_UNUSED(fd);
943 ZBX_UNUSED(what);
944
945 asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_TIMEOUT;
946 }
947
948 /******************************************************************************
949 * *
950 * Function: ipc_service_accept *
951 * *
952 * Purpose: accepts a new client connection *
953 * *
954 * Parameters: service - [IN] the IPC service *
955 * *
956 ******************************************************************************/
ipc_service_accept(zbx_ipc_service_t * service)957 static void ipc_service_accept(zbx_ipc_service_t *service)
958 {
959 const char *__function_name = "ipc_service_accept";
960 int fd;
961
962 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
963
964 while (-1 == (fd = accept(service->fd, NULL, NULL)))
965 {
966 if (EINTR != errno)
967 {
968 /* If there is unaccepted connection libevent will call registered callback function over and */
969 /* over again. It is better to exit straight away and cause all other processes to stop. */
970 zabbix_log(LOG_LEVEL_CRIT, "cannot accept incoming IPC connection: %s", zbx_strerror(errno));
971 exit(EXIT_FAILURE);
972 }
973 }
974
975 ipc_service_add_client(service, fd);
976
977 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
978 }
979
980 /******************************************************************************
981 * *
982 * Function: ipc_message_create *
983 * *
984 * Purpose: creates IPC message *
985 * *
986 * Parameters: code - [IN] the message code *
987 * data - [IN] the data *
988 * size - [IN] the data size *
989 * *
990 * Return value: The created message. *
991 * *
992 ******************************************************************************/
ipc_message_create(zbx_uint32_t code,const unsigned char * data,zbx_uint32_t size)993 static zbx_ipc_message_t *ipc_message_create(zbx_uint32_t code, const unsigned char *data, zbx_uint32_t size)
994 {
995 zbx_ipc_message_t *message;
996
997 message = (zbx_ipc_message_t *)zbx_malloc(NULL, sizeof(zbx_ipc_message_t));
998
999 message->code = code;
1000 message->size = size;
1001
1002 if (0 != size)
1003 {
1004 message->data = (unsigned char *)zbx_malloc(NULL, size);
1005 memcpy(message->data, data, size);
1006 }
1007 else
1008 message->data = NULL;
1009
1010 return message;
1011 }
1012
1013 /******************************************************************************
1014 * *
1015 * Function: ipc_service_event_log *
1016 * *
1017 * Purpose: libevent logging callback *
1018 * *
1019 ******************************************************************************/
ipc_service_event_log_cb(int severity,const char * msg)1020 static void ipc_service_event_log_cb(int severity, const char *msg)
1021 {
1022 int loglevel;
1023
1024 switch (severity)
1025 {
1026 case _EVENT_LOG_DEBUG:
1027 loglevel = LOG_LEVEL_TRACE;
1028 break;
1029 case _EVENT_LOG_MSG:
1030 loglevel = LOG_LEVEL_DEBUG;
1031 break;
1032 case _EVENT_LOG_WARN:
1033 loglevel = LOG_LEVEL_WARNING;
1034 break;
1035 case _EVENT_LOG_ERR:
1036 loglevel = LOG_LEVEL_DEBUG;
1037 break;
1038 default:
1039 loglevel = LOG_LEVEL_DEBUG;
1040 break;
1041 }
1042
1043 zabbix_log(loglevel, "IPC service: %s", msg);
1044 }
1045
1046 /******************************************************************************
1047 * *
1048 * Function: ipc_service_init_libevent *
1049 * *
1050 * Purpose: initialize libevent library *
1051 * *
1052 ******************************************************************************/
ipc_service_init_libevent(void)1053 static void ipc_service_init_libevent(void)
1054 {
1055 event_set_log_callback(ipc_service_event_log_cb);
1056 }
1057
1058 /******************************************************************************
1059 * *
1060 * Function: ipc_service_free_libevent *
1061 * *
1062 * Purpose: uninitialize libevent library *
1063 * *
1064 ******************************************************************************/
ipc_service_free_libevent(void)1065 static void ipc_service_free_libevent(void)
1066 {
1067 }
1068
1069 /******************************************************************************
1070 * *
1071 * Function: ipc_service_client_connected_cb *
1072 * *
1073 * Purpose: libevent listener callback *
1074 * *
1075 ******************************************************************************/
ipc_service_client_connected_cb(evutil_socket_t fd,short what,void * arg)1076 static void ipc_service_client_connected_cb(evutil_socket_t fd, short what, void *arg)
1077 {
1078 zbx_ipc_service_t *service = (zbx_ipc_service_t *)arg;
1079
1080 ZBX_UNUSED(fd);
1081 ZBX_UNUSED(what);
1082
1083 ipc_service_accept(service);
1084 }
1085
1086 /******************************************************************************
1087 * *
1088 * Function: ipc_service_timer_cb *
1089 * *
1090 * Purpose: timer callback *
1091 * *
1092 ******************************************************************************/
ipc_service_timer_cb(evutil_socket_t fd,short what,void * arg)1093 static void ipc_service_timer_cb(evutil_socket_t fd, short what, void *arg)
1094 {
1095 ZBX_UNUSED(fd);
1096 ZBX_UNUSED(what);
1097 ZBX_UNUSED(arg);
1098 }
1099
1100 /******************************************************************************
1101 * *
1102 * Function: ipc_check_running_service *
1103 * *
1104 * Purpose: checks if an IPC service is already running *
1105 * *
1106 * Parameters: service_name - [IN] *
1107 * *
1108 ******************************************************************************/
ipc_check_running_service(const char * service_name)1109 static int ipc_check_running_service(const char *service_name)
1110 {
1111 zbx_ipc_socket_t csocket;
1112 int ret;
1113 char *error = NULL;
1114
1115 if (SUCCEED == (ret = zbx_ipc_socket_open(&csocket, service_name, 0, &error)))
1116 zbx_ipc_socket_close(&csocket);
1117 else
1118 zbx_free(error);
1119
1120 return ret;
1121 }
1122
1123 /*
1124 * Public client API
1125 */
1126
1127 /******************************************************************************
1128 * *
1129 * Function: zbx_ipc_socket_open *
1130 * *
1131 * Purpose: opens socket to an IPC service listening on the specified path *
1132 * *
1133 * Parameters: csocket - [OUT] the IPC socket to the service *
1134 * service_name - [IN] the IPC service name *
1135 * timeout - [IN] the connection timeout *
1136 * error - [OUT] the error message *
1137 * *
1138 * Return value: SUCCEED - the socket was successfully opened *
1139 * FAIL - otherwise *
1140 * *
1141 ******************************************************************************/
zbx_ipc_socket_open(zbx_ipc_socket_t * csocket,const char * service_name,int timeout,char ** error)1142 int zbx_ipc_socket_open(zbx_ipc_socket_t *csocket, const char *service_name, int timeout, char **error)
1143 {
1144 const char *__function_name = "zbx_ipc_socket_open";
1145 struct sockaddr_un addr;
1146 time_t start;
1147 struct timespec ts = {0, 100000000};
1148 const char *socket_path;
1149 int ret = FAIL;
1150
1151 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
1152
1153 if (NULL == (socket_path = ipc_make_path(service_name, error)))
1154 goto out;
1155
1156 if (-1 == (csocket->fd = socket(AF_UNIX, SOCK_STREAM, 0)))
1157 {
1158 *error = zbx_dsprintf(*error, "Cannot create client socket: %s.", zbx_strerror(errno));
1159 goto out;
1160 }
1161
1162 memset(&addr, 0, sizeof(addr));
1163 addr.sun_family = AF_UNIX;
1164 memcpy(addr.sun_path, socket_path, sizeof(addr.sun_path));
1165
1166 start = time(NULL);
1167
1168 while (0 != connect(csocket->fd, (struct sockaddr*)&addr, sizeof(addr)))
1169 {
1170 if (0 == timeout || time(NULL) - start > timeout)
1171 {
1172 *error = zbx_dsprintf(*error, "Cannot connect to service \"%s\": %s.", service_name,
1173 zbx_strerror(errno));
1174 close(csocket->fd);
1175 goto out;
1176 }
1177
1178 nanosleep(&ts, NULL);
1179 }
1180
1181 csocket->rx_buffer_bytes = 0;
1182 csocket->rx_buffer_offset = 0;
1183
1184 ret = SUCCEED;
1185 out:
1186 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __function_name, zbx_result_string(ret));
1187 return ret;
1188 }
1189
1190 /******************************************************************************
1191 * *
1192 * Function: zbx_ipc_socket_close *
1193 * *
1194 * Purpose: closes socket to an IPC service *
1195 * *
1196 * Parameters: csocket - [IN/OUT] the IPC socket to close *
1197 * *
1198 ******************************************************************************/
zbx_ipc_socket_close(zbx_ipc_socket_t * csocket)1199 void zbx_ipc_socket_close(zbx_ipc_socket_t *csocket)
1200 {
1201 const char *__function_name = "zbx_ipc_socket_close";
1202
1203 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
1204
1205 if (-1 != csocket->fd)
1206 {
1207 close(csocket->fd);
1208 csocket->fd = -1;
1209 }
1210
1211 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
1212 }
1213
1214 /******************************************************************************
1215 * *
1216 * Function: zbx_ipc_socket_write *
1217 * *
1218 * Purpose: writes a message to IPC service *
1219 * *
1220 * Parameters: csocket - [IN] an opened IPC socket to the service *
1221 * code - [IN] the message code *
1222 * data - [IN] the data *
1223 * size - [IN] the data size *
1224 * *
1225 * Return value: SUCCEED - the message was successfully written *
1226 * FAIL - otherwise *
1227 * *
1228 ******************************************************************************/
zbx_ipc_socket_write(zbx_ipc_socket_t * csocket,zbx_uint32_t code,const unsigned char * data,zbx_uint32_t size)1229 int zbx_ipc_socket_write(zbx_ipc_socket_t *csocket, zbx_uint32_t code, const unsigned char *data, zbx_uint32_t size)
1230 {
1231 const char *__function_name = "zbx_ipc_socket_write";
1232 int ret;
1233 zbx_uint32_t size_sent;
1234
1235 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
1236
1237 if (SUCCEED == ipc_socket_write_message(csocket, code, data, size, &size_sent) &&
1238 size_sent == size + ZBX_IPC_HEADER_SIZE)
1239 {
1240 ret = SUCCEED;
1241 }
1242 else
1243 ret = FAIL;
1244
1245 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __function_name, zbx_result_string(ret));
1246
1247 return ret;
1248 }
1249
1250 /******************************************************************************
1251 * *
1252 * Function: zbx_ipc_socket_read *
1253 * *
1254 * Purpose: reads a message from IPC service *
1255 * *
1256 * Parameters: csocket - [IN] an opened IPC socket to the service *
1257 * message - [OUT] the received message *
1258 * *
1259 * Return value: SUCCEED - the message was successfully received *
1260 * FAIL - otherwise *
1261 * *
1262 * Comments: If this function succeeds the message must be cleaned/freed by *
1263 * the caller. *
1264 * *
1265 ******************************************************************************/
zbx_ipc_socket_read(zbx_ipc_socket_t * csocket,zbx_ipc_message_t * message)1266 int zbx_ipc_socket_read(zbx_ipc_socket_t *csocket, zbx_ipc_message_t *message)
1267 {
1268 const char *__function_name = "zbx_ipc_socket_read";
1269 int ret = FAIL;
1270 zbx_uint32_t rx_bytes = 0, header[2];
1271 unsigned char *data = NULL;
1272
1273 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
1274
1275 if (SUCCEED != ipc_socket_read_message(csocket, header, &data, &rx_bytes))
1276 goto out;
1277
1278 if (SUCCEED != ipc_message_is_completed(header, rx_bytes))
1279 {
1280 zbx_free(data);
1281 goto out;
1282 }
1283
1284 message->code = header[ZBX_IPC_MESSAGE_CODE];
1285 message->size = header[ZBX_IPC_MESSAGE_SIZE];
1286 message->data = data;
1287
1288 if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_TRACE))
1289 {
1290 char *msg = NULL;
1291
1292 zbx_ipc_message_format(message, &msg);
1293
1294 zabbix_log(LOG_LEVEL_DEBUG, "%s() %s", __function_name, msg);
1295
1296 zbx_free(msg);
1297 }
1298
1299 ret = SUCCEED;
1300 out:
1301 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __function_name, zbx_result_string(ret));
1302
1303 return ret;
1304 }
1305
1306 /******************************************************************************
1307 * *
1308 * Function: zbx_ipc_message_free *
1309 * *
1310 * Purpose: frees the resources allocated to store IPC message data *
1311 * *
1312 * Parameters: message - [IN] the message to free *
1313 * *
1314 ******************************************************************************/
zbx_ipc_message_free(zbx_ipc_message_t * message)1315 void zbx_ipc_message_free(zbx_ipc_message_t *message)
1316 {
1317 if (NULL != message)
1318 {
1319 zbx_free(message->data);
1320 zbx_free(message);
1321 }
1322 }
1323
1324 /******************************************************************************
1325 * *
1326 * Function: zbx_ipc_message_clean *
1327 * *
1328 * Purpose: frees the resources allocated to store IPC message data *
1329 * *
1330 * Parameters: message - [IN] the message to clean *
1331 * *
1332 ******************************************************************************/
zbx_ipc_message_clean(zbx_ipc_message_t * message)1333 void zbx_ipc_message_clean(zbx_ipc_message_t *message)
1334 {
1335 zbx_free(message->data);
1336 }
1337
1338 /******************************************************************************
1339 * *
1340 * Function: zbx_ipc_message_init *
1341 * *
1342 * Purpose: initializes IPC message *
1343 * *
1344 * Parameters: message - [IN] the message to initialize *
1345 * *
1346 ******************************************************************************/
zbx_ipc_message_init(zbx_ipc_message_t * message)1347 void zbx_ipc_message_init(zbx_ipc_message_t *message)
1348 {
1349 memset(message, 0, sizeof(zbx_ipc_message_t));
1350 }
1351
1352 /******************************************************************************
1353 * *
1354 * Function: zbx_ipc_message_format *
1355 * *
1356 * Purpose: formats message to readable format for debug messages *
1357 * *
1358 * Parameters: message - [IN] the message *
1359 * data - [OUT] the formatted message *
1360 * *
1361 ******************************************************************************/
zbx_ipc_message_format(const zbx_ipc_message_t * message,char ** data)1362 void zbx_ipc_message_format(const zbx_ipc_message_t *message, char **data)
1363 {
1364 size_t data_alloc = ZBX_IPC_DATA_DUMP_SIZE * 4 + 32, data_offset = 0;
1365 zbx_uint32_t i, data_num;
1366
1367 if (NULL == message)
1368 return;
1369
1370 data_num = message->size;
1371
1372 if (ZBX_IPC_DATA_DUMP_SIZE < data_num)
1373 data_num = ZBX_IPC_DATA_DUMP_SIZE;
1374
1375 *data = (char *)zbx_malloc(*data, data_alloc);
1376 zbx_snprintf_alloc(data, &data_alloc, &data_offset, "code:%u size:%u data:", message->code, message->size);
1377
1378 for (i = 0; i < data_num; i++)
1379 {
1380 if (0 != i)
1381 zbx_strcpy_alloc(data, &data_alloc, &data_offset, (0 == (i & 7) ? " | " : " "));
1382
1383 zbx_snprintf_alloc(data, &data_alloc, &data_offset, "%02x", (int)message->data[i]);
1384 }
1385
1386 (*data)[data_offset] = '\0';
1387 }
1388
1389 /******************************************************************************
1390 * *
1391 * Function: zbx_ipc_message_copy *
1392 * *
1393 * Purpose: copies ipc message *
1394 * *
1395 * Parameters: dst - [IN] the destination message *
1396 * src - [IN] the source message *
1397 * *
1398 ******************************************************************************/
zbx_ipc_message_copy(zbx_ipc_message_t * dst,const zbx_ipc_message_t * src)1399 void zbx_ipc_message_copy(zbx_ipc_message_t *dst, const zbx_ipc_message_t *src)
1400 {
1401 dst->code = src->code;
1402 dst->size = src->size;
1403 dst->data = (unsigned char *)zbx_malloc(NULL, src->size);
1404 memcpy(dst->data, src->data, src->size);
1405 }
1406
1407 /*
1408 * Public service API
1409 */
1410
1411 /******************************************************************************
1412 * *
1413 * Function: zbx_ipc_service_init_env *
1414 * *
1415 * Purpose: initializes IPC service environment *
1416 * *
1417 * Parameters: path - [IN] the service root path *
1418 * error - [OUT] the error message *
1419 * *
1420 * Return value: SUCCEED - the environment was initialized successfully. *
1421 * FAIL - otherwise *
1422 * *
1423 ******************************************************************************/
zbx_ipc_service_init_env(const char * path,char ** error)1424 int zbx_ipc_service_init_env(const char *path, char **error)
1425 {
1426 const char *__function_name = "zbx_ipc_service_init_env";
1427 struct stat fs;
1428 int ret = FAIL;
1429
1430 zabbix_log(LOG_LEVEL_DEBUG, "In %s() path:%s", __function_name, path);
1431
1432 if (0 != ipc_path_root_len)
1433 {
1434 *error = zbx_dsprintf(*error, "The IPC service environment has been already initialized with"
1435 " root directory at \"%s\".", ipc_get_path());
1436 goto out;
1437 }
1438
1439 if (0 != stat(path, &fs))
1440 {
1441 *error = zbx_dsprintf(*error, "Failed to stat the specified path \"%s\": %s.", path,
1442 zbx_strerror(errno));
1443 goto out;
1444 }
1445
1446 if (0 == S_ISDIR(fs.st_mode))
1447 {
1448 *error = zbx_dsprintf(*error, "The specified path \"%s\" is not a directory.", path);
1449 goto out;
1450 }
1451
1452 if (0 != access(path, W_OK | R_OK))
1453 {
1454 *error = zbx_dsprintf(*error, "Cannot access path \"%s\": %s.", path, zbx_strerror(errno));
1455 goto out;
1456 }
1457
1458 ipc_path_root_len = strlen(path);
1459 if (ZBX_IPC_PATH_MAX < ipc_path_root_len + 3)
1460 {
1461 *error = zbx_dsprintf(*error, "The IPC root path \"%s\" is too long.", path);
1462 goto out;
1463 }
1464
1465 memcpy(ipc_path, path, ipc_path_root_len + 1);
1466
1467 while (1 < ipc_path_root_len && '/' == ipc_path[ipc_path_root_len - 1])
1468 ipc_path[--ipc_path_root_len] = '\0';
1469
1470 ipc_service_init_libevent();
1471
1472 ret = SUCCEED;
1473 out:
1474 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __function_name, zbx_result_string(ret));
1475
1476 return ret;
1477 }
1478
1479 /******************************************************************************
1480 * *
1481 * Function: zbx_ipc_service_free_env *
1482 * *
1483 * Purpose: frees IPC service environment *
1484 * *
1485 ******************************************************************************/
zbx_ipc_service_free_env(void)1486 void zbx_ipc_service_free_env(void)
1487 {
1488 ipc_service_free_libevent();
1489 }
1490
1491
1492 /******************************************************************************
1493 * *
1494 * Function: zbx_ipc_service_start *
1495 * *
1496 * Purpose: starts IPC service on the specified path *
1497 * *
1498 * Parameters: service - [IN/OUT] the IPC service *
1499 * service_name - [IN] the unix domain socket path *
1500 * error - [OUT] the error message *
1501 * *
1502 * Return value: SUCCEED - the service was initialized successfully. *
1503 * FAIL - otherwise *
1504 * *
1505 ******************************************************************************/
zbx_ipc_service_start(zbx_ipc_service_t * service,const char * service_name,char ** error)1506 int zbx_ipc_service_start(zbx_ipc_service_t *service, const char *service_name, char **error)
1507 {
1508 const char *__function_name = "zbx_ipc_service_start";
1509 struct sockaddr_un addr;
1510 const char *socket_path;
1511 int ret = FAIL;
1512 mode_t mode;
1513
1514 zabbix_log(LOG_LEVEL_DEBUG, "In %s() service:%s", __function_name, service_name);
1515
1516 mode = umask(077);
1517
1518 if (NULL == (socket_path = ipc_make_path(service_name, error)))
1519 goto out;
1520
1521 if (0 == access(socket_path, F_OK))
1522 {
1523 if (0 != access(socket_path, W_OK))
1524 {
1525 *error = zbx_dsprintf(*error, "The file \"%s\" is used by another process.", socket_path);
1526 goto out;
1527 }
1528
1529 if (SUCCEED == ipc_check_running_service(service_name))
1530 {
1531 *error = zbx_dsprintf(*error, "\"%s\" service is already running.", service_name);
1532 goto out;
1533 }
1534
1535 unlink(socket_path);
1536 }
1537
1538 if (-1 == (service->fd = socket(AF_UNIX, SOCK_STREAM, 0)))
1539 {
1540 *error = zbx_dsprintf(*error, "Cannot create socket: %s.", zbx_strerror(errno));
1541 goto out;
1542 }
1543
1544 memset(&addr, 0, sizeof(addr));
1545 addr.sun_family = AF_UNIX;
1546 memcpy(addr.sun_path, socket_path, sizeof(addr.sun_path));
1547
1548 if (0 != bind(service->fd, (struct sockaddr*)&addr, sizeof(addr)))
1549 {
1550 *error = zbx_dsprintf(*error, "Cannot bind socket to \"%s\": %s.", socket_path, zbx_strerror(errno));
1551 goto out;
1552 }
1553
1554 if (0 != listen(service->fd, SOMAXCONN))
1555 {
1556 *error = zbx_dsprintf(*error, "Cannot listen socket: %s.", zbx_strerror(errno));
1557 goto out;
1558 }
1559
1560 service->path = zbx_strdup(NULL, service_name);
1561 zbx_vector_ptr_create(&service->clients);
1562 zbx_queue_ptr_create(&service->clients_recv);
1563
1564 service->ev = event_base_new();
1565 service->ev_listener = event_new(service->ev, service->fd, EV_READ | EV_PERSIST,
1566 ipc_service_client_connected_cb, service);
1567 event_add(service->ev_listener, NULL);
1568
1569 service->ev_timer = event_new(service->ev, -1, 0, ipc_service_timer_cb, service);
1570
1571 ret = SUCCEED;
1572 out:
1573 umask(mode);
1574
1575 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __function_name, zbx_result_string(ret));
1576
1577 return ret;
1578 }
1579
1580 /******************************************************************************
1581 * *
1582 * Function: zbx_ipc_service_close *
1583 * *
1584 * Purpose: closes IPC service and frees the resources allocated by it *
1585 * *
1586 * Parameters: service - [IN/OUT] the IPC service *
1587 * *
1588 ******************************************************************************/
zbx_ipc_service_close(zbx_ipc_service_t * service)1589 void zbx_ipc_service_close(zbx_ipc_service_t *service)
1590 {
1591 const char *__function_name = "zbx_ipc_service_close";
1592 int i;
1593
1594 zabbix_log(LOG_LEVEL_DEBUG, "In %s() path:%s", __function_name, service->path);
1595
1596 close(service->fd);
1597
1598 for (i = 0; i < service->clients.values_num; i++)
1599 ipc_client_free((zbx_ipc_client_t *)service->clients.values[i]);
1600
1601 zbx_free(service->path);
1602
1603 zbx_vector_ptr_destroy(&service->clients);
1604 zbx_queue_ptr_destroy(&service->clients_recv);
1605
1606 event_free(service->ev_timer);
1607 event_free(service->ev_listener);
1608 event_base_free(service->ev);
1609
1610 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
1611 }
1612
1613 /******************************************************************************
1614 * *
1615 * Function: zbx_ipc_service_recv *
1616 * *
1617 * Purpose: receives ipc message from a connected client *
1618 * *
1619 * Parameters: service - [IN] the IPC service *
1620 * timeout - [IN] the timeout in seconds, 0 is used for *
1621 * nonblocking call and ZBX_IPC_WAIT_FOREVER is *
1622 * used for blocking call without timeout *
1623 * client - [OUT] the client that sent the message or *
1624 * NULL if there are no messages and the *
1625 * specified timeout passed. *
1626 * The client must be released by caller with *
1627 * zbx_ipc_client_release() function. *
1628 * message - [OUT] the received message or NULL if the client *
1629 * connection was closed. *
1630 * The message must be freed by caller with *
1631 * ipc_message_free() function. *
1632 * *
1633 * Return value: ZBX_IPC_RECV_IMMEDIATE - returned immediately without *
1634 * waiting for socket events *
1635 * (pending events are processed) *
1636 * ZBX_IPC_RECV_WAIT - returned after receiving socket *
1637 * event *
1638 * ZBX_IPC_RECV_TIMEOUT - returned after timeout expired *
1639 * *
1640 ******************************************************************************/
zbx_ipc_service_recv(zbx_ipc_service_t * service,int timeout,zbx_ipc_client_t ** client,zbx_ipc_message_t ** message)1641 int zbx_ipc_service_recv(zbx_ipc_service_t *service, int timeout, zbx_ipc_client_t **client,
1642 zbx_ipc_message_t **message)
1643 {
1644 const char *__function_name = "zbx_ipc_service_recv";
1645
1646 int ret, flags;
1647
1648 zabbix_log(LOG_LEVEL_DEBUG, "In %s() timeout:%d", __function_name, timeout);
1649
1650 if (timeout != 0 && SUCCEED == zbx_queue_ptr_empty(&service->clients_recv))
1651 {
1652 if (ZBX_IPC_WAIT_FOREVER != timeout)
1653 {
1654 struct timeval tv = {timeout, 0};
1655 evtimer_add(service->ev_timer, &tv);
1656 }
1657 flags = EVLOOP_ONCE;
1658 }
1659 else
1660 flags = EVLOOP_NONBLOCK;
1661
1662 event_base_loop(service->ev, flags);
1663
1664 if (NULL != (*client = ipc_service_pop_client(service)))
1665 {
1666 if (NULL != (*message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&(*client)->rx_queue)))
1667 {
1668 if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_TRACE))
1669 {
1670 char *data = NULL;
1671
1672 zbx_ipc_message_format(*message, &data);
1673 zabbix_log(LOG_LEVEL_DEBUG, "%s() %s", __function_name, data);
1674
1675 zbx_free(data);
1676 }
1677
1678 ipc_service_push_client(service, *client);
1679 zbx_ipc_client_addref(*client);
1680 }
1681
1682 ret = (EVLOOP_NONBLOCK == flags ? ZBX_IPC_RECV_IMMEDIATE : ZBX_IPC_RECV_WAIT);
1683 }
1684 else
1685 { ret = ZBX_IPC_RECV_TIMEOUT;
1686 *client = NULL;
1687 *message = NULL;
1688 }
1689
1690 evtimer_del(service->ev_timer);
1691
1692 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%d", __function_name, ret);
1693
1694 return ret;
1695 }
1696
1697 /******************************************************************************
1698 * *
1699 * Function: zbx_ipc_client_send *
1700 * *
1701 * Purpose: Sends IPC message to client *
1702 * *
1703 * Parameters: client - [IN] the IPC client *
1704 * code - [IN] the message code *
1705 * data - [IN] the data *
1706 * size - [IN] the data size *
1707 * *
1708 * Comments: If data can't be written directly to socket (buffer full) then *
1709 * the message is queued and sent during zbx_ipc_service_recv() *
1710 * messaging loop whenever socket becomes ready. *
1711 * *
1712 ******************************************************************************/
zbx_ipc_client_send(zbx_ipc_client_t * client,zbx_uint32_t code,const unsigned char * data,zbx_uint32_t size)1713 int zbx_ipc_client_send(zbx_ipc_client_t *client, zbx_uint32_t code, const unsigned char *data, zbx_uint32_t size)
1714 {
1715 const char *__function_name = "zbx_ipc_client_send";
1716 zbx_uint32_t tx_size = 0;
1717 zbx_ipc_message_t *message;
1718 int ret = FAIL;
1719
1720 zabbix_log(LOG_LEVEL_DEBUG, "In %s() clientid:" ZBX_FS_UI64, __function_name, client->id);
1721
1722 if (0 != client->tx_bytes)
1723 {
1724 message = ipc_message_create(code, data, size);
1725 zbx_queue_ptr_push(&client->tx_queue, message);
1726 ret = SUCCEED;
1727 goto out;
1728 }
1729
1730 if (FAIL == ipc_socket_write_message(&client->csocket, code, data, size, &tx_size))
1731 goto out;
1732
1733 if (tx_size != ZBX_IPC_HEADER_SIZE + size)
1734 {
1735 client->tx_header[ZBX_IPC_MESSAGE_CODE] = code;
1736 client->tx_header[ZBX_IPC_MESSAGE_SIZE] = size;
1737 client->tx_data = (unsigned char *)zbx_malloc(NULL, size);
1738 memcpy(client->tx_data, data, size);
1739 client->tx_bytes = ZBX_IPC_HEADER_SIZE + size - tx_size;
1740 event_add(client->tx_event, NULL);
1741 }
1742
1743 ret = SUCCEED;
1744 out:
1745 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __function_name, zbx_result_string(ret));
1746
1747 return ret;
1748 }
1749
1750 /******************************************************************************
1751 * *
1752 * Function: zbx_ipc_client_close *
1753 * *
1754 * Purpose: closes client socket and frees resources allocated for client *
1755 * *
1756 * Parameters: client - [IN] the IPC client *
1757 * *
1758 ******************************************************************************/
zbx_ipc_client_close(zbx_ipc_client_t * client)1759 void zbx_ipc_client_close(zbx_ipc_client_t *client)
1760 {
1761 const char *__function_name = "zbx_ipc_client_close";
1762
1763 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
1764
1765 ipc_client_free_events(client);
1766 zbx_ipc_socket_close(&client->csocket);
1767
1768 ipc_service_remove_client(client->service, client);
1769 zbx_queue_ptr_remove_value(&client->service->clients_recv, client);
1770 zbx_ipc_client_release(client);
1771
1772 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
1773 }
1774
zbx_ipc_client_addref(zbx_ipc_client_t * client)1775 void zbx_ipc_client_addref(zbx_ipc_client_t *client)
1776 {
1777 client->refcount++;
1778 }
1779
zbx_ipc_client_release(zbx_ipc_client_t * client)1780 void zbx_ipc_client_release(zbx_ipc_client_t *client)
1781 {
1782 if (0 == --client->refcount)
1783 ipc_client_free(client);
1784 }
1785
zbx_ipc_client_connected(zbx_ipc_client_t * client)1786 int zbx_ipc_client_connected(zbx_ipc_client_t *client)
1787 {
1788 return (NULL == client->rx_event ? FAIL : SUCCEED);
1789 }
1790
1791 /******************************************************************************
1792 * *
1793 * Function: zbx_ipc_async_socket_open *
1794 * *
1795 * Purpose: opens asynchronous socket to IPC service client *
1796 * *
1797 * Parameters: client - [OUT] the IPC service client *
1798 * service_name - [IN] the IPC service name *
1799 * timeout - [IN] the connection timeout *
1800 * error - [OUT] the error message *
1801 * *
1802 * Return value: SUCCEED - the socket was successfully opened *
1803 * FAIL - otherwise *
1804 * *
1805 ******************************************************************************/
zbx_ipc_async_socket_open(zbx_ipc_async_socket_t * asocket,const char * service_name,int timeout,char ** error)1806 int zbx_ipc_async_socket_open(zbx_ipc_async_socket_t *asocket, const char *service_name, int timeout, char **error)
1807 {
1808 const char *__function_name = "zbx_ipc_async_socket_open";
1809 int ret = FAIL, flags;
1810
1811 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
1812
1813 memset(asocket, 0, sizeof(zbx_ipc_async_socket_t));
1814 asocket->client = (zbx_ipc_client_t *)zbx_malloc(NULL, sizeof(zbx_ipc_client_t));
1815 memset(asocket->client, 0, sizeof(zbx_ipc_client_t));
1816
1817 if (SUCCEED != zbx_ipc_socket_open(&asocket->client->csocket, service_name, timeout, error))
1818 {
1819 zbx_free(asocket->client);
1820 goto out;
1821 }
1822
1823 if (-1 == (flags = fcntl(asocket->client->csocket.fd, F_GETFL, 0)))
1824 {
1825 zabbix_log(LOG_LEVEL_CRIT, "cannot get IPC client socket flags");
1826 exit(EXIT_FAILURE);
1827 }
1828
1829 if (-1 == fcntl(asocket->client->csocket.fd, F_SETFL, flags | O_NONBLOCK))
1830 {
1831 zabbix_log(LOG_LEVEL_CRIT, "cannot set non-blocking mode for IPC client socket");
1832 exit(EXIT_FAILURE);
1833 }
1834
1835 asocket->ev = event_base_new();
1836 asocket->ev_timer = event_new(asocket->ev, -1, 0, ipc_async_socket_timer_cb, asocket);
1837 asocket->client->rx_event = event_new(asocket->ev, asocket->client->csocket.fd, EV_READ | EV_PERSIST,
1838 ipc_async_socket_read_event_cb, (void *)asocket);
1839 asocket->client->tx_event = event_new(asocket->ev, asocket->client->csocket.fd, EV_WRITE | EV_PERSIST,
1840 ipc_async_socket_write_event_cb, (void *)asocket);
1841 event_add(asocket->client->rx_event, NULL);
1842
1843 asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_NONE;
1844
1845 ret = SUCCEED;
1846 out:
1847 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __function_name, zbx_result_string(ret));
1848 return ret;
1849 }
1850
1851 /******************************************************************************
1852 * *
1853 * Function: zbx_ipc_async_socket_close *
1854 * *
1855 * Purpose: closes asynchronous IPC socket and frees allocated resources *
1856 * *
1857 * Parameters: asocket - [IN] the asynchronous IPC socket *
1858 * *
1859 ******************************************************************************/
zbx_ipc_async_socket_close(zbx_ipc_async_socket_t * asocket)1860 void zbx_ipc_async_socket_close(zbx_ipc_async_socket_t *asocket)
1861 {
1862 const char *__function_name = "zbx_ipc_async_socket_close";
1863
1864 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
1865
1866 ipc_client_free(asocket->client);
1867
1868 event_free(asocket->ev_timer);
1869 event_base_free(asocket->ev);
1870
1871 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
1872 }
1873
1874 /******************************************************************************
1875 * *
1876 * Function: zbx_ipc_async_socket_send *
1877 * *
1878 * Purpose: Sends message through asynchronous IPC socket *
1879 * *
1880 * Parameters: asocket - [IN] the asynchronous IPC socket *
1881 * code - [IN] the message code *
1882 * data - [IN] the data *
1883 * size - [IN] the data size *
1884 * *
1885 * Comments: If data can't be written directly to socket (buffer full) then *
1886 * the message is queued and sent during zbx_ipc_async_socket_recv()*
1887 * or zbx_ipc_async_socket_flush() functions whenever socket becomes*
1888 * ready. *
1889 * *
1890 ******************************************************************************/
zbx_ipc_async_socket_send(zbx_ipc_async_socket_t * asocket,zbx_uint32_t code,const unsigned char * data,zbx_uint32_t size)1891 int zbx_ipc_async_socket_send(zbx_ipc_async_socket_t *asocket, zbx_uint32_t code, const unsigned char *data,
1892 zbx_uint32_t size)
1893 {
1894 const char *__function_name = "zbx_ipc_async_socket_send";
1895 int ret = FAIL;
1896
1897 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
1898
1899 ret = zbx_ipc_client_send(asocket->client, code, data, size);
1900
1901 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __function_name, zbx_result_string(ret));
1902
1903 return ret;
1904 }
1905
1906 /******************************************************************************
1907 * *
1908 * Function: zbx_ipc_async_socket_recv *
1909 * *
1910 * Purpose: receives message through asynchronous IPC socket *
1911 * *
1912 * Parameters: asocket - [IN] the asynchronous IPC socket *
1913 * timeout - [IN] the timeout in seconds, 0 is used for *
1914 * nonblocking call and ZBX_IPC_WAIT_FOREVER is *
1915 * used for blocking call without timeout *
1916 * message - [OUT] the received message or NULL if the client *
1917 * connection was closed. *
1918 * The message must be freed by caller with *
1919 * ipc_message_free() function. *
1920 * *
1921 * Return value: SUCCEED - the message was read successfully or timeout *
1922 * occurred *
1923 * FAIL - otherwise *
1924 * *
1925 * Comments: After socket has been closed (or connection error has occurred) *
1926 * calls to zbx_ipc_client_read() will return success with buffered *
1927 * messages, until all buffered messages are retrieved. *
1928 * *
1929 ******************************************************************************/
zbx_ipc_async_socket_recv(zbx_ipc_async_socket_t * asocket,int timeout,zbx_ipc_message_t ** message)1930 int zbx_ipc_async_socket_recv(zbx_ipc_async_socket_t *asocket, int timeout, zbx_ipc_message_t **message)
1931 {
1932 const char *__function_name = "zbx_ipc_async_socket_recv";
1933
1934 int ret, flags;
1935
1936 zabbix_log(LOG_LEVEL_DEBUG, "In %s() timeout:%d", __function_name, timeout);
1937
1938 if (timeout != 0 && SUCCEED == zbx_queue_ptr_empty(&asocket->client->rx_queue))
1939 {
1940 if (ZBX_IPC_WAIT_FOREVER != timeout)
1941 {
1942 struct timeval tv = {timeout, 0};
1943 evtimer_add(asocket->ev_timer, &tv);
1944 }
1945 flags = EVLOOP_ONCE;
1946 }
1947 else
1948 flags = EVLOOP_NONBLOCK;
1949
1950 if (ZBX_IPC_ASYNC_SOCKET_STATE_ERROR != asocket->state)
1951 event_base_loop(asocket->ev, flags);
1952
1953 if (NULL != (*message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&asocket->client->rx_queue)))
1954 {
1955 if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_TRACE))
1956 {
1957 char *data = NULL;
1958
1959 zbx_ipc_message_format(*message, &data);
1960 zabbix_log(LOG_LEVEL_DEBUG, "%s() %s", __function_name, data);
1961
1962 zbx_free(data);
1963 }
1964 }
1965
1966 if (NULL != *message || ZBX_IPC_ASYNC_SOCKET_STATE_ERROR != asocket->state)
1967 ret = SUCCEED;
1968 else
1969 ret = FAIL;
1970
1971 evtimer_del(asocket->ev_timer);
1972
1973 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%d", __function_name, ret);
1974
1975 return ret;
1976 }
1977
1978 /******************************************************************************
1979 * *
1980 * Function: zbx_ipc_async_socket_flush *
1981 * *
1982 * Purpose: flushes unsent through asynchronous IPC socket *
1983 * *
1984 * Parameters: asocket - [IN] the asynchronous IPC service socket *
1985 * timeout - [IN] the timeout in seconds, 0 is used for *
1986 * nonblocking call and ZBX_IPC_WAIT_FOREVER is *
1987 * used for blocking call without timeout *
1988 * *
1989 * Return value: SUCCEED - the data was flushed successfully or timeout *
1990 * occurred. Use zbx_ipc_client_unsent_data() to *
1991 * check if all data was sent. *
1992 * FAIL - failed to send data (connection was closed or an *
1993 * error occurred). *
1994 * *
1995 ******************************************************************************/
zbx_ipc_async_socket_flush(zbx_ipc_async_socket_t * asocket,int timeout)1996 int zbx_ipc_async_socket_flush(zbx_ipc_async_socket_t *asocket, int timeout)
1997 {
1998 const char *__function_name = "zbx_ipc_async_socket_flush";
1999
2000 int ret = FAIL, flags;
2001
2002 zabbix_log(LOG_LEVEL_DEBUG, "In %s() timeout:%d", __function_name, timeout);
2003
2004 if (0 == asocket->client->tx_bytes)
2005 {
2006 ret = SUCCEED;
2007 goto out;
2008 }
2009
2010 if (ZBX_IPC_ASYNC_SOCKET_STATE_ERROR == asocket->state)
2011 goto out;
2012
2013 asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_NONE;
2014
2015 if (0 != timeout)
2016 {
2017 if (ZBX_IPC_WAIT_FOREVER != timeout)
2018 {
2019 struct timeval tv = {timeout, 0};
2020 evtimer_add(asocket->ev_timer, &tv);
2021 }
2022 flags = EVLOOP_ONCE;
2023 }
2024 else
2025 flags = EVLOOP_NONBLOCK;
2026
2027 do
2028 {
2029 event_base_loop(asocket->ev, flags);
2030
2031 if (SUCCEED != zbx_ipc_client_connected(asocket->client))
2032 goto out;
2033 }
2034 while (0 != timeout && 0 != asocket->client->tx_bytes && ZBX_IPC_ASYNC_SOCKET_STATE_NONE == asocket->state);
2035
2036 if (ZBX_IPC_ASYNC_SOCKET_STATE_ERROR != asocket->state)
2037 {
2038 ret = SUCCEED;
2039 asocket->state = ZBX_IPC_CLIENT_STATE_NONE;
2040 }
2041 out:
2042 evtimer_del(asocket->ev_timer);
2043
2044 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%d", __function_name, ret);
2045
2046 return ret;
2047 }
2048
2049 /******************************************************************************
2050 * *
2051 * Function: zbx_ipc_async_socket_check_unsent *
2052 * *
2053 * Purpose: checks if there are data to be sent *
2054 * *
2055 * Parameters: client - [IN] the IPC service client *
2056 * *
2057 * Return value: SUCCEED - there are messages queued to be sent *
2058 * FAIL - all data has been sent *
2059 * *
2060 ******************************************************************************/
zbx_ipc_async_socket_check_unsent(zbx_ipc_async_socket_t * asocket)2061 int zbx_ipc_async_socket_check_unsent(zbx_ipc_async_socket_t *asocket)
2062 {
2063 return (0 == asocket->client->tx_bytes ? FAIL : SUCCEED);
2064 }
2065
2066 #endif
2067