1 #include <uwsgi.h>
2
3 /*
4
5 Author: Łukasz Mierzwa
6
7 */
8
9 extern struct uwsgi_server uwsgi;
10
11 struct carbon_server_list {
12 int healthy;
13 int errors;
14 char *hostname;
15 char *port;
16 struct carbon_server_list *next;
17 };
18
19 struct uwsgi_carbon {
20 struct uwsgi_string_list *servers;
21 struct carbon_server_list *servers_data;
22 int freq;
23 int timeout;
24 char *id;
25 int no_workers;
26 unsigned long long *last_busyness_values;
27 unsigned long long *current_busyness_values;
28 int *was_busy;
29 int max_retries;
30 int retry_delay;
31 char *root_node;
32 char *hostname_dot_replacement;
33 char *hostname;
34 int resolve_hostname;
35 char *idle_avg;
36 int push_avg;
37 int zero_avg;
38 uint64_t last_requests;
39 struct uwsgi_stats_pusher *pusher;
40 int use_metrics;
41 } u_carbon;
42
43 static struct uwsgi_option carbon_options[] = {
44 {"carbon", required_argument, 0, "push statistics to the specified carbon server", uwsgi_opt_add_string_list, &u_carbon.servers, UWSGI_OPT_MASTER},
45 {"carbon-timeout", required_argument, 0, "set carbon connection timeout in seconds (default 3)", uwsgi_opt_set_int, &u_carbon.timeout, 0},
46 {"carbon-freq", required_argument, 0, "set carbon push frequency in seconds (default 60)", uwsgi_opt_set_int, &u_carbon.freq, 0},
47 {"carbon-id", required_argument, 0, "set carbon id", uwsgi_opt_set_str, &u_carbon.id, 0},
48 {"carbon-no-workers", no_argument, 0, "disable generation of single worker metrics", uwsgi_opt_true, &u_carbon.no_workers, 0},
49 {"carbon-max-retry", required_argument, 0, "set maximum number of retries in case of connection errors (default 1)", uwsgi_opt_set_int, &u_carbon.max_retries, 0},
50 {"carbon-retry-delay", required_argument, 0, "set connection retry delay in seconds (default 7)", uwsgi_opt_set_int, &u_carbon.retry_delay, 0},
51 {"carbon-root", required_argument, 0, "set carbon metrics root node (default 'uwsgi')", uwsgi_opt_set_str, &u_carbon.root_node, 0},
52 {"carbon-hostname-dots", required_argument, 0, "set char to use as a replacement for dots in hostname (dots are not replaced by default)", uwsgi_opt_set_str, &u_carbon.hostname_dot_replacement, 0},
53 {"carbon-name-resolve", no_argument, 0, "allow using hostname as carbon server address (default disabled)", uwsgi_opt_true, &u_carbon.resolve_hostname, 0},
54 {"carbon-resolve-names", no_argument, 0, "allow using hostname as carbon server address (default disabled)", uwsgi_opt_true, &u_carbon.resolve_hostname, 0},
55 {"carbon-idle-avg", required_argument, 0, "average values source during idle period (no requests), can be \"last\", \"zero\", \"none\" (default is last)", uwsgi_opt_set_str, &u_carbon.idle_avg, 0},
56 {"carbon-use-metrics", no_argument, 0, "don't compute all statistics, use metrics subsystem data instead (warning! key names will be different)", uwsgi_opt_true, &u_carbon.use_metrics, 0},
57 {0, 0, 0, 0, 0, 0, 0},
58
59 };
60
carbon_post_init()61 static void carbon_post_init() {
62
63 int i;
64 struct uwsgi_string_list *usl = u_carbon.servers;
65 if (!uwsgi.sockets) return;
66 if (!u_carbon.servers) return;
67
68 while(usl) {
69 struct carbon_server_list *u_server = uwsgi_calloc(sizeof(struct carbon_server_list));
70 u_server->healthy = 1;
71 u_server->errors = 0;
72
73 char *p, *ctx = NULL;
74 // make a copy to not clobber argv
75 char *tmp = uwsgi_str(usl->value);
76 uwsgi_foreach_token(tmp, ":", p, ctx) {
77 if (!u_server->hostname) {
78 u_server->hostname = uwsgi_str(p);
79 }
80 else if (!u_server->port) {
81 u_server->port = uwsgi_str(p);
82 }
83 else
84 break;
85 }
86 free(tmp);
87 if (!u_server->hostname || !u_server->port) {
88 uwsgi_log("[carbon] invalid carbon server address (%s)\n", usl->value);
89 usl = usl->next;
90
91 if (u_server->hostname) free(u_server->hostname);
92 if (u_server->port) free(u_server->port);
93 free(u_server);
94 continue;
95 }
96
97 if (u_carbon.servers_data) {
98 u_server->next = u_carbon.servers_data;
99 }
100 u_carbon.servers_data = u_server;
101
102 uwsgi_log("[carbon] added server %s:%s\n", u_server->hostname, u_server->port);
103 usl = usl->next;
104 }
105
106 if (!u_carbon.root_node) u_carbon.root_node = "uwsgi.";
107 if (strlen(u_carbon.root_node) && !uwsgi_endswith(u_carbon.root_node, ".")) {
108 u_carbon.root_node = uwsgi_concat2(u_carbon.root_node, ".");
109 }
110
111 if (u_carbon.freq < 1) u_carbon.freq = 60;
112 if (u_carbon.timeout < 1) u_carbon.timeout = 3;
113 if (u_carbon.max_retries < 0) u_carbon.max_retries = 0;
114 if (u_carbon.retry_delay <= 0) u_carbon.retry_delay = 7;
115 if (!u_carbon.id) {
116 u_carbon.id = uwsgi_str(uwsgi.sockets->name);
117
118 for(i=0;i<(int)strlen(u_carbon.id);i++) {
119 if (u_carbon.id[i] == '.') u_carbon.id[i] = '_';
120 }
121 }
122
123 u_carbon.hostname = uwsgi_str(uwsgi.hostname);
124 if (u_carbon.hostname_dot_replacement) {
125 for(i=0;i<(int)strlen(u_carbon.hostname);i++) {
126 if (u_carbon.hostname[i] == '.') u_carbon.hostname[i] = u_carbon.hostname_dot_replacement[0];
127 }
128 }
129
130 u_carbon.push_avg = 1;
131 u_carbon.zero_avg = 0;
132 if (!u_carbon.idle_avg) {
133 u_carbon.idle_avg = "last";
134 }
135 else if (!strcmp(u_carbon.idle_avg, "zero")) {
136 u_carbon.zero_avg = 1;
137 }
138 else if (!strcmp(u_carbon.idle_avg, "none")) {
139 u_carbon.push_avg = 0;
140 }
141 else if (strcmp(u_carbon.idle_avg, "last")) {
142 uwsgi_log("[carbon] invalid value for carbon-idle-avg: \"%s\"\n", u_carbon.idle_avg);
143 exit(1);
144 }
145
146 if (!u_carbon.last_busyness_values) {
147 u_carbon.last_busyness_values = uwsgi_calloc(sizeof(unsigned long long) * uwsgi.numproc);
148 }
149
150 if (!u_carbon.current_busyness_values) {
151 u_carbon.current_busyness_values = uwsgi_calloc(sizeof(unsigned long long) * uwsgi.numproc);
152 }
153
154 if (!u_carbon.was_busy) {
155 u_carbon.was_busy = uwsgi_calloc(sizeof(int) * uwsgi.numproc);
156 }
157
158 uwsgi_log("[carbon] carbon plugin started, %is frequency, %is timeout, max retries %i, retry delay %is\n",
159 u_carbon.freq, u_carbon.timeout, u_carbon.max_retries, u_carbon.retry_delay);
160
161 struct uwsgi_stats_pusher_instance *uspi = uwsgi_stats_pusher_add(u_carbon.pusher, NULL);
162 uspi->freq = u_carbon.freq;
163 uspi->retry_delay = u_carbon.retry_delay;
164 uspi->max_retries = u_carbon.max_retries;
165 // no need to generate the json
166 uspi->raw=1;
167 }
168
carbon_write(int fd,char * fmt,...)169 static int carbon_write(int fd, char *fmt,...) {
170 va_list ap;
171 va_start(ap, fmt);
172
173 char ptr[4096];
174 int rlen;
175
176 rlen = vsnprintf(ptr, 4096, fmt, ap);
177 va_end(ap);
178
179 if (rlen < 1) return 0;
180
181 if (uwsgi_write_nb(fd, ptr, rlen, u_carbon.timeout)) {
182 uwsgi_error("carbon_write()");
183 return 0;
184 }
185
186 return 1;
187 }
188
carbon_push_stats(int retry_cycle,time_t now)189 static int carbon_push_stats(int retry_cycle, time_t now) {
190 struct carbon_server_list *usl = u_carbon.servers_data;
191 if (!u_carbon.servers_data) return 0;
192 int i;
193 int fd;
194 int wok;
195 char *ip;
196 char *carbon_address = NULL;
197 int needs_retry;
198
199 for (i = 0; i < uwsgi.numproc; i++) {
200 u_carbon.current_busyness_values[i] = uwsgi.workers[i+1].running_time - u_carbon.last_busyness_values[i];
201 u_carbon.last_busyness_values[i] = uwsgi.workers[i+1].running_time;
202 u_carbon.was_busy[i] += uwsgi_worker_is_busy(i+1);
203 }
204
205 needs_retry = 0;
206 while(usl) {
207 if (retry_cycle && usl->healthy)
208 // skip healthy servers during retry cycle
209 goto nxt;
210
211 if (retry_cycle && usl->healthy == 0)
212 uwsgi_log("[carbon] Retrying failed server at %s (%d)\n", usl->hostname, usl->errors);
213
214 if (!retry_cycle) {
215 usl->healthy = 1;
216 usl->errors = 0;
217 }
218
219 if (u_carbon.resolve_hostname) {
220 ip = uwsgi_resolve_ip(usl->hostname);
221 if (!ip) {
222 uwsgi_log("[carbon] Could not resolve hostname %s\n", usl->hostname);
223 goto nxt;
224 }
225 carbon_address = uwsgi_concat3(ip, ":", usl->port);
226 }
227 else {
228 carbon_address = uwsgi_concat3(usl->hostname, ":", usl->port);
229 }
230 fd = uwsgi_connect(carbon_address, u_carbon.timeout, 0);
231 if (fd < 0) {
232 uwsgi_log("[carbon] Could not connect to carbon server at %s\n", carbon_address);
233 needs_retry = 1;
234 usl->healthy = 0;
235 usl->errors++;
236 free(carbon_address);
237 goto nxt;
238 }
239 free(carbon_address);
240 // put the socket in non-blocking mode
241 uwsgi_socket_nb(fd);
242
243 if (u_carbon.use_metrics) goto metrics_loop;
244
245 unsigned long long total_rss = 0;
246 unsigned long long total_vsz = 0;
247 unsigned long long total_tx = 0;
248 unsigned long long total_avg_rt = 0; // total avg_rt
249 unsigned long long avg_rt = 0; // per worker avg_rt reported to carbon
250 unsigned long long active_workers = 0; // number of workers used to calculate total avg_rt
251 unsigned long long total_busyness = 0;
252 unsigned long long total_avg_busyness = 0;
253 unsigned long long worker_busyness = 0;
254 unsigned long long total_harakiri = 0;
255
256 int do_avg_push;
257
258 wok = carbon_write(fd, "%s%s.%s.requests %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, (unsigned long long) uwsgi.workers[0].requests, (unsigned long long) now);
259 if (!wok) goto clear;
260
261 for(i=1;i<=uwsgi.numproc;i++) {
262 total_tx += uwsgi.workers[i].tx;
263 total_harakiri += uwsgi.workers[i].harakiri_count;
264
265 if (uwsgi.workers[i].cheaped) {
266 // also if worker is cheaped than we report its average response time as zero, sending last value might be confusing
267 avg_rt = 0;
268 worker_busyness = 0;
269 }
270 else {
271 // global average response time is calculated from active/idle workers, cheaped workers are excluded, otherwise it is not accurate
272 avg_rt = uwsgi.workers[i].avg_response_time;
273 active_workers++;
274 total_avg_rt += uwsgi.workers[i].avg_response_time;
275
276 // calculate worker busyness
277 if (u_carbon.current_busyness_values[i-1] == 0 && u_carbon.was_busy[i-1]) {
278 worker_busyness = 100;
279 }
280 else {
281 worker_busyness = ((u_carbon.current_busyness_values[i-1]*100) / (u_carbon.freq*1000000));
282 if (worker_busyness > 100) worker_busyness = 100;
283 }
284 total_busyness += worker_busyness;
285 u_carbon.was_busy[i-1] = 0;
286
287 if (uwsgi.logging_options.memory_report == 1 || uwsgi.force_get_memusage) {
288 // only running workers are counted in total memory stats and if memory-report option is enabled
289 total_rss += uwsgi.workers[i].rss_size;
290 total_vsz += uwsgi.workers[i].vsz_size;
291 }
292 }
293
294 //skip per worker metrics when disabled
295 if (u_carbon.no_workers) continue;
296
297 wok = carbon_write(fd, "%s%s.%s.worker%d.requests %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, i, (unsigned long long) uwsgi.workers[i].requests, (unsigned long long) now);
298 if (!wok) goto clear;
299
300 if (uwsgi.logging_options.memory_report == 1 || uwsgi.force_get_memusage) {
301 wok = carbon_write(fd, "%s%s.%s.worker%d.rss_size %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, i, (unsigned long long) uwsgi.workers[i].rss_size, (unsigned long long) now);
302 if (!wok) goto clear;
303
304 wok = carbon_write(fd, "%s%s.%s.worker%d.vsz_size %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, i, (unsigned long long) uwsgi.workers[i].vsz_size, (unsigned long long) now);
305 if (!wok) goto clear;
306 }
307
308 do_avg_push = 1;
309 if (!u_carbon.last_requests || u_carbon.last_requests == uwsgi.workers[0].requests) {
310 if (!u_carbon.push_avg) {
311 do_avg_push = 0;
312 }
313 else if (u_carbon.zero_avg) {
314 avg_rt = 0;
315 }
316 }
317 if (do_avg_push) {
318 wok = carbon_write(fd, "%s%s.%s.worker%d.avg_rt %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, i, (unsigned long long) avg_rt, (unsigned long long) now);
319 if (!wok) goto clear;
320 }
321
322 wok = carbon_write(fd, "%s%s.%s.worker%d.tx %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, i, (unsigned long long) uwsgi.workers[i].tx, (unsigned long long) now);
323 if (!wok) goto clear;
324
325 wok = carbon_write(fd, "%s%s.%s.worker%d.busyness %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, i, (unsigned long long) worker_busyness, (unsigned long long) now);
326 if (!wok) goto clear;
327
328 wok = carbon_write(fd, "%s%s.%s.worker%d.harakiri %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, i, (unsigned long long) uwsgi.workers[i].harakiri_count, (unsigned long long) now);
329 if (!wok) goto clear;
330
331 }
332
333 if (uwsgi.logging_options.memory_report == 1 || uwsgi.force_get_memusage) {
334 wok = carbon_write(fd, "%s%s.%s.rss_size %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, (unsigned long long) total_rss, (unsigned long long) now);
335 if (!wok) goto clear;
336
337 wok = carbon_write(fd, "%s%s.%s.vsz_size %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, (unsigned long long) total_vsz, (unsigned long long) now);
338 if (!wok) goto clear;
339 }
340
341 do_avg_push = 1;
342 uint64_t c_total_avg_rt = (active_workers ? total_avg_rt / active_workers : 0);
343 if (!u_carbon.last_requests || u_carbon.last_requests == uwsgi.workers[0].requests) {
344 if (!u_carbon.push_avg) {
345 do_avg_push = 0;
346 }
347 else if (u_carbon.zero_avg) {
348 c_total_avg_rt = 0;
349 }
350 }
351 if (do_avg_push) {
352 wok = carbon_write(fd, "%s%s.%s.avg_rt %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, (unsigned long long) c_total_avg_rt, (unsigned long long) now);
353 if (!wok) goto clear;
354 }
355
356 wok = carbon_write(fd, "%s%s.%s.tx %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, (unsigned long long) total_tx, (unsigned long long) now);
357 if (!wok) goto clear;
358
359 if (active_workers > 0) {
360 total_avg_busyness = total_busyness / active_workers;
361 if (total_avg_busyness > 100) total_avg_busyness = 100;
362 } else {
363 total_avg_busyness = 0;
364 }
365 wok = carbon_write(fd, "%s%s.%s.busyness %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, (unsigned long long) total_avg_busyness, (unsigned long long) now);
366 if (!wok) goto clear;
367
368 wok = carbon_write(fd, "%s%s.%s.active_workers %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, (unsigned long long) active_workers, (unsigned long long) now);
369 if (!wok) goto clear;
370
371 if (uwsgi.cheaper) {
372 wok = carbon_write(fd, "%s%s.%s.cheaped_workers %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, (unsigned long long) uwsgi.numproc - active_workers, (unsigned long long) now);
373 if (!wok) goto clear;
374 }
375
376 wok = carbon_write(fd, "%s%s.%s.harakiri %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, (unsigned long long) total_harakiri, (unsigned long long) now);
377 if (!wok) goto clear;
378
379 metrics_loop:
380 if (u_carbon.use_metrics) {
381 struct uwsgi_metric *um = uwsgi.metrics;
382 while(um) {
383 uwsgi_rlock(uwsgi.metrics_lock);
384 wok = carbon_write(fd, "%s%s.%s.%.*s %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, um->name_len, um->name, (unsigned long long) *um->value, (unsigned long long) now);
385 uwsgi_rwunlock(uwsgi.metrics_lock);
386 if (um->reset_after_push){
387 uwsgi_wlock(uwsgi.metrics_lock);
388 *um->value = um->initial_value;
389 uwsgi_rwunlock(uwsgi.metrics_lock);
390 }
391 if (!wok) goto clear;
392 um = um->next;
393 }
394 }
395
396 usl->healthy = 1;
397 usl->errors = 0;
398
399 u_carbon.last_requests = uwsgi.workers[0].requests;
400
401 clear:
402 close(fd);
403 nxt:
404 usl = usl->next;
405 }
406
407 return needs_retry;
408 }
409
carbon_push(struct uwsgi_stats_pusher_instance * uspi,time_t now,char * json,size_t json_len)410 static void carbon_push(struct uwsgi_stats_pusher_instance *uspi, time_t now, char *json, size_t json_len) {
411 uspi->needs_retry = carbon_push_stats(uspi->retries, now);
412 }
413
carbon_cleanup()414 static void carbon_cleanup() {
415 carbon_push_stats(0, uwsgi_now());
416 }
417
carbon_register()418 static void carbon_register() {
419 u_carbon.pusher = uwsgi_register_stats_pusher("carbon", carbon_push);
420 }
421
422 struct uwsgi_plugin carbon_plugin = {
423
424 .name = "carbon",
425
426 .master_cleanup = carbon_cleanup,
427
428 .options = carbon_options,
429 .on_load = carbon_register,
430 .post_init = carbon_post_init,
431 };
432