1 /* Icecast
2 *
3 * This program is distributed under the GNU General Public License, version 2.
4 * A copy of this license is included with this source.
5 *
6 * Copyright 2000-2013, Jack Moffitt <jack@xiph.org>,
7 * Michael Smith <msmith@xiph.org>,
8 * oddsock <oddsock@xiph.org>,
9 * Karl Heyes <karl@xiph.org>
10 * and others (see AUTHORS for details).
11 *
12 * Copyright 2000-2020, Karl Heyes <karl@kheyes.plus.com>
13 *
14 */
15
16 /* client.c
17 **
18 ** client interface implementation
19 **
20 */
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <stdlib.h>
27 #include <string.h>
28 #include <fcntl.h>
29 #include <errno.h>
30
31 #include "thread/thread.h"
32 #include "avl/avl.h"
33 #include "httpp/httpp.h"
34 #include "timing/timing.h"
35
36 #include "client.h"
37 #include "cfgfile.h"
38 #include "connection.h"
39 #include "refbuf.h"
40 #include "format.h"
41 #include "stats.h"
42 #include "fserve.h"
43
44 #include "client.h"
45 #include "logging.h"
46 #include "slave.h"
47 #include "global.h"
48 #include "util.h"
49
50 #undef CATMODULE
51 #define CATMODULE "client"
52
53 int worker_count = 0, worker_min_count;
54 worker_t *worker_balance_to_check, *worker_least_used, *worker_incoming = NULL;
55
56 FD_t logger_fd[2];
57
58 static void logger_commits (int id);
59
60
client_register(client_t * client)61 void client_register (client_t *client)
62 {
63 if (client)
64 global.clients++;
65 }
66
67
client_keepalive_header(client_t * client)68 const char *client_keepalive_header (client_t *client)
69 {
70 return (client->flags & CLIENT_KEEPALIVE) ? "Connection: Keep-Alive" : "Connection: Close";
71 }
72
73
74 /* verify that the socket is still connected. */
client_connected(client_t * client)75 int client_connected (client_t *client)
76 {
77 int ret = 1;
78 if (client)
79 {
80 if (sock_active (client->connection.sock) == 0)
81 ret = 0;
82 }
83 return ret;
84 }
85
86
client_destroy(client_t * client)87 void client_destroy(client_t *client)
88 {
89 if (client == NULL)
90 return;
91
92 if (client->worker)
93 {
94 WARN0 ("client still on worker thread");
95 return;
96 }
97 /* release the buffer now, as the buffer could be on the source queue
98 * and may of disappeared after auth completes */
99 if (client->refbuf)
100 {
101 refbuf_release (client->refbuf);
102 client->refbuf = NULL;
103 }
104
105 if (client->flags & CLIENT_AUTHENTICATED)
106 DEBUG1 ("client still in auth \"%s\"", httpp_getvar (client->parser, HTTPP_VAR_URI));
107
108 /* write log entry if ip is set (some things don't set it, like outgoing
109 * slave requests
110 */
111 if (client->respcode > 0 && client->parser)
112 logging_access(client);
113
114 if (client->flags & CLIENT_IP_BAN_LIFT)
115 {
116 INFO1 ("lifting IP ban on client at %s", client->connection.ip);
117 connection_release_banned_ip (client->connection.ip);
118 client->flags &= ~CLIENT_IP_BAN_LIFT;
119 }
120
121 if (client->parser)
122 httpp_destroy (client->parser);
123
124 /* we need to free client specific format data (if any) */
125 if (client->free_client_data)
126 client->free_client_data (client);
127
128 free(client->username);
129 free(client->password);
130 client->username = NULL;
131 client->password = NULL;
132 client->parser = NULL;
133 client->respcode = 0;
134 client->free_client_data = NULL;
135
136 if (not_ssl_connection (&client->connection))
137 sock_set_cork (client->connection.sock, 0); // ensure any corked data is actually sent.
138
139 global_lock ();
140 if (global.running != ICE_RUNNING || client->connection.error ||
141 (client->flags & CLIENT_KEEPALIVE) == 0 || client_connected (client) == 0)
142 {
143 global.clients--;
144 config_clear_listener (client->server_conn);
145 global_unlock ();
146 connection_close (&client->connection);
147
148 free(client);
149 return;
150 }
151 global_unlock ();
152 DEBUG1 ("keepalive detected on %s, placing back onto worker", client->connection.ip);
153 if (not_ssl_connection (&client->connection))
154 sock_set_cork (client->connection.sock, 1); // reenable cork for the next go around
155 client->counter = client->schedule_ms = timing_get_time();
156 connection_reset (&client->connection, client->schedule_ms);
157
158 client->ops = &http_request_ops;
159 client->flags = CLIENT_ACTIVE;
160 client->shared_data = NULL;
161 client->refbuf = NULL;
162 client->pos = 0;
163 client->intro_offset = 0;
164 client_add_incoming (client);
165 }
166
167
client_compare(void * compare_arg,void * a,void * b)168 int client_compare (void *compare_arg, void *a, void *b)
169 {
170 client_t *ca = a, *cb = b;
171
172 if (ca->connection.id < cb->connection.id) return -1;
173 if (ca->connection.id > cb->connection.id) return 1;
174
175 return 0;
176 }
177
178
179 /* helper function for reading data from a client */
client_read_bytes(client_t * client,void * buf,unsigned len)180 int client_read_bytes (client_t *client, void *buf, unsigned len)
181 {
182 int (*con_read)(struct connection_tag *handle, void *buf, size_t len) = connection_read;
183 int bytes;
184
185 if (len == 0)
186 return 0;
187 if (client->refbuf && client->pos < client->refbuf->len)
188 {
189 unsigned remaining = client->refbuf->len - client->pos;
190 if (remaining > len)
191 remaining = len;
192 memcpy (buf, client->refbuf->data + client->pos, remaining);
193 client->pos += remaining;
194 if (client->pos >= client->refbuf->len)
195 client_set_queue (client, NULL);
196 return remaining;
197 }
198 #ifdef HAVE_OPENSSL
199 if (client->connection.ssl)
200 con_read = connection_read_ssl;
201 #endif
202 bytes = con_read (&client->connection, buf, len);
203
204 if (bytes == -1 && client->connection.error)
205 DEBUG2 ("reading from connection %"PRIu64 " from %s has failed", client->connection.id, &client->connection.ip[0]);
206
207 return bytes;
208 }
209
210
client_send_302(client_t * client,const char * location)211 int client_send_302(client_t *client, const char *location)
212 {
213 int len;
214 char body [4096];
215
216 client_set_queue (client, NULL);
217 client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
218 len = snprintf (body, sizeof body, "Moved <a href=\"%s\">here</a>\r\n", location);
219 len = snprintf (client->refbuf->data, PER_CLIENT_REFBUF_SIZE,
220 "HTTP/1.0 302 Temporarily Moved\r\n"
221 "Content-Type: text/html\r\n"
222 "Content-Length: %d\r\n"
223 "Location: %s\r\n\r\n%s",
224 len, location, body);
225 client->respcode = 302;
226 client->flags &= ~CLIENT_KEEPALIVE;
227 client->refbuf->len = len;
228 return fserve_setup_client (client);
229 }
230
231
client_send_400(client_t * client,const char * message)232 int client_send_400(client_t *client, const char *message)
233 {
234 client_set_queue (client, NULL);
235 client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
236 snprintf (client->refbuf->data, PER_CLIENT_REFBUF_SIZE,
237 "HTTP/1.0 400 Bad Request\r\n"
238 "Content-Type: text/html\r\n\r\n"
239 "<b>%s</b>\r\n", message?message:"");
240 client->respcode = 400;
241 client->flags &= ~CLIENT_KEEPALIVE;
242 client->refbuf->len = strlen (client->refbuf->data);
243 return fserve_setup_client (client);
244 }
245
246
client_send_401(client_t * client,const char * realm)247 int client_send_401 (client_t *client, const char *realm)
248 {
249 ice_config_t *config = config_get_config ();
250
251 if (realm == NULL)
252 realm = config->server_id;
253
254 client_set_queue (client, NULL);
255 client->refbuf = refbuf_new (500);
256 snprintf (client->refbuf->data, 500,
257 "HTTP/1.0 401 Authentication Required\r\n"
258 "WWW-Authenticate: Basic realm=\"%s\"\r\n"
259 "\r\n"
260 "You need to authenticate\r\n", realm);
261 config_release_config();
262 client->respcode = 401;
263 client->flags &= ~CLIENT_KEEPALIVE;
264 client->refbuf->len = strlen (client->refbuf->data);
265 return fserve_setup_client (client);
266 }
267
268
client_send_403(client_t * client,const char * reason)269 int client_send_403(client_t *client, const char *reason)
270 {
271 if (reason == NULL)
272 reason = "Forbidden";
273 client_set_queue (client, NULL);
274 client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
275 snprintf (client->refbuf->data, PER_CLIENT_REFBUF_SIZE,
276 "HTTP/1.0 403 %s\r\n"
277 "Content-Type: text/html\r\n\r\n", reason);
278 client->respcode = 403;
279 client->flags &= ~CLIENT_KEEPALIVE;
280 client->refbuf->len = strlen (client->refbuf->data);
281 return fserve_setup_client (client);
282 }
283
284
client_send_403redirect(client_t * client,const char * mount,const char * reason)285 int client_send_403redirect (client_t *client, const char *mount, const char *reason)
286 {
287 if (redirect_client (mount, client))
288 return 0;
289 return client_send_403 (client, reason);
290 }
291
292
client_send_404(client_t * client,const char * message)293 int client_send_404 (client_t *client, const char *message)
294 {
295 int ret = -1;
296
297 if (client->worker == NULL) /* client is not on any worker now */
298 {
299 client_destroy (client);
300 return 0;
301 }
302 client_set_queue (client, NULL);
303 if (client->respcode || client->connection.error)
304 {
305 worker_t *worker = client->worker;
306 if (client->respcode >= 300)
307 client->flags = client->flags & ~CLIENT_AUTHENTICATED;
308 client->flags |= CLIENT_ACTIVE;
309 worker_wakeup (worker);
310 }
311 else
312 {
313 if (client->parser->req_type == httpp_req_head || message == NULL)
314 message = "Not Available";
315 ret = strlen (message);
316 client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
317 snprintf (client->refbuf->data, PER_CLIENT_REFBUF_SIZE,
318 "HTTP/1.0 404 Not Available\r\n"
319 "%s\r\nContent-Length: %d\r\nContent-Type: text/html\r\n\r\n"
320 "%s", client_keepalive_header (client), ret,
321 message ? message: "");
322 client->respcode = 404;
323 client->refbuf->len = strlen (client->refbuf->data);
324 ret = fserve_setup_client (client);
325 }
326 return ret;
327 }
328
329
client_send_416(client_t * client)330 int client_send_416(client_t *client)
331 {
332 client_set_queue (client, NULL);
333 client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
334 snprintf (client->refbuf->data, PER_CLIENT_REFBUF_SIZE,
335 "HTTP/1.0 416 Request Range Not Satisfiable\r\n\r\n");
336 client->respcode = 416;
337 client->refbuf->len = strlen (client->refbuf->data);
338 return fserve_setup_client (client);
339 }
340
341
client_send_501(client_t * client)342 int client_send_501(client_t *client)
343 {
344 client_set_queue (client, NULL);
345 client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
346 snprintf (client->refbuf->data, PER_CLIENT_REFBUF_SIZE,
347 "HTTP/1.0 501 Not Implemented\r\n\r\n");
348 client->respcode = 501;
349 client->refbuf->len = strlen (client->refbuf->data);
350 return fserve_setup_client (client);
351 }
352
353
client_add_cors(client_t * client,char * buf,int remain)354 int client_add_cors (client_t *client, char *buf, int remain)
355 {
356 int bytes = 0;
357 const char *cred = "", *origin = httpp_getvar (client->parser, "origin");
358 if (origin)
359 cred = "Access-Control-Allow-Credentials: true\r\n";
360 else
361 origin = "*";
362
363 bytes = snprintf (buf, remain,
364 "Access-Control-Allow-Origin: %s\r\n%s"
365 "Access-Control-Allow-Headers: Origin, Accept, X-Requested-With, Content-Type, Icy-MetaData\r\n"
366 "Access-Control-Allow-Methods: GET, OPTIONS, SOURCE, PUT, HEAD, STATS\r\n\r\n",
367 origin, cred);
368 return bytes;
369 }
370
371
client_send_options(client_t * client)372 int client_send_options(client_t *client)
373 {
374 client_set_queue (client, NULL);
375 client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
376 char *ptr = client->refbuf->data;
377 int bytes = snprintf (ptr, PER_CLIENT_REFBUF_SIZE,
378 "HTTP/1.1 200 OK\r\n"
379 "Connection: Keep-alive\r\n");
380 client_add_cors (client, ptr+bytes, PER_CLIENT_REFBUF_SIZE-bytes);
381 client->respcode = 200;
382 client->refbuf->len = strlen (client->refbuf->data);
383 return fserve_setup_client (client);
384 }
385
386
387 /* helper function for sending the data to a client */
client_send_bytes(client_t * client,const void * buf,unsigned len)388 int client_send_bytes (client_t *client, const void *buf, unsigned len)
389 {
390 int (*con_send)(struct connection_tag *handle, const void *buf, size_t len) = connection_send;
391 int ret;
392 #ifdef HAVE_OPENSSL
393 if (client->connection.ssl)
394 con_send = connection_send_ssl;
395 #endif
396 ret = con_send (&client->connection, buf, len);
397
398 if (client->connection.error)
399 DEBUG3 ("Client %"PRIu64 " connection on %s from %s died", client->connection.id, (client->mount ? client->mount:"unknown"), &client->connection.ip[0]);
400
401 return ret;
402 }
403
404
client_send_buffer(client_t * client)405 static int client_send_buffer (client_t *client)
406 {
407 const char *buf = client->refbuf->data + client->pos;
408 int len = client->refbuf->len - client->pos;
409 int ret = client_send_bytes (client, buf, len);
410
411 if (ret > 0)
412 client->pos += ret;
413 if (client->connection.error == 0 && client->pos >= client->refbuf->len)
414 {
415 int (*callback)(client_t *) = client->format_data;
416 return callback (client);
417 }
418 return ret;
419 }
420
421
422 struct _client_functions client_buffer_ops =
423 {
424 client_send_buffer,
425 client_destroy
426 };
427
428
client_send_buffer_callback(client_t * client,int (* callback)(client_t *))429 int client_send_buffer_callback (client_t *client, int(*callback)(client_t*))
430 {
431 client->format_data = callback;
432 client->ops = &client_buffer_ops;
433 return 0;
434 }
435
436
client_set_queue(client_t * client,refbuf_t * refbuf)437 void client_set_queue (client_t *client, refbuf_t *refbuf)
438 {
439 refbuf_t *next, *to_release = client->refbuf;
440
441 while (to_release)
442 {
443 if (to_release->flags & REFBUF_SHARED)
444 {
445 ERROR1 ("content has a shared flag status for %s", client->connection.ip);
446 break;
447 }
448 next = to_release->next;
449 to_release->next = NULL;
450 refbuf_release (to_release);
451 to_release = next;
452 }
453
454 client->refbuf = refbuf;
455 if (refbuf)
456 refbuf_addref (client->refbuf);
457
458 client->pos = 0;
459 }
460
is_worker_incoming(worker_t * w)461 int is_worker_incoming (worker_t *w)
462 {
463 return (w == worker_incoming) ? 1 : 0;
464 }
465
worker_check_time_ms(worker_t * worker)466 static uint64_t worker_check_time_ms (worker_t *worker)
467 {
468 uint64_t tm = timing_get_time();
469 if (tm - worker->time_ms > 1000 && worker->time_ms)
470 WARN2 ("worker %p has been stuck for %" PRIu64 " ms", worker, (tm - worker->time_ms));
471 return tm;
472 }
473
474
find_least_busy_handler(int log)475 static worker_t *find_least_busy_handler (int log)
476 {
477 worker_t *min = workers;
478 int min_count = INT_MAX;
479
480 if (workers && workers->next)
481 {
482 worker_t *handler = workers;
483
484 while (handler)
485 {
486 thread_spin_lock (&handler->lock);
487 int cur_count = handler->count + handler->pending_count;
488 thread_spin_unlock (&handler->lock);
489
490 if (log) DEBUG2 ("handler %p has %d clients", handler, cur_count);
491 if (cur_count < min_count)
492 {
493 min = handler;
494 min_count = cur_count;
495 }
496 handler = handler->next;
497 }
498 worker_min_count = min_count;
499 }
500 return min;
501 }
502
503
worker_selected(void)504 worker_t *worker_selected (void)
505 {
506 return worker_least_used;
507 }
508
509
510 /* worker mutex should be already locked */
worker_add_client(worker_t * worker,client_t * client)511 static void worker_add_client (worker_t *worker, client_t *client)
512 {
513 ++worker->pending_count;
514 client->next_on_worker = NULL;
515 *worker->pending_clients_tail = client;
516 worker->pending_clients_tail = &client->next_on_worker;
517 client->worker = worker;
518 }
519
520
client_change_worker(client_t * client,worker_t * dest_worker)521 int client_change_worker (client_t *client, worker_t *dest_worker)
522 {
523 if (dest_worker->running == 0)
524 return 0;
525 client->next_on_worker = NULL;
526
527 thread_spin_lock (&dest_worker->lock);
528 worker_add_client (dest_worker, client);
529 thread_spin_unlock (&dest_worker->lock);
530 worker_wakeup (dest_worker);
531
532 return 1;
533 }
534
535
client_add_worker(client_t * client)536 void client_add_worker (client_t *client)
537 {
538 worker_t *handler;
539
540 thread_rwlock_rlock (&workers_lock);
541 /* add client to the handler with the least number of clients */
542 handler = worker_selected();
543 thread_spin_lock (&handler->lock);
544 thread_rwlock_unlock (&workers_lock);
545
546 worker_add_client (handler, client);
547 thread_spin_unlock (&handler->lock);
548 worker_wakeup (handler);
549 }
550
client_add_incoming(client_t * client)551 void client_add_incoming (client_t *client)
552 {
553 worker_t *handler;
554
555 thread_rwlock_rlock (&workers_lock);
556 handler = worker_incoming;
557 thread_spin_lock (&handler->lock);
558 thread_rwlock_unlock (&workers_lock);
559
560 worker_add_client (handler, client);
561 thread_spin_unlock (&handler->lock);
562 worker_wakeup (handler);
563 }
564
565
566 #ifdef _WIN32
567 #define pipe_create sock_create_pipe_emulation
568 #define pipe_write(A, B, C) send(A, B, C, 0)
569 #define pipe_read(A,B,C) recv(A, B, C, 0)
570 #else
571 #ifdef HAVE_PIPE2
572 #define pipe_create(x) pipe2(x,O_CLOEXEC)
573 #elif defined(FD_CLOEXEC)
pipe_create(FD_t x[])574 int pipe_create(FD_t x[]) {
575 int r = pipe(x); if (r==0) {fcntl(x[0], F_SETFD,FD_CLOEXEC); \
576 fcntl(x[1], F_SETFD,FD_CLOEXEC); } return r;
577 }
578 #else
579 #define pipe_create pipe
580 #endif
581 #define pipe_write write
582 #define pipe_read read
583 #endif
584
585
worker_control_create(FD_t wakeup_fd[])586 void worker_control_create (FD_t wakeup_fd[])
587 {
588 if (pipe_create (&wakeup_fd[0]) < 0)
589 {
590 ERROR0 ("pipe failed, descriptor limit?");
591 abort();
592 }
593 sock_set_blocking (wakeup_fd[0], 0);
594 sock_set_blocking (wakeup_fd[1], 0);
595 }
596
597
worker_add_pending_clients(worker_t * worker)598 static client_t **worker_add_pending_clients (worker_t *worker)
599 {
600 thread_spin_lock (&worker->lock);
601 if (worker->pending_clients)
602 {
603 unsigned count;
604 client_t **p;
605
606 p = worker->last_p;
607 *worker->last_p = worker->pending_clients;
608 worker->last_p = worker->pending_clients_tail;
609 worker->count += worker->pending_count;
610 count = worker->pending_count;
611 worker->pending_clients = NULL;
612 worker->pending_clients_tail = &worker->pending_clients;
613 worker->pending_count = 0;
614 thread_spin_unlock (&worker->lock);
615 DEBUG2 ("Added %d pending clients to %p", count, worker);
616 return p; /* only these new ones scheduled so process from here */
617 }
618 thread_spin_unlock (&worker->lock);
619 worker->wakeup_ms = worker->time_ms + 60000;
620 return &worker->clients;
621 }
622
623
624 // enter with spin lock enabled, exit without
625 //
worker_wait(worker_t * worker)626 static client_t **worker_wait (worker_t *worker)
627 {
628 int ret, duration = 2;
629
630 if (worker->running)
631 {
632 uint64_t tm = worker_check_time_ms (worker);
633 if (worker->wakeup_ms > tm)
634 duration = (int)(worker->wakeup_ms - tm);
635 if (duration > 60000) /* make duration at most 60s */
636 duration = 60000;
637 }
638 thread_spin_unlock (&worker->lock);
639
640 ret = util_timed_wait_for_fd (worker->wakeup_fd[0], duration);
641 if (ret > 0) /* may of been several wakeup attempts */
642 {
643 char ca[100];
644 do
645 {
646 ret = pipe_read (worker->wakeup_fd[0], ca, sizeof ca);
647 if (ret > 0)
648 break;
649 if (ret < 0 && sock_recoverable (sock_error()))
650 break;
651 sock_close (worker->wakeup_fd[1]);
652 sock_close (worker->wakeup_fd[0]);
653 worker_control_create (&worker->wakeup_fd[0]);
654 worker_wakeup (worker);
655 WARN0 ("Had to recreate worker control feed");
656 } while (1);
657 }
658
659 worker->time_ms = timing_get_time();
660 worker->current_time.tv_sec = (time_t)(worker->time_ms/1000);
661
662 return worker_add_pending_clients (worker);
663 }
664
665
worker_relocate_clients(worker_t * worker)666 static void worker_relocate_clients (worker_t *worker)
667 {
668 if (workers == NULL)
669 return;
670 while (worker->count || worker->pending_count)
671 {
672 client_t *client = worker->clients, **prevp = &worker->clients;
673
674 worker->wakeup_ms = worker->time_ms + 150;
675 worker->current_time.tv_sec = (time_t)(worker->time_ms/1000);
676 while (client)
677 {
678 if (client->flags & CLIENT_ACTIVE)
679 {
680 client->worker = workers;
681 prevp = &client->next_on_worker;
682 }
683 else
684 {
685 *prevp = client->next_on_worker;
686 worker_add_client (worker, client);
687 worker->count--;
688 }
689 client = *prevp;
690 }
691 if (worker->clients)
692 {
693 thread_spin_lock (&workers->lock);
694 *workers->pending_clients_tail = worker->clients;
695 workers->pending_clients_tail = prevp;
696 workers->pending_count += worker->count;
697 thread_spin_unlock (&workers->lock);
698 worker_wakeup (workers);
699 worker->clients = NULL;
700 worker->last_p = &worker->clients;
701 worker->count = 0;
702 }
703 thread_spin_lock (&worker->lock);
704 worker_wait (worker);
705 }
706 }
707
worker(void * arg)708 void *worker (void *arg)
709 {
710 worker_t *worker = arg;
711 long prev_count = -1;
712 client_t **prevp = &worker->clients;
713 uint64_t c = 0;
714
715 thread_rwlock_rlock (&global.workers_rw);
716 worker->running = 1;
717 worker->wakeup_ms = (int64_t)0;
718 worker->time_ms = timing_get_time();
719
720 while (1)
721 {
722 client_t *client = *prevp;
723 uint64_t sched_ms = worker->time_ms + 12;
724
725 c = 0;
726 while (client)
727 {
728 if (client->worker != worker) abort();
729 /* process client details but skip those that are not ready yet */
730 if (client->flags & CLIENT_ACTIVE)
731 {
732 int ret = 0;
733 client_t *nx = client->next_on_worker;
734
735 int process = 1;
736 if (worker->running) // force all active clients to run on worker shutdown
737 {
738 if (client->schedule_ms <= sched_ms)
739 {
740 if (c > 9000 && client->wakeup == NULL)
741 process = 0;
742 }
743 else if (client->wakeup == NULL || *client->wakeup == 0)
744 {
745 process = 0;
746 }
747 }
748
749 if (process)
750 {
751 if ((c & 511) == 0)
752 {
753 // update these periodically to keep in sync
754 worker->time_ms = worker_check_time_ms (worker);
755 worker->current_time.tv_sec = (time_t)(worker->time_ms/1000);
756 }
757 c++;
758 errno = 0;
759 ret = client->ops->process (client);
760 if (ret < 0)
761 {
762 client->worker = NULL;
763 if (client->ops->release)
764 client->ops->release (client);
765 }
766 if (ret)
767 {
768 thread_spin_lock (&worker->lock);
769 worker->count--;
770 if (nx == NULL) /* is this the last client */
771 worker->last_p = prevp;
772 client = *prevp = nx;
773 thread_spin_unlock (&worker->lock);
774 continue;
775 }
776 }
777 if ((client->flags & CLIENT_ACTIVE) && client->schedule_ms < worker->wakeup_ms)
778 worker->wakeup_ms = client->schedule_ms;
779 }
780 prevp = &client->next_on_worker;
781 client = *prevp;
782 }
783 if (prev_count != worker->count)
784 {
785 DEBUG2 ("%p now has %d clients", worker, worker->count);
786 prev_count = worker->count;
787 }
788 thread_spin_lock (&worker->lock);
789 if (worker->running == 0)
790 {
791 if (worker->count == 0 && worker->pending_count == 0)
792 break;
793 }
794 prevp = worker_wait (worker);
795 }
796 thread_spin_unlock (&worker->lock);
797 worker_relocate_clients (worker);
798 INFO0 ("shutting down");
799 thread_rwlock_unlock (&global.workers_rw);
800 return NULL;
801 }
802
803
804 // We pick a worker (consequetive) and set a max number of clients to move if needed
worker_balance_trigger(time_t now)805 void worker_balance_trigger (time_t now)
806 {
807 thread_rwlock_wlock (&workers_lock);
808 if (worker_count > 1)
809 {
810 int log_counts = (now & 15) == 0 ? 1 : 0;
811
812 worker_least_used = find_least_busy_handler (log_counts);
813 if (worker_balance_to_check)
814 {
815 worker_t *w = worker_balance_to_check;
816 // DEBUG2 ("Worker allocations reset on %p, least is %p", w, worker_least_used);
817 thread_spin_lock (&w->lock);
818 w->move_allocations = 200;
819 worker_balance_to_check = w->next;
820 thread_spin_unlock (&w->lock);
821 }
822 if (worker_balance_to_check == NULL)
823 worker_balance_to_check = workers;
824 }
825 thread_rwlock_unlock (&workers_lock);
826 }
827
828
worker_start(void)829 static void worker_start (void)
830 {
831 worker_t *handler = calloc (1, sizeof(worker_t));
832
833 worker_control_create (&handler->wakeup_fd[0]);
834
835 handler->pending_clients_tail = &handler->pending_clients;
836 thread_spin_create (&handler->lock);
837 handler->last_p = &handler->clients;
838
839 thread_rwlock_wlock (&workers_lock);
840 if (worker_incoming == NULL)
841 {
842 worker_incoming = handler;
843 handler->move_allocations = 1000000; // should stay fixed for this one
844 handler->thread = thread_create ("worker", worker, handler, THREAD_ATTACHED);
845 thread_rwlock_unlock (&workers_lock);
846 INFO1 ("starting incoming worker thread %p", worker_incoming);
847 worker_start(); // single level recursion, just get a special worker thread set up
848 return;
849 }
850 handler->next = workers;
851 workers = handler;
852 worker_count++;
853 worker_least_used = worker_balance_to_check = workers;
854 thread_rwlock_unlock (&workers_lock);
855
856 handler->thread = thread_create ("worker", worker, handler, THREAD_ATTACHED);
857 }
858
859
worker_stop(void)860 static void worker_stop (void)
861 {
862 worker_t *handler;
863
864 thread_rwlock_wlock (&workers_lock);
865 do
866 {
867 if (worker_count > 0)
868 {
869 handler = workers;
870 workers = handler->next;
871 worker_least_used = worker_balance_to_check = workers;
872 if (workers)
873 workers->move_allocations = 1000;
874 worker_count--;
875 }
876 else
877 {
878 handler = worker_incoming;
879 worker_incoming = NULL;
880 INFO0 ("stopping incoming worker thread");
881 }
882
883 if (handler)
884 {
885 thread_spin_lock (&handler->lock);
886 handler->running = 0;
887 thread_spin_unlock (&handler->lock);
888
889 worker_wakeup (handler);
890 thread_rwlock_unlock (&workers_lock);
891
892 thread_join (handler->thread);
893 thread_spin_destroy (&handler->lock);
894
895 sock_close (handler->wakeup_fd[1]);
896 sock_close (handler->wakeup_fd[0]);
897 free (handler);
898 thread_rwlock_wlock (&workers_lock);
899 }
900 } while (workers == NULL && worker_incoming);
901 thread_rwlock_unlock (&workers_lock);
902 }
903
904
workers_adjust(int new_count)905 void workers_adjust (int new_count)
906 {
907 INFO1 ("requested worker count %d", new_count);
908 while (worker_count != new_count)
909 {
910 if (worker_count < new_count)
911 worker_start ();
912 else if (worker_count > new_count)
913 worker_stop ();
914 }
915 }
916
917
worker_wakeup(worker_t * worker)918 void worker_wakeup (worker_t *worker)
919 {
920 pipe_write (worker->wakeup_fd[1], "W", 1);
921 }
922
923
logger_commits(int id)924 static void logger_commits (int id)
925 {
926 pipe_write (logger_fd[1], "L", 1);
927 }
928
log_commit_thread(void * arg)929 static void *log_commit_thread (void *arg)
930 {
931 INFO0 ("started");
932 thread_rwlock_rlock (&global.workers_rw);
933 while (1)
934 {
935 int ret = util_timed_wait_for_fd (logger_fd[0], 5000);
936 if (ret == 0)
937 {
938 global_lock();
939 int loop = (global.running == ICE_RUNNING);
940 global_unlock();
941 if (loop) continue;
942 }
943 if (ret > 0)
944 {
945 char cm[80];
946 ret = pipe_read (logger_fd[0], cm, sizeof cm);
947 if (ret > 0)
948 {
949 // fprintf (stderr, "logger woken with %d\n", ret);
950 log_commit_entries ();
951 continue;
952 }
953 }
954 int err = 0;
955 if (ret < 0 && sock_recoverable ((err = sock_error())) && global.running == ICE_RUNNING)
956 continue;
957 sock_close (logger_fd[0]);
958 if (worker_count)
959 {
960 worker_control_create (logger_fd);
961 ERROR1 ("logger received code %d", err);
962 continue;
963 }
964 log_commit_entries ();
965 // fprintf (stderr, "logger closed with zero workers\n");
966 break;
967 }
968 thread_rwlock_unlock (&global.workers_rw);
969 return NULL;
970 }
971
972
worker_logger_init(void)973 void worker_logger_init (void)
974 {
975 worker_control_create (logger_fd);
976 log_set_commit_callback (logger_commits);
977 }
978
worker_logger(int stop)979 void worker_logger (int stop)
980 {
981 if (stop)
982 {
983 logger_commits(0);
984 sock_close (logger_fd[1]);
985 logger_fd[1] = -1;
986 return;
987 }
988 thread_create ("Log Thread", log_commit_thread, NULL, THREAD_DETACHED);
989 }
990
991