1 #include "common.h"
2
3 #ifdef HAVE_IPCSERVICE
4
5 #ifdef HAVE_LIBEVENT
6 # include <event.h>
7 #endif
8
9 #include "zbxtypes.h"
10 #include "zbxalgo.h"
11 #include "log.h"
12 #include "zbxipcservice.h"
13
14 #define ZBX_IPC_PATH_MAX sizeof(((struct sockaddr_un *)0)->sun_path)
15
16 #define ZBX_IPC_DATA_DUMP_SIZE 128
17
18 static char ipc_path[ZBX_IPC_PATH_MAX] = {0};
19 static size_t ipc_path_root_len = 0;
20
21 #define ZBX_IPC_CLIENT_STATE_NONE 0
22 #define ZBX_IPC_CLIENT_STATE_QUEUED 1
23
24 #define ZBX_IPC_ASYNC_SOCKET_STATE_NONE 0
25 #define ZBX_IPC_ASYNC_SOCKET_STATE_TIMEOUT 1
26 #define ZBX_IPC_ASYNC_SOCKET_STATE_ERROR 2
27
28 extern unsigned char program_type;
29
30 /* IPC client, providing nonblocking connections through socket */
31 struct zbx_ipc_client
32 {
33 zbx_ipc_socket_t csocket;
34 zbx_ipc_service_t *service;
35
36 zbx_uint32_t rx_header[2];
37 unsigned char *rx_data;
38 zbx_uint32_t rx_bytes;
39 zbx_queue_ptr_t rx_queue;
40 struct event *rx_event;
41
42 zbx_uint32_t tx_header[2];
43 unsigned char *tx_data;
44 zbx_uint32_t tx_bytes;
45 zbx_queue_ptr_t tx_queue;
46 struct event *tx_event;
47
48 zbx_uint64_t id;
49 unsigned char state;
50
51 void *userdata;
52
53 zbx_uint32_t refcount;
54 };
55
56 /*
57 * Private API
58 */
59
60 #define ZBX_IPC_HEADER_SIZE (int)(sizeof(zbx_uint32_t) * 2)
61
62 #define ZBX_IPC_MESSAGE_CODE 0
63 #define ZBX_IPC_MESSAGE_SIZE 1
64
65 #if !defined(LIBEVENT_VERSION_NUMBER) || LIBEVENT_VERSION_NUMBER < 0x2000000
66 typedef int evutil_socket_t;
67
event_new(struct event_base * ev,evutil_socket_t fd,short what,void (* cb_func)(int,short,void *),void * cb_arg)68 static struct event *event_new(struct event_base *ev, evutil_socket_t fd, short what,
69 void(*cb_func)(int, short, void *), void *cb_arg)
70 {
71 struct event *event;
72
73 event = zbx_malloc(NULL, sizeof(struct event));
74 event_set(event, fd, what, cb_func, cb_arg);
75 event_base_set(ev, event);
76
77 return event;
78 }
79
event_free(struct event * event)80 static void event_free(struct event *event)
81 {
82 event_del(event);
83 zbx_free(event);
84 }
85
86 #endif
87
88 static void ipc_client_read_event_cb(evutil_socket_t fd, short what, void *arg);
89 static void ipc_client_write_event_cb(evutil_socket_t fd, short what, void *arg);
90
ipc_get_path(void)91 static const char *ipc_get_path(void)
92 {
93 ipc_path[ipc_path_root_len] = '\0';
94
95 return ipc_path;
96 }
97
98 #define ZBX_IPC_SOCKET_PREFIX "/zabbix_"
99 #define ZBX_IPC_SOCKET_SUFFIX ".sock"
100
101 #define ZBX_IPC_CLASS_PREFIX_NONE ""
102 #define ZBX_IPC_CLASS_PREFIX_SERVER "server_"
103 #define ZBX_IPC_CLASS_PREFIX_PROXY "proxy_"
104 #define ZBX_IPC_CLASS_PREFIX_AGENT "agent_"
105
106 /******************************************************************************
107 * *
108 * Function: ipc_make_path *
109 * *
110 * Purpose: makes socket path from the service name *
111 * *
112 * Parameters: service_name - [IN] the service name *
113 * error - [OUT] the error message *
114 * *
115 * Return value: The created path or NULL if the path exceeds unix domain *
116 * socket path maximum length *
117 * *
118 ******************************************************************************/
ipc_make_path(const char * service_name,char ** error)119 static const char *ipc_make_path(const char *service_name, char **error)
120 {
121 const char *prefix;
122 size_t path_len, offset, prefix_len;
123
124 path_len = strlen(service_name);
125
126 switch (program_type)
127 {
128 case ZBX_PROGRAM_TYPE_SERVER:
129 prefix = ZBX_IPC_CLASS_PREFIX_SERVER;
130 prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_SERVER);
131 break;
132 case ZBX_PROGRAM_TYPE_PROXY_ACTIVE:
133 case ZBX_PROGRAM_TYPE_PROXY_PASSIVE:
134 prefix = ZBX_IPC_CLASS_PREFIX_PROXY;
135 prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_PROXY);
136 break;
137 case ZBX_PROGRAM_TYPE_AGENTD:
138 prefix = ZBX_IPC_CLASS_PREFIX_AGENT;
139 prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_AGENT);
140 break;
141 default:
142 prefix = ZBX_IPC_CLASS_PREFIX_NONE;
143 prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_NONE);
144 break;
145 }
146
147 if (ZBX_IPC_PATH_MAX < ipc_path_root_len + path_len + 1 + ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_PREFIX) +
148 ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_SUFFIX) + prefix_len)
149 {
150 *error = zbx_dsprintf(*error,
151 "Socket path \"%s%s%s%s%s\" exceeds maximum length of unix domain socket path.",
152 ipc_path, ZBX_IPC_SOCKET_PREFIX, prefix, service_name, ZBX_IPC_SOCKET_SUFFIX);
153 return NULL;
154 }
155
156 offset = ipc_path_root_len;
157 memcpy(ipc_path + offset , ZBX_IPC_SOCKET_PREFIX, ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_PREFIX));
158 offset += ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_PREFIX);
159 memcpy(ipc_path + offset, prefix, prefix_len);
160 offset += prefix_len;
161 memcpy(ipc_path + offset, service_name, path_len);
162 offset += path_len;
163 memcpy(ipc_path + offset, ZBX_IPC_SOCKET_SUFFIX, ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_SUFFIX) + 1);
164
165 return ipc_path;
166 }
167
168 /******************************************************************************
169 * *
170 * Function: ipc_write_data *
171 * *
172 * Purpose: writes data to a socket *
173 * *
174 * Parameters: fd - [IN] the socket file descriptor *
175 * data - [IN] the data *
176 * size - [IN] the data size *
177 * size_sent - [IN] the actual size written to socket *
178 * *
179 * Return value: SUCCEED - no socket errors were detected. Either the data or *
180 * a part of it was written to socket or a write to *
181 * non-blocking socket would block *
182 * FAIL - otherwise *
183 * *
184 ******************************************************************************/
ipc_write_data(int fd,const unsigned char * data,zbx_uint32_t size,zbx_uint32_t * size_sent)185 static int ipc_write_data(int fd, const unsigned char *data, zbx_uint32_t size, zbx_uint32_t *size_sent)
186 {
187 zbx_uint32_t offset = 0;
188 int n, ret = SUCCEED;
189
190 while (offset != size)
191 {
192 n = write(fd, data + offset, size - offset);
193
194 if (-1 == n)
195 {
196 if (EINTR == errno)
197 continue;
198
199 if (EWOULDBLOCK == errno || EAGAIN == errno)
200 break;
201
202 zabbix_log(LOG_LEVEL_WARNING, "cannot write to IPC socket: %s", strerror(errno));
203 ret = FAIL;
204 break;
205 }
206 offset += n;
207 }
208
209 *size_sent = offset;
210
211 return ret;
212 }
213
214 /******************************************************************************
215 * *
216 * Function: ipc_read_data *
217 * *
218 * Purpose: reads data from a socket *
219 * *
220 * Parameters: fd - [IN] the socket file descriptor *
221 * data - [IN] the data *
222 * size - [IN] the data size *
223 * size_sent - [IN] the actual size read from socket *
224 * *
225 * Return value: SUCCEED - the data was successfully read *
226 * FAIL - otherwise *
227 * *
228 * Comments: When reading data from non-blocking sockets SUCCEED will be *
229 * returned also if there were no more data to read. *
230 * *
231 ******************************************************************************/
ipc_read_data(int fd,unsigned char * buffer,zbx_uint32_t size,zbx_uint32_t * read_size)232 static int ipc_read_data(int fd, unsigned char *buffer, zbx_uint32_t size, zbx_uint32_t *read_size)
233 {
234 int n;
235
236 *read_size = 0;
237
238 while (-1 == (n = read(fd, buffer + *read_size, size - *read_size)))
239 {
240 if (EINTR == errno)
241 continue;
242
243 if (EWOULDBLOCK == errno || EAGAIN == errno)
244 return SUCCEED;
245
246 return FAIL;
247 }
248
249 if (0 == n)
250 return FAIL;
251
252 *read_size += n;
253
254 return SUCCEED;
255 }
256
257 /******************************************************************************
258 * *
259 * Function: ipc_read_data_full *
260 * *
261 * Purpose: reads data from a socket until the requested data has been read *
262 * *
263 * Parameters: fd - [IN] the socket file descriptor *
264 * buffer - [IN] the data *
265 * size - [IN] the data size *
266 * read_size - [IN] the actual size read from socket *
267 * *
268 * Return value: SUCCEED - the data was successfully read *
269 * FAIL - otherwise *
270 * *
271 * Comments: When reading data from non-blocking sockets this function will *
272 * return SUCCEED if there are no data to read, even if not all of *
273 * the requested data has been read. *
274 * *
275 ******************************************************************************/
ipc_read_data_full(int fd,unsigned char * buffer,zbx_uint32_t size,zbx_uint32_t * read_size)276 static int ipc_read_data_full(int fd, unsigned char *buffer, zbx_uint32_t size, zbx_uint32_t *read_size)
277 {
278 int ret = FAIL;
279 zbx_uint32_t offset = 0, chunk_size;
280
281 *read_size = 0;
282
283 while (offset < size)
284 {
285 if (FAIL == ipc_read_data(fd, buffer + offset, size - offset, &chunk_size))
286 goto out;
287
288 if (0 == chunk_size)
289 break;
290
291 offset += chunk_size;
292 }
293
294 ret = SUCCEED;
295 out:
296 *read_size = offset;
297
298 return ret;
299 }
300
301 /******************************************************************************
302 * *
303 * Function: ipc_socket_write_message *
304 * *
305 * Purpose: writes IPC message to socket *
306 * *
307 * Parameters: csocket - [IN] the IPC socket *
308 * code - [IN] the message code *
309 * data - [IN] the data *
310 * size - [IN] the data size *
311 * tx_size - [IN] the actual size written to socket *
312 * *
313 * Return value: SUCCEED - no socket errors were detected. Either the data or *
314 * a part of it was written to socket or a write to *
315 * non-blocking socket would block *
316 * FAIL - otherwise *
317 * *
318 * Comments: When using non-blocking sockets the tx_size parameter must be *
319 * checked in addition to return value to tell if the message was *
320 * sent successfully. *
321 * *
322 ******************************************************************************/
ipc_socket_write_message(zbx_ipc_socket_t * csocket,zbx_uint32_t code,const unsigned char * data,zbx_uint32_t size,zbx_uint32_t * tx_size)323 static int ipc_socket_write_message(zbx_ipc_socket_t *csocket, zbx_uint32_t code, const unsigned char *data,
324 zbx_uint32_t size, zbx_uint32_t *tx_size)
325 {
326 int ret;
327 zbx_uint32_t size_data, buffer[ZBX_IPC_SOCKET_BUFFER_SIZE / sizeof(zbx_uint32_t)];
328
329 buffer[0] = code;
330 buffer[1] = size;
331
332 if (ZBX_IPC_SOCKET_BUFFER_SIZE - ZBX_IPC_HEADER_SIZE >= size)
333 {
334 if (0 != size)
335 memcpy(buffer + 2, data, size);
336
337 return ipc_write_data(csocket->fd, (unsigned char *)buffer, size + ZBX_IPC_HEADER_SIZE, tx_size);
338 }
339
340 if (FAIL == ipc_write_data(csocket->fd, (unsigned char *)buffer, ZBX_IPC_HEADER_SIZE, tx_size))
341 return FAIL;
342
343 /* in the case of non-blocking sockets only a part of the header might be sent */
344 if (ZBX_IPC_HEADER_SIZE != *tx_size)
345 return SUCCEED;
346
347 ret = ipc_write_data(csocket->fd, data, size, &size_data);
348 *tx_size += size_data;
349
350 return ret;
351 }
352
353 /******************************************************************************
354 * *
355 * Function: ipc_read_buffer *
356 * *
357 * Purpose: reads message header and data from buffer *
358 * *
359 * Parameters: header - [IN/OUT] the message header *
360 * data - [OUT] the message data *
361 * rx_bytes - [IN] the number of bytes stored in message *
362 * (including header) *
363 * buffer - [IN] the buffer to parse *
364 * size - [IN] the number of bytes to parse *
365 * read_size - [OUT] the number of bytes read *
366 * *
367 * Return value: SUCCEED - message was successfully parsed *
368 * FAIL - not enough data *
369 * *
370 ******************************************************************************/
ipc_read_buffer(zbx_uint32_t * header,unsigned char ** data,zbx_uint32_t rx_bytes,const unsigned char * buffer,zbx_uint32_t size,zbx_uint32_t * read_size)371 static int ipc_read_buffer(zbx_uint32_t *header, unsigned char **data, zbx_uint32_t rx_bytes,
372 const unsigned char *buffer, zbx_uint32_t size, zbx_uint32_t *read_size)
373 {
374 zbx_uint32_t copy_size, data_size, data_offset;
375
376 *read_size = 0;
377
378 if (ZBX_IPC_HEADER_SIZE > rx_bytes)
379 {
380 copy_size = MIN(ZBX_IPC_HEADER_SIZE - rx_bytes, size);
381 memcpy((char *)header + rx_bytes, buffer, copy_size);
382 *read_size += copy_size;
383
384 if (ZBX_IPC_HEADER_SIZE > rx_bytes + copy_size)
385 return FAIL;
386
387 data_size = header[ZBX_IPC_MESSAGE_SIZE];
388
389 if (0 == data_size)
390 {
391 *data = NULL;
392 return SUCCEED;
393 }
394
395 *data = (unsigned char *)zbx_malloc(NULL, data_size);
396 data_offset = 0;
397 }
398 else
399 {
400 data_size = header[ZBX_IPC_MESSAGE_SIZE];
401 data_offset = rx_bytes - ZBX_IPC_HEADER_SIZE;
402 }
403
404 copy_size = MIN(data_size - data_offset, size - *read_size);
405 memcpy(*data + data_offset, buffer + *read_size, copy_size);
406 *read_size += copy_size;
407
408 return (rx_bytes + *read_size == data_size + ZBX_IPC_HEADER_SIZE ? SUCCEED : FAIL);
409 }
410
411 /******************************************************************************
412 * *
413 * Function: ipc_message_is_completed *
414 * *
415 * Purpose: checks if IPC message has been completed *
416 * *
417 * Parameters: header - [IN] the message header *
418 * rx_bytes - [IN] the number of bytes set in message *
419 * (including header) *
420 * *
421 * Return value: SUCCEED - message has been completed *
422 * FAIL - otherwise *
423 * *
424 ******************************************************************************/
ipc_message_is_completed(const zbx_uint32_t * header,zbx_uint32_t rx_bytes)425 static int ipc_message_is_completed(const zbx_uint32_t *header, zbx_uint32_t rx_bytes)
426 {
427 if (ZBX_IPC_HEADER_SIZE > rx_bytes)
428 return FAIL;
429
430 if (header[ZBX_IPC_MESSAGE_SIZE] + ZBX_IPC_HEADER_SIZE != rx_bytes)
431 return FAIL;
432
433 return SUCCEED;
434 }
435
436 /******************************************************************************
437 * *
438 * Function: ipc_socket_read_message *
439 * *
440 * Purpose: reads IPC message from buffered client socket *
441 * *
442 * Parameters: csocket - [IN] the source socket *
443 * header - [OUT] the header of the message *
444 * data - [OUT] the data of the message *
445 * rx_bytes - [IN/OUT] the total message size read (including *
446 * header *
447 * *
448 * Return value: SUCCEED - data was read successfully, check rx_bytes to *
449 * determine if the message was completed. *
450 * FAIL - failed to read message (socket error or connection *
451 * was closed). *
452 * *
453 ******************************************************************************/
ipc_socket_read_message(zbx_ipc_socket_t * csocket,zbx_uint32_t * header,unsigned char ** data,zbx_uint32_t * rx_bytes)454 static int ipc_socket_read_message(zbx_ipc_socket_t *csocket, zbx_uint32_t *header, unsigned char **data,
455 zbx_uint32_t *rx_bytes)
456 {
457 zbx_uint32_t data_size, offset, read_size = 0;
458 int ret = FAIL;
459
460 /* try to read message from socket buffer */
461 if (csocket->rx_buffer_bytes > csocket->rx_buffer_offset)
462 {
463 ret = ipc_read_buffer(header, data, *rx_bytes, csocket->rx_buffer + csocket->rx_buffer_offset,
464 csocket->rx_buffer_bytes - csocket->rx_buffer_offset, &read_size);
465
466 csocket->rx_buffer_offset += read_size;
467 *rx_bytes += read_size;
468
469 if (SUCCEED == ret)
470 goto out;
471 }
472
473 /* not enough data in socket buffer, try to read more until message is completed or no data to read */
474 while (SUCCEED != ret)
475 {
476 csocket->rx_buffer_offset = 0;
477 csocket->rx_buffer_bytes = 0;
478
479 if (ZBX_IPC_HEADER_SIZE < *rx_bytes)
480 {
481 offset = *rx_bytes - ZBX_IPC_HEADER_SIZE;
482 data_size = header[ZBX_IPC_MESSAGE_SIZE] - offset;
483
484 /* long messages will be read directly into message buffer */
485 if (ZBX_IPC_SOCKET_BUFFER_SIZE * 0.75 < data_size)
486 {
487 ret = ipc_read_data_full(csocket->fd, *data + offset, data_size, &read_size);
488 *rx_bytes += read_size;
489 goto out;
490 }
491 }
492
493 if (FAIL == ipc_read_data(csocket->fd, csocket->rx_buffer, ZBX_IPC_SOCKET_BUFFER_SIZE, &read_size))
494 goto out;
495
496 /* it's possible that nothing will be read on non-blocking sockets, return success */
497 if (0 == read_size)
498 {
499 ret = SUCCEED;
500 goto out;
501 }
502
503 csocket->rx_buffer_bytes = read_size;
504
505 ret = ipc_read_buffer(header, data, *rx_bytes, csocket->rx_buffer, csocket->rx_buffer_bytes,
506 &read_size);
507
508 csocket->rx_buffer_offset += read_size;
509 *rx_bytes += read_size;
510 }
511 out:
512 return ret;
513 }
514
515 /******************************************************************************
516 * *
517 * Function: ipc_client_free_event *
518 * *
519 * Purpose: frees client's libevent event *
520 * *
521 * Parameters: client - [IN] the client *
522 * *
523 ******************************************************************************/
ipc_client_free_events(zbx_ipc_client_t * client)524 static void ipc_client_free_events(zbx_ipc_client_t *client)
525 {
526 if (NULL != client->rx_event)
527 {
528 event_free(client->rx_event);
529 client->rx_event = NULL;
530 }
531
532 if (NULL != client->tx_event)
533 {
534 event_free(client->tx_event);
535 client->tx_event = NULL;
536 }
537 }
538
539 /******************************************************************************
540 * *
541 * Function: ipc_client_free *
542 * *
543 * Purpose: frees IPC service client *
544 * *
545 * Parameters: client - [IN] the client to free *
546 * *
547 ******************************************************************************/
ipc_client_free(zbx_ipc_client_t * client)548 static void ipc_client_free(zbx_ipc_client_t *client)
549 {
550 zbx_ipc_message_t *message;
551
552 ipc_client_free_events(client);
553 zbx_ipc_socket_close(&client->csocket);
554
555 while (NULL != (message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&client->rx_queue)))
556 zbx_ipc_message_free(message);
557
558 zbx_queue_ptr_destroy(&client->rx_queue);
559 zbx_free(client->rx_data);
560
561 while (NULL != (message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&client->tx_queue)))
562 zbx_ipc_message_free(message);
563
564 zbx_queue_ptr_destroy(&client->tx_queue);
565 zbx_free(client->tx_data);
566
567 ipc_client_free_events(client);
568
569 zbx_free(client);
570 }
571
572 /******************************************************************************
573 * *
574 * Function: ipc_client_push_rx_message *
575 * *
576 * Purpose: adds message to received messages queue *
577 * *
578 * Parameters: client - [IN] the client to read *
579 * *
580 ******************************************************************************/
ipc_client_push_rx_message(zbx_ipc_client_t * client)581 static void ipc_client_push_rx_message(zbx_ipc_client_t *client)
582 {
583 zbx_ipc_message_t *message;
584
585 message = (zbx_ipc_message_t *)zbx_malloc(NULL, sizeof(zbx_ipc_message_t));
586 message->code = client->rx_header[ZBX_IPC_MESSAGE_CODE];
587 message->size = client->rx_header[ZBX_IPC_MESSAGE_SIZE];
588 message->data = client->rx_data;
589 zbx_queue_ptr_push(&client->rx_queue, message);
590
591 client->rx_data = NULL;
592 client->rx_bytes = 0;
593 }
594
595 /******************************************************************************
596 * *
597 * Function: ipc_client_pop_tx_message *
598 * *
599 * Purpose: prepares to send the next message in send queue *
600 * *
601 * Parameters: client - [IN] the client *
602 * *
603 ******************************************************************************/
ipc_client_pop_tx_message(zbx_ipc_client_t * client)604 static void ipc_client_pop_tx_message(zbx_ipc_client_t *client)
605 {
606 zbx_ipc_message_t *message;
607
608 zbx_free(client->tx_data);
609 client->tx_bytes = 0;
610
611 if (NULL == (message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&client->tx_queue)))
612 return;
613
614 client->tx_bytes = ZBX_IPC_HEADER_SIZE + message->size;
615 client->tx_header[ZBX_IPC_MESSAGE_CODE] = message->code;
616 client->tx_header[ZBX_IPC_MESSAGE_SIZE] = message->size;
617 client->tx_data = message->data;
618 zbx_free(message);
619 }
620
621 /******************************************************************************
622 * *
623 * Function: ipc_client_read *
624 * *
625 * Purpose: reads data from IPC service client *
626 * *
627 * Parameters: client - [IN] the client to read *
628 * *
629 * Return value: FAIL - read error/connection was closed *
630 * *
631 * Comments: This function reads data from socket, parses it and adds *
632 * parsed messages to received messages queue. *
633 * *
634 ******************************************************************************/
ipc_client_read(zbx_ipc_client_t * client)635 static int ipc_client_read(zbx_ipc_client_t *client)
636 {
637 int rc;
638
639 do
640 {
641 if (FAIL == ipc_socket_read_message(&client->csocket, client->rx_header, &client->rx_data,
642 &client->rx_bytes))
643 {
644 zbx_free(client->rx_data);
645 client->rx_bytes = 0;
646 return FAIL;
647 }
648
649 if (SUCCEED == (rc = ipc_message_is_completed(client->rx_header, client->rx_bytes)))
650 ipc_client_push_rx_message(client);
651 }
652
653 while (SUCCEED == rc);
654
655 return SUCCEED;
656 }
657
658 /******************************************************************************
659 * *
660 * Function: ipc_client_write *
661 * *
662 * Purpose: writes queued data to IPC service client *
663 * *
664 * Parameters: client - [IN] the client *
665 * *
666 * Return value: SUCCEED - the data was sent successfully *
667 * FAIL - otherwise *
668 * *
669 ******************************************************************************/
ipc_client_write(zbx_ipc_client_t * client)670 static int ipc_client_write(zbx_ipc_client_t *client)
671 {
672 zbx_uint32_t data_size, write_size;
673
674 data_size = client->tx_header[ZBX_IPC_MESSAGE_SIZE];
675
676 if (data_size < client->tx_bytes)
677 {
678 zbx_uint32_t size, offset;
679
680 size = client->tx_bytes - data_size;
681 offset = ZBX_IPC_HEADER_SIZE - size;
682
683 if (SUCCEED != ipc_write_data(client->csocket.fd, (unsigned char *)client->tx_header + offset, size,
684 &write_size))
685 {
686 return FAIL;
687 }
688
689 client->tx_bytes -= write_size;
690
691 if (data_size < client->tx_bytes)
692 return SUCCEED;
693 }
694
695 while (0 < client->tx_bytes)
696 {
697 if (SUCCEED != ipc_write_data(client->csocket.fd, client->tx_data + data_size - client->tx_bytes,
698 client->tx_bytes, &write_size))
699 {
700 return FAIL;
701 }
702
703 if (0 == write_size)
704 return SUCCEED;
705
706 client->tx_bytes -= write_size;
707 }
708
709 if (0 == client->tx_bytes)
710 ipc_client_pop_tx_message(client);
711
712 return SUCCEED;
713 }
714
715 /******************************************************************************
716 * *
717 * Function: ipc_service_pop_client *
718 * *
719 * Purpose: gets the next client with messages/closed socket from recv queue *
720 * *
721 * Parameters: service - [IN] the IPC service *
722 * *
723 * Return value: The client with messages/closed socket *
724 * *
725 ******************************************************************************/
ipc_service_pop_client(zbx_ipc_service_t * service)726 static zbx_ipc_client_t *ipc_service_pop_client(zbx_ipc_service_t *service)
727 {
728 zbx_ipc_client_t *client;
729
730 if (NULL != (client = (zbx_ipc_client_t *)zbx_queue_ptr_pop(&service->clients_recv)))
731 client->state = ZBX_IPC_CLIENT_STATE_NONE;
732
733 return client;
734 }
735
736 /******************************************************************************
737 * *
738 * Function: ipc_service_push_client *
739 * *
740 * Purpose: pushes client to the recv queue if needed *
741 * *
742 * Parameters: service - [IN] the IPC service *
743 * client - [IN] the IPC client *
744 * *
745 * Comments: The client is pushed to the recv queue if it isn't already there *
746 * and there is messages to return or the client connection was *
747 * closed. *
748 * *
749 ******************************************************************************/
ipc_service_push_client(zbx_ipc_service_t * service,zbx_ipc_client_t * client)750 static void ipc_service_push_client(zbx_ipc_service_t *service, zbx_ipc_client_t *client)
751 {
752 if (ZBX_IPC_CLIENT_STATE_QUEUED == client->state)
753 return;
754
755 if (0 == zbx_queue_ptr_values_num(&client->rx_queue) && NULL != client->rx_event)
756 return;
757
758 client->state = ZBX_IPC_CLIENT_STATE_QUEUED;
759 zbx_queue_ptr_push(&service->clients_recv, client);
760 }
761
762 /******************************************************************************
763 * *
764 * Function: ipc_service_add_client *
765 * *
766 * Purpose: adds a new IPC service client *
767 * *
768 * Parameters: service - [IN] the IPC service *
769 * fd - [IN] the client socket descriptor *
770 * *
771 ******************************************************************************/
ipc_service_add_client(zbx_ipc_service_t * service,int fd)772 static void ipc_service_add_client(zbx_ipc_service_t *service, int fd)
773 {
774 static zbx_uint64_t next_clientid = 1;
775 zbx_ipc_client_t *client;
776 int flags;
777
778 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
779
780 client = (zbx_ipc_client_t *)zbx_malloc(NULL, sizeof(zbx_ipc_client_t));
781 memset(client, 0, sizeof(zbx_ipc_client_t));
782
783 if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
784 {
785 zabbix_log(LOG_LEVEL_CRIT, "cannot get IPC client socket flags");
786 exit(EXIT_FAILURE);
787 }
788
789 if (-1 == fcntl(fd, F_SETFL, flags | O_NONBLOCK))
790 {
791 zabbix_log(LOG_LEVEL_CRIT, "cannot set non-blocking mode for IPC client socket");
792 exit(EXIT_FAILURE);
793 }
794
795 client->csocket.fd = fd;
796 client->csocket.rx_buffer_bytes = 0;
797 client->csocket.rx_buffer_offset = 0;
798 client->id = next_clientid++;
799 client->state = ZBX_IPC_CLIENT_STATE_NONE;
800 client->refcount = 1;
801
802 zbx_queue_ptr_create(&client->rx_queue);
803 zbx_queue_ptr_create(&client->tx_queue);
804
805 client->service = service;
806 client->rx_event = event_new(service->ev, fd, EV_READ | EV_PERSIST, ipc_client_read_event_cb, (void *)client);
807 client->tx_event = event_new(service->ev, fd, EV_WRITE | EV_PERSIST, ipc_client_write_event_cb, (void *)client);
808 event_add(client->rx_event, NULL);
809
810 zbx_vector_ptr_append(&service->clients, client);
811
812 zabbix_log(LOG_LEVEL_DEBUG, "End of %s() clientid:" ZBX_FS_UI64, __func__, client->id);
813 }
814
815 /******************************************************************************
816 * *
817 * Function: ipc_service_remove_client *
818 * *
819 * Purpose: removes IPC service client *
820 * *
821 * Parameters: service - [IN] the IPC service *
822 * client - [IN] the client to remove *
823 * *
824 ******************************************************************************/
ipc_service_remove_client(zbx_ipc_service_t * service,zbx_ipc_client_t * client)825 static void ipc_service_remove_client(zbx_ipc_service_t *service, zbx_ipc_client_t *client)
826 {
827 int i;
828
829 for (i = 0; i < service->clients.values_num; i++)
830 {
831 if (service->clients.values[i] == client)
832 zbx_vector_ptr_remove_noorder(&service->clients, i);
833 }
834 }
835
836 /******************************************************************************
837 * *
838 * Function: zbx_ipc_client_by_id *
839 * *
840 * Purpose: to find connected client when only it's ID is known *
841 * *
842 * Parameters: service - [IN] the IPC service *
843 * id - [IN] ID of client *
844 * *
845 * Return value: address of client or NULL if client has already disconnected *
846 * *
847 ******************************************************************************/
zbx_ipc_client_by_id(const zbx_ipc_service_t * service,zbx_uint64_t id)848 zbx_ipc_client_t *zbx_ipc_client_by_id(const zbx_ipc_service_t *service, zbx_uint64_t id)
849 {
850 int i;
851 zbx_ipc_client_t *client;
852
853 for (i = 0; i < service->clients.values_num; i++)
854 {
855 client = (zbx_ipc_client_t *) service->clients.values[i];
856
857 if (id == client->id)
858 return client;
859 }
860
861 return NULL;
862 }
863
864 /******************************************************************************
865 * *
866 * Function: ipc_client_read_event_cb *
867 * *
868 * Purpose: service client read event libevent callback *
869 * *
870 ******************************************************************************/
ipc_client_read_event_cb(evutil_socket_t fd,short what,void * arg)871 static void ipc_client_read_event_cb(evutil_socket_t fd, short what, void *arg)
872 {
873 zbx_ipc_client_t *client = (zbx_ipc_client_t *)arg;
874
875 ZBX_UNUSED(fd);
876 ZBX_UNUSED(what);
877
878 if (SUCCEED != ipc_client_read(client))
879 {
880 ipc_client_free_events(client);
881 ipc_service_remove_client(client->service, client);
882 }
883
884 ipc_service_push_client(client->service, client);
885 }
886
887 /******************************************************************************
888 * *
889 * Function: ipc_client_write_event_cb *
890 * *
891 * Purpose: service client write event libevent callback *
892 * *
893 ******************************************************************************/
ipc_client_write_event_cb(evutil_socket_t fd,short what,void * arg)894 static void ipc_client_write_event_cb(evutil_socket_t fd, short what, void *arg)
895 {
896 zbx_ipc_client_t *client = (zbx_ipc_client_t *)arg;
897
898 ZBX_UNUSED(fd);
899 ZBX_UNUSED(what);
900
901 if (SUCCEED != ipc_client_write(client))
902 {
903 zabbix_log(LOG_LEVEL_CRIT, "cannot send data to IPC client");
904 zbx_ipc_client_close(client);
905 return;
906 }
907
908 if (0 == client->tx_bytes)
909 event_del(client->tx_event);
910 }
911
912 /******************************************************************************
913 * *
914 * Function: ipc_async_socket_write_event_cb *
915 * *
916 * Purpose: asynchronous socket write event libevent callback *
917 * *
918 ******************************************************************************/
ipc_async_socket_write_event_cb(evutil_socket_t fd,short what,void * arg)919 static void ipc_async_socket_write_event_cb(evutil_socket_t fd, short what, void *arg)
920 {
921 zbx_ipc_async_socket_t *asocket = (zbx_ipc_async_socket_t *)arg;
922
923 ZBX_UNUSED(fd);
924 ZBX_UNUSED(what);
925
926 if (SUCCEED != ipc_client_write(asocket->client))
927 {
928 zabbix_log(LOG_LEVEL_CRIT, "cannot send data to IPC client");
929 ipc_client_free_events(asocket->client);
930 zbx_ipc_socket_close(&asocket->client->csocket);
931 asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_ERROR;
932 return;
933 }
934
935 if (0 == asocket->client->tx_bytes)
936 event_del(asocket->client->tx_event);
937 }
938
939 /******************************************************************************
940 * *
941 * Function: ipc_async_socket_read_event_cb *
942 * *
943 * Purpose: asynchronous socket read event libevent callback *
944 * *
945 ******************************************************************************/
ipc_async_socket_read_event_cb(evutil_socket_t fd,short what,void * arg)946 static void ipc_async_socket_read_event_cb(evutil_socket_t fd, short what, void *arg)
947 {
948 zbx_ipc_async_socket_t *asocket = (zbx_ipc_async_socket_t *)arg;
949
950 ZBX_UNUSED(fd);
951 ZBX_UNUSED(what);
952
953 if (SUCCEED != ipc_client_read(asocket->client))
954 {
955 ipc_client_free_events(asocket->client);
956 asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_ERROR;
957 }
958 }
959
960 /******************************************************************************
961 * *
962 * Function: ipc_async_socket_timer_cb *
963 * *
964 * Purpose: timer callback *
965 * *
966 ******************************************************************************/
ipc_async_socket_timer_cb(evutil_socket_t fd,short what,void * arg)967 static void ipc_async_socket_timer_cb(evutil_socket_t fd, short what, void *arg)
968 {
969 zbx_ipc_async_socket_t *asocket = (zbx_ipc_async_socket_t *)arg;
970
971 ZBX_UNUSED(fd);
972 ZBX_UNUSED(what);
973
974 asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_TIMEOUT;
975 }
976
977 /******************************************************************************
978 * *
979 * Function: ipc_service_accept *
980 * *
981 * Purpose: accepts a new client connection *
982 * *
983 * Parameters: service - [IN] the IPC service *
984 * *
985 ******************************************************************************/
ipc_service_accept(zbx_ipc_service_t * service)986 static void ipc_service_accept(zbx_ipc_service_t *service)
987 {
988 int fd;
989
990 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
991
992 while (-1 == (fd = accept(service->fd, NULL, NULL)))
993 {
994 if (EINTR != errno)
995 {
996 /* If there is unaccepted connection libevent will call registered callback function over and */
997 /* over again. It is better to exit straight away and cause all other processes to stop. */
998 zabbix_log(LOG_LEVEL_CRIT, "cannot accept incoming IPC connection: %s", zbx_strerror(errno));
999 exit(EXIT_FAILURE);
1000 }
1001 }
1002
1003 ipc_service_add_client(service, fd);
1004
1005 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1006 }
1007
1008 /******************************************************************************
1009 * *
1010 * Function: ipc_message_create *
1011 * *
1012 * Purpose: creates IPC message *
1013 * *
1014 * Parameters: code - [IN] the message code *
1015 * data - [IN] the data *
1016 * size - [IN] the data size *
1017 * *
1018 * Return value: The created message. *
1019 * *
1020 ******************************************************************************/
ipc_message_create(zbx_uint32_t code,const unsigned char * data,zbx_uint32_t size)1021 static zbx_ipc_message_t *ipc_message_create(zbx_uint32_t code, const unsigned char *data, zbx_uint32_t size)
1022 {
1023 zbx_ipc_message_t *message;
1024
1025 message = (zbx_ipc_message_t *)zbx_malloc(NULL, sizeof(zbx_ipc_message_t));
1026
1027 message->code = code;
1028 message->size = size;
1029
1030 if (0 != size)
1031 {
1032 message->data = (unsigned char *)zbx_malloc(NULL, size);
1033 memcpy(message->data, data, size);
1034 }
1035 else
1036 message->data = NULL;
1037
1038 return message;
1039 }
1040
1041 /******************************************************************************
1042 * *
1043 * Function: ipc_service_event_log *
1044 * *
1045 * Purpose: libevent logging callback *
1046 * *
1047 ******************************************************************************/
ipc_service_event_log_cb(int severity,const char * msg)1048 static void ipc_service_event_log_cb(int severity, const char *msg)
1049 {
1050 int loglevel;
1051
1052 switch (severity)
1053 {
1054 case _EVENT_LOG_DEBUG:
1055 loglevel = LOG_LEVEL_TRACE;
1056 break;
1057 case _EVENT_LOG_MSG:
1058 loglevel = LOG_LEVEL_DEBUG;
1059 break;
1060 case _EVENT_LOG_WARN:
1061 loglevel = LOG_LEVEL_WARNING;
1062 break;
1063 case _EVENT_LOG_ERR:
1064 loglevel = LOG_LEVEL_DEBUG;
1065 break;
1066 default:
1067 loglevel = LOG_LEVEL_DEBUG;
1068 break;
1069 }
1070
1071 zabbix_log(loglevel, "IPC service: %s", msg);
1072 }
1073
1074 /******************************************************************************
1075 * *
1076 * Function: ipc_service_init_libevent *
1077 * *
1078 * Purpose: initialize libevent library *
1079 * *
1080 ******************************************************************************/
ipc_service_init_libevent(void)1081 static void ipc_service_init_libevent(void)
1082 {
1083 event_set_log_callback(ipc_service_event_log_cb);
1084 }
1085
1086 /******************************************************************************
1087 * *
1088 * Function: ipc_service_free_libevent *
1089 * *
1090 * Purpose: uninitialize libevent library *
1091 * *
1092 ******************************************************************************/
ipc_service_free_libevent(void)1093 static void ipc_service_free_libevent(void)
1094 {
1095 }
1096
1097 /******************************************************************************
1098 * *
1099 * Function: ipc_service_client_connected_cb *
1100 * *
1101 * Purpose: libevent listener callback *
1102 * *
1103 ******************************************************************************/
ipc_service_client_connected_cb(evutil_socket_t fd,short what,void * arg)1104 static void ipc_service_client_connected_cb(evutil_socket_t fd, short what, void *arg)
1105 {
1106 zbx_ipc_service_t *service = (zbx_ipc_service_t *)arg;
1107
1108 ZBX_UNUSED(fd);
1109 ZBX_UNUSED(what);
1110
1111 ipc_service_accept(service);
1112 }
1113
1114 /******************************************************************************
1115 * *
1116 * Function: ipc_service_timer_cb *
1117 * *
1118 * Purpose: timer callback *
1119 * *
1120 ******************************************************************************/
ipc_service_timer_cb(evutil_socket_t fd,short what,void * arg)1121 static void ipc_service_timer_cb(evutil_socket_t fd, short what, void *arg)
1122 {
1123 ZBX_UNUSED(fd);
1124 ZBX_UNUSED(what);
1125 ZBX_UNUSED(arg);
1126 }
1127
1128 /******************************************************************************
1129 * *
1130 * Function: ipc_check_running_service *
1131 * *
1132 * Purpose: checks if an IPC service is already running *
1133 * *
1134 * Parameters: service_name - [IN] *
1135 * *
1136 ******************************************************************************/
ipc_check_running_service(const char * service_name)1137 static int ipc_check_running_service(const char *service_name)
1138 {
1139 zbx_ipc_socket_t csocket;
1140 int ret;
1141 char *error = NULL;
1142
1143 if (SUCCEED == (ret = zbx_ipc_socket_open(&csocket, service_name, 0, &error)))
1144 zbx_ipc_socket_close(&csocket);
1145 else
1146 zbx_free(error);
1147
1148 return ret;
1149 }
1150
1151 /*
1152 * Public client API
1153 */
1154
1155 /******************************************************************************
1156 * *
1157 * Function: zbx_ipc_socket_open *
1158 * *
1159 * Purpose: opens socket to an IPC service listening on the specified path *
1160 * *
1161 * Parameters: csocket - [OUT] the IPC socket to the service *
1162 * service_name - [IN] the IPC service name *
1163 * timeout - [IN] the connection timeout *
1164 * error - [OUT] the error message *
1165 * *
1166 * Return value: SUCCEED - the socket was successfully opened *
1167 * FAIL - otherwise *
1168 * *
1169 ******************************************************************************/
zbx_ipc_socket_open(zbx_ipc_socket_t * csocket,const char * service_name,int timeout,char ** error)1170 int zbx_ipc_socket_open(zbx_ipc_socket_t *csocket, const char *service_name, int timeout, char **error)
1171 {
1172 struct sockaddr_un addr;
1173 time_t start;
1174 struct timespec ts = {0, 100000000};
1175 const char *socket_path;
1176 int ret = FAIL;
1177
1178 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1179
1180 if (NULL == (socket_path = ipc_make_path(service_name, error)))
1181 goto out;
1182
1183 if (-1 == (csocket->fd = socket(AF_UNIX, SOCK_STREAM, 0)))
1184 {
1185 *error = zbx_dsprintf(*error, "Cannot create client socket: %s.", zbx_strerror(errno));
1186 goto out;
1187 }
1188
1189 memset(&addr, 0, sizeof(addr));
1190 addr.sun_family = AF_UNIX;
1191 memcpy(addr.sun_path, socket_path, sizeof(addr.sun_path));
1192
1193 start = time(NULL);
1194
1195 while (0 != connect(csocket->fd, (struct sockaddr*)&addr, sizeof(addr)))
1196 {
1197 if (0 == timeout || time(NULL) - start > timeout)
1198 {
1199 *error = zbx_dsprintf(*error, "Cannot connect to service \"%s\": %s.", service_name,
1200 zbx_strerror(errno));
1201 close(csocket->fd);
1202 goto out;
1203 }
1204
1205 nanosleep(&ts, NULL);
1206 }
1207
1208 csocket->rx_buffer_bytes = 0;
1209 csocket->rx_buffer_offset = 0;
1210
1211 ret = SUCCEED;
1212 out:
1213 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1214 return ret;
1215 }
1216
1217 /******************************************************************************
1218 * *
1219 * Function: zbx_ipc_socket_close *
1220 * *
1221 * Purpose: closes socket to an IPC service *
1222 * *
1223 * Parameters: csocket - [IN/OUT] the IPC socket to close *
1224 * *
1225 ******************************************************************************/
zbx_ipc_socket_close(zbx_ipc_socket_t * csocket)1226 void zbx_ipc_socket_close(zbx_ipc_socket_t *csocket)
1227 {
1228 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1229
1230 if (-1 != csocket->fd)
1231 {
1232 close(csocket->fd);
1233 csocket->fd = -1;
1234 }
1235
1236 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1237 }
1238
1239 /******************************************************************************
1240 * *
1241 * Function: zbx_ipc_socket_write *
1242 * *
1243 * Purpose: writes a message to IPC service *
1244 * *
1245 * Parameters: csocket - [IN] an opened IPC socket to the service *
1246 * code - [IN] the message code *
1247 * data - [IN] the data *
1248 * size - [IN] the data size *
1249 * *
1250 * Return value: SUCCEED - the message was successfully written *
1251 * FAIL - otherwise *
1252 * *
1253 ******************************************************************************/
zbx_ipc_socket_write(zbx_ipc_socket_t * csocket,zbx_uint32_t code,const unsigned char * data,zbx_uint32_t size)1254 int zbx_ipc_socket_write(zbx_ipc_socket_t *csocket, zbx_uint32_t code, const unsigned char *data, zbx_uint32_t size)
1255 {
1256 int ret;
1257 zbx_uint32_t size_sent;
1258
1259 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1260
1261 if (SUCCEED == ipc_socket_write_message(csocket, code, data, size, &size_sent) &&
1262 size_sent == size + ZBX_IPC_HEADER_SIZE)
1263 {
1264 ret = SUCCEED;
1265 }
1266 else
1267 ret = FAIL;
1268
1269 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1270
1271 return ret;
1272 }
1273
1274 /******************************************************************************
1275 * *
1276 * Function: zbx_ipc_socket_read *
1277 * *
1278 * Purpose: reads a message from IPC service *
1279 * *
1280 * Parameters: csocket - [IN] an opened IPC socket to the service *
1281 * message - [OUT] the received message *
1282 * *
1283 * Return value: SUCCEED - the message was successfully received *
1284 * FAIL - otherwise *
1285 * *
1286 * Comments: If this function succeeds the message must be cleaned/freed by *
1287 * the caller. *
1288 * *
1289 ******************************************************************************/
zbx_ipc_socket_read(zbx_ipc_socket_t * csocket,zbx_ipc_message_t * message)1290 int zbx_ipc_socket_read(zbx_ipc_socket_t *csocket, zbx_ipc_message_t *message)
1291 {
1292 int ret = FAIL;
1293 zbx_uint32_t rx_bytes = 0, header[2];
1294 unsigned char *data = NULL;
1295
1296 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1297
1298 if (SUCCEED != ipc_socket_read_message(csocket, header, &data, &rx_bytes))
1299 goto out;
1300
1301 if (SUCCEED != ipc_message_is_completed(header, rx_bytes))
1302 {
1303 zbx_free(data);
1304 goto out;
1305 }
1306
1307 message->code = header[ZBX_IPC_MESSAGE_CODE];
1308 message->size = header[ZBX_IPC_MESSAGE_SIZE];
1309 message->data = data;
1310
1311 if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_TRACE))
1312 {
1313 char *msg = NULL;
1314
1315 zbx_ipc_message_format(message, &msg);
1316
1317 zabbix_log(LOG_LEVEL_DEBUG, "%s() %s", __func__, msg);
1318
1319 zbx_free(msg);
1320 }
1321
1322 ret = SUCCEED;
1323 out:
1324 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1325
1326 return ret;
1327 }
1328
1329 /******************************************************************************
1330 * *
1331 * Function: zbx_ipc_message_free *
1332 * *
1333 * Purpose: frees the resources allocated to store IPC message data *
1334 * *
1335 * Parameters: message - [IN] the message to free *
1336 * *
1337 ******************************************************************************/
zbx_ipc_message_free(zbx_ipc_message_t * message)1338 void zbx_ipc_message_free(zbx_ipc_message_t *message)
1339 {
1340 if (NULL != message)
1341 {
1342 zbx_free(message->data);
1343 zbx_free(message);
1344 }
1345 }
1346
1347 /******************************************************************************
1348 * *
1349 * Function: zbx_ipc_message_clean *
1350 * *
1351 * Purpose: frees the resources allocated to store IPC message data *
1352 * *
1353 * Parameters: message - [IN] the message to clean *
1354 * *
1355 ******************************************************************************/
zbx_ipc_message_clean(zbx_ipc_message_t * message)1356 void zbx_ipc_message_clean(zbx_ipc_message_t *message)
1357 {
1358 zbx_free(message->data);
1359 }
1360
1361 /******************************************************************************
1362 * *
1363 * Function: zbx_ipc_message_init *
1364 * *
1365 * Purpose: initializes IPC message *
1366 * *
1367 * Parameters: message - [IN] the message to initialize *
1368 * *
1369 ******************************************************************************/
zbx_ipc_message_init(zbx_ipc_message_t * message)1370 void zbx_ipc_message_init(zbx_ipc_message_t *message)
1371 {
1372 memset(message, 0, sizeof(zbx_ipc_message_t));
1373 }
1374
1375 /******************************************************************************
1376 * *
1377 * Function: zbx_ipc_message_format *
1378 * *
1379 * Purpose: formats message to readable format for debug messages *
1380 * *
1381 * Parameters: message - [IN] the message *
1382 * data - [OUT] the formatted message *
1383 * *
1384 ******************************************************************************/
zbx_ipc_message_format(const zbx_ipc_message_t * message,char ** data)1385 void zbx_ipc_message_format(const zbx_ipc_message_t *message, char **data)
1386 {
1387 size_t data_alloc = ZBX_IPC_DATA_DUMP_SIZE * 4 + 32, data_offset = 0;
1388 zbx_uint32_t i, data_num;
1389
1390 if (NULL == message)
1391 return;
1392
1393 data_num = message->size;
1394
1395 if (ZBX_IPC_DATA_DUMP_SIZE < data_num)
1396 data_num = ZBX_IPC_DATA_DUMP_SIZE;
1397
1398 *data = (char *)zbx_malloc(*data, data_alloc);
1399 zbx_snprintf_alloc(data, &data_alloc, &data_offset, "code:%u size:%u data:", message->code, message->size);
1400
1401 for (i = 0; i < data_num; i++)
1402 {
1403 if (0 != i)
1404 zbx_strcpy_alloc(data, &data_alloc, &data_offset, (0 == (i & 7) ? " | " : " "));
1405
1406 zbx_snprintf_alloc(data, &data_alloc, &data_offset, "%02x", (int)message->data[i]);
1407 }
1408
1409 (*data)[data_offset] = '\0';
1410 }
1411
1412 /******************************************************************************
1413 * *
1414 * Function: zbx_ipc_message_copy *
1415 * *
1416 * Purpose: copies ipc message *
1417 * *
1418 * Parameters: dst - [IN] the destination message *
1419 * src - [IN] the source message *
1420 * *
1421 ******************************************************************************/
zbx_ipc_message_copy(zbx_ipc_message_t * dst,const zbx_ipc_message_t * src)1422 void zbx_ipc_message_copy(zbx_ipc_message_t *dst, const zbx_ipc_message_t *src)
1423 {
1424 dst->code = src->code;
1425 dst->size = src->size;
1426 dst->data = (unsigned char *)zbx_malloc(NULL, src->size);
1427 memcpy(dst->data, src->data, src->size);
1428 }
1429
1430 /*
1431 * Public service API
1432 */
1433
1434 /******************************************************************************
1435 * *
1436 * Function: zbx_ipc_service_init_env *
1437 * *
1438 * Purpose: initializes IPC service environment *
1439 * *
1440 * Parameters: path - [IN] the service root path *
1441 * error - [OUT] the error message *
1442 * *
1443 * Return value: SUCCEED - the environment was initialized successfully. *
1444 * FAIL - otherwise *
1445 * *
1446 ******************************************************************************/
zbx_ipc_service_init_env(const char * path,char ** error)1447 int zbx_ipc_service_init_env(const char *path, char **error)
1448 {
1449 struct stat fs;
1450 int ret = FAIL;
1451
1452 zabbix_log(LOG_LEVEL_DEBUG, "In %s() path:%s", __func__, path);
1453
1454 if (0 != ipc_path_root_len)
1455 {
1456 *error = zbx_dsprintf(*error, "The IPC service environment has been already initialized with"
1457 " root directory at \"%s\".", ipc_get_path());
1458 goto out;
1459 }
1460
1461 if (0 != stat(path, &fs))
1462 {
1463 *error = zbx_dsprintf(*error, "Failed to stat the specified path \"%s\": %s.", path,
1464 zbx_strerror(errno));
1465 goto out;
1466 }
1467
1468 if (0 == S_ISDIR(fs.st_mode))
1469 {
1470 *error = zbx_dsprintf(*error, "The specified path \"%s\" is not a directory.", path);
1471 goto out;
1472 }
1473
1474 if (0 != access(path, W_OK | R_OK))
1475 {
1476 *error = zbx_dsprintf(*error, "Cannot access path \"%s\": %s.", path, zbx_strerror(errno));
1477 goto out;
1478 }
1479
1480 ipc_path_root_len = strlen(path);
1481 if (ZBX_IPC_PATH_MAX < ipc_path_root_len + 3)
1482 {
1483 *error = zbx_dsprintf(*error, "The IPC root path \"%s\" is too long.", path);
1484 goto out;
1485 }
1486
1487 memcpy(ipc_path, path, ipc_path_root_len + 1);
1488
1489 while (1 < ipc_path_root_len && '/' == ipc_path[ipc_path_root_len - 1])
1490 ipc_path[--ipc_path_root_len] = '\0';
1491
1492 ipc_service_init_libevent();
1493
1494 ret = SUCCEED;
1495 out:
1496 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1497
1498 return ret;
1499 }
1500
1501 /******************************************************************************
1502 * *
1503 * Function: zbx_ipc_service_free_env *
1504 * *
1505 * Purpose: frees IPC service environment *
1506 * *
1507 ******************************************************************************/
zbx_ipc_service_free_env(void)1508 void zbx_ipc_service_free_env(void)
1509 {
1510 ipc_service_free_libevent();
1511 }
1512
1513
1514 /******************************************************************************
1515 * *
1516 * Function: zbx_ipc_service_start *
1517 * *
1518 * Purpose: starts IPC service on the specified path *
1519 * *
1520 * Parameters: service - [IN/OUT] the IPC service *
1521 * service_name - [IN] the unix domain socket path *
1522 * error - [OUT] the error message *
1523 * *
1524 * Return value: SUCCEED - the service was initialized successfully. *
1525 * FAIL - otherwise *
1526 * *
1527 ******************************************************************************/
zbx_ipc_service_start(zbx_ipc_service_t * service,const char * service_name,char ** error)1528 int zbx_ipc_service_start(zbx_ipc_service_t *service, const char *service_name, char **error)
1529 {
1530 struct sockaddr_un addr;
1531 const char *socket_path;
1532 int ret = FAIL;
1533 mode_t mode;
1534
1535 zabbix_log(LOG_LEVEL_DEBUG, "In %s() service:%s", __func__, service_name);
1536
1537 mode = umask(077);
1538
1539 if (NULL == (socket_path = ipc_make_path(service_name, error)))
1540 goto out;
1541
1542 if (0 == access(socket_path, F_OK))
1543 {
1544 if (0 != access(socket_path, W_OK))
1545 {
1546 *error = zbx_dsprintf(*error, "The file \"%s\" is used by another process.", socket_path);
1547 goto out;
1548 }
1549
1550 if (SUCCEED == ipc_check_running_service(service_name))
1551 {
1552 *error = zbx_dsprintf(*error, "\"%s\" service is already running.", service_name);
1553 goto out;
1554 }
1555
1556 unlink(socket_path);
1557 }
1558
1559 if (-1 == (service->fd = socket(AF_UNIX, SOCK_STREAM, 0)))
1560 {
1561 *error = zbx_dsprintf(*error, "Cannot create socket: %s.", zbx_strerror(errno));
1562 goto out;
1563 }
1564
1565 memset(&addr, 0, sizeof(addr));
1566 addr.sun_family = AF_UNIX;
1567 memcpy(addr.sun_path, socket_path, sizeof(addr.sun_path));
1568
1569 if (0 != bind(service->fd, (struct sockaddr*)&addr, sizeof(addr)))
1570 {
1571 *error = zbx_dsprintf(*error, "Cannot bind socket to \"%s\": %s.", socket_path, zbx_strerror(errno));
1572 goto out;
1573 }
1574
1575 if (0 != listen(service->fd, SOMAXCONN))
1576 {
1577 *error = zbx_dsprintf(*error, "Cannot listen socket: %s.", zbx_strerror(errno));
1578 goto out;
1579 }
1580
1581 service->path = zbx_strdup(NULL, service_name);
1582 zbx_vector_ptr_create(&service->clients);
1583 zbx_queue_ptr_create(&service->clients_recv);
1584
1585 service->ev = event_base_new();
1586 service->ev_listener = event_new(service->ev, service->fd, EV_READ | EV_PERSIST,
1587 ipc_service_client_connected_cb, service);
1588 event_add(service->ev_listener, NULL);
1589
1590 service->ev_timer = event_new(service->ev, -1, 0, ipc_service_timer_cb, service);
1591
1592 ret = SUCCEED;
1593 out:
1594 umask(mode);
1595
1596 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1597
1598 return ret;
1599 }
1600
1601 /******************************************************************************
1602 * *
1603 * Function: zbx_ipc_service_close *
1604 * *
1605 * Purpose: closes IPC service and frees the resources allocated by it *
1606 * *
1607 * Parameters: service - [IN/OUT] the IPC service *
1608 * *
1609 ******************************************************************************/
zbx_ipc_service_close(zbx_ipc_service_t * service)1610 void zbx_ipc_service_close(zbx_ipc_service_t *service)
1611 {
1612 int i;
1613
1614 zabbix_log(LOG_LEVEL_DEBUG, "In %s() path:%s", __func__, service->path);
1615
1616 close(service->fd);
1617
1618 for (i = 0; i < service->clients.values_num; i++)
1619 ipc_client_free((zbx_ipc_client_t *)service->clients.values[i]);
1620
1621 zbx_free(service->path);
1622
1623 zbx_vector_ptr_destroy(&service->clients);
1624 zbx_queue_ptr_destroy(&service->clients_recv);
1625
1626 event_free(service->ev_timer);
1627 event_free(service->ev_listener);
1628 event_base_free(service->ev);
1629
1630 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1631 }
1632
1633 /******************************************************************************
1634 * *
1635 * Function: zbx_ipc_service_recv *
1636 * *
1637 * Purpose: receives ipc message from a connected client *
1638 * *
1639 * Parameters: service - [IN] the IPC service *
1640 * timeout - [IN] the timeout in seconds, 0 is used for *
1641 * nonblocking call and ZBX_IPC_WAIT_FOREVER is *
1642 * used for blocking call without timeout *
1643 * client - [OUT] the client that sent the message or *
1644 * NULL if there are no messages and the *
1645 * specified timeout passed. *
1646 * The client must be released by caller with *
1647 * zbx_ipc_client_release() function. *
1648 * message - [OUT] the received message or NULL if the client *
1649 * connection was closed. *
1650 * The message must be freed by caller with *
1651 * ipc_message_free() function. *
1652 * *
1653 * Return value: ZBX_IPC_RECV_IMMEDIATE - returned immediately without *
1654 * waiting for socket events *
1655 * (pending events are processed) *
1656 * ZBX_IPC_RECV_WAIT - returned after receiving socket *
1657 * event *
1658 * ZBX_IPC_RECV_TIMEOUT - returned after timeout expired *
1659 * *
1660 ******************************************************************************/
zbx_ipc_service_recv(zbx_ipc_service_t * service,int timeout,zbx_ipc_client_t ** client,zbx_ipc_message_t ** message)1661 int zbx_ipc_service_recv(zbx_ipc_service_t *service, int timeout, zbx_ipc_client_t **client,
1662 zbx_ipc_message_t **message)
1663 {
1664 int ret, flags;
1665
1666 zabbix_log(LOG_LEVEL_DEBUG, "In %s() timeout:%d", __func__, timeout);
1667
1668 if (timeout != 0 && SUCCEED == zbx_queue_ptr_empty(&service->clients_recv))
1669 {
1670 if (ZBX_IPC_WAIT_FOREVER != timeout)
1671 {
1672 struct timeval tv = {timeout, 0};
1673 evtimer_add(service->ev_timer, &tv);
1674 }
1675 flags = EVLOOP_ONCE;
1676 }
1677 else
1678 flags = EVLOOP_NONBLOCK;
1679
1680 event_base_loop(service->ev, flags);
1681
1682 if (NULL != (*client = ipc_service_pop_client(service)))
1683 {
1684 if (NULL != (*message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&(*client)->rx_queue)))
1685 {
1686 if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_TRACE))
1687 {
1688 char *data = NULL;
1689
1690 zbx_ipc_message_format(*message, &data);
1691 zabbix_log(LOG_LEVEL_DEBUG, "%s() %s", __func__, data);
1692
1693 zbx_free(data);
1694 }
1695
1696 ipc_service_push_client(service, *client);
1697 zbx_ipc_client_addref(*client);
1698 }
1699
1700 ret = (EVLOOP_NONBLOCK == flags ? ZBX_IPC_RECV_IMMEDIATE : ZBX_IPC_RECV_WAIT);
1701 }
1702 else
1703 { ret = ZBX_IPC_RECV_TIMEOUT;
1704 *client = NULL;
1705 *message = NULL;
1706 }
1707
1708 evtimer_del(service->ev_timer);
1709
1710 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%d", __func__, ret);
1711
1712 return ret;
1713 }
1714
1715 /******************************************************************************
1716 * *
1717 * Function: zbx_ipc_client_send *
1718 * *
1719 * Purpose: Sends IPC message to client *
1720 * *
1721 * Parameters: client - [IN] the IPC client *
1722 * code - [IN] the message code *
1723 * data - [IN] the data *
1724 * size - [IN] the data size *
1725 * *
1726 * Comments: If data can't be written directly to socket (buffer full) then *
1727 * the message is queued and sent during zbx_ipc_service_recv() *
1728 * messaging loop whenever socket becomes ready. *
1729 * *
1730 ******************************************************************************/
zbx_ipc_client_send(zbx_ipc_client_t * client,zbx_uint32_t code,const unsigned char * data,zbx_uint32_t size)1731 int zbx_ipc_client_send(zbx_ipc_client_t *client, zbx_uint32_t code, const unsigned char *data, zbx_uint32_t size)
1732 {
1733 zbx_uint32_t tx_size = 0;
1734 zbx_ipc_message_t *message;
1735 int ret = FAIL;
1736
1737 zabbix_log(LOG_LEVEL_DEBUG, "In %s() clientid:" ZBX_FS_UI64, __func__, client->id);
1738
1739 if (0 != client->tx_bytes)
1740 {
1741 message = ipc_message_create(code, data, size);
1742 zbx_queue_ptr_push(&client->tx_queue, message);
1743 ret = SUCCEED;
1744 goto out;
1745 }
1746
1747 if (FAIL == ipc_socket_write_message(&client->csocket, code, data, size, &tx_size))
1748 goto out;
1749
1750 if (tx_size != ZBX_IPC_HEADER_SIZE + size)
1751 {
1752 client->tx_header[ZBX_IPC_MESSAGE_CODE] = code;
1753 client->tx_header[ZBX_IPC_MESSAGE_SIZE] = size;
1754 client->tx_data = (unsigned char *)zbx_malloc(NULL, size);
1755 memcpy(client->tx_data, data, size);
1756 client->tx_bytes = ZBX_IPC_HEADER_SIZE + size - tx_size;
1757 event_add(client->tx_event, NULL);
1758 }
1759
1760 ret = SUCCEED;
1761 out:
1762 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1763
1764 return ret;
1765 }
1766
1767 /******************************************************************************
1768 * *
1769 * Function: zbx_ipc_client_close *
1770 * *
1771 * Purpose: closes client socket and frees resources allocated for client *
1772 * *
1773 * Parameters: client - [IN] the IPC client *
1774 * *
1775 ******************************************************************************/
zbx_ipc_client_close(zbx_ipc_client_t * client)1776 void zbx_ipc_client_close(zbx_ipc_client_t *client)
1777 {
1778 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1779
1780 ipc_client_free_events(client);
1781 zbx_ipc_socket_close(&client->csocket);
1782
1783 ipc_service_remove_client(client->service, client);
1784 zbx_queue_ptr_remove_value(&client->service->clients_recv, client);
1785 zbx_ipc_client_release(client);
1786
1787 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1788 }
1789
zbx_ipc_client_addref(zbx_ipc_client_t * client)1790 void zbx_ipc_client_addref(zbx_ipc_client_t *client)
1791 {
1792 client->refcount++;
1793 }
1794
zbx_ipc_client_release(zbx_ipc_client_t * client)1795 void zbx_ipc_client_release(zbx_ipc_client_t *client)
1796 {
1797 if (0 == --client->refcount)
1798 ipc_client_free(client);
1799 }
1800
zbx_ipc_client_connected(zbx_ipc_client_t * client)1801 int zbx_ipc_client_connected(zbx_ipc_client_t *client)
1802 {
1803 return (NULL == client->rx_event ? FAIL : SUCCEED);
1804 }
1805
zbx_ipc_client_id(const zbx_ipc_client_t * client)1806 zbx_uint64_t zbx_ipc_client_id(const zbx_ipc_client_t *client)
1807 {
1808 return client->id;
1809 }
1810
zbx_ipc_client_set_userdata(zbx_ipc_client_t * client,void * userdata)1811 void zbx_ipc_client_set_userdata(zbx_ipc_client_t *client, void *userdata)
1812 {
1813 client->userdata = userdata;
1814 }
1815
zbx_ipc_client_get_userdata(zbx_ipc_client_t * client)1816 void *zbx_ipc_client_get_userdata(zbx_ipc_client_t *client)
1817 {
1818 return client->userdata;
1819 }
1820
1821 /******************************************************************************
1822 * *
1823 * Function: zbx_ipc_async_socket_open *
1824 * *
1825 * Purpose: opens asynchronous socket to IPC service client *
1826 * *
1827 * Parameters: client - [OUT] the IPC service client *
1828 * service_name - [IN] the IPC service name *
1829 * timeout - [IN] the connection timeout *
1830 * error - [OUT] the error message *
1831 * *
1832 * Return value: SUCCEED - the socket was successfully opened *
1833 * FAIL - otherwise *
1834 * *
1835 ******************************************************************************/
zbx_ipc_async_socket_open(zbx_ipc_async_socket_t * asocket,const char * service_name,int timeout,char ** error)1836 int zbx_ipc_async_socket_open(zbx_ipc_async_socket_t *asocket, const char *service_name, int timeout, char **error)
1837 {
1838 int ret = FAIL, flags;
1839
1840 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1841
1842 memset(asocket, 0, sizeof(zbx_ipc_async_socket_t));
1843 asocket->client = (zbx_ipc_client_t *)zbx_malloc(NULL, sizeof(zbx_ipc_client_t));
1844 memset(asocket->client, 0, sizeof(zbx_ipc_client_t));
1845
1846 if (SUCCEED != zbx_ipc_socket_open(&asocket->client->csocket, service_name, timeout, error))
1847 {
1848 zbx_free(asocket->client);
1849 goto out;
1850 }
1851
1852 if (-1 == (flags = fcntl(asocket->client->csocket.fd, F_GETFL, 0)))
1853 {
1854 zabbix_log(LOG_LEVEL_CRIT, "cannot get IPC client socket flags");
1855 exit(EXIT_FAILURE);
1856 }
1857
1858 if (-1 == fcntl(asocket->client->csocket.fd, F_SETFL, flags | O_NONBLOCK))
1859 {
1860 zabbix_log(LOG_LEVEL_CRIT, "cannot set non-blocking mode for IPC client socket");
1861 exit(EXIT_FAILURE);
1862 }
1863
1864 asocket->ev = event_base_new();
1865 asocket->ev_timer = event_new(asocket->ev, -1, 0, ipc_async_socket_timer_cb, asocket);
1866 asocket->client->rx_event = event_new(asocket->ev, asocket->client->csocket.fd, EV_READ | EV_PERSIST,
1867 ipc_async_socket_read_event_cb, (void *)asocket);
1868 asocket->client->tx_event = event_new(asocket->ev, asocket->client->csocket.fd, EV_WRITE | EV_PERSIST,
1869 ipc_async_socket_write_event_cb, (void *)asocket);
1870 event_add(asocket->client->rx_event, NULL);
1871
1872 asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_NONE;
1873
1874 ret = SUCCEED;
1875 out:
1876 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1877 return ret;
1878 }
1879
1880 /******************************************************************************
1881 * *
1882 * Function: zbx_ipc_async_socket_close *
1883 * *
1884 * Purpose: closes asynchronous IPC socket and frees allocated resources *
1885 * *
1886 * Parameters: asocket - [IN] the asynchronous IPC socket *
1887 * *
1888 ******************************************************************************/
zbx_ipc_async_socket_close(zbx_ipc_async_socket_t * asocket)1889 void zbx_ipc_async_socket_close(zbx_ipc_async_socket_t *asocket)
1890 {
1891 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1892
1893 ipc_client_free(asocket->client);
1894 asocket->client = NULL;
1895
1896 event_free(asocket->ev_timer);
1897 event_base_free(asocket->ev);
1898
1899 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1900 }
1901
1902 /******************************************************************************
1903 * *
1904 * Function: zbx_ipc_async_socket_send *
1905 * *
1906 * Purpose: Sends message through asynchronous IPC socket *
1907 * *
1908 * Parameters: asocket - [IN] the asynchronous IPC socket *
1909 * code - [IN] the message code *
1910 * data - [IN] the data *
1911 * size - [IN] the data size *
1912 * *
1913 * Comments: If data can't be written directly to socket (buffer full) then *
1914 * the message is queued and sent during zbx_ipc_async_socket_recv()*
1915 * or zbx_ipc_async_socket_flush() functions whenever socket becomes*
1916 * ready. *
1917 * *
1918 ******************************************************************************/
zbx_ipc_async_socket_send(zbx_ipc_async_socket_t * asocket,zbx_uint32_t code,const unsigned char * data,zbx_uint32_t size)1919 int zbx_ipc_async_socket_send(zbx_ipc_async_socket_t *asocket, zbx_uint32_t code, const unsigned char *data,
1920 zbx_uint32_t size)
1921 {
1922 int ret = FAIL;
1923
1924 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1925
1926 ret = zbx_ipc_client_send(asocket->client, code, data, size);
1927
1928 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1929
1930 return ret;
1931 }
1932
1933 /******************************************************************************
1934 * *
1935 * Function: zbx_ipc_async_socket_recv *
1936 * *
1937 * Purpose: receives message through asynchronous IPC socket *
1938 * *
1939 * Parameters: asocket - [IN] the asynchronous IPC socket *
1940 * timeout - [IN] the timeout in seconds, 0 is used for *
1941 * nonblocking call and ZBX_IPC_WAIT_FOREVER is *
1942 * used for blocking call without timeout *
1943 * message - [OUT] the received message or NULL if the client *
1944 * connection was closed. *
1945 * The message must be freed by caller with *
1946 * ipc_message_free() function. *
1947 * *
1948 * Return value: SUCCEED - the message was read successfully or timeout *
1949 * occurred *
1950 * FAIL - otherwise *
1951 * *
1952 * Comments: After socket has been closed (or connection error has occurred) *
1953 * calls to zbx_ipc_client_read() will return success with buffered *
1954 * messages, until all buffered messages are retrieved. *
1955 * *
1956 ******************************************************************************/
zbx_ipc_async_socket_recv(zbx_ipc_async_socket_t * asocket,int timeout,zbx_ipc_message_t ** message)1957 int zbx_ipc_async_socket_recv(zbx_ipc_async_socket_t *asocket, int timeout, zbx_ipc_message_t **message)
1958 {
1959 int ret, flags;
1960
1961 zabbix_log(LOG_LEVEL_DEBUG, "In %s() timeout:%d", __func__, timeout);
1962
1963 if (timeout != 0 && SUCCEED == zbx_queue_ptr_empty(&asocket->client->rx_queue))
1964 {
1965 if (ZBX_IPC_WAIT_FOREVER != timeout)
1966 {
1967 struct timeval tv = {timeout, 0};
1968 evtimer_add(asocket->ev_timer, &tv);
1969 }
1970 flags = EVLOOP_ONCE;
1971 }
1972 else
1973 flags = EVLOOP_NONBLOCK;
1974
1975 asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_NONE;
1976
1977 do
1978 {
1979 event_base_loop(asocket->ev, flags);
1980 *message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&asocket->client->rx_queue);
1981 }
1982 while (NULL == *message && ZBX_IPC_ASYNC_SOCKET_STATE_NONE == asocket->state);
1983
1984 if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_TRACE) && NULL != *message)
1985 {
1986 char *data = NULL;
1987
1988 zbx_ipc_message_format(*message, &data);
1989 zabbix_log(LOG_LEVEL_DEBUG, "%s() %s", __func__, data);
1990
1991 zbx_free(data);
1992 }
1993
1994 if (NULL != *message || ZBX_IPC_ASYNC_SOCKET_STATE_ERROR != asocket->state)
1995 ret = SUCCEED;
1996 else
1997 ret = FAIL;
1998
1999 evtimer_del(asocket->ev_timer);
2000
2001 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%d", __func__, ret);
2002
2003 return ret;
2004 }
2005
2006 /******************************************************************************
2007 * *
2008 * Function: zbx_ipc_async_socket_flush *
2009 * *
2010 * Purpose: flushes unsent through asynchronous IPC socket *
2011 * *
2012 * Parameters: asocket - [IN] the asynchronous IPC service socket *
2013 * timeout - [IN] the timeout in seconds, 0 is used for *
2014 * nonblocking call and ZBX_IPC_WAIT_FOREVER is *
2015 * used for blocking call without timeout *
2016 * *
2017 * Return value: SUCCEED - the data was flushed successfully or timeout *
2018 * occurred. Use zbx_ipc_client_unsent_data() to *
2019 * check if all data was sent. *
2020 * FAIL - failed to send data (connection was closed or an *
2021 * error occurred). *
2022 * *
2023 ******************************************************************************/
zbx_ipc_async_socket_flush(zbx_ipc_async_socket_t * asocket,int timeout)2024 int zbx_ipc_async_socket_flush(zbx_ipc_async_socket_t *asocket, int timeout)
2025 {
2026 int ret = FAIL, flags;
2027
2028 zabbix_log(LOG_LEVEL_DEBUG, "In %s() timeout:%d", __func__, timeout);
2029
2030 if (0 == asocket->client->tx_bytes)
2031 {
2032 ret = SUCCEED;
2033 goto out;
2034 }
2035
2036 if (ZBX_IPC_ASYNC_SOCKET_STATE_ERROR == asocket->state)
2037 goto out;
2038
2039 asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_NONE;
2040
2041 if (0 != timeout)
2042 {
2043 if (ZBX_IPC_WAIT_FOREVER != timeout)
2044 {
2045 struct timeval tv = {timeout, 0};
2046 evtimer_add(asocket->ev_timer, &tv);
2047 }
2048 flags = EVLOOP_ONCE;
2049 }
2050 else
2051 flags = EVLOOP_NONBLOCK;
2052
2053 do
2054 {
2055 event_base_loop(asocket->ev, flags);
2056
2057 if (SUCCEED != zbx_ipc_client_connected(asocket->client))
2058 goto out;
2059 }
2060 while (0 != timeout && 0 != asocket->client->tx_bytes && ZBX_IPC_ASYNC_SOCKET_STATE_NONE == asocket->state);
2061
2062 if (ZBX_IPC_ASYNC_SOCKET_STATE_ERROR != asocket->state)
2063 {
2064 ret = SUCCEED;
2065 asocket->state = ZBX_IPC_CLIENT_STATE_NONE;
2066 }
2067 out:
2068 evtimer_del(asocket->ev_timer);
2069
2070 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%d", __func__, ret);
2071
2072 return ret;
2073 }
2074
2075 /******************************************************************************
2076 * *
2077 * Function: zbx_ipc_async_socket_check_unsent *
2078 * *
2079 * Purpose: check if there are data to be sent *
2080 * *
2081 * Parameters: asocket - [IN] the asynchronous IPC service socket *
2082 * *
2083 * Return value: SUCCEED - there are messages queued to be sent *
2084 * FAIL - all data has been sent *
2085 * *
2086 ******************************************************************************/
zbx_ipc_async_socket_check_unsent(zbx_ipc_async_socket_t * asocket)2087 int zbx_ipc_async_socket_check_unsent(zbx_ipc_async_socket_t *asocket)
2088 {
2089 return (0 == asocket->client->tx_bytes ? FAIL : SUCCEED);
2090 }
2091
2092 /******************************************************************************
2093 * *
2094 * Function: zbx_ipc_async_socket_connected *
2095 * *
2096 * Purpose: check if socket is connected *
2097 * *
2098 * Parameters: asocket - [IN] the asynchronous IPC service socket *
2099 * *
2100 * Return value: SUCCEED - socket is connected *
2101 * FAIL - otherwise *
2102 * *
2103 ******************************************************************************/
zbx_ipc_async_socket_connected(zbx_ipc_async_socket_t * asocket)2104 int zbx_ipc_async_socket_connected(zbx_ipc_async_socket_t *asocket)
2105 {
2106 if (NULL == asocket->client)
2107 return FAIL;
2108
2109 return zbx_ipc_client_connected(asocket->client);
2110 }
2111
2112 /******************************************************************************
2113 * *
2114 * Function: zbx_ipc_async_exchange *
2115 * *
2116 * Purpose: connect, send message and receive response in a given timeout *
2117 * *
2118 * Parameters: service_name - [IN] the IPC service name *
2119 * code - [IN] the message code *
2120 * timeout - [IN] time allowed to be spent on receive, note *
2121 * that this does not include open, send and *
2122 * flush that have their own timeouts *
2123 * data - [IN] the data *
2124 * size - [IN] the data size *
2125 * out - [OUT] the received message or NULL on error *
2126 * The message must be freed by zbx_free() *
2127 * error - [OUT] the error message *
2128 * *
2129 * Return value: SUCCEED - successfully sent message and received response *
2130 * FAIL - error occurred *
2131 * *
2132 ******************************************************************************/
zbx_ipc_async_exchange(const char * service_name,zbx_uint32_t code,int timeout,const unsigned char * data,zbx_uint32_t size,unsigned char ** out,char ** error)2133 int zbx_ipc_async_exchange(const char *service_name, zbx_uint32_t code, int timeout, const unsigned char *data,
2134 zbx_uint32_t size, unsigned char **out, char **error)
2135 {
2136 zbx_ipc_message_t *message;
2137 zbx_ipc_async_socket_t asocket;
2138 int ret = FAIL;
2139
2140 zabbix_log(LOG_LEVEL_DEBUG, "In %s() service:'%s' code:%u timeout:%d", __func__, service_name, code, timeout);
2141
2142 if (FAIL == zbx_ipc_async_socket_open(&asocket, service_name, timeout, error))
2143 goto out;
2144
2145 if (FAIL == zbx_ipc_async_socket_send(&asocket, code, data, size))
2146 {
2147 *error = zbx_strdup(NULL, "Cannot send request");
2148 goto fail;
2149 }
2150
2151 if (FAIL == zbx_ipc_async_socket_flush(&asocket, timeout))
2152 {
2153 *error = zbx_strdup(NULL, "Cannot flush request");
2154 goto fail;
2155 }
2156
2157 if (FAIL == zbx_ipc_async_socket_recv(&asocket, timeout, &message))
2158 {
2159 *error = zbx_strdup(NULL, "Cannot receive response");
2160 goto fail;
2161 }
2162
2163 if (NULL == message)
2164 {
2165 *error = zbx_strdup(NULL, "Timeout while waiting for response");
2166 goto fail;
2167 }
2168
2169 *out = message->data;
2170 message->data = NULL;
2171
2172 zbx_ipc_message_free(message);
2173 ret = SUCCEED;
2174 fail:
2175 zbx_ipc_async_socket_close(&asocket);
2176 out:
2177 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
2178 return ret;
2179 }
2180
2181 #endif
2182