1 /*
2  * Copyright (c) 2014, 2015, 2016  Machine Zone, Inc.
3  *
4  * Original author: Lev Walkin <lwalkin@machinezone.com>
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14 
15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25  * SUCH DAMAGE.
26  */
27 #include <stdlib.h>
28 #include <unistd.h>
29 #include <stdio.h>
30 #include <string.h>
31 #include <math.h>
32 
33 #include "tcpkali_statsd.h"
34 
35 #define SBATCH_INT(t, str, value)                               \
36     do {                                                        \
37         int ret = statsd_addToBatch(statsd, t, str, value, 1);  \
38         if(ret == STATSD_BATCH_FULL) {                          \
39             statsd_sendBatch(statsd);                           \
40             ret = statsd_addToBatch(statsd, t, str, value, 1);  \
41         }                                                       \
42         assert(ret == STATSD_SUCCESS);                          \
43     } while(0)
44 
45 #define SBATCH_DBL(t, str, value)                                   \
46     do {                                                            \
47         int ret = statsd_addToBatch_dbl(statsd, t, str, value, 1);  \
48         if(ret == STATSD_BATCH_FULL) {                              \
49             statsd_sendBatch(statsd);                               \
50             ret = statsd_addToBatch_dbl(statsd, t, str, value, 1);  \
51         }                                                           \
52         assert(ret == STATSD_SUCCESS);                              \
53     } while(0)
54 
55 
report_latency(Statsd * statsd,statsd_report_latency_types ltype,struct hdr_histogram * hist,const struct percentile_values * latency_percentiles)56 static void report_latency(Statsd *statsd, statsd_report_latency_types ltype, struct hdr_histogram *hist, const struct percentile_values *latency_percentiles) {
57 
58     if(!hist || hist->total_count == 0)
59         return;
60 
61 #define LENGTH_PREFIXED_STR(s)  {sizeof(s)-1,(s)}
62     static struct prefixes {
63         const char *min;
64         const char *mean;
65         const char *max;
66         struct {
67             size_t size;
68             const char *str;
69         } latency_kind;
70     } prefixes[] =
71         {[SLT_CONNECT] = {"latency.connect.min", "latency.connect.mean",
72                           "latency.connect.max",
73                           LENGTH_PREFIXED_STR("latency.connect.")},
74          [SLT_FIRSTBYTE] = {"latency.firstbyte.min", "latency.firstbyte.mean",
75                             "latency.firstbyte.max",
76                             LENGTH_PREFIXED_STR("latency.firstbyte.")},
77          [SLT_MARKER] = {"latency.message.min", "latency.message.mean",
78                          "latency.message.max",
79                          LENGTH_PREFIXED_STR("latency.message.")}};
80     assert(ltype < sizeof(prefixes)/sizeof(prefixes[0]));
81     const struct prefixes *pfx = &prefixes[ltype];
82     assert(pfx->mean);
83 
84     for(size_t i = 0; i < latency_percentiles->size; i++) {
85         const struct percentile_value *pv = &latency_percentiles->values[i];
86         char name[64];
87         memcpy(name, pfx->latency_kind.str, pfx->latency_kind.size);
88         strcpy(name+pfx->latency_kind.size, pv->value_s);
89         double latency_ms = hdr_value_at_percentile(hist, pv->value_d) / 10.0;
90         SBATCH_DBL(STATSD_GAUGE, name, latency_ms);
91     }
92 
93     SBATCH_DBL(STATSD_GAUGE, pfx->min, hdr_min(hist) / 10.0);
94     SBATCH_DBL(STATSD_GAUGE, pfx->mean, hdr_mean(hist) / 10.0);
95     SBATCH_DBL(STATSD_GAUGE, pfx->max, hdr_max(hist) / 10.0);
96 }
97 
98 
99 void
report_to_statsd(Statsd * statsd,statsd_feedback * sf,statsd_report_latency_types latency_types,const struct percentile_values * latency_percentiles)100 report_to_statsd(Statsd *statsd, statsd_feedback *sf, statsd_report_latency_types latency_types, const struct percentile_values *latency_percentiles) {
101     if(!statsd) return;
102     if(!sf) {
103         static statsd_feedback empty_feedback;
104         sf = &empty_feedback;
105     }
106 
107     statsd_resetBatch(statsd);
108 
109     SBATCH_INT(STATSD_COUNT, "connections.opened", sf->opened);
110     SBATCH_INT(STATSD_GAUGE, "connections.total", sf->conns_in + sf->conns_out);
111     SBATCH_INT(STATSD_GAUGE, "connections.total.in", sf->conns_in);
112     SBATCH_INT(STATSD_GAUGE, "connections.total.out", sf->conns_out);
113     SBATCH_INT(STATSD_GAUGE, "traffic.bitrate", sf->bps_in + sf->bps_out);
114     SBATCH_INT(STATSD_GAUGE, "traffic.bitrate.in", sf->bps_in);
115     SBATCH_INT(STATSD_GAUGE, "traffic.bitrate.out", sf->bps_out);
116     SBATCH_INT(STATSD_COUNT, "traffic.data",
117            sf->traffic_delta.bytes_rcvd + sf->traffic_delta.bytes_sent);
118     SBATCH_INT(STATSD_COUNT, "traffic.data.rcvd", sf->traffic_delta.bytes_rcvd);
119     SBATCH_INT(STATSD_COUNT, "traffic.data.sent", sf->traffic_delta.bytes_sent);
120     SBATCH_INT(STATSD_COUNT, "traffic.data.reads", sf->traffic_delta.num_reads);
121     SBATCH_INT(STATSD_COUNT, "traffic.data.writes", sf->traffic_delta.num_writes);
122     SBATCH_INT(STATSD_COUNT, "traffic.msgs.rcvd", sf->traffic_delta.msgs_rcvd);
123     SBATCH_INT(STATSD_COUNT, "traffic.msgs.sent", sf->traffic_delta.msgs_sent);
124 
125     if(latency_types) {
126         if(latency_types & SLT_CONNECT)
127             report_latency(statsd, SLT_CONNECT,
128                            sf->latency ? sf->latency->connect_histogram : 0,
129                            latency_percentiles);
130         if(latency_types & SLT_FIRSTBYTE)
131             report_latency(statsd, SLT_FIRSTBYTE,
132                            sf->latency ? sf->latency->firstbyte_histogram : 0,
133                            latency_percentiles);
134         if(latency_types & SLT_MARKER)
135             report_latency(statsd, SLT_MARKER,
136                            sf->latency ? sf->latency->marker_histogram : 0,
137                            latency_percentiles);
138     }
139 
140     statsd_sendBatch(statsd);
141 }
142 
143 void
report_latency_to_statsd(Statsd * statsd,struct latency_snapshot * latency,statsd_report_latency_types latency_types,const struct percentile_values * latency_percentiles)144 report_latency_to_statsd(Statsd *statsd, struct latency_snapshot *latency, statsd_report_latency_types latency_types, const struct percentile_values *latency_percentiles) {
145     if(!statsd) return;
146 
147     statsd_resetBatch(statsd);
148 
149     if(latency_types & SLT_CONNECT)
150         report_latency(statsd, SLT_CONNECT,
151                        latency->connect_histogram,
152                        latency_percentiles);
153     if(latency_types & SLT_FIRSTBYTE)
154         report_latency(statsd, SLT_FIRSTBYTE,
155                        latency->firstbyte_histogram,
156                        latency_percentiles);
157     if(latency_types & SLT_MARKER)
158         report_latency(statsd, SLT_MARKER,
159                        latency->marker_histogram,
160                        latency_percentiles);
161 
162     statsd_sendBatch(statsd);
163 }
164 
165