1 #include "common.h"
2
3 #ifdef HAVE_IPCSERVICE
4
5 #ifdef HAVE_LIBEVENT
6 # include <event.h>
7 #endif
8
9 #include "zbxtypes.h"
10 #include "zbxalgo.h"
11 #include "log.h"
12 #include "zbxipcservice.h"
13
14 #define ZBX_IPC_PATH_MAX sizeof(((struct sockaddr_un *)0)->sun_path)
15
16 #define ZBX_IPC_DATA_DUMP_SIZE 128
17
18 static char ipc_path[ZBX_IPC_PATH_MAX] = {0};
19 static size_t ipc_path_root_len = 0;
20
21 #define ZBX_IPC_CLIENT_STATE_NONE 0
22 #define ZBX_IPC_CLIENT_STATE_QUEUED 1
23
24 #define ZBX_IPC_ASYNC_SOCKET_STATE_NONE 0
25 #define ZBX_IPC_ASYNC_SOCKET_STATE_TIMEOUT 1
26 #define ZBX_IPC_ASYNC_SOCKET_STATE_ERROR 2
27
28 extern unsigned char program_type;
29
30 /* IPC client, providing nonblocking connections through socket */
31 struct zbx_ipc_client
32 {
33 zbx_ipc_socket_t csocket;
34 zbx_ipc_service_t *service;
35
36 zbx_uint32_t rx_header[2];
37 unsigned char *rx_data;
38 zbx_uint32_t rx_bytes;
39 zbx_queue_ptr_t rx_queue;
40 struct event *rx_event;
41
42 zbx_uint32_t tx_header[2];
43 unsigned char *tx_data;
44 zbx_uint32_t tx_bytes;
45 zbx_queue_ptr_t tx_queue;
46 struct event *tx_event;
47
48 zbx_uint64_t id;
49 unsigned char state;
50
51 zbx_uint32_t refcount;
52 };
53
54 /*
55 * Private API
56 */
57
58 #define ZBX_IPC_HEADER_SIZE (int)(sizeof(zbx_uint32_t) * 2)
59
60 #define ZBX_IPC_MESSAGE_CODE 0
61 #define ZBX_IPC_MESSAGE_SIZE 1
62
63 #if !defined(LIBEVENT_VERSION_NUMBER) || LIBEVENT_VERSION_NUMBER < 0x2000000
64 typedef int evutil_socket_t;
65
event_new(struct event_base * ev,evutil_socket_t fd,short what,void (* cb_func)(int,short,void *),void * cb_arg)66 static struct event *event_new(struct event_base *ev, evutil_socket_t fd, short what,
67 void(*cb_func)(int, short, void *), void *cb_arg)
68 {
69 struct event *event;
70
71 event = zbx_malloc(NULL, sizeof(struct event));
72 event_set(event, fd, what, cb_func, cb_arg);
73 event_base_set(ev, event);
74
75 return event;
76 }
77
event_free(struct event * event)78 static void event_free(struct event *event)
79 {
80 event_del(event);
81 zbx_free(event);
82 }
83
84 #endif
85
86 static void ipc_client_read_event_cb(evutil_socket_t fd, short what, void *arg);
87 static void ipc_client_write_event_cb(evutil_socket_t fd, short what, void *arg);
88
ipc_get_path(void)89 static const char *ipc_get_path(void)
90 {
91 ipc_path[ipc_path_root_len] = '\0';
92
93 return ipc_path;
94 }
95
96 #define ZBX_IPC_SOCKET_PREFIX "/zabbix_"
97 #define ZBX_IPC_SOCKET_SUFFIX ".sock"
98
99 #define ZBX_IPC_CLASS_PREFIX_NONE ""
100 #define ZBX_IPC_CLASS_PREFIX_SERVER "server_"
101 #define ZBX_IPC_CLASS_PREFIX_PROXY "proxy_"
102 #define ZBX_IPC_CLASS_PREFIX_AGENT "agent_"
103
104 /******************************************************************************
105 * *
106 * Function: ipc_make_path *
107 * *
108 * Purpose: makes socket path from the service name *
109 * *
110 * Parameters: service_name - [IN] the service name *
111 * error - [OUT] the error message *
112 * *
113 * Return value: The created path or NULL if the path exceeds unix domain *
114 * socket path maximum length *
115 * *
116 ******************************************************************************/
ipc_make_path(const char * service_name,char ** error)117 static const char *ipc_make_path(const char *service_name, char **error)
118 {
119 const char *prefix;
120 size_t path_len, offset, prefix_len;
121
122 path_len = strlen(service_name);
123
124 switch (program_type)
125 {
126 case ZBX_PROGRAM_TYPE_SERVER:
127 prefix = ZBX_IPC_CLASS_PREFIX_SERVER;
128 prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_SERVER);
129 break;
130 case ZBX_PROGRAM_TYPE_PROXY_ACTIVE:
131 case ZBX_PROGRAM_TYPE_PROXY_PASSIVE:
132 prefix = ZBX_IPC_CLASS_PREFIX_PROXY;
133 prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_PROXY);
134 break;
135 case ZBX_PROGRAM_TYPE_AGENTD:
136 prefix = ZBX_IPC_CLASS_PREFIX_AGENT;
137 prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_AGENT);
138 break;
139 default:
140 prefix = ZBX_IPC_CLASS_PREFIX_NONE;
141 prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_NONE);
142 break;
143 }
144
145 if (ZBX_IPC_PATH_MAX < ipc_path_root_len + path_len + 1 + ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_PREFIX) +
146 ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_SUFFIX) + prefix_len)
147 {
148 *error = zbx_dsprintf(*error,
149 "Socket path \"%s%s%s%s%s\" exceeds maximum length of unix domain socket path.",
150 ipc_path, ZBX_IPC_SOCKET_PREFIX, prefix, service_name, ZBX_IPC_SOCKET_SUFFIX);
151 return NULL;
152 }
153
154 offset = ipc_path_root_len;
155 memcpy(ipc_path + offset , ZBX_IPC_SOCKET_PREFIX, ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_PREFIX));
156 offset += ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_PREFIX);
157 memcpy(ipc_path + offset, prefix, prefix_len);
158 offset += prefix_len;
159 memcpy(ipc_path + offset, service_name, path_len);
160 offset += path_len;
161 memcpy(ipc_path + offset, ZBX_IPC_SOCKET_SUFFIX, ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_SUFFIX) + 1);
162
163 return ipc_path;
164 }
165
166 /******************************************************************************
167 * *
168 * Function: ipc_write_data *
169 * *
170 * Purpose: writes data to a socket *
171 * *
172 * Parameters: fd - [IN] the socket file descriptor *
173 * data - [IN] the data *
174 * size - [IN] the data size *
175 * size_sent - [IN] the actual size written to socket *
176 * *
177 * Return value: SUCCEED - no socket errors were detected. Either the data or *
178 * a part of it was written to socket or a write to *
179 * non-blocking socket would block *
180 * FAIL - otherwise *
181 * *
182 ******************************************************************************/
ipc_write_data(int fd,const unsigned char * data,zbx_uint32_t size,zbx_uint32_t * size_sent)183 static int ipc_write_data(int fd, const unsigned char *data, zbx_uint32_t size, zbx_uint32_t *size_sent)
184 {
185 zbx_uint32_t offset = 0;
186 int n, ret = SUCCEED;
187
188 while (offset != size)
189 {
190 n = write(fd, data + offset, size - offset);
191
192 if (-1 == n)
193 {
194 if (EINTR == errno)
195 continue;
196
197 if (EWOULDBLOCK == errno || EAGAIN == errno)
198 break;
199
200 zabbix_log(LOG_LEVEL_WARNING, "cannot write to IPC socket: %s", strerror(errno));
201 ret = FAIL;
202 break;
203 }
204 offset += n;
205 }
206
207 *size_sent = offset;
208
209 return ret;
210 }
211
212 /******************************************************************************
213 * *
214 * Function: ipc_read_data *
215 * *
216 * Purpose: reads data from a socket *
217 * *
218 * Parameters: fd - [IN] the socket file descriptor *
219 * data - [IN] the data *
220 * size - [IN] the data size *
221 * size_sent - [IN] the actual size read from socket *
222 * *
223 * Return value: SUCCEED - the data was successfully read *
224 * FAIL - otherwise *
225 * *
226 * Comments: When reading data from non-blocking sockets SUCCEED will be *
227 * returned also if there were no more data to read. *
228 * *
229 ******************************************************************************/
ipc_read_data(int fd,unsigned char * buffer,zbx_uint32_t size,zbx_uint32_t * read_size)230 static int ipc_read_data(int fd, unsigned char *buffer, zbx_uint32_t size, zbx_uint32_t *read_size)
231 {
232 int n;
233
234 *read_size = 0;
235
236 while (-1 == (n = read(fd, buffer + *read_size, size - *read_size)))
237 {
238 if (EINTR == errno)
239 continue;
240
241 if (EWOULDBLOCK == errno || EAGAIN == errno)
242 return SUCCEED;
243
244 return FAIL;
245 }
246
247 if (0 == n)
248 return FAIL;
249
250 *read_size += n;
251
252 return SUCCEED;
253 }
254
255 /******************************************************************************
256 * *
257 * Function: ipc_read_data_full *
258 * *
259 * Purpose: reads data from a socket until the requested data has been read *
260 * *
261 * Parameters: fd - [IN] the socket file descriptor *
262 * buffer - [IN] the data *
263 * size - [IN] the data size *
264 * read_size - [IN] the actual size read from socket *
265 * *
266 * Return value: SUCCEED - the data was successfully read *
267 * FAIL - otherwise *
268 * *
269 * Comments: When reading data from non-blocking sockets this function will *
270 * return SUCCEED if there are no data to read, even if not all of *
271 * the requested data has been read. *
272 * *
273 ******************************************************************************/
ipc_read_data_full(int fd,unsigned char * buffer,zbx_uint32_t size,zbx_uint32_t * read_size)274 static int ipc_read_data_full(int fd, unsigned char *buffer, zbx_uint32_t size, zbx_uint32_t *read_size)
275 {
276 int ret = FAIL;
277 zbx_uint32_t offset = 0, chunk_size;
278
279 *read_size = 0;
280
281 while (offset < size)
282 {
283 if (FAIL == ipc_read_data(fd, buffer + offset, size - offset, &chunk_size))
284 goto out;
285
286 if (0 == chunk_size)
287 break;
288
289 offset += chunk_size;
290 }
291
292 ret = SUCCEED;
293 out:
294 *read_size = offset;
295
296 return ret;
297 }
298
299 /******************************************************************************
300 * *
301 * Function: ipc_socket_write_message *
302 * *
303 * Purpose: writes IPC message to socket *
304 * *
305 * Parameters: csocket - [IN] the IPC socket *
306 * code - [IN] the message code *
307 * data - [IN] the data *
308 * size - [IN] the data size *
309 * tx_size - [IN] the actual size written to socket *
310 * *
311 * Return value: SUCCEED - no socket errors were detected. Either the data or *
312 * a part of it was written to socket or a write to *
313 * non-blocking socket would block *
314 * FAIL - otherwise *
315 * *
316 * Comments: When using non-blocking sockets the tx_size parameter must be *
317 * checked in addition to return value to tell if the message was *
318 * sent successfully. *
319 * *
320 ******************************************************************************/
ipc_socket_write_message(zbx_ipc_socket_t * csocket,zbx_uint32_t code,const unsigned char * data,zbx_uint32_t size,zbx_uint32_t * tx_size)321 static int ipc_socket_write_message(zbx_ipc_socket_t *csocket, zbx_uint32_t code, const unsigned char *data,
322 zbx_uint32_t size, zbx_uint32_t *tx_size)
323 {
324 int ret;
325 zbx_uint32_t size_data, buffer[ZBX_IPC_SOCKET_BUFFER_SIZE / sizeof(zbx_uint32_t)];
326
327 buffer[0] = code;
328 buffer[1] = size;
329
330 if (ZBX_IPC_SOCKET_BUFFER_SIZE - ZBX_IPC_HEADER_SIZE >= size)
331 {
332 if (0 != size)
333 memcpy(buffer + 2, data, size);
334
335 return ipc_write_data(csocket->fd, (unsigned char *)buffer, size + ZBX_IPC_HEADER_SIZE, tx_size);
336 }
337
338 if (FAIL == ipc_write_data(csocket->fd, (unsigned char *)buffer, ZBX_IPC_HEADER_SIZE, tx_size))
339 return FAIL;
340
341 /* in the case of non-blocking sockets only a part of the header might be sent */
342 if (ZBX_IPC_HEADER_SIZE != *tx_size)
343 return SUCCEED;
344
345 ret = ipc_write_data(csocket->fd, data, size, &size_data);
346 *tx_size += size_data;
347
348 return ret;
349 }
350
351 /******************************************************************************
352 * *
353 * Function: ipc_read_buffer *
354 * *
355 * Purpose: reads message header and data from buffer *
356 * *
357 * Parameters: header - [IN/OUT] the message header *
358 * data - [OUT] the message data *
359 * rx_bytes - [IN] the number of bytes stored in message *
360 * (including header) *
361 * buffer - [IN] the buffer to parse *
362 * size - [IN] the number of bytes to parse *
363 * read_size - [OUT] the number of bytes read *
364 * *
365 * Return value: SUCCEED - message was successfully parsed *
366 * FAIL - not enough data *
367 * *
368 ******************************************************************************/
ipc_read_buffer(zbx_uint32_t * header,unsigned char ** data,zbx_uint32_t rx_bytes,const unsigned char * buffer,zbx_uint32_t size,zbx_uint32_t * read_size)369 static int ipc_read_buffer(zbx_uint32_t *header, unsigned char **data, zbx_uint32_t rx_bytes,
370 const unsigned char *buffer, zbx_uint32_t size, zbx_uint32_t *read_size)
371 {
372 zbx_uint32_t copy_size, data_size, data_offset;
373
374 *read_size = 0;
375
376 if (ZBX_IPC_HEADER_SIZE > rx_bytes)
377 {
378 copy_size = MIN(ZBX_IPC_HEADER_SIZE - rx_bytes, size);
379 memcpy((char *)header + rx_bytes, buffer, copy_size);
380 *read_size += copy_size;
381
382 if (ZBX_IPC_HEADER_SIZE > rx_bytes + copy_size)
383 return FAIL;
384
385 data_size = header[ZBX_IPC_MESSAGE_SIZE];
386
387 if (0 == data_size)
388 {
389 *data = NULL;
390 return SUCCEED;
391 }
392
393 *data = (unsigned char *)zbx_malloc(NULL, data_size);
394 data_offset = 0;
395 }
396 else
397 {
398 data_size = header[ZBX_IPC_MESSAGE_SIZE];
399 data_offset = rx_bytes - ZBX_IPC_HEADER_SIZE;
400 }
401
402 copy_size = MIN(data_size - data_offset, size - *read_size);
403 memcpy(*data + data_offset, buffer + *read_size, copy_size);
404 *read_size += copy_size;
405
406 return (rx_bytes + *read_size == data_size + ZBX_IPC_HEADER_SIZE ? SUCCEED : FAIL);
407 }
408
409 /******************************************************************************
410 * *
411 * Function: ipc_message_is_completed *
412 * *
413 * Purpose: checks if IPC message has been completed *
414 * *
415 * Parameters: header - [IN] the message header *
416 * rx_bytes - [IN] the number of bytes set in message *
417 * (including header) *
418 * *
419 * Return value: SUCCEED - message has been completed *
420 * FAIL - otherwise *
421 * *
422 ******************************************************************************/
ipc_message_is_completed(const zbx_uint32_t * header,zbx_uint32_t rx_bytes)423 static int ipc_message_is_completed(const zbx_uint32_t *header, zbx_uint32_t rx_bytes)
424 {
425 if (ZBX_IPC_HEADER_SIZE > rx_bytes)
426 return FAIL;
427
428 if (header[ZBX_IPC_MESSAGE_SIZE] + ZBX_IPC_HEADER_SIZE != rx_bytes)
429 return FAIL;
430
431 return SUCCEED;
432 }
433
434 /******************************************************************************
435 * *
436 * Function: ipc_socket_read_message *
437 * *
438 * Purpose: reads IPC message from buffered client socket *
439 * *
440 * Parameters: csocket - [IN] the source socket *
441 * header - [OUT] the header of the message *
442 * data - [OUT] the data of the message *
443 * rx_bytes - [IN/OUT] the total message size read (including *
444 * header *
445 * *
446 * Return value: SUCCEED - data was read successfully, check rx_bytes to *
447 * determine if the message was completed. *
448 * FAIL - failed to read message (socket error or connection *
449 * was closed). *
450 * *
451 ******************************************************************************/
ipc_socket_read_message(zbx_ipc_socket_t * csocket,zbx_uint32_t * header,unsigned char ** data,zbx_uint32_t * rx_bytes)452 static int ipc_socket_read_message(zbx_ipc_socket_t *csocket, zbx_uint32_t *header, unsigned char **data,
453 zbx_uint32_t *rx_bytes)
454 {
455 zbx_uint32_t data_size, offset, read_size = 0;
456 int ret = FAIL;
457
458 /* try to read message from socket buffer */
459 if (csocket->rx_buffer_bytes > csocket->rx_buffer_offset)
460 {
461 ret = ipc_read_buffer(header, data, *rx_bytes, csocket->rx_buffer + csocket->rx_buffer_offset,
462 csocket->rx_buffer_bytes - csocket->rx_buffer_offset, &read_size);
463
464 csocket->rx_buffer_offset += read_size;
465 *rx_bytes += read_size;
466
467 if (SUCCEED == ret)
468 goto out;
469 }
470
471 /* not enough data in socket buffer, try to read more until message is completed or no data to read */
472 while (SUCCEED != ret)
473 {
474 csocket->rx_buffer_offset = 0;
475 csocket->rx_buffer_bytes = 0;
476
477 if (ZBX_IPC_HEADER_SIZE < *rx_bytes)
478 {
479 offset = *rx_bytes - ZBX_IPC_HEADER_SIZE;
480 data_size = header[ZBX_IPC_MESSAGE_SIZE] - offset;
481
482 /* long messages will be read directly into message buffer */
483 if (ZBX_IPC_SOCKET_BUFFER_SIZE * 0.75 < data_size)
484 {
485 ret = ipc_read_data_full(csocket->fd, *data + offset, data_size, &read_size);
486 *rx_bytes += read_size;
487 goto out;
488 }
489 }
490
491 if (FAIL == ipc_read_data(csocket->fd, csocket->rx_buffer, ZBX_IPC_SOCKET_BUFFER_SIZE, &read_size))
492 goto out;
493
494 /* it's possible that nothing will be read on non-blocking sockets, return success */
495 if (0 == read_size)
496 {
497 ret = SUCCEED;
498 goto out;
499 }
500
501 csocket->rx_buffer_bytes = read_size;
502
503 ret = ipc_read_buffer(header, data, *rx_bytes, csocket->rx_buffer, csocket->rx_buffer_bytes,
504 &read_size);
505
506 csocket->rx_buffer_offset += read_size;
507 *rx_bytes += read_size;
508 }
509 out:
510 return ret;
511 }
512
513 /******************************************************************************
514 * *
515 * Function: ipc_client_free_event *
516 * *
517 * Purpose: frees client's libevent event *
518 * *
519 * Parameters: client - [IN] the client *
520 * *
521 ******************************************************************************/
ipc_client_free_events(zbx_ipc_client_t * client)522 static void ipc_client_free_events(zbx_ipc_client_t *client)
523 {
524 if (NULL != client->rx_event)
525 {
526 event_free(client->rx_event);
527 client->rx_event = NULL;
528 }
529
530 if (NULL != client->tx_event)
531 {
532 event_free(client->tx_event);
533 client->tx_event = NULL;
534 }
535 }
536
537 /******************************************************************************
538 * *
539 * Function: ipc_client_free *
540 * *
541 * Purpose: frees IPC service client *
542 * *
543 * Parameters: client - [IN] the client to free *
544 * *
545 ******************************************************************************/
ipc_client_free(zbx_ipc_client_t * client)546 static void ipc_client_free(zbx_ipc_client_t *client)
547 {
548 zbx_ipc_message_t *message;
549
550 ipc_client_free_events(client);
551 zbx_ipc_socket_close(&client->csocket);
552
553 while (NULL != (message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&client->rx_queue)))
554 zbx_ipc_message_free(message);
555
556 zbx_queue_ptr_destroy(&client->rx_queue);
557 zbx_free(client->rx_data);
558
559 while (NULL != (message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&client->tx_queue)))
560 zbx_ipc_message_free(message);
561
562 zbx_queue_ptr_destroy(&client->tx_queue);
563 zbx_free(client->tx_data);
564
565 ipc_client_free_events(client);
566
567 zbx_free(client);
568 }
569
570 /******************************************************************************
571 * *
572 * Function: ipc_client_push_rx_message *
573 * *
574 * Purpose: adds message to received messages queue *
575 * *
576 * Parameters: client - [IN] the client to read *
577 * *
578 ******************************************************************************/
ipc_client_push_rx_message(zbx_ipc_client_t * client)579 static void ipc_client_push_rx_message(zbx_ipc_client_t *client)
580 {
581 zbx_ipc_message_t *message;
582
583 message = (zbx_ipc_message_t *)zbx_malloc(NULL, sizeof(zbx_ipc_message_t));
584 message->code = client->rx_header[ZBX_IPC_MESSAGE_CODE];
585 message->size = client->rx_header[ZBX_IPC_MESSAGE_SIZE];
586 message->data = client->rx_data;
587 zbx_queue_ptr_push(&client->rx_queue, message);
588
589 client->rx_data = NULL;
590 client->rx_bytes = 0;
591 }
592
593 /******************************************************************************
594 * *
595 * Function: ipc_client_pop_tx_message *
596 * *
597 * Purpose: prepares to send the next message in send queue *
598 * *
599 * Parameters: client - [IN] the client *
600 * *
601 ******************************************************************************/
ipc_client_pop_tx_message(zbx_ipc_client_t * client)602 static void ipc_client_pop_tx_message(zbx_ipc_client_t *client)
603 {
604 zbx_ipc_message_t *message;
605
606 zbx_free(client->tx_data);
607 client->tx_bytes = 0;
608
609 if (NULL == (message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&client->tx_queue)))
610 return;
611
612 client->tx_bytes = ZBX_IPC_HEADER_SIZE + message->size;
613 client->tx_header[ZBX_IPC_MESSAGE_CODE] = message->code;
614 client->tx_header[ZBX_IPC_MESSAGE_SIZE] = message->size;
615 client->tx_data = message->data;
616 zbx_free(message);
617 }
618
619 /******************************************************************************
620 * *
621 * Function: ipc_client_read *
622 * *
623 * Purpose: reads data from IPC service client *
624 * *
625 * Parameters: client - [IN] the client to read *
626 * *
627 * Return value: FAIL - read error/connection was closed *
628 * *
629 * Comments: This function reads data from socket, parses it and adds *
630 * parsed messages to received messages queue. *
631 * *
632 ******************************************************************************/
ipc_client_read(zbx_ipc_client_t * client)633 static int ipc_client_read(zbx_ipc_client_t *client)
634 {
635 int rc;
636
637 do
638 {
639 if (FAIL == ipc_socket_read_message(&client->csocket, client->rx_header, &client->rx_data,
640 &client->rx_bytes))
641 {
642 zbx_free(client->rx_data);
643 client->rx_bytes = 0;
644 return FAIL;
645 }
646
647 if (SUCCEED == (rc = ipc_message_is_completed(client->rx_header, client->rx_bytes)))
648 ipc_client_push_rx_message(client);
649 }
650
651 while (SUCCEED == rc);
652
653 return SUCCEED;
654 }
655
656 /******************************************************************************
657 * *
658 * Function: ipc_client_write *
659 * *
660 * Purpose: writes queued data to IPC service client *
661 * *
662 * Parameters: client - [IN] the client *
663 * *
664 * Return value: SUCCEED - the data was sent successfully *
665 * FAIL - otherwise *
666 * *
667 ******************************************************************************/
ipc_client_write(zbx_ipc_client_t * client)668 static int ipc_client_write(zbx_ipc_client_t *client)
669 {
670 zbx_uint32_t data_size, write_size;
671
672 data_size = client->tx_header[ZBX_IPC_MESSAGE_SIZE];
673
674 if (data_size < client->tx_bytes)
675 {
676 zbx_uint32_t size, offset;
677
678 size = client->tx_bytes - data_size;
679 offset = ZBX_IPC_HEADER_SIZE - size;
680
681 if (SUCCEED != ipc_write_data(client->csocket.fd, (unsigned char *)client->tx_header + offset, size,
682 &write_size))
683 {
684 return FAIL;
685 }
686
687 client->tx_bytes -= write_size;
688
689 if (data_size < client->tx_bytes)
690 return SUCCEED;
691 }
692
693 while (0 < client->tx_bytes)
694 {
695 if (SUCCEED != ipc_write_data(client->csocket.fd, client->tx_data + data_size - client->tx_bytes,
696 client->tx_bytes, &write_size))
697 {
698 return FAIL;
699 }
700
701 if (0 == write_size)
702 return SUCCEED;
703
704 client->tx_bytes -= write_size;
705 }
706
707 if (0 == client->tx_bytes)
708 ipc_client_pop_tx_message(client);
709
710 return SUCCEED;
711 }
712
713 /******************************************************************************
714 * *
715 * Function: ipc_service_pop_client *
716 * *
717 * Purpose: gets the next client with messages/closed socket from recv queue *
718 * *
719 * Parameters: service - [IN] the IPC service *
720 * *
721 * Return value: The client with messages/closed socket *
722 * *
723 ******************************************************************************/
ipc_service_pop_client(zbx_ipc_service_t * service)724 static zbx_ipc_client_t *ipc_service_pop_client(zbx_ipc_service_t *service)
725 {
726 zbx_ipc_client_t *client;
727
728 if (NULL != (client = (zbx_ipc_client_t *)zbx_queue_ptr_pop(&service->clients_recv)))
729 client->state = ZBX_IPC_CLIENT_STATE_NONE;
730
731 return client;
732 }
733
734 /******************************************************************************
735 * *
736 * Function: ipc_service_push_client *
737 * *
738 * Purpose: pushes client to the recv queue if needed *
739 * *
740 * Parameters: service - [IN] the IPC service *
741 * client - [IN] the IPC client *
742 * *
743 * Comments: The client is pushed to the recv queue if it isn't already there *
744 * and there is messages to return or the client connection was *
745 * closed. *
746 * *
747 ******************************************************************************/
ipc_service_push_client(zbx_ipc_service_t * service,zbx_ipc_client_t * client)748 static void ipc_service_push_client(zbx_ipc_service_t *service, zbx_ipc_client_t *client)
749 {
750 if (ZBX_IPC_CLIENT_STATE_QUEUED == client->state)
751 return;
752
753 if (0 == zbx_queue_ptr_values_num(&client->rx_queue) && NULL != client->rx_event)
754 return;
755
756 client->state = ZBX_IPC_CLIENT_STATE_QUEUED;
757 zbx_queue_ptr_push(&service->clients_recv, client);
758 }
759
760 /******************************************************************************
761 * *
762 * Function: ipc_service_add_client *
763 * *
764 * Purpose: adds a new IPC service client *
765 * *
766 * Parameters: service - [IN] the IPC service *
767 * fd - [IN] the client socket descriptor *
768 * *
769 ******************************************************************************/
ipc_service_add_client(zbx_ipc_service_t * service,int fd)770 static void ipc_service_add_client(zbx_ipc_service_t *service, int fd)
771 {
772 static zbx_uint64_t next_clientid = 1;
773 zbx_ipc_client_t *client;
774 int flags;
775
776 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
777
778 client = (zbx_ipc_client_t *)zbx_malloc(NULL, sizeof(zbx_ipc_client_t));
779 memset(client, 0, sizeof(zbx_ipc_client_t));
780
781 if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
782 {
783 zabbix_log(LOG_LEVEL_CRIT, "cannot get IPC client socket flags");
784 exit(EXIT_FAILURE);
785 }
786
787 if (-1 == fcntl(fd, F_SETFL, flags | O_NONBLOCK))
788 {
789 zabbix_log(LOG_LEVEL_CRIT, "cannot set non-blocking mode for IPC client socket");
790 exit(EXIT_FAILURE);
791 }
792
793 client->csocket.fd = fd;
794 client->csocket.rx_buffer_bytes = 0;
795 client->csocket.rx_buffer_offset = 0;
796 client->id = next_clientid++;
797 client->state = ZBX_IPC_CLIENT_STATE_NONE;
798 client->refcount = 1;
799
800 zbx_queue_ptr_create(&client->rx_queue);
801 zbx_queue_ptr_create(&client->tx_queue);
802
803 client->service = service;
804 client->rx_event = event_new(service->ev, fd, EV_READ | EV_PERSIST, ipc_client_read_event_cb, (void *)client);
805 client->tx_event = event_new(service->ev, fd, EV_WRITE | EV_PERSIST, ipc_client_write_event_cb, (void *)client);
806 event_add(client->rx_event, NULL);
807
808 zbx_vector_ptr_append(&service->clients, client);
809
810 zabbix_log(LOG_LEVEL_DEBUG, "End of %s() clientid:" ZBX_FS_UI64, __func__, client->id);
811 }
812
813 /******************************************************************************
814 * *
815 * Function: ipc_service_remove_client *
816 * *
817 * Purpose: removes IPC service client *
818 * *
819 * Parameters: service - [IN] the IPC service *
820 * client - [IN] the client to remove *
821 * *
822 ******************************************************************************/
ipc_service_remove_client(zbx_ipc_service_t * service,zbx_ipc_client_t * client)823 static void ipc_service_remove_client(zbx_ipc_service_t *service, zbx_ipc_client_t *client)
824 {
825 int i;
826
827 for (i = 0; i < service->clients.values_num; i++)
828 {
829 if (service->clients.values[i] == client)
830 zbx_vector_ptr_remove_noorder(&service->clients, i);
831 }
832 }
833
834 /******************************************************************************
835 * *
836 * Function: zbx_ipc_client_by_id *
837 * *
838 * Purpose: to find connected client when only it's ID is known *
839 * *
840 * Parameters: service - [IN] the IPC service *
841 * id - [IN] ID of client *
842 * *
843 * Return value: address of client or NULL if client has already disconnected *
844 * *
845 ******************************************************************************/
zbx_ipc_client_by_id(const zbx_ipc_service_t * service,zbx_uint64_t id)846 zbx_ipc_client_t *zbx_ipc_client_by_id(const zbx_ipc_service_t *service, zbx_uint64_t id)
847 {
848 int i;
849 zbx_ipc_client_t *client;
850
851 for (i = 0; i < service->clients.values_num; i++)
852 {
853 client = (zbx_ipc_client_t *) service->clients.values[i];
854
855 if (id == client->id)
856 return client;
857 }
858
859 return NULL;
860 }
861
862 /******************************************************************************
863 * *
864 * Function: ipc_client_read_event_cb *
865 * *
866 * Purpose: service client read event libevent callback *
867 * *
868 ******************************************************************************/
ipc_client_read_event_cb(evutil_socket_t fd,short what,void * arg)869 static void ipc_client_read_event_cb(evutil_socket_t fd, short what, void *arg)
870 {
871 zbx_ipc_client_t *client = (zbx_ipc_client_t *)arg;
872
873 ZBX_UNUSED(fd);
874 ZBX_UNUSED(what);
875
876 if (SUCCEED != ipc_client_read(client))
877 {
878 ipc_client_free_events(client);
879 ipc_service_remove_client(client->service, client);
880 }
881
882 ipc_service_push_client(client->service, client);
883 }
884
885 /******************************************************************************
886 * *
887 * Function: ipc_client_write_event_cb *
888 * *
889 * Purpose: service client write event libevent callback *
890 * *
891 ******************************************************************************/
ipc_client_write_event_cb(evutil_socket_t fd,short what,void * arg)892 static void ipc_client_write_event_cb(evutil_socket_t fd, short what, void *arg)
893 {
894 zbx_ipc_client_t *client = (zbx_ipc_client_t *)arg;
895
896 ZBX_UNUSED(fd);
897 ZBX_UNUSED(what);
898
899 if (SUCCEED != ipc_client_write(client))
900 {
901 zabbix_log(LOG_LEVEL_CRIT, "cannot send data to IPC client");
902 zbx_ipc_client_close(client);
903 return;
904 }
905
906 if (0 == client->tx_bytes)
907 event_del(client->tx_event);
908 }
909
910 /******************************************************************************
911 * *
912 * Function: ipc_async_socket_write_event_cb *
913 * *
914 * Purpose: asynchronous socket write event libevent callback *
915 * *
916 ******************************************************************************/
ipc_async_socket_write_event_cb(evutil_socket_t fd,short what,void * arg)917 static void ipc_async_socket_write_event_cb(evutil_socket_t fd, short what, void *arg)
918 {
919 zbx_ipc_async_socket_t *asocket = (zbx_ipc_async_socket_t *)arg;
920
921 ZBX_UNUSED(fd);
922 ZBX_UNUSED(what);
923
924 if (SUCCEED != ipc_client_write(asocket->client))
925 {
926 zabbix_log(LOG_LEVEL_CRIT, "cannot send data to IPC client");
927 ipc_client_free_events(asocket->client);
928 zbx_ipc_socket_close(&asocket->client->csocket);
929 asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_ERROR;
930 return;
931 }
932
933 if (0 == asocket->client->tx_bytes)
934 event_del(asocket->client->tx_event);
935 }
936
937 /******************************************************************************
938 * *
939 * Function: ipc_async_socket_read_event_cb *
940 * *
941 * Purpose: asynchronous socket read event libevent callback *
942 * *
943 ******************************************************************************/
ipc_async_socket_read_event_cb(evutil_socket_t fd,short what,void * arg)944 static void ipc_async_socket_read_event_cb(evutil_socket_t fd, short what, void *arg)
945 {
946 zbx_ipc_async_socket_t *asocket = (zbx_ipc_async_socket_t *)arg;
947
948 ZBX_UNUSED(fd);
949 ZBX_UNUSED(what);
950
951 if (SUCCEED != ipc_client_read(asocket->client))
952 {
953 ipc_client_free_events(asocket->client);
954 asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_ERROR;
955 }
956 }
957
958 /******************************************************************************
959 * *
960 * Function: ipc_async_socket_timer_cb *
961 * *
962 * Purpose: timer callback *
963 * *
964 ******************************************************************************/
ipc_async_socket_timer_cb(evutil_socket_t fd,short what,void * arg)965 static void ipc_async_socket_timer_cb(evutil_socket_t fd, short what, void *arg)
966 {
967 zbx_ipc_async_socket_t *asocket = (zbx_ipc_async_socket_t *)arg;
968
969 ZBX_UNUSED(fd);
970 ZBX_UNUSED(what);
971
972 asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_TIMEOUT;
973 }
974
975 /******************************************************************************
976 * *
977 * Function: ipc_service_accept *
978 * *
979 * Purpose: accepts a new client connection *
980 * *
981 * Parameters: service - [IN] the IPC service *
982 * *
983 ******************************************************************************/
ipc_service_accept(zbx_ipc_service_t * service)984 static void ipc_service_accept(zbx_ipc_service_t *service)
985 {
986 int fd;
987
988 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
989
990 while (-1 == (fd = accept(service->fd, NULL, NULL)))
991 {
992 if (EINTR != errno)
993 {
994 /* If there is unaccepted connection libevent will call registered callback function over and */
995 /* over again. It is better to exit straight away and cause all other processes to stop. */
996 zabbix_log(LOG_LEVEL_CRIT, "cannot accept incoming IPC connection: %s", zbx_strerror(errno));
997 exit(EXIT_FAILURE);
998 }
999 }
1000
1001 ipc_service_add_client(service, fd);
1002
1003 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1004 }
1005
1006 /******************************************************************************
1007 * *
1008 * Function: ipc_message_create *
1009 * *
1010 * Purpose: creates IPC message *
1011 * *
1012 * Parameters: code - [IN] the message code *
1013 * data - [IN] the data *
1014 * size - [IN] the data size *
1015 * *
1016 * Return value: The created message. *
1017 * *
1018 ******************************************************************************/
ipc_message_create(zbx_uint32_t code,const unsigned char * data,zbx_uint32_t size)1019 static zbx_ipc_message_t *ipc_message_create(zbx_uint32_t code, const unsigned char *data, zbx_uint32_t size)
1020 {
1021 zbx_ipc_message_t *message;
1022
1023 message = (zbx_ipc_message_t *)zbx_malloc(NULL, sizeof(zbx_ipc_message_t));
1024
1025 message->code = code;
1026 message->size = size;
1027
1028 if (0 != size)
1029 {
1030 message->data = (unsigned char *)zbx_malloc(NULL, size);
1031 memcpy(message->data, data, size);
1032 }
1033 else
1034 message->data = NULL;
1035
1036 return message;
1037 }
1038
1039 /******************************************************************************
1040 * *
1041 * Function: ipc_service_event_log *
1042 * *
1043 * Purpose: libevent logging callback *
1044 * *
1045 ******************************************************************************/
ipc_service_event_log_cb(int severity,const char * msg)1046 static void ipc_service_event_log_cb(int severity, const char *msg)
1047 {
1048 int loglevel;
1049
1050 switch (severity)
1051 {
1052 case _EVENT_LOG_DEBUG:
1053 loglevel = LOG_LEVEL_TRACE;
1054 break;
1055 case _EVENT_LOG_MSG:
1056 loglevel = LOG_LEVEL_DEBUG;
1057 break;
1058 case _EVENT_LOG_WARN:
1059 loglevel = LOG_LEVEL_WARNING;
1060 break;
1061 case _EVENT_LOG_ERR:
1062 loglevel = LOG_LEVEL_DEBUG;
1063 break;
1064 default:
1065 loglevel = LOG_LEVEL_DEBUG;
1066 break;
1067 }
1068
1069 zabbix_log(loglevel, "IPC service: %s", msg);
1070 }
1071
1072 /******************************************************************************
1073 * *
1074 * Function: ipc_service_init_libevent *
1075 * *
1076 * Purpose: initialize libevent library *
1077 * *
1078 ******************************************************************************/
ipc_service_init_libevent(void)1079 static void ipc_service_init_libevent(void)
1080 {
1081 event_set_log_callback(ipc_service_event_log_cb);
1082 }
1083
1084 /******************************************************************************
1085 * *
1086 * Function: ipc_service_free_libevent *
1087 * *
1088 * Purpose: uninitialize libevent library *
1089 * *
1090 ******************************************************************************/
ipc_service_free_libevent(void)1091 static void ipc_service_free_libevent(void)
1092 {
1093 }
1094
1095 /******************************************************************************
1096 * *
1097 * Function: ipc_service_client_connected_cb *
1098 * *
1099 * Purpose: libevent listener callback *
1100 * *
1101 ******************************************************************************/
ipc_service_client_connected_cb(evutil_socket_t fd,short what,void * arg)1102 static void ipc_service_client_connected_cb(evutil_socket_t fd, short what, void *arg)
1103 {
1104 zbx_ipc_service_t *service = (zbx_ipc_service_t *)arg;
1105
1106 ZBX_UNUSED(fd);
1107 ZBX_UNUSED(what);
1108
1109 ipc_service_accept(service);
1110 }
1111
1112 /******************************************************************************
1113 * *
1114 * Function: ipc_service_timer_cb *
1115 * *
1116 * Purpose: timer callback *
1117 * *
1118 ******************************************************************************/
ipc_service_timer_cb(evutil_socket_t fd,short what,void * arg)1119 static void ipc_service_timer_cb(evutil_socket_t fd, short what, void *arg)
1120 {
1121 ZBX_UNUSED(fd);
1122 ZBX_UNUSED(what);
1123 ZBX_UNUSED(arg);
1124 }
1125
1126 /******************************************************************************
1127 * *
1128 * Function: ipc_check_running_service *
1129 * *
1130 * Purpose: checks if an IPC service is already running *
1131 * *
1132 * Parameters: service_name - [IN] *
1133 * *
1134 ******************************************************************************/
ipc_check_running_service(const char * service_name)1135 static int ipc_check_running_service(const char *service_name)
1136 {
1137 zbx_ipc_socket_t csocket;
1138 int ret;
1139 char *error = NULL;
1140
1141 if (SUCCEED == (ret = zbx_ipc_socket_open(&csocket, service_name, 0, &error)))
1142 zbx_ipc_socket_close(&csocket);
1143 else
1144 zbx_free(error);
1145
1146 return ret;
1147 }
1148
1149 /*
1150 * Public client API
1151 */
1152
1153 /******************************************************************************
1154 * *
1155 * Function: zbx_ipc_socket_open *
1156 * *
1157 * Purpose: opens socket to an IPC service listening on the specified path *
1158 * *
1159 * Parameters: csocket - [OUT] the IPC socket to the service *
1160 * service_name - [IN] the IPC service name *
1161 * timeout - [IN] the connection timeout *
1162 * error - [OUT] the error message *
1163 * *
1164 * Return value: SUCCEED - the socket was successfully opened *
1165 * FAIL - otherwise *
1166 * *
1167 ******************************************************************************/
zbx_ipc_socket_open(zbx_ipc_socket_t * csocket,const char * service_name,int timeout,char ** error)1168 int zbx_ipc_socket_open(zbx_ipc_socket_t *csocket, const char *service_name, int timeout, char **error)
1169 {
1170 struct sockaddr_un addr;
1171 time_t start;
1172 struct timespec ts = {0, 100000000};
1173 const char *socket_path;
1174 int ret = FAIL;
1175
1176 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1177
1178 if (NULL == (socket_path = ipc_make_path(service_name, error)))
1179 goto out;
1180
1181 if (-1 == (csocket->fd = socket(AF_UNIX, SOCK_STREAM, 0)))
1182 {
1183 *error = zbx_dsprintf(*error, "Cannot create client socket: %s.", zbx_strerror(errno));
1184 goto out;
1185 }
1186
1187 memset(&addr, 0, sizeof(addr));
1188 addr.sun_family = AF_UNIX;
1189 memcpy(addr.sun_path, socket_path, sizeof(addr.sun_path));
1190
1191 start = time(NULL);
1192
1193 while (0 != connect(csocket->fd, (struct sockaddr*)&addr, sizeof(addr)))
1194 {
1195 if (0 == timeout || time(NULL) - start > timeout)
1196 {
1197 *error = zbx_dsprintf(*error, "Cannot connect to service \"%s\": %s.", service_name,
1198 zbx_strerror(errno));
1199 close(csocket->fd);
1200 goto out;
1201 }
1202
1203 nanosleep(&ts, NULL);
1204 }
1205
1206 csocket->rx_buffer_bytes = 0;
1207 csocket->rx_buffer_offset = 0;
1208
1209 ret = SUCCEED;
1210 out:
1211 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1212 return ret;
1213 }
1214
1215 /******************************************************************************
1216 * *
1217 * Function: zbx_ipc_socket_close *
1218 * *
1219 * Purpose: closes socket to an IPC service *
1220 * *
1221 * Parameters: csocket - [IN/OUT] the IPC socket to close *
1222 * *
1223 ******************************************************************************/
zbx_ipc_socket_close(zbx_ipc_socket_t * csocket)1224 void zbx_ipc_socket_close(zbx_ipc_socket_t *csocket)
1225 {
1226 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1227
1228 if (-1 != csocket->fd)
1229 {
1230 close(csocket->fd);
1231 csocket->fd = -1;
1232 }
1233
1234 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1235 }
1236
1237 /******************************************************************************
1238 * *
1239 * Function: zbx_ipc_socket_write *
1240 * *
1241 * Purpose: writes a message to IPC service *
1242 * *
1243 * Parameters: csocket - [IN] an opened IPC socket to the service *
1244 * code - [IN] the message code *
1245 * data - [IN] the data *
1246 * size - [IN] the data size *
1247 * *
1248 * Return value: SUCCEED - the message was successfully written *
1249 * FAIL - otherwise *
1250 * *
1251 ******************************************************************************/
zbx_ipc_socket_write(zbx_ipc_socket_t * csocket,zbx_uint32_t code,const unsigned char * data,zbx_uint32_t size)1252 int zbx_ipc_socket_write(zbx_ipc_socket_t *csocket, zbx_uint32_t code, const unsigned char *data, zbx_uint32_t size)
1253 {
1254 int ret;
1255 zbx_uint32_t size_sent;
1256
1257 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1258
1259 if (SUCCEED == ipc_socket_write_message(csocket, code, data, size, &size_sent) &&
1260 size_sent == size + ZBX_IPC_HEADER_SIZE)
1261 {
1262 ret = SUCCEED;
1263 }
1264 else
1265 ret = FAIL;
1266
1267 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1268
1269 return ret;
1270 }
1271
1272 /******************************************************************************
1273 * *
1274 * Function: zbx_ipc_socket_read *
1275 * *
1276 * Purpose: reads a message from IPC service *
1277 * *
1278 * Parameters: csocket - [IN] an opened IPC socket to the service *
1279 * message - [OUT] the received message *
1280 * *
1281 * Return value: SUCCEED - the message was successfully received *
1282 * FAIL - otherwise *
1283 * *
1284 * Comments: If this function succeeds the message must be cleaned/freed by *
1285 * the caller. *
1286 * *
1287 ******************************************************************************/
zbx_ipc_socket_read(zbx_ipc_socket_t * csocket,zbx_ipc_message_t * message)1288 int zbx_ipc_socket_read(zbx_ipc_socket_t *csocket, zbx_ipc_message_t *message)
1289 {
1290 int ret = FAIL;
1291 zbx_uint32_t rx_bytes = 0, header[2];
1292 unsigned char *data = NULL;
1293
1294 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1295
1296 if (SUCCEED != ipc_socket_read_message(csocket, header, &data, &rx_bytes))
1297 goto out;
1298
1299 if (SUCCEED != ipc_message_is_completed(header, rx_bytes))
1300 {
1301 zbx_free(data);
1302 goto out;
1303 }
1304
1305 message->code = header[ZBX_IPC_MESSAGE_CODE];
1306 message->size = header[ZBX_IPC_MESSAGE_SIZE];
1307 message->data = data;
1308
1309 if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_TRACE))
1310 {
1311 char *msg = NULL;
1312
1313 zbx_ipc_message_format(message, &msg);
1314
1315 zabbix_log(LOG_LEVEL_DEBUG, "%s() %s", __func__, msg);
1316
1317 zbx_free(msg);
1318 }
1319
1320 ret = SUCCEED;
1321 out:
1322 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1323
1324 return ret;
1325 }
1326
1327 /******************************************************************************
1328 * *
1329 * Function: zbx_ipc_message_free *
1330 * *
1331 * Purpose: frees the resources allocated to store IPC message data *
1332 * *
1333 * Parameters: message - [IN] the message to free *
1334 * *
1335 ******************************************************************************/
zbx_ipc_message_free(zbx_ipc_message_t * message)1336 void zbx_ipc_message_free(zbx_ipc_message_t *message)
1337 {
1338 if (NULL != message)
1339 {
1340 zbx_free(message->data);
1341 zbx_free(message);
1342 }
1343 }
1344
1345 /******************************************************************************
1346 * *
1347 * Function: zbx_ipc_message_clean *
1348 * *
1349 * Purpose: frees the resources allocated to store IPC message data *
1350 * *
1351 * Parameters: message - [IN] the message to clean *
1352 * *
1353 ******************************************************************************/
zbx_ipc_message_clean(zbx_ipc_message_t * message)1354 void zbx_ipc_message_clean(zbx_ipc_message_t *message)
1355 {
1356 zbx_free(message->data);
1357 }
1358
1359 /******************************************************************************
1360 * *
1361 * Function: zbx_ipc_message_init *
1362 * *
1363 * Purpose: initializes IPC message *
1364 * *
1365 * Parameters: message - [IN] the message to initialize *
1366 * *
1367 ******************************************************************************/
zbx_ipc_message_init(zbx_ipc_message_t * message)1368 void zbx_ipc_message_init(zbx_ipc_message_t *message)
1369 {
1370 memset(message, 0, sizeof(zbx_ipc_message_t));
1371 }
1372
1373 /******************************************************************************
1374 * *
1375 * Function: zbx_ipc_message_format *
1376 * *
1377 * Purpose: formats message to readable format for debug messages *
1378 * *
1379 * Parameters: message - [IN] the message *
1380 * data - [OUT] the formatted message *
1381 * *
1382 ******************************************************************************/
zbx_ipc_message_format(const zbx_ipc_message_t * message,char ** data)1383 void zbx_ipc_message_format(const zbx_ipc_message_t *message, char **data)
1384 {
1385 size_t data_alloc = ZBX_IPC_DATA_DUMP_SIZE * 4 + 32, data_offset = 0;
1386 zbx_uint32_t i, data_num;
1387
1388 if (NULL == message)
1389 return;
1390
1391 data_num = message->size;
1392
1393 if (ZBX_IPC_DATA_DUMP_SIZE < data_num)
1394 data_num = ZBX_IPC_DATA_DUMP_SIZE;
1395
1396 *data = (char *)zbx_malloc(*data, data_alloc);
1397 zbx_snprintf_alloc(data, &data_alloc, &data_offset, "code:%u size:%u data:", message->code, message->size);
1398
1399 for (i = 0; i < data_num; i++)
1400 {
1401 if (0 != i)
1402 zbx_strcpy_alloc(data, &data_alloc, &data_offset, (0 == (i & 7) ? " | " : " "));
1403
1404 zbx_snprintf_alloc(data, &data_alloc, &data_offset, "%02x", (int)message->data[i]);
1405 }
1406
1407 (*data)[data_offset] = '\0';
1408 }
1409
1410 /******************************************************************************
1411 * *
1412 * Function: zbx_ipc_message_copy *
1413 * *
1414 * Purpose: copies ipc message *
1415 * *
1416 * Parameters: dst - [IN] the destination message *
1417 * src - [IN] the source message *
1418 * *
1419 ******************************************************************************/
zbx_ipc_message_copy(zbx_ipc_message_t * dst,const zbx_ipc_message_t * src)1420 void zbx_ipc_message_copy(zbx_ipc_message_t *dst, const zbx_ipc_message_t *src)
1421 {
1422 dst->code = src->code;
1423 dst->size = src->size;
1424 dst->data = (unsigned char *)zbx_malloc(NULL, src->size);
1425 memcpy(dst->data, src->data, src->size);
1426 }
1427
1428 /*
1429 * Public service API
1430 */
1431
1432 /******************************************************************************
1433 * *
1434 * Function: zbx_ipc_service_init_env *
1435 * *
1436 * Purpose: initializes IPC service environment *
1437 * *
1438 * Parameters: path - [IN] the service root path *
1439 * error - [OUT] the error message *
1440 * *
1441 * Return value: SUCCEED - the environment was initialized successfully. *
1442 * FAIL - otherwise *
1443 * *
1444 ******************************************************************************/
zbx_ipc_service_init_env(const char * path,char ** error)1445 int zbx_ipc_service_init_env(const char *path, char **error)
1446 {
1447 struct stat fs;
1448 int ret = FAIL;
1449
1450 zabbix_log(LOG_LEVEL_DEBUG, "In %s() path:%s", __func__, path);
1451
1452 if (0 != ipc_path_root_len)
1453 {
1454 *error = zbx_dsprintf(*error, "The IPC service environment has been already initialized with"
1455 " root directory at \"%s\".", ipc_get_path());
1456 goto out;
1457 }
1458
1459 if (0 != stat(path, &fs))
1460 {
1461 *error = zbx_dsprintf(*error, "Failed to stat the specified path \"%s\": %s.", path,
1462 zbx_strerror(errno));
1463 goto out;
1464 }
1465
1466 if (0 == S_ISDIR(fs.st_mode))
1467 {
1468 *error = zbx_dsprintf(*error, "The specified path \"%s\" is not a directory.", path);
1469 goto out;
1470 }
1471
1472 if (0 != access(path, W_OK | R_OK))
1473 {
1474 *error = zbx_dsprintf(*error, "Cannot access path \"%s\": %s.", path, zbx_strerror(errno));
1475 goto out;
1476 }
1477
1478 ipc_path_root_len = strlen(path);
1479 if (ZBX_IPC_PATH_MAX < ipc_path_root_len + 3)
1480 {
1481 *error = zbx_dsprintf(*error, "The IPC root path \"%s\" is too long.", path);
1482 goto out;
1483 }
1484
1485 memcpy(ipc_path, path, ipc_path_root_len + 1);
1486
1487 while (1 < ipc_path_root_len && '/' == ipc_path[ipc_path_root_len - 1])
1488 ipc_path[--ipc_path_root_len] = '\0';
1489
1490 ipc_service_init_libevent();
1491
1492 ret = SUCCEED;
1493 out:
1494 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1495
1496 return ret;
1497 }
1498
1499 /******************************************************************************
1500 * *
1501 * Function: zbx_ipc_service_free_env *
1502 * *
1503 * Purpose: frees IPC service environment *
1504 * *
1505 ******************************************************************************/
zbx_ipc_service_free_env(void)1506 void zbx_ipc_service_free_env(void)
1507 {
1508 ipc_service_free_libevent();
1509 }
1510
1511
1512 /******************************************************************************
1513 * *
1514 * Function: zbx_ipc_service_start *
1515 * *
1516 * Purpose: starts IPC service on the specified path *
1517 * *
1518 * Parameters: service - [IN/OUT] the IPC service *
1519 * service_name - [IN] the unix domain socket path *
1520 * error - [OUT] the error message *
1521 * *
1522 * Return value: SUCCEED - the service was initialized successfully. *
1523 * FAIL - otherwise *
1524 * *
1525 ******************************************************************************/
zbx_ipc_service_start(zbx_ipc_service_t * service,const char * service_name,char ** error)1526 int zbx_ipc_service_start(zbx_ipc_service_t *service, const char *service_name, char **error)
1527 {
1528 struct sockaddr_un addr;
1529 const char *socket_path;
1530 int ret = FAIL;
1531 mode_t mode;
1532
1533 zabbix_log(LOG_LEVEL_DEBUG, "In %s() service:%s", __func__, service_name);
1534
1535 mode = umask(077);
1536
1537 if (NULL == (socket_path = ipc_make_path(service_name, error)))
1538 goto out;
1539
1540 if (0 == access(socket_path, F_OK))
1541 {
1542 if (0 != access(socket_path, W_OK))
1543 {
1544 *error = zbx_dsprintf(*error, "The file \"%s\" is used by another process.", socket_path);
1545 goto out;
1546 }
1547
1548 if (SUCCEED == ipc_check_running_service(service_name))
1549 {
1550 *error = zbx_dsprintf(*error, "\"%s\" service is already running.", service_name);
1551 goto out;
1552 }
1553
1554 unlink(socket_path);
1555 }
1556
1557 if (-1 == (service->fd = socket(AF_UNIX, SOCK_STREAM, 0)))
1558 {
1559 *error = zbx_dsprintf(*error, "Cannot create socket: %s.", zbx_strerror(errno));
1560 goto out;
1561 }
1562
1563 memset(&addr, 0, sizeof(addr));
1564 addr.sun_family = AF_UNIX;
1565 memcpy(addr.sun_path, socket_path, sizeof(addr.sun_path));
1566
1567 if (0 != bind(service->fd, (struct sockaddr*)&addr, sizeof(addr)))
1568 {
1569 *error = zbx_dsprintf(*error, "Cannot bind socket to \"%s\": %s.", socket_path, zbx_strerror(errno));
1570 goto out;
1571 }
1572
1573 if (0 != listen(service->fd, SOMAXCONN))
1574 {
1575 *error = zbx_dsprintf(*error, "Cannot listen socket: %s.", zbx_strerror(errno));
1576 goto out;
1577 }
1578
1579 service->path = zbx_strdup(NULL, service_name);
1580 zbx_vector_ptr_create(&service->clients);
1581 zbx_queue_ptr_create(&service->clients_recv);
1582
1583 service->ev = event_base_new();
1584 service->ev_listener = event_new(service->ev, service->fd, EV_READ | EV_PERSIST,
1585 ipc_service_client_connected_cb, service);
1586 event_add(service->ev_listener, NULL);
1587
1588 service->ev_timer = event_new(service->ev, -1, 0, ipc_service_timer_cb, service);
1589
1590 ret = SUCCEED;
1591 out:
1592 umask(mode);
1593
1594 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1595
1596 return ret;
1597 }
1598
1599 /******************************************************************************
1600 * *
1601 * Function: zbx_ipc_service_close *
1602 * *
1603 * Purpose: closes IPC service and frees the resources allocated by it *
1604 * *
1605 * Parameters: service - [IN/OUT] the IPC service *
1606 * *
1607 ******************************************************************************/
zbx_ipc_service_close(zbx_ipc_service_t * service)1608 void zbx_ipc_service_close(zbx_ipc_service_t *service)
1609 {
1610 int i;
1611
1612 zabbix_log(LOG_LEVEL_DEBUG, "In %s() path:%s", __func__, service->path);
1613
1614 close(service->fd);
1615
1616 for (i = 0; i < service->clients.values_num; i++)
1617 ipc_client_free((zbx_ipc_client_t *)service->clients.values[i]);
1618
1619 zbx_free(service->path);
1620
1621 zbx_vector_ptr_destroy(&service->clients);
1622 zbx_queue_ptr_destroy(&service->clients_recv);
1623
1624 event_free(service->ev_timer);
1625 event_free(service->ev_listener);
1626 event_base_free(service->ev);
1627
1628 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1629 }
1630
1631 /******************************************************************************
1632 * *
1633 * Function: zbx_ipc_service_recv *
1634 * *
1635 * Purpose: receives ipc message from a connected client *
1636 * *
1637 * Parameters: service - [IN] the IPC service *
1638 * timeout - [IN] the timeout in seconds, 0 is used for *
1639 * nonblocking call and ZBX_IPC_WAIT_FOREVER is *
1640 * used for blocking call without timeout *
1641 * client - [OUT] the client that sent the message or *
1642 * NULL if there are no messages and the *
1643 * specified timeout passed. *
1644 * The client must be released by caller with *
1645 * zbx_ipc_client_release() function. *
1646 * message - [OUT] the received message or NULL if the client *
1647 * connection was closed. *
1648 * The message must be freed by caller with *
1649 * ipc_message_free() function. *
1650 * *
1651 * Return value: ZBX_IPC_RECV_IMMEDIATE - returned immediately without *
1652 * waiting for socket events *
1653 * (pending events are processed) *
1654 * ZBX_IPC_RECV_WAIT - returned after receiving socket *
1655 * event *
1656 * ZBX_IPC_RECV_TIMEOUT - returned after timeout expired *
1657 * *
1658 ******************************************************************************/
zbx_ipc_service_recv(zbx_ipc_service_t * service,int timeout,zbx_ipc_client_t ** client,zbx_ipc_message_t ** message)1659 int zbx_ipc_service_recv(zbx_ipc_service_t *service, int timeout, zbx_ipc_client_t **client,
1660 zbx_ipc_message_t **message)
1661 {
1662 int ret, flags;
1663
1664 zabbix_log(LOG_LEVEL_DEBUG, "In %s() timeout:%d", __func__, timeout);
1665
1666 if (timeout != 0 && SUCCEED == zbx_queue_ptr_empty(&service->clients_recv))
1667 {
1668 if (ZBX_IPC_WAIT_FOREVER != timeout)
1669 {
1670 struct timeval tv = {timeout, 0};
1671 evtimer_add(service->ev_timer, &tv);
1672 }
1673 flags = EVLOOP_ONCE;
1674 }
1675 else
1676 flags = EVLOOP_NONBLOCK;
1677
1678 event_base_loop(service->ev, flags);
1679
1680 if (NULL != (*client = ipc_service_pop_client(service)))
1681 {
1682 if (NULL != (*message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&(*client)->rx_queue)))
1683 {
1684 if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_TRACE))
1685 {
1686 char *data = NULL;
1687
1688 zbx_ipc_message_format(*message, &data);
1689 zabbix_log(LOG_LEVEL_DEBUG, "%s() %s", __func__, data);
1690
1691 zbx_free(data);
1692 }
1693
1694 ipc_service_push_client(service, *client);
1695 zbx_ipc_client_addref(*client);
1696 }
1697
1698 ret = (EVLOOP_NONBLOCK == flags ? ZBX_IPC_RECV_IMMEDIATE : ZBX_IPC_RECV_WAIT);
1699 }
1700 else
1701 { ret = ZBX_IPC_RECV_TIMEOUT;
1702 *client = NULL;
1703 *message = NULL;
1704 }
1705
1706 evtimer_del(service->ev_timer);
1707
1708 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%d", __func__, ret);
1709
1710 return ret;
1711 }
1712
1713 /******************************************************************************
1714 * *
1715 * Function: zbx_ipc_client_send *
1716 * *
1717 * Purpose: Sends IPC message to client *
1718 * *
1719 * Parameters: client - [IN] the IPC client *
1720 * code - [IN] the message code *
1721 * data - [IN] the data *
1722 * size - [IN] the data size *
1723 * *
1724 * Comments: If data can't be written directly to socket (buffer full) then *
1725 * the message is queued and sent during zbx_ipc_service_recv() *
1726 * messaging loop whenever socket becomes ready. *
1727 * *
1728 ******************************************************************************/
zbx_ipc_client_send(zbx_ipc_client_t * client,zbx_uint32_t code,const unsigned char * data,zbx_uint32_t size)1729 int zbx_ipc_client_send(zbx_ipc_client_t *client, zbx_uint32_t code, const unsigned char *data, zbx_uint32_t size)
1730 {
1731 zbx_uint32_t tx_size = 0;
1732 zbx_ipc_message_t *message;
1733 int ret = FAIL;
1734
1735 zabbix_log(LOG_LEVEL_DEBUG, "In %s() clientid:" ZBX_FS_UI64, __func__, client->id);
1736
1737 if (0 != client->tx_bytes)
1738 {
1739 message = ipc_message_create(code, data, size);
1740 zbx_queue_ptr_push(&client->tx_queue, message);
1741 ret = SUCCEED;
1742 goto out;
1743 }
1744
1745 if (FAIL == ipc_socket_write_message(&client->csocket, code, data, size, &tx_size))
1746 goto out;
1747
1748 if (tx_size != ZBX_IPC_HEADER_SIZE + size)
1749 {
1750 client->tx_header[ZBX_IPC_MESSAGE_CODE] = code;
1751 client->tx_header[ZBX_IPC_MESSAGE_SIZE] = size;
1752 client->tx_data = (unsigned char *)zbx_malloc(NULL, size);
1753 memcpy(client->tx_data, data, size);
1754 client->tx_bytes = ZBX_IPC_HEADER_SIZE + size - tx_size;
1755 event_add(client->tx_event, NULL);
1756 }
1757
1758 ret = SUCCEED;
1759 out:
1760 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1761
1762 return ret;
1763 }
1764
1765 /******************************************************************************
1766 * *
1767 * Function: zbx_ipc_client_close *
1768 * *
1769 * Purpose: closes client socket and frees resources allocated for client *
1770 * *
1771 * Parameters: client - [IN] the IPC client *
1772 * *
1773 ******************************************************************************/
zbx_ipc_client_close(zbx_ipc_client_t * client)1774 void zbx_ipc_client_close(zbx_ipc_client_t *client)
1775 {
1776 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1777
1778 ipc_client_free_events(client);
1779 zbx_ipc_socket_close(&client->csocket);
1780
1781 ipc_service_remove_client(client->service, client);
1782 zbx_queue_ptr_remove_value(&client->service->clients_recv, client);
1783 zbx_ipc_client_release(client);
1784
1785 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1786 }
1787
zbx_ipc_client_addref(zbx_ipc_client_t * client)1788 void zbx_ipc_client_addref(zbx_ipc_client_t *client)
1789 {
1790 client->refcount++;
1791 }
1792
zbx_ipc_client_release(zbx_ipc_client_t * client)1793 void zbx_ipc_client_release(zbx_ipc_client_t *client)
1794 {
1795 if (0 == --client->refcount)
1796 ipc_client_free(client);
1797 }
1798
zbx_ipc_client_connected(zbx_ipc_client_t * client)1799 int zbx_ipc_client_connected(zbx_ipc_client_t *client)
1800 {
1801 return (NULL == client->rx_event ? FAIL : SUCCEED);
1802 }
1803
zbx_ipc_client_id(const zbx_ipc_client_t * client)1804 zbx_uint64_t zbx_ipc_client_id(const zbx_ipc_client_t *client)
1805 {
1806 return client->id;
1807 }
1808
1809 /******************************************************************************
1810 * *
1811 * Function: zbx_ipc_async_socket_open *
1812 * *
1813 * Purpose: opens asynchronous socket to IPC service client *
1814 * *
1815 * Parameters: client - [OUT] the IPC service client *
1816 * service_name - [IN] the IPC service name *
1817 * timeout - [IN] the connection timeout *
1818 * error - [OUT] the error message *
1819 * *
1820 * Return value: SUCCEED - the socket was successfully opened *
1821 * FAIL - otherwise *
1822 * *
1823 ******************************************************************************/
zbx_ipc_async_socket_open(zbx_ipc_async_socket_t * asocket,const char * service_name,int timeout,char ** error)1824 int zbx_ipc_async_socket_open(zbx_ipc_async_socket_t *asocket, const char *service_name, int timeout, char **error)
1825 {
1826 int ret = FAIL, flags;
1827
1828 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1829
1830 memset(asocket, 0, sizeof(zbx_ipc_async_socket_t));
1831 asocket->client = (zbx_ipc_client_t *)zbx_malloc(NULL, sizeof(zbx_ipc_client_t));
1832 memset(asocket->client, 0, sizeof(zbx_ipc_client_t));
1833
1834 if (SUCCEED != zbx_ipc_socket_open(&asocket->client->csocket, service_name, timeout, error))
1835 {
1836 zbx_free(asocket->client);
1837 goto out;
1838 }
1839
1840 if (-1 == (flags = fcntl(asocket->client->csocket.fd, F_GETFL, 0)))
1841 {
1842 zabbix_log(LOG_LEVEL_CRIT, "cannot get IPC client socket flags");
1843 exit(EXIT_FAILURE);
1844 }
1845
1846 if (-1 == fcntl(asocket->client->csocket.fd, F_SETFL, flags | O_NONBLOCK))
1847 {
1848 zabbix_log(LOG_LEVEL_CRIT, "cannot set non-blocking mode for IPC client socket");
1849 exit(EXIT_FAILURE);
1850 }
1851
1852 asocket->ev = event_base_new();
1853 asocket->ev_timer = event_new(asocket->ev, -1, 0, ipc_async_socket_timer_cb, asocket);
1854 asocket->client->rx_event = event_new(asocket->ev, asocket->client->csocket.fd, EV_READ | EV_PERSIST,
1855 ipc_async_socket_read_event_cb, (void *)asocket);
1856 asocket->client->tx_event = event_new(asocket->ev, asocket->client->csocket.fd, EV_WRITE | EV_PERSIST,
1857 ipc_async_socket_write_event_cb, (void *)asocket);
1858 event_add(asocket->client->rx_event, NULL);
1859
1860 asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_NONE;
1861
1862 ret = SUCCEED;
1863 out:
1864 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1865 return ret;
1866 }
1867
1868 /******************************************************************************
1869 * *
1870 * Function: zbx_ipc_async_socket_close *
1871 * *
1872 * Purpose: closes asynchronous IPC socket and frees allocated resources *
1873 * *
1874 * Parameters: asocket - [IN] the asynchronous IPC socket *
1875 * *
1876 ******************************************************************************/
zbx_ipc_async_socket_close(zbx_ipc_async_socket_t * asocket)1877 void zbx_ipc_async_socket_close(zbx_ipc_async_socket_t *asocket)
1878 {
1879 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1880
1881 ipc_client_free(asocket->client);
1882
1883 event_free(asocket->ev_timer);
1884 event_base_free(asocket->ev);
1885
1886 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1887 }
1888
1889 /******************************************************************************
1890 * *
1891 * Function: zbx_ipc_async_socket_send *
1892 * *
1893 * Purpose: Sends message through asynchronous IPC socket *
1894 * *
1895 * Parameters: asocket - [IN] the asynchronous IPC socket *
1896 * code - [IN] the message code *
1897 * data - [IN] the data *
1898 * size - [IN] the data size *
1899 * *
1900 * Comments: If data can't be written directly to socket (buffer full) then *
1901 * the message is queued and sent during zbx_ipc_async_socket_recv()*
1902 * or zbx_ipc_async_socket_flush() functions whenever socket becomes*
1903 * ready. *
1904 * *
1905 ******************************************************************************/
zbx_ipc_async_socket_send(zbx_ipc_async_socket_t * asocket,zbx_uint32_t code,const unsigned char * data,zbx_uint32_t size)1906 int zbx_ipc_async_socket_send(zbx_ipc_async_socket_t *asocket, zbx_uint32_t code, const unsigned char *data,
1907 zbx_uint32_t size)
1908 {
1909 int ret = FAIL;
1910
1911 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1912
1913 ret = zbx_ipc_client_send(asocket->client, code, data, size);
1914
1915 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1916
1917 return ret;
1918 }
1919
1920 /******************************************************************************
1921 * *
1922 * Function: zbx_ipc_async_socket_recv *
1923 * *
1924 * Purpose: receives message through asynchronous IPC socket *
1925 * *
1926 * Parameters: asocket - [IN] the asynchronous IPC socket *
1927 * timeout - [IN] the timeout in seconds, 0 is used for *
1928 * nonblocking call and ZBX_IPC_WAIT_FOREVER is *
1929 * used for blocking call without timeout *
1930 * message - [OUT] the received message or NULL if the client *
1931 * connection was closed. *
1932 * The message must be freed by caller with *
1933 * ipc_message_free() function. *
1934 * *
1935 * Return value: SUCCEED - the message was read successfully or timeout *
1936 * occurred *
1937 * FAIL - otherwise *
1938 * *
1939 * Comments: After socket has been closed (or connection error has occurred) *
1940 * calls to zbx_ipc_client_read() will return success with buffered *
1941 * messages, until all buffered messages are retrieved. *
1942 * *
1943 ******************************************************************************/
zbx_ipc_async_socket_recv(zbx_ipc_async_socket_t * asocket,int timeout,zbx_ipc_message_t ** message)1944 int zbx_ipc_async_socket_recv(zbx_ipc_async_socket_t *asocket, int timeout, zbx_ipc_message_t **message)
1945 {
1946 int ret, flags;
1947
1948 zabbix_log(LOG_LEVEL_DEBUG, "In %s() timeout:%d", __func__, timeout);
1949
1950 if (timeout != 0 && SUCCEED == zbx_queue_ptr_empty(&asocket->client->rx_queue))
1951 {
1952 if (ZBX_IPC_WAIT_FOREVER != timeout)
1953 {
1954 struct timeval tv = {timeout, 0};
1955 evtimer_add(asocket->ev_timer, &tv);
1956 }
1957 flags = EVLOOP_ONCE;
1958 }
1959 else
1960 flags = EVLOOP_NONBLOCK;
1961
1962 asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_NONE;
1963
1964 do
1965 {
1966 event_base_loop(asocket->ev, flags);
1967 *message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&asocket->client->rx_queue);
1968 }
1969 while (NULL == *message && ZBX_IPC_ASYNC_SOCKET_STATE_NONE == asocket->state);
1970
1971 if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_TRACE) && NULL != *message)
1972 {
1973 char *data = NULL;
1974
1975 zbx_ipc_message_format(*message, &data);
1976 zabbix_log(LOG_LEVEL_DEBUG, "%s() %s", __func__, data);
1977
1978 zbx_free(data);
1979 }
1980
1981 if (NULL != *message || ZBX_IPC_ASYNC_SOCKET_STATE_ERROR != asocket->state)
1982 ret = SUCCEED;
1983 else
1984 ret = FAIL;
1985
1986 evtimer_del(asocket->ev_timer);
1987
1988 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%d", __func__, ret);
1989
1990 return ret;
1991 }
1992
1993 /******************************************************************************
1994 * *
1995 * Function: zbx_ipc_async_socket_flush *
1996 * *
1997 * Purpose: flushes unsent through asynchronous IPC socket *
1998 * *
1999 * Parameters: asocket - [IN] the asynchronous IPC service socket *
2000 * timeout - [IN] the timeout in seconds, 0 is used for *
2001 * nonblocking call and ZBX_IPC_WAIT_FOREVER is *
2002 * used for blocking call without timeout *
2003 * *
2004 * Return value: SUCCEED - the data was flushed successfully or timeout *
2005 * occurred. Use zbx_ipc_client_unsent_data() to *
2006 * check if all data was sent. *
2007 * FAIL - failed to send data (connection was closed or an *
2008 * error occurred). *
2009 * *
2010 ******************************************************************************/
zbx_ipc_async_socket_flush(zbx_ipc_async_socket_t * asocket,int timeout)2011 int zbx_ipc_async_socket_flush(zbx_ipc_async_socket_t *asocket, int timeout)
2012 {
2013 int ret = FAIL, flags;
2014
2015 zabbix_log(LOG_LEVEL_DEBUG, "In %s() timeout:%d", __func__, timeout);
2016
2017 if (0 == asocket->client->tx_bytes)
2018 {
2019 ret = SUCCEED;
2020 goto out;
2021 }
2022
2023 if (ZBX_IPC_ASYNC_SOCKET_STATE_ERROR == asocket->state)
2024 goto out;
2025
2026 asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_NONE;
2027
2028 if (0 != timeout)
2029 {
2030 if (ZBX_IPC_WAIT_FOREVER != timeout)
2031 {
2032 struct timeval tv = {timeout, 0};
2033 evtimer_add(asocket->ev_timer, &tv);
2034 }
2035 flags = EVLOOP_ONCE;
2036 }
2037 else
2038 flags = EVLOOP_NONBLOCK;
2039
2040 do
2041 {
2042 event_base_loop(asocket->ev, flags);
2043
2044 if (SUCCEED != zbx_ipc_client_connected(asocket->client))
2045 goto out;
2046 }
2047 while (0 != timeout && 0 != asocket->client->tx_bytes && ZBX_IPC_ASYNC_SOCKET_STATE_NONE == asocket->state);
2048
2049 if (ZBX_IPC_ASYNC_SOCKET_STATE_ERROR != asocket->state)
2050 {
2051 ret = SUCCEED;
2052 asocket->state = ZBX_IPC_CLIENT_STATE_NONE;
2053 }
2054 out:
2055 evtimer_del(asocket->ev_timer);
2056
2057 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%d", __func__, ret);
2058
2059 return ret;
2060 }
2061
2062 /******************************************************************************
2063 * *
2064 * Function: zbx_ipc_async_socket_check_unsent *
2065 * *
2066 * Purpose: checks if there are data to be sent *
2067 * *
2068 * Parameters: client - [IN] the IPC service client *
2069 * *
2070 * Return value: SUCCEED - there are messages queued to be sent *
2071 * FAIL - all data has been sent *
2072 * *
2073 ******************************************************************************/
zbx_ipc_async_socket_check_unsent(zbx_ipc_async_socket_t * asocket)2074 int zbx_ipc_async_socket_check_unsent(zbx_ipc_async_socket_t *asocket)
2075 {
2076 return (0 == asocket->client->tx_bytes ? FAIL : SUCCEED);
2077 }
2078
2079 /******************************************************************************
2080 * *
2081 * Function: zbx_ipc_async_exchange *
2082 * *
2083 * Purpose: connect, send message and receive response in a given timeout *
2084 * *
2085 * Parameters: service_name - [IN] the IPC service name *
2086 * code - [IN] the message code *
2087 * timeout - [IN] time allowed to be spent on receive, note *
2088 * that this does not include open, send and *
2089 * flush that have their own timeouts *
2090 * data - [IN] the data *
2091 * size - [IN] the data size *
2092 * out - [OUT] the received message or NULL on error *
2093 * The message must be freed by zbx_free() *
2094 * error - [OUT] the error message *
2095 * *
2096 * Return value: SUCCEED - successfully sent message and received response *
2097 * FAIL - error occurred *
2098 * *
2099 ******************************************************************************/
zbx_ipc_async_exchange(const char * service_name,zbx_uint32_t code,int timeout,const unsigned char * data,zbx_uint32_t size,unsigned char ** out,char ** error)2100 int zbx_ipc_async_exchange(const char *service_name, zbx_uint32_t code, int timeout, const unsigned char *data,
2101 zbx_uint32_t size, unsigned char **out, char **error)
2102 {
2103 zbx_ipc_message_t *message;
2104 zbx_ipc_async_socket_t asocket;
2105 int ret = FAIL;
2106
2107 zabbix_log(LOG_LEVEL_DEBUG, "In %s() service:'%s' code:%u timeout:%d", __func__, service_name, code, timeout);
2108
2109 if (FAIL == zbx_ipc_async_socket_open(&asocket, service_name, timeout, error))
2110 goto out;
2111
2112 if (FAIL == zbx_ipc_async_socket_send(&asocket, code, data, size))
2113 {
2114 *error = zbx_strdup(NULL, "Cannot send request");
2115 goto fail;
2116 }
2117
2118 if (FAIL == zbx_ipc_async_socket_flush(&asocket, timeout))
2119 {
2120 *error = zbx_strdup(NULL, "Cannot flush request");
2121 goto fail;
2122 }
2123
2124 if (FAIL == zbx_ipc_async_socket_recv(&asocket, timeout, &message))
2125 {
2126 *error = zbx_strdup(NULL, "Cannot receive response");
2127 goto fail;
2128 }
2129
2130 if (NULL == message)
2131 {
2132 *error = zbx_strdup(NULL, "Timeout while waiting for response");
2133 goto fail;
2134 }
2135
2136 *out = message->data;
2137 message->data = NULL;
2138
2139 zbx_ipc_message_free(message);
2140 ret = SUCCEED;
2141 fail:
2142 zbx_ipc_async_socket_close(&asocket);
2143 out:
2144 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
2145 return ret;
2146 }
2147
2148 #endif
2149