1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2014.  ALL RIGHTS RESERVED.
3 *
4 * See file LICENSE for terms.
5 */
6 
7 #ifdef HAVE_CONFIG_H
8 #  include "config.h"
9 #endif
10 
11 #include "libstats.h"
12 
13 #include <arpa/inet.h>
14 #include <sys/socket.h>
15 #include <sys/time.h>
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <errno.h>
19 #include <assert.h>
20 #include <string.h>
21 #include <netdb.h>
22 #include <unistd.h>
23 #include <pthread.h>
24 #include <inttypes.h>
25 #include <sys/uio.h>
26 
27 #include <ucs/datastruct/sglib_wrapper.h>
28 #include <ucs/sys/compiler.h>
29 #include <ucs/debug/log.h>
30 
31 #define UCS_STATS_MAGIC            "UCSSTAT1"
32 #define UCS_STATS_MSG_FRAG_SIZE    1400
33 #define ENTITY_HASH_SIZE           997
34 
35 
36 /* UDP packet header */
37 typedef struct ucs_stats_packet_hdr {
38     char                magic[8];
39     uint64_t            timestamp;
40     uint32_t            total_size;
41     uint32_t            frag_offset;
42     uint32_t            frag_size;
43 } UCS_S_PACKED ucs_stats_packet_hdr_t;
44 
45 
46 /* Fragment assembly hole free-list */
47 typedef struct frag_hole {
48     ucs_list_link_t     list;
49     size_t              size; /* Including this struct */
50 } frag_hole_t;
51 
52 
53 /* An entity which reports statistics */
54 typedef struct stats_entity stats_entity_t;
55 struct stats_entity {
56     struct sockaddr_in  in_addr;        /* Entity address */
57     uint64_t            timestamp;      /* Current timestamp */
58     size_t              buffer_size;    /* Buffer size */
59     void                *inprogress_buffer;    /* Fragment assembly buffer */
60     ucs_list_link_t     holes;          /* List of holes in the buffer */
61     stats_entity_t      *next;          /* Hash link */
62 
63     pthread_mutex_t     lock;
64     volatile unsigned   refcount;
65     void                *completed_buffer;  /* Completed buffer */
66     struct timeval      update_time;
67 };
68 
69 
70 /* Client context */
71 typedef struct ucs_stats_client {
72     int              sockfd;
73 } ucs_stats_client_t;
74 
75 
76 /* Server context */
77 typedef struct ucs_stats_server {
78     int                     sockfd;
79     int                     udp_port;
80     pthread_t               server_thread;
81     volatile unsigned long  rcvd_packets;
82     volatile int            stop;
83     ucs_list_link_t         curr_stats;
84     pthread_mutex_t         entities_lock;
85     stats_entity_t*         entities_hash[ENTITY_HASH_SIZE];
86 } ucs_stats_server_t;
87 
88 
SGLIB_DEFINE_LIST_PROTOTYPES(stats_entity_t,stats_entity_cmp,next)89 SGLIB_DEFINE_LIST_PROTOTYPES(stats_entity_t, stats_entity_cmp, next)
90 SGLIB_DEFINE_HASHED_CONTAINER_PROTOTYPES(stats_entity_t, ENTITY_HASH_SIZE, stats_entity_hash)
91 
92 
93 ucs_status_t ucs_stats_client_init(const char *server_addr, int port, ucs_stats_client_h *p_client)
94 {
95     ucs_stats_client_h client;
96     struct sockaddr_in saddr;
97     struct hostent *he;
98     ucs_status_t status;
99     int ret;
100 
101     client = malloc(sizeof *client);
102     if (client == NULL) {
103         status = UCS_ERR_NO_MEMORY;
104         goto err;
105     }
106 
107     he = gethostbyname(server_addr);
108     if (he == NULL || he->h_addr_list == NULL) {
109         ucs_error("failed to resolve address of '%s'", server_addr);
110         status = UCS_ERR_INVALID_ADDR;
111         goto err_free;
112     }
113 
114     saddr.sin_family = he->h_addrtype;
115     saddr.sin_port   = htons(port);
116     assert(he->h_length == sizeof(saddr.sin_addr));
117     memcpy(&saddr.sin_addr, he->h_addr_list[0], he->h_length);
118     memset(saddr.sin_zero, 0, sizeof(saddr.sin_zero));
119 
120     client->sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
121     if (client->sockfd < 0) {
122         ucs_error("socket() failed: %m");
123         status = UCS_ERR_IO_ERROR;
124         goto err_free;
125     }
126 
127     ret = connect(client->sockfd, (struct sockaddr *)&saddr, sizeof(saddr));
128     if (ret < 0) {
129         ucs_error("connect(%d) failed: %m", client->sockfd);
130         status = UCS_ERR_IO_ERROR;
131         goto err_close;
132     }
133 
134     *p_client = client;
135     return UCS_OK;
136 
137 err_close:
138     close(client->sockfd);
139 err_free:
140     free(client);
141 err:
142     return status;
143 }
144 
ucs_stats_client_cleanup(ucs_stats_client_h client)145 void ucs_stats_client_cleanup(ucs_stats_client_h client)
146 {
147     close(client->sockfd);
148     free(client);
149 }
150 
151 static ucs_status_t
ucs_stats_sock_send_frags(int sockfd,uint64_t timestamp,void * buffer,size_t size)152 ucs_stats_sock_send_frags(int sockfd, uint64_t timestamp, void *buffer, size_t size)
153 {
154     struct iovec iov[2];
155     ucs_stats_packet_hdr_t hdr;
156     size_t frag_size, offset;
157     ssize_t nsent;
158     size_t max_frag = UCS_STATS_MSG_FRAG_SIZE - sizeof(hdr);
159 
160     offset = 0;
161 
162     memcpy(hdr.magic, UCS_STATS_MAGIC, sizeof(hdr.magic));
163     hdr.total_size  = size;
164     hdr.timestamp   = timestamp;
165 
166     while (offset < size) {
167         frag_size = size - offset;
168         if (frag_size > max_frag) {
169             frag_size = max_frag;
170         }
171 
172         hdr.frag_offset = offset;
173         hdr.frag_size   = frag_size;
174 
175         iov[0].iov_base = &hdr;
176         iov[0].iov_len  = sizeof(hdr);
177         iov[1].iov_base = UCS_PTR_BYTE_OFFSET(buffer, offset);
178         iov[1].iov_len  = hdr.frag_size;
179 
180         nsent = writev(sockfd, iov, 2);
181         if (nsent == -1) {
182             if (errno == ECONNREFUSED) {
183                 ucs_trace("stats server is down");
184                 return UCS_OK;
185             } else {
186                 ucs_error("writev() failed: %m");
187                 return UCS_ERR_IO_ERROR;
188             }
189         }
190 
191         assert(nsent == sizeof(hdr) + frag_size);
192         offset += frag_size;
193     }
194 
195     return UCS_OK;
196 }
197 
198 ucs_status_t
ucs_stats_client_send(ucs_stats_client_h client,ucs_stats_node_t * root,uint64_t timestamp)199 ucs_stats_client_send(ucs_stats_client_h client, ucs_stats_node_t *root,
200                       uint64_t timestamp)
201 {
202     ucs_status_t status;
203     FILE *stream;
204     char *buffer;
205     size_t size;
206 
207     /* TODO use GLIBC custom stream */
208     stream = open_memstream(&buffer, &size);
209     if (stream == NULL) {
210         status = UCS_ERR_NO_MEMORY;
211         goto out;
212     }
213 
214     status = ucs_stats_serialize(stream, root, UCS_STATS_SERIALIZE_BINARY);
215     fclose(stream);
216 
217     if (status != UCS_OK) {
218         goto out_free;
219     }
220 
221     /* send */
222     status = ucs_stats_sock_send_frags(client->sockfd, timestamp, buffer, size);
223 
224 out_free:
225     free(buffer);
226 out:
227     return status;
228 }
229 
ucs_stats_server_entity_reset_buffer(stats_entity_t * entity,size_t new_size)230 static void ucs_stats_server_entity_reset_buffer(stats_entity_t * entity,
231                                                  size_t new_size)
232 {
233     frag_hole_t *hole;
234 
235     if (new_size != entity->buffer_size) {
236         pthread_mutex_lock(&entity->lock);
237         entity->buffer_size = new_size;
238         entity->inprogress_buffer = realloc(entity->inprogress_buffer,
239                                             new_size + sizeof(frag_hole_t));
240         entity->completed_buffer  = realloc(entity->completed_buffer,
241                                             new_size + sizeof(frag_hole_t));
242         pthread_mutex_unlock(&entity->lock);
243     }
244 
245     hole = entity->inprogress_buffer;
246     hole->size = entity->buffer_size;
247     ucs_list_head_init(&entity->holes);
248     ucs_list_add_tail(&entity->holes, &hole->list);
249 }
250 
ucs_stats_server_entity_alloc(struct sockaddr_in * addr)251 static stats_entity_t *ucs_stats_server_entity_alloc(struct sockaddr_in *addr)
252 {
253     stats_entity_t *entity;
254 
255     entity = malloc(sizeof *entity);
256     if (entity == NULL) {
257         return NULL;
258     }
259 
260     entity->in_addr           = *addr;
261     entity->timestamp         = 0;
262     entity->buffer_size       = SIZE_MAX;
263     entity->inprogress_buffer = NULL;
264     entity->completed_buffer  = NULL;
265     entity->refcount          = 1;
266     ucs_list_head_init(&entity->holes);
267     pthread_mutex_init(&entity->lock, NULL);
268 
269     ucs_stats_server_entity_reset_buffer(entity, 0);
270     return entity;
271 }
272 
ucs_stats_server_entity_free(stats_entity_t * entity)273 static void ucs_stats_server_entity_free(stats_entity_t * entity)
274 {
275     free(entity->inprogress_buffer);
276     free(entity->completed_buffer);
277     free(entity);
278 }
279 
280 static stats_entity_t*
ucs_stats_server_entity_get(ucs_stats_server_h server,struct sockaddr_in * addr)281 ucs_stats_server_entity_get(ucs_stats_server_h server, struct sockaddr_in *addr)
282 {
283     stats_entity_t *entity, search;
284 
285     pthread_mutex_lock(&server->entities_lock);
286     search.in_addr = *addr;
287 
288     entity = sglib_hashed_stats_entity_t_find_member(server->entities_hash, &search);
289     if (entity == NULL) {
290         entity = ucs_stats_server_entity_alloc(addr);
291         gettimeofday(&entity->update_time, NULL);
292         sglib_hashed_stats_entity_t_add(server->entities_hash, entity);
293     }
294 
295     __sync_fetch_and_add(&entity->refcount, 1);
296     pthread_mutex_unlock(&server->entities_lock);
297 
298     return entity;
299 }
300 
ucs_stats_server_entity_put(stats_entity_t * entity)301 static void ucs_stats_server_entity_put(stats_entity_t * entity)
302 {
303     if (__sync_fetch_and_sub(&entity->refcount, 1) == 1) {
304         ucs_stats_server_entity_free(entity);
305     }
306 }
307 
308 /**
309  * Find a hole to contain the given fragment.
310  */
311 static frag_hole_t *
find_frag_hole(stats_entity_t * entity,size_t frag_size,size_t frag_offset)312 find_frag_hole(stats_entity_t *entity, size_t frag_size, size_t frag_offset)
313 {
314     void *frag_start = UCS_PTR_BYTE_OFFSET(entity->inprogress_buffer, frag_offset);
315     void *frag_end   = UCS_PTR_BYTE_OFFSET(entity->inprogress_buffer,
316                                            frag_offset + frag_size);
317     frag_hole_t *hole;
318 
319     ucs_list_for_each(hole, &entity->holes, list) {
320         if ((frag_start >= (void*)hole) &&
321             (frag_end <= UCS_PTR_BYTE_OFFSET(hole, hole->size))) {
322             return hole;
323         }
324     }
325     return NULL;
326 }
327 
328 /**
329  * Update statistics with new arrived fragment.
330  */
331 static ucs_status_t
ucs_stats_server_entity_update(ucs_stats_server_h server,stats_entity_t * entity,uint64_t timestamp,size_t total_size,void * frag,size_t frag_size,size_t frag_offset)332 ucs_stats_server_entity_update(ucs_stats_server_h server, stats_entity_t *entity,
333                                uint64_t timestamp, size_t total_size, void *frag,
334                                size_t frag_size, size_t frag_offset)
335 {
336     frag_hole_t *hole, *new_hole;
337     void *frag_start, *frag_end, *hole_end;
338 
339     ucs_debug("From %s:%d - timestamp %"PRIu64", %zu..%zu / %zu",
340               inet_ntoa(entity->in_addr.sin_addr), ntohs(entity->in_addr.sin_port),
341               timestamp, frag_offset, frag_offset + frag_size, total_size);
342 
343     if (timestamp < entity->timestamp) {
344         ucs_debug("Dropping - old timestamp");
345         return UCS_OK;
346     } else if (timestamp > entity->timestamp) {
347         ucs_debug("New timestamp, resetting buffer with size %zu", total_size);
348         entity->timestamp = timestamp;
349         ucs_stats_server_entity_reset_buffer(entity, total_size);
350     } else {
351         /* Make sure all packets in this timestamp have the same 'total_size' */
352         if (entity->buffer_size != total_size) {
353             ucs_error("Total size in the packet is %zu, but expected is %zu",
354                       total_size, entity->buffer_size);
355         }
356     }
357 
358     hole = find_frag_hole(entity, frag_size, frag_offset);
359     if (hole == NULL) {
360         ucs_error("cannot fill fragment (offset %zu size %zu)", frag_offset, frag_size);
361         return UCS_ERR_MESSAGE_TRUNCATED;
362     }
363 
364     frag_start = UCS_PTR_BYTE_OFFSET(entity->inprogress_buffer, frag_offset);
365     frag_end   = UCS_PTR_BYTE_OFFSET(entity->inprogress_buffer,
366                                      frag_offset + frag_size);
367     hole_end   = UCS_PTR_BYTE_OFFSET(hole, hole->size);
368 
369     ucs_debug("inserting into a hole of %zu..%zu",
370               UCS_PTR_BYTE_DIFF(entity->inprogress_buffer, hole),
371               UCS_PTR_BYTE_DIFF(entity->inprogress_buffer, hole_end));
372 
373     /* If the fragment does not reach the end of the hole, create a new hole
374      * in this space.
375      */
376     if (frag_end < hole_end) {
377         /* Make sure we don't create a hole which is too small for a free-list
378          * pointer to fit in. An exception is the last fragment.
379          */
380         assert((UCS_PTR_BYTE_DIFF(frag_end, hole_end) >= sizeof(*new_hole)) ||
381                (hole_end == UCS_PTR_BYTE_OFFSET(entity->inprogress_buffer,
382                                                 entity->buffer_size)));
383         new_hole       = frag_end;
384         new_hole->size = UCS_PTR_BYTE_DIFF(frag_end, hole_end);
385         ucs_list_insert_after(&hole->list, &new_hole->list);
386     }
387 
388     /* If we have room before the fragment, resize the hole. Otherwise, delete it */
389     if (frag_start > (void*)hole) {
390         assert(UCS_PTR_BYTE_DIFF(hole, frag_start) >= sizeof(*hole));
391         hole->size = UCS_PTR_BYTE_DIFF(hole, frag_start);
392     } else {
393         ucs_list_del(&hole->list);
394     }
395 
396     /* Copy the fragment */
397     memcpy(frag_start, frag, frag_size);
398 
399     /* Completed? */
400     if (ucs_list_is_empty(&entity->holes)) {
401         ucs_debug("timestamp %"PRIu64" fully assembled", entity->timestamp);
402         pthread_mutex_lock(&entity->lock);
403         memcpy(entity->completed_buffer, entity->inprogress_buffer, entity->buffer_size);
404         pthread_mutex_unlock(&entity->lock);
405     }
406 
407     return UCS_OK;
408 }
409 
410 /**
411  * Update context with new arrived packet.
412  */
413 static ucs_status_t
ucs_stats_server_update_context(ucs_stats_server_h server,struct sockaddr_in * sender,ucs_stats_packet_hdr_t * pkt,size_t pkt_len)414 ucs_stats_server_update_context(ucs_stats_server_h server, struct sockaddr_in *sender,
415                                 ucs_stats_packet_hdr_t *pkt, size_t pkt_len)
416 {
417     stats_entity_t *entity;
418     ucs_status_t status;
419 
420     /* Validate fragment size */
421     if (pkt_len != pkt->frag_size + sizeof(ucs_stats_packet_hdr_t)) {
422         ucs_error("Invalid receive size: expected %zu, got %zu",
423                   pkt->frag_size + sizeof(ucs_stats_packet_hdr_t), pkt_len);
424         return UCS_ERR_MESSAGE_TRUNCATED;
425     }
426 
427     /* Validate magic */
428     if (memcmp(pkt->magic, UCS_STATS_MAGIC, sizeof(pkt->magic)) != 0) {
429         ucs_error("Invalid magic in packet header");
430         return UCS_ERR_INVALID_PARAM;
431     }
432 
433     /* Find or create the entity */
434     entity = ucs_stats_server_entity_get(server, sender);
435 
436     pthread_mutex_lock(&entity->lock);
437     gettimeofday(&entity->update_time, NULL);
438     pthread_mutex_unlock(&entity->lock);
439 
440     /* Update the entity */
441     status = ucs_stats_server_entity_update(server, entity, pkt->timestamp,
442                                            pkt->total_size, pkt + 1,
443                                            pkt->frag_size, pkt->frag_offset);
444 
445     ucs_stats_server_entity_put(entity);
446     ++server->rcvd_packets;
447     return status;
448 }
449 
ucs_stats_server_create_socket(int udp_port,int * p_sockfd,int * p_udp_port)450 static ucs_status_t ucs_stats_server_create_socket(int udp_port, int *p_sockfd,
451                                                   int *p_udp_port)
452 {
453     struct sockaddr_in saddr;
454     socklen_t socklen;
455     int sockfd;
456     int ret;
457 
458     sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
459     if (sockfd < 0) {
460         ucs_error("socked() failed: %m");
461         return UCS_ERR_IO_ERROR;
462     }
463 
464     saddr.sin_family      = AF_INET;
465     saddr.sin_addr.s_addr = INADDR_ANY;
466     saddr.sin_port        = htons(udp_port);
467     memset(saddr.sin_zero, 0, sizeof(saddr.sin_zero));
468 
469     ret = bind(sockfd, (struct sockaddr*)&saddr, sizeof(saddr));
470     if (ret < 0) {
471         ucs_error("Failed to bind socket to port %u: %m", udp_port);
472         goto err_close_sock;
473     }
474 
475     socklen = sizeof(saddr);
476     ret = getsockname(sockfd, (struct sockaddr*)&saddr, &socklen);
477     if (ret < 0) {
478         ucs_error("getsockname(%d) failed: %m", sockfd);
479         goto err_close_sock;
480     }
481 
482     *p_sockfd   = sockfd;
483     *p_udp_port = ntohs(saddr.sin_port);
484     return UCS_OK;
485 
486 err_close_sock:
487     close(sockfd);
488     return UCS_ERR_INVALID_ADDR;
489 }
490 
ucs_stats_server_clear_old_enitities(ucs_stats_server_h server)491 static void ucs_stats_server_clear_old_enitities(ucs_stats_server_h server)
492 {
493     struct sglib_hashed_stats_entity_t_iterator it;
494     stats_entity_t *entity;
495     struct timeval current, diff;
496 
497     gettimeofday(&current, NULL);
498 
499     pthread_mutex_lock(&server->entities_lock);
500     entity = sglib_hashed_stats_entity_t_it_init(&it,server->entities_hash);
501     while (entity != NULL) {
502         pthread_mutex_lock(&entity->lock);
503         timersub(&current, &entity->update_time, &diff);
504         pthread_mutex_unlock(&entity->lock);
505 
506         if (diff.tv_sec > 5.0) {
507             sglib_hashed_stats_entity_t_delete(server->entities_hash, entity);
508             ucs_stats_server_entity_put(entity);
509         }
510         entity = sglib_hashed_stats_entity_t_it_next(&it);
511      }
512 
513      pthread_mutex_unlock(&server->entities_lock);
514 }
515 
ucs_stats_server_thread_func(void * arg)516 static void* ucs_stats_server_thread_func(void *arg)
517 {
518     ucs_stats_server_h server = arg;
519     struct sockaddr_in recv_addr;
520     socklen_t recv_addr_len;
521     char recv_buf[UCS_STATS_MSG_FRAG_SIZE];
522     ssize_t recv_len;
523     ucs_status_t status;
524 
525     ucs_debug("starting server thread");
526     while (!server->stop) {
527         recv_addr_len = sizeof(recv_addr);
528         recv_len = recvfrom(server->sockfd, recv_buf, UCS_STATS_MSG_FRAG_SIZE, 0,
529                             (struct sockaddr*)&recv_addr, &recv_addr_len);
530         if (recv_len < 0) {
531             ucs_error("recvfrom() failed: %s (return value: %ld)", strerror(errno),
532                       recv_len);
533             break;
534         } else if (recv_len == 0) {
535             ucs_debug("Empty receive - ignoring");
536             continue;
537         }
538 
539         if (recv_addr.sin_family != AF_INET) {
540             ucs_error("invalid address family from recvfrom()");
541             break;
542         }
543 
544         /* Update with new data */
545         /* coverity[tainted_data] */
546         status = ucs_stats_server_update_context(server, &recv_addr, (void*)recv_buf, recv_len);
547         if (status != UCS_OK) {
548             break;
549         }
550 
551         ucs_stats_server_clear_old_enitities(server);
552     }
553 
554     ucs_debug("terminating server thread");
555     return NULL;
556 }
557 
ucs_stats_server_start(int port,ucs_stats_server_h * p_server)558 ucs_status_t ucs_stats_server_start(int port, ucs_stats_server_h *p_server)
559 {
560     ucs_stats_server_h server;
561     ucs_status_t status;
562 
563     server = malloc(sizeof *server);
564     if (server == NULL) {
565         ucs_error("Failed to allocate stats context");
566         return UCS_ERR_NO_MEMORY;
567     }
568 
569     pthread_mutex_init(&server->entities_lock, NULL);
570     ucs_list_head_init(&server->curr_stats);
571     sglib_hashed_stats_entity_t_init(server->entities_hash);
572 
573     status = ucs_stats_server_create_socket(port, &server->sockfd, &server->udp_port);
574     if (status != UCS_OK) {
575         free(server);
576         return status;
577     }
578 
579     server->rcvd_packets = 0;
580     server->stop         = 0;
581     pthread_create(&server->server_thread, NULL, ucs_stats_server_thread_func,
582                    server);
583 
584     *p_server = server;
585     return UCS_OK;
586 }
587 
ucs_stats_server_destroy(ucs_stats_server_h server)588 void ucs_stats_server_destroy(ucs_stats_server_h server)
589 {
590     struct sglib_hashed_stats_entity_t_iterator it;
591     stats_entity_t *entity;
592     void *retval;
593 
594     server->stop = 1;
595     shutdown(server->sockfd, SHUT_RDWR);
596     pthread_join(server->server_thread, &retval);
597     close(server->sockfd);
598 
599     ucs_stats_server_purge_stats(server);
600 
601     entity = sglib_hashed_stats_entity_t_it_init(&it,server->entities_hash);
602     while (entity != NULL) {
603         ucs_stats_server_entity_put(entity);
604         entity = sglib_hashed_stats_entity_t_it_next(&it);
605     }
606     free(server);
607 }
608 
ucs_stats_server_get_port(ucs_stats_server_h server)609 int ucs_stats_server_get_port(ucs_stats_server_h server)
610 {
611    return server->udp_port;
612 }
613 
ucs_stats_server_get_stats(ucs_stats_server_h server)614 ucs_list_link_t *ucs_stats_server_get_stats(ucs_stats_server_h server)
615 {
616     struct sglib_hashed_stats_entity_t_iterator it;
617     stats_entity_t *entity;
618     ucs_stats_node_t *node;
619     ucs_status_t status;
620     FILE *stream;
621 
622     ucs_stats_server_purge_stats(server);
623 
624     pthread_mutex_lock(&server->entities_lock);
625     for (entity = sglib_hashed_stats_entity_t_it_init(&it, server->entities_hash);
626          entity != NULL; entity = sglib_hashed_stats_entity_t_it_next(&it))
627     {
628         /* Parse the statistics data */
629         pthread_mutex_lock(&entity->lock);
630         stream = fmemopen(entity->completed_buffer, entity->buffer_size, "rb");
631         status = ucs_stats_deserialize(stream, &node);
632         fclose(stream);
633         pthread_mutex_unlock(&entity->lock);
634 
635         if (status == UCS_OK) {
636             ucs_list_add_tail(&server->curr_stats, &node->list);
637         }
638     }
639     pthread_mutex_unlock(&server->entities_lock);
640 
641     return &server->curr_stats;
642 }
643 
ucs_stats_server_purge_stats(ucs_stats_server_h server)644 void ucs_stats_server_purge_stats(ucs_stats_server_h server)
645 {
646     ucs_stats_node_t *node, *tmp;
647 
648     ucs_list_for_each_safe(node, tmp, &server->curr_stats, list) {
649         ucs_list_del(&node->list);
650         ucs_stats_free(node);
651     }
652 }
653 
ucs_stats_server_rcvd_packets(ucs_stats_server_h server)654 unsigned long ucs_stats_server_rcvd_packets(ucs_stats_server_h server)
655 {
656    return server->rcvd_packets;
657 }
658 
stats_entity_cmp(stats_entity_t * e1,stats_entity_t * e2)659 static inline int stats_entity_cmp(stats_entity_t *e1, stats_entity_t *e2)
660 {
661     int addr_diff = e1->in_addr.sin_addr.s_addr < e2->in_addr.sin_addr.s_addr;
662     if (addr_diff != 0) {
663         return addr_diff;
664     } else {
665         return ntohs(e1->in_addr.sin_port) - ntohs(e1->in_addr.sin_port);
666     }
667 }
668 
stats_entity_hash(stats_entity_t * e)669 static inline int stats_entity_hash(stats_entity_t *e)
670 {
671     return (((uint64_t)e->in_addr.sin_addr.s_addr << 16) + (uint64_t)ntohs(e->in_addr.sin_port)) % ENTITY_HASH_SIZE;
672 }
673 
674 SGLIB_DEFINE_LIST_FUNCTIONS(stats_entity_t, stats_entity_cmp, next)
675 SGLIB_DEFINE_HASHED_CONTAINER_FUNCTIONS(stats_entity_t, ENTITY_HASH_SIZE, stats_entity_hash)
676