1 #include <uwsgi.h>
2 
3 /*
4 
5 	Author:	Łukasz Mierzwa
6 
7 */
8 
9 extern struct uwsgi_server uwsgi;
10 
11 struct carbon_server_list {
12 	int healthy;
13 	int errors;
14 	char *hostname;
15 	char *port;
16 	struct carbon_server_list *next;
17 };
18 
19 struct uwsgi_carbon {
20 	struct uwsgi_string_list *servers;
21 	struct carbon_server_list *servers_data;
22 	int freq;
23 	int timeout;
24 	char *id;
25 	int no_workers;
26 	unsigned long long *last_busyness_values;
27 	unsigned long long *current_busyness_values;
28 	int *was_busy;
29 	int max_retries;
30 	int retry_delay;
31 	char *root_node;
32 	char *hostname_dot_replacement;
33 	char *hostname;
34 	int resolve_hostname;
35 	char *idle_avg;
36 	int push_avg;
37 	int zero_avg;
38 	uint64_t last_requests;
39 	struct uwsgi_stats_pusher *pusher;
40 	int use_metrics;
41 } u_carbon;
42 
43 static struct uwsgi_option carbon_options[] = {
44 	{"carbon", required_argument, 0, "push statistics to the specified carbon server", uwsgi_opt_add_string_list, &u_carbon.servers, UWSGI_OPT_MASTER},
45 	{"carbon-timeout", required_argument, 0, "set carbon connection timeout in seconds (default 3)", uwsgi_opt_set_int, &u_carbon.timeout, 0},
46 	{"carbon-freq", required_argument, 0, "set carbon push frequency in seconds (default 60)", uwsgi_opt_set_int, &u_carbon.freq, 0},
47 	{"carbon-id", required_argument, 0, "set carbon id", uwsgi_opt_set_str, &u_carbon.id, 0},
48 	{"carbon-no-workers", no_argument, 0, "disable generation of single worker metrics", uwsgi_opt_true, &u_carbon.no_workers, 0},
49 	{"carbon-max-retry", required_argument, 0, "set maximum number of retries in case of connection errors (default 1)", uwsgi_opt_set_int, &u_carbon.max_retries, 0},
50 	{"carbon-retry-delay", required_argument, 0, "set connection retry delay in seconds (default 7)", uwsgi_opt_set_int, &u_carbon.retry_delay, 0},
51 	{"carbon-root", required_argument, 0, "set carbon metrics root node (default 'uwsgi')", uwsgi_opt_set_str, &u_carbon.root_node, 0},
52 	{"carbon-hostname-dots", required_argument, 0, "set char to use as a replacement for dots in hostname (dots are not replaced by default)", uwsgi_opt_set_str, &u_carbon.hostname_dot_replacement, 0},
53 	{"carbon-name-resolve", no_argument, 0, "allow using hostname as carbon server address (default disabled)", uwsgi_opt_true, &u_carbon.resolve_hostname, 0},
54 	{"carbon-resolve-names", no_argument, 0, "allow using hostname as carbon server address (default disabled)", uwsgi_opt_true, &u_carbon.resolve_hostname, 0},
55 	{"carbon-idle-avg", required_argument, 0, "average values source during idle period (no requests), can be \"last\", \"zero\", \"none\" (default is last)", uwsgi_opt_set_str, &u_carbon.idle_avg, 0},
56 	{"carbon-use-metrics", no_argument, 0, "don't compute all statistics, use metrics subsystem data instead (warning! key names will be different)", uwsgi_opt_true, &u_carbon.use_metrics, 0},
57 	{0, 0, 0, 0, 0, 0, 0},
58 
59 };
60 
carbon_post_init()61 static void carbon_post_init() {
62 
63 	int i;
64 	struct uwsgi_string_list *usl = u_carbon.servers;
65 	if (!uwsgi.sockets) return;
66 	if (!u_carbon.servers) return;
67 
68 	while(usl) {
69 		struct carbon_server_list *u_server = uwsgi_calloc(sizeof(struct carbon_server_list));
70 		u_server->healthy = 1;
71 		u_server->errors = 0;
72 
73 		char *p, *ctx = NULL;
74 		// make a copy to not clobber argv
75 		char *tmp = uwsgi_str(usl->value);
76 		uwsgi_foreach_token(tmp, ":", p, ctx) {
77 			if (!u_server->hostname) {
78 				u_server->hostname = uwsgi_str(p);
79 			}
80 			else if (!u_server->port) {
81 				u_server->port = uwsgi_str(p);
82 			}
83 			else
84 				break;
85 		}
86 		free(tmp);
87 		if (!u_server->hostname || !u_server->port) {
88 			uwsgi_log("[carbon] invalid carbon server address (%s)\n", usl->value);
89 			usl = usl->next;
90 
91 			if (u_server->hostname) free(u_server->hostname);
92 			if (u_server->port) free(u_server->port);
93 			free(u_server);
94 			continue;
95 		}
96 
97 		if (u_carbon.servers_data) {
98 			u_server->next = u_carbon.servers_data;
99 		}
100 		u_carbon.servers_data = u_server;
101 
102 		uwsgi_log("[carbon] added server %s:%s\n", u_server->hostname, u_server->port);
103 		usl = usl->next;
104 	}
105 
106 	if (!u_carbon.root_node) u_carbon.root_node = "uwsgi.";
107 	if (strlen(u_carbon.root_node) && !uwsgi_endswith(u_carbon.root_node, ".")) {
108 		u_carbon.root_node = uwsgi_concat2(u_carbon.root_node, ".");
109 	}
110 
111 	if (u_carbon.freq < 1) u_carbon.freq = 60;
112 	if (u_carbon.timeout < 1) u_carbon.timeout = 3;
113 	if (u_carbon.max_retries < 0) u_carbon.max_retries = 0;
114 	if (u_carbon.retry_delay <= 0) u_carbon.retry_delay = 7;
115 	if (!u_carbon.id) {
116 		u_carbon.id = uwsgi_str(uwsgi.sockets->name);
117 
118 		for(i=0;i<(int)strlen(u_carbon.id);i++) {
119 			if (u_carbon.id[i] == '.') u_carbon.id[i] = '_';
120 		}
121 	}
122 
123 	u_carbon.hostname = uwsgi_str(uwsgi.hostname);
124 	if (u_carbon.hostname_dot_replacement) {
125 		for(i=0;i<(int)strlen(u_carbon.hostname);i++) {
126 			if (u_carbon.hostname[i] == '.') u_carbon.hostname[i] = u_carbon.hostname_dot_replacement[0];
127 		}
128 	}
129 
130 	u_carbon.push_avg = 1;
131 	u_carbon.zero_avg = 0;
132 	if (!u_carbon.idle_avg) {
133 		u_carbon.idle_avg = "last";
134 	}
135 	else if (!strcmp(u_carbon.idle_avg, "zero")) {
136 		u_carbon.zero_avg = 1;
137 	}
138 	else if (!strcmp(u_carbon.idle_avg, "none")) {
139 		u_carbon.push_avg = 0;
140 	}
141 	else if (strcmp(u_carbon.idle_avg, "last")) {
142 		uwsgi_log("[carbon] invalid value for carbon-idle-avg: \"%s\"\n", u_carbon.idle_avg);
143 		exit(1);
144 	}
145 
146 	if (!u_carbon.last_busyness_values) {
147 		u_carbon.last_busyness_values = uwsgi_calloc(sizeof(unsigned long long) * uwsgi.numproc);
148 	}
149 
150 	if (!u_carbon.current_busyness_values) {
151 		u_carbon.current_busyness_values = uwsgi_calloc(sizeof(unsigned long long) * uwsgi.numproc);
152 	}
153 
154 	if (!u_carbon.was_busy) {
155 		u_carbon.was_busy = uwsgi_calloc(sizeof(int) * uwsgi.numproc);
156 	}
157 
158 	uwsgi_log("[carbon] carbon plugin started, %is frequency, %is timeout, max retries %i, retry delay %is\n",
159 		u_carbon.freq, u_carbon.timeout, u_carbon.max_retries, u_carbon.retry_delay);
160 
161 	struct uwsgi_stats_pusher_instance *uspi = uwsgi_stats_pusher_add(u_carbon.pusher, NULL);
162 	uspi->freq = u_carbon.freq;
163 	uspi->retry_delay = u_carbon.retry_delay;
164 	uspi->max_retries = u_carbon.max_retries;
165 	// no need to generate the json
166 	uspi->raw=1;
167 }
168 
carbon_write(int fd,char * fmt,...)169 static int carbon_write(int fd, char *fmt,...) {
170 	va_list ap;
171 	va_start(ap, fmt);
172 
173 	char ptr[4096];
174 	int rlen;
175 
176 	rlen = vsnprintf(ptr, 4096, fmt, ap);
177 	va_end(ap);
178 
179 	if (rlen < 1) return 0;
180 
181 	if (uwsgi_write_nb(fd, ptr, rlen, u_carbon.timeout)) {
182 		uwsgi_error("carbon_write()");
183 		return 0;
184 	}
185 
186 	return 1;
187 }
188 
carbon_push_stats(int retry_cycle,time_t now)189 static int carbon_push_stats(int retry_cycle, time_t now) {
190 	struct carbon_server_list *usl = u_carbon.servers_data;
191 	if (!u_carbon.servers_data) return 0;
192 	int i;
193 	int fd;
194 	int wok;
195 	char *ip;
196 	char *carbon_address = NULL;
197 	int needs_retry;
198 
199 	for (i = 0; i < uwsgi.numproc; i++) {
200 		u_carbon.current_busyness_values[i] = uwsgi.workers[i+1].running_time - u_carbon.last_busyness_values[i];
201 		u_carbon.last_busyness_values[i] = uwsgi.workers[i+1].running_time;
202 		u_carbon.was_busy[i] += uwsgi_worker_is_busy(i+1);
203 	}
204 
205 	needs_retry = 0;
206 	while(usl) {
207 		if (retry_cycle && usl->healthy)
208 			// skip healthy servers during retry cycle
209 			goto nxt;
210 
211 		if (retry_cycle && usl->healthy == 0)
212 			uwsgi_log("[carbon] Retrying failed server at %s (%d)\n", usl->hostname, usl->errors);
213 
214 		if (!retry_cycle) {
215 			usl->healthy = 1;
216 			usl->errors = 0;
217 		}
218 
219 		if (u_carbon.resolve_hostname) {
220 			ip = uwsgi_resolve_ip(usl->hostname);
221 			if (!ip) {
222 				uwsgi_log("[carbon] Could not resolve hostname %s\n", usl->hostname);
223 				goto nxt;
224 			}
225 			carbon_address = uwsgi_concat3(ip, ":", usl->port);
226 		}
227 		else {
228 			carbon_address = uwsgi_concat3(usl->hostname, ":", usl->port);
229 		}
230 		fd = uwsgi_connect(carbon_address, u_carbon.timeout, 0);
231 		if (fd < 0) {
232 			uwsgi_log("[carbon] Could not connect to carbon server at %s\n", carbon_address);
233 			needs_retry = 1;
234 			usl->healthy = 0;
235 			usl->errors++;
236 			free(carbon_address);
237 			goto nxt;
238 		}
239 		free(carbon_address);
240 		// put the socket in non-blocking mode
241 		uwsgi_socket_nb(fd);
242 
243 		if (u_carbon.use_metrics) goto metrics_loop;
244 
245 		unsigned long long total_rss = 0;
246 		unsigned long long total_vsz = 0;
247 		unsigned long long total_tx = 0;
248 		unsigned long long total_avg_rt = 0; // total avg_rt
249 		unsigned long long avg_rt = 0; // per worker avg_rt reported to carbon
250 		unsigned long long active_workers = 0; // number of workers used to calculate total avg_rt
251 		unsigned long long total_busyness = 0;
252 		unsigned long long total_avg_busyness = 0;
253 		unsigned long long worker_busyness = 0;
254 		unsigned long long total_harakiri = 0;
255 
256 		int do_avg_push;
257 
258 		wok = carbon_write(fd, "%s%s.%s.requests %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, (unsigned long long) uwsgi.workers[0].requests, (unsigned long long) now);
259 		if (!wok) goto clear;
260 
261 		for(i=1;i<=uwsgi.numproc;i++) {
262 			total_tx += uwsgi.workers[i].tx;
263 			total_harakiri += uwsgi.workers[i].harakiri_count;
264 
265 			if (uwsgi.workers[i].cheaped) {
266 				// also if worker is cheaped than we report its average response time as zero, sending last value might be confusing
267 				avg_rt = 0;
268 				worker_busyness = 0;
269 			}
270 			else {
271 				// global average response time is calculated from active/idle workers, cheaped workers are excluded, otherwise it is not accurate
272 				avg_rt = uwsgi.workers[i].avg_response_time;
273 				active_workers++;
274 				total_avg_rt += uwsgi.workers[i].avg_response_time;
275 
276 				// calculate worker busyness
277 				if (u_carbon.current_busyness_values[i-1] == 0 && u_carbon.was_busy[i-1]) {
278 					worker_busyness = 100;
279 				}
280 				else {
281 					worker_busyness = ((u_carbon.current_busyness_values[i-1]*100) / (u_carbon.freq*1000000));
282 					if (worker_busyness > 100) worker_busyness = 100;
283 				}
284 				total_busyness += worker_busyness;
285 				u_carbon.was_busy[i-1] = 0;
286 
287 				if (uwsgi.logging_options.memory_report == 1 || uwsgi.force_get_memusage) {
288 					// only running workers are counted in total memory stats and if memory-report option is enabled
289 					total_rss += uwsgi.workers[i].rss_size;
290 					total_vsz += uwsgi.workers[i].vsz_size;
291 				}
292 			}
293 
294 			//skip per worker metrics when disabled
295 			if (u_carbon.no_workers) continue;
296 
297 			wok = carbon_write(fd, "%s%s.%s.worker%d.requests %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, i, (unsigned long long) uwsgi.workers[i].requests, (unsigned long long) now);
298 			if (!wok) goto clear;
299 
300 			if (uwsgi.logging_options.memory_report == 1 || uwsgi.force_get_memusage) {
301 				wok = carbon_write(fd, "%s%s.%s.worker%d.rss_size %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, i, (unsigned long long) uwsgi.workers[i].rss_size, (unsigned long long) now);
302 				if (!wok) goto clear;
303 
304 				wok = carbon_write(fd, "%s%s.%s.worker%d.vsz_size %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, i, (unsigned long long) uwsgi.workers[i].vsz_size, (unsigned long long) now);
305 				if (!wok) goto clear;
306 			}
307 
308 			do_avg_push = 1;
309 			if (!u_carbon.last_requests || u_carbon.last_requests == uwsgi.workers[0].requests) {
310 				if (!u_carbon.push_avg) {
311 					do_avg_push = 0;
312 				}
313 				else if (u_carbon.zero_avg) {
314 					avg_rt = 0;
315 				}
316 			}
317 			if (do_avg_push) {
318 				wok = carbon_write(fd, "%s%s.%s.worker%d.avg_rt %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, i, (unsigned long long) avg_rt, (unsigned long long) now);
319 				if (!wok) goto clear;
320 			}
321 
322 			wok = carbon_write(fd, "%s%s.%s.worker%d.tx %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, i, (unsigned long long) uwsgi.workers[i].tx, (unsigned long long) now);
323 			if (!wok) goto clear;
324 
325 			wok = carbon_write(fd, "%s%s.%s.worker%d.busyness %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, i, (unsigned long long) worker_busyness, (unsigned long long) now);
326 			if (!wok) goto clear;
327 
328 			wok = carbon_write(fd, "%s%s.%s.worker%d.harakiri %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, i, (unsigned long long) uwsgi.workers[i].harakiri_count, (unsigned long long) now);
329 			if (!wok) goto clear;
330 
331 		}
332 
333 		if (uwsgi.logging_options.memory_report == 1 || uwsgi.force_get_memusage) {
334 			wok = carbon_write(fd, "%s%s.%s.rss_size %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, (unsigned long long) total_rss, (unsigned long long) now);
335 			if (!wok) goto clear;
336 
337 			wok = carbon_write(fd, "%s%s.%s.vsz_size %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, (unsigned long long) total_vsz, (unsigned long long) now);
338 			if (!wok) goto clear;
339 		}
340 
341 		do_avg_push = 1;
342 		uint64_t c_total_avg_rt = (active_workers ? total_avg_rt / active_workers : 0);
343 		if (!u_carbon.last_requests || u_carbon.last_requests == uwsgi.workers[0].requests) {
344 			if (!u_carbon.push_avg) {
345 				do_avg_push = 0;
346 			}
347 			else if (u_carbon.zero_avg) {
348 				c_total_avg_rt = 0;
349 			}
350 		}
351 		if (do_avg_push) {
352 			wok = carbon_write(fd, "%s%s.%s.avg_rt %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, (unsigned long long) c_total_avg_rt, (unsigned long long) now);
353 			if (!wok) goto clear;
354 		}
355 
356 		wok = carbon_write(fd, "%s%s.%s.tx %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, (unsigned long long) total_tx, (unsigned long long) now);
357 		if (!wok) goto clear;
358 
359 		if (active_workers > 0) {
360 			total_avg_busyness = total_busyness / active_workers;
361 			if (total_avg_busyness > 100) total_avg_busyness = 100;
362 		} else {
363 			total_avg_busyness = 0;
364 		}
365 		wok = carbon_write(fd, "%s%s.%s.busyness %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, (unsigned long long) total_avg_busyness, (unsigned long long) now);
366 		if (!wok) goto clear;
367 
368 		wok = carbon_write(fd, "%s%s.%s.active_workers %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, (unsigned long long) active_workers, (unsigned long long) now);
369 		if (!wok) goto clear;
370 
371 		if (uwsgi.cheaper) {
372 			wok = carbon_write(fd, "%s%s.%s.cheaped_workers %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, (unsigned long long) uwsgi.numproc - active_workers, (unsigned long long) now);
373 			if (!wok) goto clear;
374 		}
375 
376 		wok = carbon_write(fd, "%s%s.%s.harakiri %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, (unsigned long long) total_harakiri, (unsigned long long) now);
377 		if (!wok) goto clear;
378 
379 metrics_loop:
380 		if (u_carbon.use_metrics) {
381 			struct uwsgi_metric *um = uwsgi.metrics;
382 			while(um) {
383 				uwsgi_rlock(uwsgi.metrics_lock);
384 				wok = carbon_write(fd, "%s%s.%s.%.*s %llu %llu\n", u_carbon.root_node, u_carbon.hostname, u_carbon.id, um->name_len, um->name, (unsigned long long) *um->value, (unsigned long long) now);
385 				uwsgi_rwunlock(uwsgi.metrics_lock);
386 				if (um->reset_after_push){
387 					uwsgi_wlock(uwsgi.metrics_lock);
388 					*um->value = um->initial_value;
389 					uwsgi_rwunlock(uwsgi.metrics_lock);
390 				}
391 				if (!wok) goto clear;
392 				um = um->next;
393 			}
394 		}
395 
396 		usl->healthy = 1;
397 		usl->errors = 0;
398 
399 		u_carbon.last_requests = uwsgi.workers[0].requests;
400 
401 clear:
402 		close(fd);
403 nxt:
404 		usl = usl->next;
405 	}
406 
407 	return needs_retry;
408 }
409 
carbon_push(struct uwsgi_stats_pusher_instance * uspi,time_t now,char * json,size_t json_len)410 static void carbon_push(struct uwsgi_stats_pusher_instance *uspi, time_t now, char *json, size_t json_len) {
411 	uspi->needs_retry = carbon_push_stats(uspi->retries, now);
412 }
413 
carbon_cleanup()414 static void carbon_cleanup() {
415 	carbon_push_stats(0, uwsgi_now());
416 }
417 
carbon_register()418 static void carbon_register() {
419 	u_carbon.pusher = uwsgi_register_stats_pusher("carbon", carbon_push);
420 }
421 
422 struct uwsgi_plugin carbon_plugin = {
423 
424 	.name = "carbon",
425 
426 	.master_cleanup = carbon_cleanup,
427 
428 	.options = carbon_options,
429 	.on_load = carbon_register,
430 	.post_init = carbon_post_init,
431 };
432