1 #include "uwsgi.h"
2 /*
3 utility functions for fast generating json output for the stats subsystem
4 */
5
6 extern struct uwsgi_server uwsgi;
7
uwsgi_stats_new(size_t chunk_size)8 struct uwsgi_stats *uwsgi_stats_new(size_t chunk_size) {
9 struct uwsgi_stats *us = uwsgi_malloc(sizeof(struct uwsgi_stats));
10 us->base = uwsgi_malloc(chunk_size);
11 us->base[0] = '{';
12 us->pos = 1;
13 us->chunk = chunk_size;
14 us->size = chunk_size;
15 us->tabs = 1;
16 us->dirty = 0;
17 us->minified = uwsgi.stats_minified;
18 if (!us->minified) {
19 us->base[1] = '\n';
20 us->pos++;
21 }
22 return us;
23 }
24
uwsgi_stats_symbol(struct uwsgi_stats * us,char sym)25 int uwsgi_stats_symbol(struct uwsgi_stats *us, char sym) {
26 char *ptr = us->base + us->pos;
27 char *watermark = us->base + us->size;
28
29 if (ptr + 1 > watermark) {
30 char *new_base = realloc(us->base, us->size + us->chunk);
31 if (!new_base)
32 return -1;
33 us->base = new_base;
34 us->size += us->chunk;
35 ptr = us->base + us->pos;
36 }
37
38 *ptr = sym;
39 us->pos++;
40 return 0;
41 }
42
uwsgi_stats_symbol_nl(struct uwsgi_stats * us,char sym)43 int uwsgi_stats_symbol_nl(struct uwsgi_stats *us, char sym) {
44 if (uwsgi_stats_symbol(us, sym)) {
45 return -1;
46 }
47 if (us->minified)
48 return 0;
49 return uwsgi_stats_symbol(us, '\n');
50 }
51
52
uwsgi_stats_comma(struct uwsgi_stats * us)53 int uwsgi_stats_comma(struct uwsgi_stats *us) {
54 return uwsgi_stats_symbol_nl(us, ',');
55 }
56
uwsgi_stats_apply_tabs(struct uwsgi_stats * us)57 int uwsgi_stats_apply_tabs(struct uwsgi_stats *us) {
58 if (us->minified)
59 return 0;
60 size_t i;
61 for (i = 0; i < us->tabs; i++) {
62 if (uwsgi_stats_symbol(us, '\t'))
63 return -1;
64 };
65 return 0;
66 }
67
68
uwsgi_stats_object_open(struct uwsgi_stats * us)69 int uwsgi_stats_object_open(struct uwsgi_stats *us) {
70 if (uwsgi_stats_apply_tabs(us))
71 return -1;
72 if (!us->minified)
73 us->tabs++;
74 return uwsgi_stats_symbol_nl(us, '{');
75 }
76
uwsgi_stats_object_close(struct uwsgi_stats * us)77 int uwsgi_stats_object_close(struct uwsgi_stats *us) {
78 if (!us->minified) {
79 if (uwsgi_stats_symbol(us, '\n'))
80 return -1;
81 us->tabs--;
82 if (uwsgi_stats_apply_tabs(us))
83 return -1;
84 }
85 return uwsgi_stats_symbol(us, '}');
86 }
87
uwsgi_stats_list_open(struct uwsgi_stats * us)88 int uwsgi_stats_list_open(struct uwsgi_stats *us) {
89 us->tabs++;
90 return uwsgi_stats_symbol_nl(us, '[');
91 }
92
uwsgi_stats_list_close(struct uwsgi_stats * us)93 int uwsgi_stats_list_close(struct uwsgi_stats *us) {
94 if (!us->minified) {
95 if (uwsgi_stats_symbol(us, '\n'))
96 return -1;
97 us->tabs--;
98 if (uwsgi_stats_apply_tabs(us))
99 return -1;
100 }
101 return uwsgi_stats_symbol(us, ']');
102 }
103
uwsgi_stats_keyval(struct uwsgi_stats * us,char * key,char * value)104 int uwsgi_stats_keyval(struct uwsgi_stats *us, char *key, char *value) {
105
106 if (uwsgi_stats_apply_tabs(us))
107 return -1;
108
109 char *ptr = us->base + us->pos;
110 char *watermark = us->base + us->size;
111 size_t available = watermark - ptr;
112
113 int ret = snprintf(ptr, available, "\"%s\":\"%s\"", key, value);
114 if (ret <= 0)
115 return -1;
116 while (ret >= (int) available) {
117 char *new_base = realloc(us->base, us->size + us->chunk);
118 if (!new_base)
119 return -1;
120 us->base = new_base;
121 us->size += us->chunk;
122 ptr = us->base + us->pos;
123 watermark = us->base + us->size;
124 available = watermark - ptr;
125 ret = snprintf(ptr, available, "\"%s\":\"%s\"", key, value);
126 if (ret <= 0)
127 return -1;
128 }
129
130 us->pos += ret;
131 return 0;
132
133 }
134
uwsgi_stats_keyval_comma(struct uwsgi_stats * us,char * key,char * value)135 int uwsgi_stats_keyval_comma(struct uwsgi_stats *us, char *key, char *value) {
136 int ret = uwsgi_stats_keyval(us, key, value);
137 if (ret)
138 return -1;
139 return uwsgi_stats_comma(us);
140 }
141
uwsgi_stats_keyvalnum(struct uwsgi_stats * us,char * key,char * value,unsigned long long num)142 int uwsgi_stats_keyvalnum(struct uwsgi_stats *us, char *key, char *value, unsigned long long num) {
143
144 if (uwsgi_stats_apply_tabs(us))
145 return -1;
146
147 char *ptr = us->base + us->pos;
148 char *watermark = us->base + us->size;
149 size_t available = watermark - ptr;
150
151 int ret = snprintf(ptr, available, "\"%s\":\"%s%llu\"", key, value, num);
152 if (ret <= 0)
153 return -1;
154 while (ret >= (int) available) {
155 char *new_base = realloc(us->base, us->size + us->chunk);
156 if (!new_base)
157 return -1;
158 us->base = new_base;
159 us->size += us->chunk;
160 ptr = us->base + us->pos;
161 watermark = us->base + us->size;
162 available = watermark - ptr;
163 ret = snprintf(ptr, available, "\"%s\":\"%s%llu\"", key, value, num);
164 if (ret <= 0)
165 return -1;
166 }
167
168 us->pos += ret;
169 return 0;
170
171 }
172
uwsgi_stats_keyvalnum_comma(struct uwsgi_stats * us,char * key,char * value,unsigned long long num)173 int uwsgi_stats_keyvalnum_comma(struct uwsgi_stats *us, char *key, char *value, unsigned long long num) {
174 int ret = uwsgi_stats_keyvalnum(us, key, value, num);
175 if (ret)
176 return -1;
177 return uwsgi_stats_comma(us);
178 }
179
180
uwsgi_stats_keyvaln(struct uwsgi_stats * us,char * key,char * value,int vallen)181 int uwsgi_stats_keyvaln(struct uwsgi_stats *us, char *key, char *value, int vallen) {
182
183 if (uwsgi_stats_apply_tabs(us))
184 return -1;
185
186 char *ptr = us->base + us->pos;
187 char *watermark = us->base + us->size;
188 size_t available = watermark - ptr;
189
190 int ret = snprintf(ptr, available, "\"%s\":\"%.*s\"", key, vallen, value);
191 if (ret <= 0)
192 return -1;
193 while (ret >= (int) available) {
194 char *new_base = realloc(us->base, us->size + us->chunk);
195 if (!new_base)
196 return -1;
197 us->base = new_base;
198 us->size += us->chunk;
199 ptr = us->base + us->pos;
200 watermark = us->base + us->size;
201 available = watermark - ptr;
202 ret = snprintf(ptr, available, "\"%s\":\"%.*s\"", key, vallen, value);
203 if (ret <= 0)
204 return -1;
205 }
206
207 us->pos += ret;
208 return 0;
209
210 }
211
uwsgi_stats_keyvaln_comma(struct uwsgi_stats * us,char * key,char * value,int vallen)212 int uwsgi_stats_keyvaln_comma(struct uwsgi_stats *us, char *key, char *value, int vallen) {
213 int ret = uwsgi_stats_keyvaln(us, key, value, vallen);
214 if (ret)
215 return -1;
216 return uwsgi_stats_comma(us);
217 }
218
219
uwsgi_stats_key(struct uwsgi_stats * us,char * key)220 int uwsgi_stats_key(struct uwsgi_stats *us, char *key) {
221
222 if (uwsgi_stats_apply_tabs(us))
223 return -1;
224
225 char *ptr = us->base + us->pos;
226 char *watermark = us->base + us->size;
227 size_t available = watermark - ptr;
228
229 int ret = snprintf(ptr, available, "\"%s\":", key);
230 if (ret <= 0)
231 return -1;
232 while (ret >= (int) available) {
233 char *new_base = realloc(us->base, us->size + us->chunk);
234 if (!new_base)
235 return -1;
236 us->base = new_base;
237 us->size += us->chunk;
238 ptr = us->base + us->pos;
239 watermark = us->base + us->size;
240 available = watermark - ptr;
241 ret = snprintf(ptr, available, "\"%s\":", key);
242 if (ret <= 0)
243 return -1;
244 }
245
246 us->pos += ret;
247 return 0;
248 }
249
uwsgi_stats_str(struct uwsgi_stats * us,char * str)250 int uwsgi_stats_str(struct uwsgi_stats *us, char *str) {
251
252 char *ptr = us->base + us->pos;
253 char *watermark = us->base + us->size;
254 size_t available = watermark - ptr;
255
256 int ret = snprintf(ptr, available, "\"%s\"", str);
257 if (ret <= 0)
258 return -1;
259 while (ret >= (int) available) {
260 char *new_base = realloc(us->base, us->size + us->chunk);
261 if (!new_base)
262 return -1;
263 us->base = new_base;
264 us->size += us->chunk;
265 ptr = us->base + us->pos;
266 watermark = us->base + us->size;
267 available = watermark - ptr;
268 ret = snprintf(ptr, available, "\"%s\"", str);
269 if (ret <= 0)
270 return -1;
271 }
272
273 us->pos += ret;
274 return 0;
275 }
276
277
278
279
uwsgi_stats_keylong(struct uwsgi_stats * us,char * key,unsigned long long num)280 int uwsgi_stats_keylong(struct uwsgi_stats *us, char *key, unsigned long long num) {
281
282 if (uwsgi_stats_apply_tabs(us))
283 return -1;
284
285 char *ptr = us->base + us->pos;
286 char *watermark = us->base + us->size;
287 size_t available = watermark - ptr;
288
289 int ret = snprintf(ptr, available, "\"%s\":%llu", key, num);
290 if (ret <= 0)
291 return -1;
292 while (ret >= (int) available) {
293 char *new_base = realloc(us->base, us->size + us->chunk);
294 if (!new_base)
295 return -1;
296 us->base = new_base;
297 us->size += us->chunk;
298 ptr = us->base + us->pos;
299 watermark = us->base + us->size;
300 available = watermark - ptr;
301 ret = snprintf(ptr, available, "\"%s\":%llu", key, num);
302 if (ret <= 0)
303 return -1;
304 }
305
306 us->pos += ret;
307 return 0;
308 }
309
310
uwsgi_stats_keylong_comma(struct uwsgi_stats * us,char * key,unsigned long long num)311 int uwsgi_stats_keylong_comma(struct uwsgi_stats *us, char *key, unsigned long long num) {
312 int ret = uwsgi_stats_keylong(us, key, num);
313 if (ret)
314 return -1;
315 return uwsgi_stats_comma(us);
316 }
317
uwsgi_stats_keyslong(struct uwsgi_stats * us,char * key,long long num)318 int uwsgi_stats_keyslong(struct uwsgi_stats *us, char *key, long long num) {
319
320 if (uwsgi_stats_apply_tabs(us))
321 return -1;
322
323 char *ptr = us->base + us->pos;
324 char *watermark = us->base + us->size;
325 size_t available = watermark - ptr;
326
327 int ret = snprintf(ptr, available, "\"%s\":%lld", key, num);
328 if (ret <= 0)
329 return -1;
330 while (ret >= (int) available) {
331 char *new_base = realloc(us->base, us->size + us->chunk);
332 if (!new_base)
333 return -1;
334 us->base = new_base;
335 us->size += us->chunk;
336 ptr = us->base + us->pos;
337 watermark = us->base + us->size;
338 available = watermark - ptr;
339 ret = snprintf(ptr, available, "\"%s\":%lld", key, num);
340 if (ret <= 0)
341 return -1;
342 }
343
344 us->pos += ret;
345 return 0;
346 }
347
348
uwsgi_stats_keyslong_comma(struct uwsgi_stats * us,char * key,long long num)349 int uwsgi_stats_keyslong_comma(struct uwsgi_stats *us, char *key, long long num) {
350 int ret = uwsgi_stats_keyslong(us, key, num);
351 if (ret)
352 return -1;
353 return uwsgi_stats_comma(us);
354 }
355
356
uwsgi_send_stats(int fd,struct uwsgi_stats * (* func)(void))357 void uwsgi_send_stats(int fd, struct uwsgi_stats *(*func) (void)) {
358
359 struct sockaddr_un client_src;
360 socklen_t client_src_len = 0;
361
362 int client_fd = accept(fd, (struct sockaddr *) &client_src, &client_src_len);
363 if (client_fd < 0) {
364 uwsgi_error("accept()");
365 return;
366 }
367
368 if (uwsgi.stats_http) {
369 if (uwsgi_send_http_stats(client_fd)) {
370 close(client_fd);
371 return;
372 }
373 }
374
375 struct uwsgi_stats *us = func();
376 if (!us)
377 goto end;
378
379 size_t remains = us->pos;
380 off_t pos = 0;
381 while (remains > 0) {
382 int ret = uwsgi_waitfd_write(client_fd, uwsgi.socket_timeout);
383 if (ret <= 0) {
384 goto end0;
385 }
386 ssize_t res = write(client_fd, us->base + pos, remains);
387 if (res <= 0) {
388 if (res < 0) {
389 uwsgi_error("write()");
390 }
391 goto end0;
392 }
393 pos += res;
394 remains -= res;
395 }
396
397 end0:
398 free(us->base);
399 free(us);
400
401 end:
402 close(client_fd);
403 }
404
uwsgi_stats_pusher_get(char * name)405 struct uwsgi_stats_pusher *uwsgi_stats_pusher_get(char *name) {
406 struct uwsgi_stats_pusher *usp = uwsgi.stats_pushers;
407 while (usp) {
408 if (!strcmp(usp->name, name)) {
409 return usp;
410 }
411 usp = usp->next;
412 }
413 return usp;
414 }
415
uwsgi_stats_pusher_add(struct uwsgi_stats_pusher * pusher,char * arg)416 struct uwsgi_stats_pusher_instance *uwsgi_stats_pusher_add(struct uwsgi_stats_pusher *pusher, char *arg) {
417 struct uwsgi_stats_pusher_instance *old_uspi = NULL, *uspi = uwsgi.stats_pusher_instances;
418 while (uspi) {
419 old_uspi = uspi;
420 uspi = uspi->next;
421 }
422
423 uspi = uwsgi_calloc(sizeof(struct uwsgi_stats_pusher_instance));
424 uspi->pusher = pusher;
425 if (arg) {
426 uspi->arg = uwsgi_str(arg);
427 }
428 uspi->raw = pusher->raw;
429 if (old_uspi) {
430 old_uspi->next = uspi;
431 }
432 else {
433 uwsgi.stats_pusher_instances = uspi;
434 }
435
436 return uspi;
437 }
438
uwsgi_stats_pusher_loop(struct uwsgi_thread * ut)439 void uwsgi_stats_pusher_loop(struct uwsgi_thread *ut) {
440 void *events = event_queue_alloc(1);
441 for (;;) {
442 int nevents = event_queue_wait_multi(ut->queue, 1, events, 1);
443 if (nevents < 0) {
444 if (errno == EINTR) continue;
445 uwsgi_log_verbose("ending the stats pusher thread...\n");
446 return;
447 }
448 if (nevents > 0) {
449 int interesting_fd = event_queue_interesting_fd(events, 0);
450 char buf[4096];
451 ssize_t len = read(interesting_fd, buf, 4096);
452 if (len <= 0) {
453 uwsgi_log("[uwsgi-stats-pusher] goodbye...\n");
454 return;
455 }
456 uwsgi_log("[uwsgi-stats-pusher] message received from master: %.*s\n", (int) len, buf);
457 continue;
458 }
459
460 time_t now = uwsgi_now();
461 struct uwsgi_stats_pusher_instance *uspi = uwsgi.stats_pusher_instances;
462 struct uwsgi_stats *us = NULL;
463 while (uspi) {
464 int delta = uspi->freq ? uspi->freq : uwsgi.stats_pusher_default_freq;
465 if (((uspi->last_run + delta) <= now) || (uspi->needs_retry && (uspi->next_retry <= now))) {
466 if (uspi->needs_retry) uspi->retries++;
467 if (uspi->raw) {
468 uspi->pusher->func(uspi, now, NULL, 0);
469 }
470 else {
471 if (!us) {
472 us = uwsgi_master_generate_stats();
473 if (!us)
474 goto next;
475 }
476 uspi->pusher->func(uspi, now, us->base, us->pos);
477 }
478 uspi->last_run = now;
479 if (uspi->needs_retry && uspi->max_retries > 0 && uspi->retries < uspi->max_retries) {
480 uwsgi_log("[uwsgi-stats-pusher] %s failed (%d), retry in %ds\n", uspi->pusher->name, uspi->retries, uspi->retry_delay);
481 uspi->next_retry = now + uspi->retry_delay;
482 } else if (uspi->needs_retry && uspi->retries >= uspi->max_retries) {
483 uwsgi_log("[uwsgi-stats-pusher] %s failed and maximum number of retries was reached (%d)\n", uspi->pusher->name, uspi->retries);
484 uspi->needs_retry = 0;
485 uspi->retries = 0;
486 } else if (uspi->retries) {
487 uwsgi_log("[uwsgi-stats-pusher] retry succeeded for %s\n", uspi->pusher->name);
488 uspi->retries = 0;
489 }
490 }
491 next:
492 uspi = uspi->next;
493 }
494
495 if (us) {
496 free(us->base);
497 free(us);
498 }
499 }
500 }
501
uwsgi_stats_pusher_setup()502 void uwsgi_stats_pusher_setup() {
503 struct uwsgi_string_list *usl = uwsgi.requested_stats_pushers;
504 while (usl) {
505 char *ssp = uwsgi_str(usl->value);
506 struct uwsgi_stats_pusher *pusher = NULL;
507 char *colon = strchr(ssp, ':');
508 if (colon) {
509 *colon = 0;
510 }
511 pusher = uwsgi_stats_pusher_get(ssp);
512 if (!pusher) {
513 uwsgi_log("unable to find \"%s\" stats_pusher\n", ssp);
514 free(ssp);
515 exit(1);
516 }
517 char *arg = NULL;
518 if (colon) {
519 arg = colon + 1;
520 *colon = ':';
521 }
522 uwsgi_stats_pusher_add(pusher, arg);
523 usl = usl->next;
524 free(ssp);
525 }
526 }
527
uwsgi_register_stats_pusher(char * name,void (* func)(struct uwsgi_stats_pusher_instance *,time_t,char *,size_t))528 struct uwsgi_stats_pusher *uwsgi_register_stats_pusher(char *name, void (*func) (struct uwsgi_stats_pusher_instance *, time_t, char *, size_t)) {
529
530 struct uwsgi_stats_pusher *pusher = uwsgi.stats_pushers, *old_pusher = NULL;
531
532 while (pusher) {
533 old_pusher = pusher;
534 pusher = pusher->next;
535 }
536
537 pusher = uwsgi_calloc(sizeof(struct uwsgi_stats_pusher));
538 pusher->name = name;
539 pusher->func = func;
540
541 if (old_pusher) {
542 old_pusher->next = pusher;
543 }
544 else {
545 uwsgi.stats_pushers = pusher;
546 }
547
548 return pusher;
549 }
550
stats_dump_var(char * k,uint16_t kl,char * v,uint16_t vl,void * data)551 static void stats_dump_var(char *k, uint16_t kl, char *v, uint16_t vl, void *data) {
552 struct uwsgi_stats *us = (struct uwsgi_stats *) data;
553 if (us->dirty) return;
554 char *var = uwsgi_concat3n(k, kl, "=", 1, v,vl);
555 char *escaped_var = uwsgi_malloc(((kl+vl+1)*2)+1);
556 escape_json(var, strlen(var), escaped_var);
557 free(var);
558 if (uwsgi_stats_str(us, escaped_var)) {
559 us->dirty = 1;
560 free(escaped_var);
561 return;
562 }
563 free(escaped_var);
564 if (uwsgi_stats_comma(us)) {
565 us->dirty = 1;
566 }
567 return;
568 }
569
uwsgi_stats_dump_vars(struct uwsgi_stats * us,struct uwsgi_core * uc)570 int uwsgi_stats_dump_vars(struct uwsgi_stats *us, struct uwsgi_core *uc) {
571 if (!uc->in_request) return 0;
572 struct uwsgi_header *uh = (struct uwsgi_header *) uc->buffer;
573 uint16_t pktsize = uh->pktsize;
574 if (!pktsize) return 0;
575 char *dst = uwsgi.workers[0].cores[0].buffer;
576 memcpy(dst, uc->buffer+4, uwsgi.buffer_size);
577 // ok now check if something changed...
578 if (!uc->in_request) return 0;
579 if (uh->pktsize != pktsize) return 0;
580 if (memcmp(dst, uc->buffer+4, uwsgi.buffer_size)) return 0;
581 // nothing changed let's dump vars
582 int ret = uwsgi_hooked_parse(dst, pktsize, stats_dump_var, us);
583 if (ret) return -1;
584 if (us->dirty) return -1;
585 if (uwsgi_stats_str(us, "")) return -1;
586 return 0;
587 }
588
uwsgi_stats_dump_request(struct uwsgi_stats * us,struct uwsgi_core * uc)589 int uwsgi_stats_dump_request(struct uwsgi_stats *us, struct uwsgi_core *uc) {
590 if (!uc->in_request) return 0;
591 struct wsgi_request req = uc->req;
592 uwsgi_stats_keylong(us, "request_start", req.start_of_request_in_sec);
593
594 return 0;
595 }
596