1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED.
3 *
4 * See file LICENSE for terms.
5 */
6
7 #ifdef HAVE_CONFIG_H
8 # include "config.h"
9 #endif
10
11 #include "libstats.h"
12
13 #include <arpa/inet.h>
14 #include <sys/socket.h>
15 #include <sys/time.h>
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <errno.h>
19 #include <assert.h>
20 #include <string.h>
21 #include <netdb.h>
22 #include <unistd.h>
23 #include <pthread.h>
24 #include <inttypes.h>
25 #include <sys/uio.h>
26
27 #include <ucs/datastruct/sglib_wrapper.h>
28 #include <ucs/sys/compiler.h>
29 #include <ucs/debug/log.h>
30
31 #define UCS_STATS_MAGIC "UCSSTAT1"
32 #define UCS_STATS_MSG_FRAG_SIZE 1400
33 #define ENTITY_HASH_SIZE 997
34
35
36 /* UDP packet header */
37 typedef struct ucs_stats_packet_hdr {
38 char magic[8];
39 uint64_t timestamp;
40 uint32_t total_size;
41 uint32_t frag_offset;
42 uint32_t frag_size;
43 } UCS_S_PACKED ucs_stats_packet_hdr_t;
44
45
46 /* Fragment assembly hole free-list */
47 typedef struct frag_hole {
48 ucs_list_link_t list;
49 size_t size; /* Including this struct */
50 } frag_hole_t;
51
52
53 /* An entity which reports statistics */
54 typedef struct stats_entity stats_entity_t;
55 struct stats_entity {
56 struct sockaddr_in in_addr; /* Entity address */
57 uint64_t timestamp; /* Current timestamp */
58 size_t buffer_size; /* Buffer size */
59 void *inprogress_buffer; /* Fragment assembly buffer */
60 ucs_list_link_t holes; /* List of holes in the buffer */
61 stats_entity_t *next; /* Hash link */
62
63 pthread_mutex_t lock;
64 volatile unsigned refcount;
65 void *completed_buffer; /* Completed buffer */
66 struct timeval update_time;
67 };
68
69
70 /* Client context */
71 typedef struct ucs_stats_client {
72 int sockfd;
73 } ucs_stats_client_t;
74
75
76 /* Server context */
77 typedef struct ucs_stats_server {
78 int sockfd;
79 int udp_port;
80 pthread_t server_thread;
81 volatile unsigned long rcvd_packets;
82 volatile int stop;
83 ucs_list_link_t curr_stats;
84 pthread_mutex_t entities_lock;
85 stats_entity_t* entities_hash[ENTITY_HASH_SIZE];
86 } ucs_stats_server_t;
87
88
SGLIB_DEFINE_LIST_PROTOTYPES(stats_entity_t,stats_entity_cmp,next)89 SGLIB_DEFINE_LIST_PROTOTYPES(stats_entity_t, stats_entity_cmp, next)
90 SGLIB_DEFINE_HASHED_CONTAINER_PROTOTYPES(stats_entity_t, ENTITY_HASH_SIZE, stats_entity_hash)
91
92
93 ucs_status_t ucs_stats_client_init(const char *server_addr, int port, ucs_stats_client_h *p_client)
94 {
95 ucs_stats_client_h client;
96 struct sockaddr_in saddr;
97 struct hostent *he;
98 ucs_status_t status;
99 int ret;
100
101 client = malloc(sizeof *client);
102 if (client == NULL) {
103 status = UCS_ERR_NO_MEMORY;
104 goto err;
105 }
106
107 he = gethostbyname(server_addr);
108 if (he == NULL || he->h_addr_list == NULL) {
109 ucs_error("failed to resolve address of '%s'", server_addr);
110 status = UCS_ERR_INVALID_ADDR;
111 goto err_free;
112 }
113
114 saddr.sin_family = he->h_addrtype;
115 saddr.sin_port = htons(port);
116 assert(he->h_length == sizeof(saddr.sin_addr));
117 memcpy(&saddr.sin_addr, he->h_addr_list[0], he->h_length);
118 memset(saddr.sin_zero, 0, sizeof(saddr.sin_zero));
119
120 client->sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
121 if (client->sockfd < 0) {
122 ucs_error("socket() failed: %m");
123 status = UCS_ERR_IO_ERROR;
124 goto err_free;
125 }
126
127 ret = connect(client->sockfd, (struct sockaddr *)&saddr, sizeof(saddr));
128 if (ret < 0) {
129 ucs_error("connect(%d) failed: %m", client->sockfd);
130 status = UCS_ERR_IO_ERROR;
131 goto err_close;
132 }
133
134 *p_client = client;
135 return UCS_OK;
136
137 err_close:
138 close(client->sockfd);
139 err_free:
140 free(client);
141 err:
142 return status;
143 }
144
ucs_stats_client_cleanup(ucs_stats_client_h client)145 void ucs_stats_client_cleanup(ucs_stats_client_h client)
146 {
147 close(client->sockfd);
148 free(client);
149 }
150
151 static ucs_status_t
ucs_stats_sock_send_frags(int sockfd,uint64_t timestamp,void * buffer,size_t size)152 ucs_stats_sock_send_frags(int sockfd, uint64_t timestamp, void *buffer, size_t size)
153 {
154 struct iovec iov[2];
155 ucs_stats_packet_hdr_t hdr;
156 size_t frag_size, offset;
157 ssize_t nsent;
158 size_t max_frag = UCS_STATS_MSG_FRAG_SIZE - sizeof(hdr);
159
160 offset = 0;
161
162 memcpy(hdr.magic, UCS_STATS_MAGIC, sizeof(hdr.magic));
163 hdr.total_size = size;
164 hdr.timestamp = timestamp;
165
166 while (offset < size) {
167 frag_size = size - offset;
168 if (frag_size > max_frag) {
169 frag_size = max_frag;
170 }
171
172 hdr.frag_offset = offset;
173 hdr.frag_size = frag_size;
174
175 iov[0].iov_base = &hdr;
176 iov[0].iov_len = sizeof(hdr);
177 iov[1].iov_base = UCS_PTR_BYTE_OFFSET(buffer, offset);
178 iov[1].iov_len = hdr.frag_size;
179
180 nsent = writev(sockfd, iov, 2);
181 if (nsent == -1) {
182 if (errno == ECONNREFUSED) {
183 ucs_trace("stats server is down");
184 return UCS_OK;
185 } else {
186 ucs_error("writev() failed: %m");
187 return UCS_ERR_IO_ERROR;
188 }
189 }
190
191 assert(nsent == sizeof(hdr) + frag_size);
192 offset += frag_size;
193 }
194
195 return UCS_OK;
196 }
197
198 ucs_status_t
ucs_stats_client_send(ucs_stats_client_h client,ucs_stats_node_t * root,uint64_t timestamp)199 ucs_stats_client_send(ucs_stats_client_h client, ucs_stats_node_t *root,
200 uint64_t timestamp)
201 {
202 ucs_status_t status;
203 FILE *stream;
204 char *buffer;
205 size_t size;
206
207 /* TODO use GLIBC custom stream */
208 stream = open_memstream(&buffer, &size);
209 if (stream == NULL) {
210 status = UCS_ERR_NO_MEMORY;
211 goto out;
212 }
213
214 status = ucs_stats_serialize(stream, root, UCS_STATS_SERIALIZE_BINARY);
215 fclose(stream);
216
217 if (status != UCS_OK) {
218 goto out_free;
219 }
220
221 /* send */
222 status = ucs_stats_sock_send_frags(client->sockfd, timestamp, buffer, size);
223
224 out_free:
225 free(buffer);
226 out:
227 return status;
228 }
229
ucs_stats_server_entity_reset_buffer(stats_entity_t * entity,size_t new_size)230 static void ucs_stats_server_entity_reset_buffer(stats_entity_t * entity,
231 size_t new_size)
232 {
233 frag_hole_t *hole;
234
235 if (new_size != entity->buffer_size) {
236 pthread_mutex_lock(&entity->lock);
237 entity->buffer_size = new_size;
238 entity->inprogress_buffer = realloc(entity->inprogress_buffer,
239 new_size + sizeof(frag_hole_t));
240 entity->completed_buffer = realloc(entity->completed_buffer,
241 new_size + sizeof(frag_hole_t));
242 pthread_mutex_unlock(&entity->lock);
243 }
244
245 hole = entity->inprogress_buffer;
246 hole->size = entity->buffer_size;
247 ucs_list_head_init(&entity->holes);
248 ucs_list_add_tail(&entity->holes, &hole->list);
249 }
250
ucs_stats_server_entity_alloc(struct sockaddr_in * addr)251 static stats_entity_t *ucs_stats_server_entity_alloc(struct sockaddr_in *addr)
252 {
253 stats_entity_t *entity;
254
255 entity = malloc(sizeof *entity);
256 if (entity == NULL) {
257 return NULL;
258 }
259
260 entity->in_addr = *addr;
261 entity->timestamp = 0;
262 entity->buffer_size = SIZE_MAX;
263 entity->inprogress_buffer = NULL;
264 entity->completed_buffer = NULL;
265 entity->refcount = 1;
266 ucs_list_head_init(&entity->holes);
267 pthread_mutex_init(&entity->lock, NULL);
268
269 ucs_stats_server_entity_reset_buffer(entity, 0);
270 return entity;
271 }
272
ucs_stats_server_entity_free(stats_entity_t * entity)273 static void ucs_stats_server_entity_free(stats_entity_t * entity)
274 {
275 free(entity->inprogress_buffer);
276 free(entity->completed_buffer);
277 free(entity);
278 }
279
280 static stats_entity_t*
ucs_stats_server_entity_get(ucs_stats_server_h server,struct sockaddr_in * addr)281 ucs_stats_server_entity_get(ucs_stats_server_h server, struct sockaddr_in *addr)
282 {
283 stats_entity_t *entity, search;
284
285 pthread_mutex_lock(&server->entities_lock);
286 search.in_addr = *addr;
287
288 entity = sglib_hashed_stats_entity_t_find_member(server->entities_hash, &search);
289 if (entity == NULL) {
290 entity = ucs_stats_server_entity_alloc(addr);
291 gettimeofday(&entity->update_time, NULL);
292 sglib_hashed_stats_entity_t_add(server->entities_hash, entity);
293 }
294
295 __sync_fetch_and_add(&entity->refcount, 1);
296 pthread_mutex_unlock(&server->entities_lock);
297
298 return entity;
299 }
300
ucs_stats_server_entity_put(stats_entity_t * entity)301 static void ucs_stats_server_entity_put(stats_entity_t * entity)
302 {
303 if (__sync_fetch_and_sub(&entity->refcount, 1) == 1) {
304 ucs_stats_server_entity_free(entity);
305 }
306 }
307
308 /**
309 * Find a hole to contain the given fragment.
310 */
311 static frag_hole_t *
find_frag_hole(stats_entity_t * entity,size_t frag_size,size_t frag_offset)312 find_frag_hole(stats_entity_t *entity, size_t frag_size, size_t frag_offset)
313 {
314 void *frag_start = UCS_PTR_BYTE_OFFSET(entity->inprogress_buffer, frag_offset);
315 void *frag_end = UCS_PTR_BYTE_OFFSET(entity->inprogress_buffer,
316 frag_offset + frag_size);
317 frag_hole_t *hole;
318
319 ucs_list_for_each(hole, &entity->holes, list) {
320 if ((frag_start >= (void*)hole) &&
321 (frag_end <= UCS_PTR_BYTE_OFFSET(hole, hole->size))) {
322 return hole;
323 }
324 }
325 return NULL;
326 }
327
328 /**
329 * Update statistics with new arrived fragment.
330 */
331 static ucs_status_t
ucs_stats_server_entity_update(ucs_stats_server_h server,stats_entity_t * entity,uint64_t timestamp,size_t total_size,void * frag,size_t frag_size,size_t frag_offset)332 ucs_stats_server_entity_update(ucs_stats_server_h server, stats_entity_t *entity,
333 uint64_t timestamp, size_t total_size, void *frag,
334 size_t frag_size, size_t frag_offset)
335 {
336 frag_hole_t *hole, *new_hole;
337 void *frag_start, *frag_end, *hole_end;
338
339 ucs_debug("From %s:%d - timestamp %"PRIu64", %zu..%zu / %zu",
340 inet_ntoa(entity->in_addr.sin_addr), ntohs(entity->in_addr.sin_port),
341 timestamp, frag_offset, frag_offset + frag_size, total_size);
342
343 if (timestamp < entity->timestamp) {
344 ucs_debug("Dropping - old timestamp");
345 return UCS_OK;
346 } else if (timestamp > entity->timestamp) {
347 ucs_debug("New timestamp, resetting buffer with size %zu", total_size);
348 entity->timestamp = timestamp;
349 ucs_stats_server_entity_reset_buffer(entity, total_size);
350 } else {
351 /* Make sure all packets in this timestamp have the same 'total_size' */
352 if (entity->buffer_size != total_size) {
353 ucs_error("Total size in the packet is %zu, but expected is %zu",
354 total_size, entity->buffer_size);
355 }
356 }
357
358 hole = find_frag_hole(entity, frag_size, frag_offset);
359 if (hole == NULL) {
360 ucs_error("cannot fill fragment (offset %zu size %zu)", frag_offset, frag_size);
361 return UCS_ERR_MESSAGE_TRUNCATED;
362 }
363
364 frag_start = UCS_PTR_BYTE_OFFSET(entity->inprogress_buffer, frag_offset);
365 frag_end = UCS_PTR_BYTE_OFFSET(entity->inprogress_buffer,
366 frag_offset + frag_size);
367 hole_end = UCS_PTR_BYTE_OFFSET(hole, hole->size);
368
369 ucs_debug("inserting into a hole of %zu..%zu",
370 UCS_PTR_BYTE_DIFF(entity->inprogress_buffer, hole),
371 UCS_PTR_BYTE_DIFF(entity->inprogress_buffer, hole_end));
372
373 /* If the fragment does not reach the end of the hole, create a new hole
374 * in this space.
375 */
376 if (frag_end < hole_end) {
377 /* Make sure we don't create a hole which is too small for a free-list
378 * pointer to fit in. An exception is the last fragment.
379 */
380 assert((UCS_PTR_BYTE_DIFF(frag_end, hole_end) >= sizeof(*new_hole)) ||
381 (hole_end == UCS_PTR_BYTE_OFFSET(entity->inprogress_buffer,
382 entity->buffer_size)));
383 new_hole = frag_end;
384 new_hole->size = UCS_PTR_BYTE_DIFF(frag_end, hole_end);
385 ucs_list_insert_after(&hole->list, &new_hole->list);
386 }
387
388 /* If we have room before the fragment, resize the hole. Otherwise, delete it */
389 if (frag_start > (void*)hole) {
390 assert(UCS_PTR_BYTE_DIFF(hole, frag_start) >= sizeof(*hole));
391 hole->size = UCS_PTR_BYTE_DIFF(hole, frag_start);
392 } else {
393 ucs_list_del(&hole->list);
394 }
395
396 /* Copy the fragment */
397 memcpy(frag_start, frag, frag_size);
398
399 /* Completed? */
400 if (ucs_list_is_empty(&entity->holes)) {
401 ucs_debug("timestamp %"PRIu64" fully assembled", entity->timestamp);
402 pthread_mutex_lock(&entity->lock);
403 memcpy(entity->completed_buffer, entity->inprogress_buffer, entity->buffer_size);
404 pthread_mutex_unlock(&entity->lock);
405 }
406
407 return UCS_OK;
408 }
409
410 /**
411 * Update context with new arrived packet.
412 */
413 static ucs_status_t
ucs_stats_server_update_context(ucs_stats_server_h server,struct sockaddr_in * sender,ucs_stats_packet_hdr_t * pkt,size_t pkt_len)414 ucs_stats_server_update_context(ucs_stats_server_h server, struct sockaddr_in *sender,
415 ucs_stats_packet_hdr_t *pkt, size_t pkt_len)
416 {
417 stats_entity_t *entity;
418 ucs_status_t status;
419
420 /* Validate fragment size */
421 if (pkt_len != pkt->frag_size + sizeof(ucs_stats_packet_hdr_t)) {
422 ucs_error("Invalid receive size: expected %zu, got %zu",
423 pkt->frag_size + sizeof(ucs_stats_packet_hdr_t), pkt_len);
424 return UCS_ERR_MESSAGE_TRUNCATED;
425 }
426
427 /* Validate magic */
428 if (memcmp(pkt->magic, UCS_STATS_MAGIC, sizeof(pkt->magic)) != 0) {
429 ucs_error("Invalid magic in packet header");
430 return UCS_ERR_INVALID_PARAM;
431 }
432
433 /* Find or create the entity */
434 entity = ucs_stats_server_entity_get(server, sender);
435
436 pthread_mutex_lock(&entity->lock);
437 gettimeofday(&entity->update_time, NULL);
438 pthread_mutex_unlock(&entity->lock);
439
440 /* Update the entity */
441 status = ucs_stats_server_entity_update(server, entity, pkt->timestamp,
442 pkt->total_size, pkt + 1,
443 pkt->frag_size, pkt->frag_offset);
444
445 ucs_stats_server_entity_put(entity);
446 ++server->rcvd_packets;
447 return status;
448 }
449
ucs_stats_server_create_socket(int udp_port,int * p_sockfd,int * p_udp_port)450 static ucs_status_t ucs_stats_server_create_socket(int udp_port, int *p_sockfd,
451 int *p_udp_port)
452 {
453 struct sockaddr_in saddr;
454 socklen_t socklen;
455 int sockfd;
456 int ret;
457
458 sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
459 if (sockfd < 0) {
460 ucs_error("socked() failed: %m");
461 return UCS_ERR_IO_ERROR;
462 }
463
464 saddr.sin_family = AF_INET;
465 saddr.sin_addr.s_addr = INADDR_ANY;
466 saddr.sin_port = htons(udp_port);
467 memset(saddr.sin_zero, 0, sizeof(saddr.sin_zero));
468
469 ret = bind(sockfd, (struct sockaddr*)&saddr, sizeof(saddr));
470 if (ret < 0) {
471 ucs_error("Failed to bind socket to port %u: %m", udp_port);
472 goto err_close_sock;
473 }
474
475 socklen = sizeof(saddr);
476 ret = getsockname(sockfd, (struct sockaddr*)&saddr, &socklen);
477 if (ret < 0) {
478 ucs_error("getsockname(%d) failed: %m", sockfd);
479 goto err_close_sock;
480 }
481
482 *p_sockfd = sockfd;
483 *p_udp_port = ntohs(saddr.sin_port);
484 return UCS_OK;
485
486 err_close_sock:
487 close(sockfd);
488 return UCS_ERR_INVALID_ADDR;
489 }
490
ucs_stats_server_clear_old_enitities(ucs_stats_server_h server)491 static void ucs_stats_server_clear_old_enitities(ucs_stats_server_h server)
492 {
493 struct sglib_hashed_stats_entity_t_iterator it;
494 stats_entity_t *entity;
495 struct timeval current, diff;
496
497 gettimeofday(¤t, NULL);
498
499 pthread_mutex_lock(&server->entities_lock);
500 entity = sglib_hashed_stats_entity_t_it_init(&it,server->entities_hash);
501 while (entity != NULL) {
502 pthread_mutex_lock(&entity->lock);
503 timersub(¤t, &entity->update_time, &diff);
504 pthread_mutex_unlock(&entity->lock);
505
506 if (diff.tv_sec > 5.0) {
507 sglib_hashed_stats_entity_t_delete(server->entities_hash, entity);
508 ucs_stats_server_entity_put(entity);
509 }
510 entity = sglib_hashed_stats_entity_t_it_next(&it);
511 }
512
513 pthread_mutex_unlock(&server->entities_lock);
514 }
515
ucs_stats_server_thread_func(void * arg)516 static void* ucs_stats_server_thread_func(void *arg)
517 {
518 ucs_stats_server_h server = arg;
519 struct sockaddr_in recv_addr;
520 socklen_t recv_addr_len;
521 char recv_buf[UCS_STATS_MSG_FRAG_SIZE];
522 ssize_t recv_len;
523 ucs_status_t status;
524
525 ucs_debug("starting server thread");
526 while (!server->stop) {
527 recv_addr_len = sizeof(recv_addr);
528 recv_len = recvfrom(server->sockfd, recv_buf, UCS_STATS_MSG_FRAG_SIZE, 0,
529 (struct sockaddr*)&recv_addr, &recv_addr_len);
530 if (recv_len < 0) {
531 ucs_error("recvfrom() failed: %s (return value: %ld)", strerror(errno),
532 recv_len);
533 break;
534 } else if (recv_len == 0) {
535 ucs_debug("Empty receive - ignoring");
536 continue;
537 }
538
539 if (recv_addr.sin_family != AF_INET) {
540 ucs_error("invalid address family from recvfrom()");
541 break;
542 }
543
544 /* Update with new data */
545 /* coverity[tainted_data] */
546 status = ucs_stats_server_update_context(server, &recv_addr, (void*)recv_buf, recv_len);
547 if (status != UCS_OK) {
548 break;
549 }
550
551 ucs_stats_server_clear_old_enitities(server);
552 }
553
554 ucs_debug("terminating server thread");
555 return NULL;
556 }
557
ucs_stats_server_start(int port,ucs_stats_server_h * p_server)558 ucs_status_t ucs_stats_server_start(int port, ucs_stats_server_h *p_server)
559 {
560 ucs_stats_server_h server;
561 ucs_status_t status;
562
563 server = malloc(sizeof *server);
564 if (server == NULL) {
565 ucs_error("Failed to allocate stats context");
566 return UCS_ERR_NO_MEMORY;
567 }
568
569 pthread_mutex_init(&server->entities_lock, NULL);
570 ucs_list_head_init(&server->curr_stats);
571 sglib_hashed_stats_entity_t_init(server->entities_hash);
572
573 status = ucs_stats_server_create_socket(port, &server->sockfd, &server->udp_port);
574 if (status != UCS_OK) {
575 free(server);
576 return status;
577 }
578
579 server->rcvd_packets = 0;
580 server->stop = 0;
581 pthread_create(&server->server_thread, NULL, ucs_stats_server_thread_func,
582 server);
583
584 *p_server = server;
585 return UCS_OK;
586 }
587
ucs_stats_server_destroy(ucs_stats_server_h server)588 void ucs_stats_server_destroy(ucs_stats_server_h server)
589 {
590 struct sglib_hashed_stats_entity_t_iterator it;
591 stats_entity_t *entity;
592 void *retval;
593
594 server->stop = 1;
595 shutdown(server->sockfd, SHUT_RDWR);
596 pthread_join(server->server_thread, &retval);
597 close(server->sockfd);
598
599 ucs_stats_server_purge_stats(server);
600
601 entity = sglib_hashed_stats_entity_t_it_init(&it,server->entities_hash);
602 while (entity != NULL) {
603 ucs_stats_server_entity_put(entity);
604 entity = sglib_hashed_stats_entity_t_it_next(&it);
605 }
606 free(server);
607 }
608
ucs_stats_server_get_port(ucs_stats_server_h server)609 int ucs_stats_server_get_port(ucs_stats_server_h server)
610 {
611 return server->udp_port;
612 }
613
ucs_stats_server_get_stats(ucs_stats_server_h server)614 ucs_list_link_t *ucs_stats_server_get_stats(ucs_stats_server_h server)
615 {
616 struct sglib_hashed_stats_entity_t_iterator it;
617 stats_entity_t *entity;
618 ucs_stats_node_t *node;
619 ucs_status_t status;
620 FILE *stream;
621
622 ucs_stats_server_purge_stats(server);
623
624 pthread_mutex_lock(&server->entities_lock);
625 for (entity = sglib_hashed_stats_entity_t_it_init(&it, server->entities_hash);
626 entity != NULL; entity = sglib_hashed_stats_entity_t_it_next(&it))
627 {
628 /* Parse the statistics data */
629 pthread_mutex_lock(&entity->lock);
630 stream = fmemopen(entity->completed_buffer, entity->buffer_size, "rb");
631 status = ucs_stats_deserialize(stream, &node);
632 fclose(stream);
633 pthread_mutex_unlock(&entity->lock);
634
635 if (status == UCS_OK) {
636 ucs_list_add_tail(&server->curr_stats, &node->list);
637 }
638 }
639 pthread_mutex_unlock(&server->entities_lock);
640
641 return &server->curr_stats;
642 }
643
ucs_stats_server_purge_stats(ucs_stats_server_h server)644 void ucs_stats_server_purge_stats(ucs_stats_server_h server)
645 {
646 ucs_stats_node_t *node, *tmp;
647
648 ucs_list_for_each_safe(node, tmp, &server->curr_stats, list) {
649 ucs_list_del(&node->list);
650 ucs_stats_free(node);
651 }
652 }
653
ucs_stats_server_rcvd_packets(ucs_stats_server_h server)654 unsigned long ucs_stats_server_rcvd_packets(ucs_stats_server_h server)
655 {
656 return server->rcvd_packets;
657 }
658
stats_entity_cmp(stats_entity_t * e1,stats_entity_t * e2)659 static inline int stats_entity_cmp(stats_entity_t *e1, stats_entity_t *e2)
660 {
661 int addr_diff = e1->in_addr.sin_addr.s_addr < e2->in_addr.sin_addr.s_addr;
662 if (addr_diff != 0) {
663 return addr_diff;
664 } else {
665 return ntohs(e1->in_addr.sin_port) - ntohs(e1->in_addr.sin_port);
666 }
667 }
668
stats_entity_hash(stats_entity_t * e)669 static inline int stats_entity_hash(stats_entity_t *e)
670 {
671 return (((uint64_t)e->in_addr.sin_addr.s_addr << 16) + (uint64_t)ntohs(e->in_addr.sin_port)) % ENTITY_HASH_SIZE;
672 }
673
674 SGLIB_DEFINE_LIST_FUNCTIONS(stats_entity_t, stats_entity_cmp, next)
675 SGLIB_DEFINE_HASHED_CONTAINER_FUNCTIONS(stats_entity_t, ENTITY_HASH_SIZE, stats_entity_hash)
676