1 /* Icecast
2  *
3  * This program is distributed under the GNU General Public License, version 2.
4  * A copy of this license is included with this source.
5  *
6  * Copyright 2000-2013, Jack Moffitt <jack@xiph.org>,
7  *                      Michael Smith <msmith@xiph.org>,
8  *                      oddsock <oddsock@xiph.org>,
9  *                      Karl Heyes <karl@xiph.org>
10  *                      and others (see AUTHORS for details).
11  *
12  * Copyright 2000-2020, Karl Heyes <karl@kheyes.plus.com>
13  *
14  */
15 
16 /* client.c
17 **
18 ** client interface implementation
19 **
20 */
21 
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25 
26 #include <stdlib.h>
27 #include <string.h>
28 #include <fcntl.h>
29 #include <errno.h>
30 
31 #include "thread/thread.h"
32 #include "avl/avl.h"
33 #include "httpp/httpp.h"
34 #include "timing/timing.h"
35 
36 #include "client.h"
37 #include "cfgfile.h"
38 #include "connection.h"
39 #include "refbuf.h"
40 #include "format.h"
41 #include "stats.h"
42 #include "fserve.h"
43 
44 #include "client.h"
45 #include "logging.h"
46 #include "slave.h"
47 #include "global.h"
48 #include "util.h"
49 
50 #undef CATMODULE
51 #define CATMODULE "client"
52 
53 int worker_count = 0, worker_min_count;
54 worker_t *worker_balance_to_check, *worker_least_used, *worker_incoming = NULL;
55 
56 FD_t logger_fd[2];
57 
58 static void logger_commits (int id);
59 
60 
client_register(client_t * client)61 void client_register (client_t *client)
62 {
63     if (client)
64         global.clients++;
65 }
66 
67 
client_keepalive_header(client_t * client)68 const char *client_keepalive_header (client_t *client)
69 {
70     return (client->flags & CLIENT_KEEPALIVE) ?  "Connection: Keep-Alive" : "Connection: Close";
71 }
72 
73 
74 /* verify that the socket is still connected. */
client_connected(client_t * client)75 int client_connected (client_t *client)
76 {
77     int ret = 1;
78     if (client)
79     {
80         if (sock_active (client->connection.sock) == 0)
81             ret = 0;
82     }
83     return ret;
84 }
85 
86 
client_destroy(client_t * client)87 void client_destroy(client_t *client)
88 {
89     if (client == NULL)
90         return;
91 
92     if (client->worker)
93     {
94         WARN0 ("client still on worker thread");
95         return;
96     }
97     /* release the buffer now, as the buffer could be on the source queue
98      * and may of disappeared after auth completes */
99     if (client->refbuf)
100     {
101         refbuf_release (client->refbuf);
102         client->refbuf = NULL;
103     }
104 
105     if (client->flags & CLIENT_AUTHENTICATED)
106         DEBUG1 ("client still in auth \"%s\"", httpp_getvar (client->parser, HTTPP_VAR_URI));
107 
108     /* write log entry if ip is set (some things don't set it, like outgoing
109      * slave requests
110      */
111     if (client->respcode > 0 && client->parser)
112         logging_access(client);
113 
114     if (client->flags & CLIENT_IP_BAN_LIFT)
115     {
116         INFO1 ("lifting IP ban on client at %s", client->connection.ip);
117         connection_release_banned_ip (client->connection.ip);
118         client->flags &= ~CLIENT_IP_BAN_LIFT;
119     }
120 
121     if (client->parser)
122         httpp_destroy (client->parser);
123 
124     /* we need to free client specific format data (if any) */
125     if (client->free_client_data)
126         client->free_client_data (client);
127 
128     free(client->username);
129     free(client->password);
130     client->username = NULL;
131     client->password = NULL;
132     client->parser = NULL;
133     client->respcode = 0;
134     client->free_client_data = NULL;
135 
136     if (not_ssl_connection (&client->connection))
137         sock_set_cork (client->connection.sock, 0); // ensure any corked data is actually sent.
138 
139     global_lock ();
140     if (global.running != ICE_RUNNING || client->connection.error ||
141             (client->flags & CLIENT_KEEPALIVE) == 0 || client_connected (client) == 0)
142     {
143         global.clients--;
144         config_clear_listener (client->server_conn);
145         global_unlock ();
146         connection_close (&client->connection);
147 
148         free(client);
149         return;
150     }
151     global_unlock ();
152     DEBUG1 ("keepalive detected on %s, placing back onto worker", client->connection.ip);
153     if (not_ssl_connection (&client->connection))
154         sock_set_cork (client->connection.sock, 1);    // reenable cork for the next go around
155     client->counter = client->schedule_ms = timing_get_time();
156     connection_reset (&client->connection, client->schedule_ms);
157 
158     client->ops = &http_request_ops;
159     client->flags = CLIENT_ACTIVE;
160     client->shared_data = NULL;
161     client->refbuf = NULL;
162     client->pos = 0;
163     client->intro_offset = 0;
164     client_add_incoming (client);
165 }
166 
167 
client_compare(void * compare_arg,void * a,void * b)168 int client_compare (void *compare_arg, void *a, void *b)
169 {
170     client_t *ca = a, *cb = b;
171 
172     if (ca->connection.id < cb->connection.id) return -1;
173     if (ca->connection.id > cb->connection.id) return 1;
174 
175     return 0;
176 }
177 
178 
179 /* helper function for reading data from a client */
client_read_bytes(client_t * client,void * buf,unsigned len)180 int client_read_bytes (client_t *client, void *buf, unsigned len)
181 {
182     int (*con_read)(struct connection_tag *handle, void *buf, size_t len) = connection_read;
183     int bytes;
184 
185     if (len == 0)
186         return 0;
187     if (client->refbuf && client->pos < client->refbuf->len)
188     {
189         unsigned remaining = client->refbuf->len - client->pos;
190         if (remaining > len)
191             remaining = len;
192         memcpy (buf, client->refbuf->data + client->pos, remaining);
193         client->pos += remaining;
194         if (client->pos >= client->refbuf->len)
195             client_set_queue (client, NULL);
196         return remaining;
197     }
198 #ifdef HAVE_OPENSSL
199     if (client->connection.ssl)
200         con_read = connection_read_ssl;
201 #endif
202     bytes = con_read (&client->connection, buf, len);
203 
204     if (bytes == -1 && client->connection.error)
205         DEBUG2 ("reading from connection %"PRIu64 " from %s has failed", client->connection.id, &client->connection.ip[0]);
206 
207     return bytes;
208 }
209 
210 
client_send_302(client_t * client,const char * location)211 int client_send_302(client_t *client, const char *location)
212 {
213     int len;
214     char body [4096];
215 
216     client_set_queue (client, NULL);
217     client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
218     len = snprintf (body, sizeof body, "Moved <a href=\"%s\">here</a>\r\n", location);
219     len = snprintf (client->refbuf->data, PER_CLIENT_REFBUF_SIZE,
220             "HTTP/1.0 302 Temporarily Moved\r\n"
221             "Content-Type: text/html\r\n"
222             "Content-Length: %d\r\n"
223             "Location: %s\r\n\r\n%s",
224             len, location, body);
225     client->respcode = 302;
226     client->flags &= ~CLIENT_KEEPALIVE;
227     client->refbuf->len = len;
228     return fserve_setup_client (client);
229 }
230 
231 
client_send_400(client_t * client,const char * message)232 int client_send_400(client_t *client, const char *message)
233 {
234     client_set_queue (client, NULL);
235     client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
236     snprintf (client->refbuf->data, PER_CLIENT_REFBUF_SIZE,
237             "HTTP/1.0 400 Bad Request\r\n"
238             "Content-Type: text/html\r\n\r\n"
239             "<b>%s</b>\r\n", message?message:"");
240     client->respcode = 400;
241     client->flags &= ~CLIENT_KEEPALIVE;
242     client->refbuf->len = strlen (client->refbuf->data);
243     return fserve_setup_client (client);
244 }
245 
246 
client_send_401(client_t * client,const char * realm)247 int client_send_401 (client_t *client, const char *realm)
248 {
249     ice_config_t *config = config_get_config ();
250 
251     if (realm == NULL)
252         realm = config->server_id;
253 
254     client_set_queue (client, NULL);
255     client->refbuf = refbuf_new (500);
256     snprintf (client->refbuf->data, 500,
257             "HTTP/1.0 401 Authentication Required\r\n"
258             "WWW-Authenticate: Basic realm=\"%s\"\r\n"
259             "\r\n"
260             "You need to authenticate\r\n", realm);
261     config_release_config();
262     client->respcode = 401;
263     client->flags &= ~CLIENT_KEEPALIVE;
264     client->refbuf->len = strlen (client->refbuf->data);
265     return fserve_setup_client (client);
266 }
267 
268 
client_send_403(client_t * client,const char * reason)269 int client_send_403(client_t *client, const char *reason)
270 {
271     if (reason == NULL)
272         reason = "Forbidden";
273     client_set_queue (client, NULL);
274     client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
275     snprintf (client->refbuf->data, PER_CLIENT_REFBUF_SIZE,
276             "HTTP/1.0 403 %s\r\n"
277             "Content-Type: text/html\r\n\r\n", reason);
278     client->respcode = 403;
279     client->flags &= ~CLIENT_KEEPALIVE;
280     client->refbuf->len = strlen (client->refbuf->data);
281     return fserve_setup_client (client);
282 }
283 
284 
client_send_403redirect(client_t * client,const char * mount,const char * reason)285 int client_send_403redirect (client_t *client, const char *mount, const char *reason)
286 {
287     if (redirect_client (mount, client))
288         return 0;
289     return client_send_403 (client, reason);
290 }
291 
292 
client_send_404(client_t * client,const char * message)293 int client_send_404 (client_t *client, const char *message)
294 {
295     int ret = -1;
296 
297     if (client->worker == NULL)   /* client is not on any worker now */
298     {
299         client_destroy (client);
300         return 0;
301     }
302     client_set_queue (client, NULL);
303     if (client->respcode || client->connection.error)
304     {
305         worker_t *worker = client->worker;
306         if (client->respcode >= 300)
307             client->flags = client->flags & ~CLIENT_AUTHENTICATED;
308         client->flags |= CLIENT_ACTIVE;
309         worker_wakeup (worker);
310     }
311     else
312     {
313         if (client->parser->req_type == httpp_req_head || message == NULL)
314             message = "Not Available";
315         ret = strlen (message);
316         client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
317         snprintf (client->refbuf->data, PER_CLIENT_REFBUF_SIZE,
318                 "HTTP/1.0 404 Not Available\r\n"
319                 "%s\r\nContent-Length: %d\r\nContent-Type: text/html\r\n\r\n"
320                 "%s", client_keepalive_header (client), ret,
321                 message ? message: "");
322         client->respcode = 404;
323         client->refbuf->len = strlen (client->refbuf->data);
324         ret = fserve_setup_client (client);
325     }
326     return ret;
327 }
328 
329 
client_send_416(client_t * client)330 int client_send_416(client_t *client)
331 {
332     client_set_queue (client, NULL);
333     client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
334     snprintf (client->refbuf->data, PER_CLIENT_REFBUF_SIZE,
335             "HTTP/1.0 416 Request Range Not Satisfiable\r\n\r\n");
336     client->respcode = 416;
337     client->refbuf->len = strlen (client->refbuf->data);
338     return fserve_setup_client (client);
339 }
340 
341 
client_send_501(client_t * client)342 int client_send_501(client_t *client)
343 {
344     client_set_queue (client, NULL);
345     client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
346     snprintf (client->refbuf->data, PER_CLIENT_REFBUF_SIZE,
347             "HTTP/1.0 501 Not Implemented\r\n\r\n");
348     client->respcode = 501;
349     client->refbuf->len = strlen (client->refbuf->data);
350     return fserve_setup_client (client);
351 }
352 
353 
client_add_cors(client_t * client,char * buf,int remain)354 int client_add_cors (client_t *client, char *buf, int remain)
355 {
356     int bytes = 0;
357     const char *cred = "", *origin = httpp_getvar (client->parser, "origin");
358     if (origin)
359         cred = "Access-Control-Allow-Credentials: true\r\n";
360     else
361         origin = "*";
362 
363     bytes = snprintf (buf, remain,
364             "Access-Control-Allow-Origin: %s\r\n%s"
365             "Access-Control-Allow-Headers: Origin, Accept, X-Requested-With, Content-Type, Icy-MetaData\r\n"
366             "Access-Control-Allow-Methods: GET, OPTIONS, SOURCE, PUT, HEAD, STATS\r\n\r\n",
367             origin, cred);
368     return bytes;
369 }
370 
371 
client_send_options(client_t * client)372 int client_send_options(client_t *client)
373 {
374     client_set_queue (client, NULL);
375     client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
376     char *ptr = client->refbuf->data;
377     int bytes = snprintf (ptr, PER_CLIENT_REFBUF_SIZE,
378             "HTTP/1.1 200 OK\r\n"
379             "Connection: Keep-alive\r\n");
380     client_add_cors (client, ptr+bytes, PER_CLIENT_REFBUF_SIZE-bytes);
381     client->respcode = 200;
382     client->refbuf->len = strlen (client->refbuf->data);
383     return fserve_setup_client (client);
384 }
385 
386 
387 /* helper function for sending the data to a client */
client_send_bytes(client_t * client,const void * buf,unsigned len)388 int client_send_bytes (client_t *client, const void *buf, unsigned len)
389 {
390     int (*con_send)(struct connection_tag *handle, const void *buf, size_t len) = connection_send;
391     int ret;
392 #ifdef HAVE_OPENSSL
393     if (client->connection.ssl)
394         con_send = connection_send_ssl;
395 #endif
396     ret = con_send (&client->connection, buf, len);
397 
398     if (client->connection.error)
399         DEBUG3 ("Client %"PRIu64 " connection on %s from %s died", client->connection.id, (client->mount ? client->mount:"unknown"), &client->connection.ip[0]);
400 
401     return ret;
402 }
403 
404 
client_send_buffer(client_t * client)405 static int client_send_buffer (client_t *client)
406 {
407     const char *buf = client->refbuf->data + client->pos;
408     int len = client->refbuf->len - client->pos;
409     int ret = client_send_bytes (client, buf, len);
410 
411     if (ret > 0)
412         client->pos += ret;
413     if (client->connection.error == 0 && client->pos >= client->refbuf->len)
414     {
415         int (*callback)(client_t *) = client->format_data;
416         return callback (client);
417     }
418     return ret;
419 }
420 
421 
422 struct _client_functions client_buffer_ops =
423 {
424     client_send_buffer,
425     client_destroy
426 };
427 
428 
client_send_buffer_callback(client_t * client,int (* callback)(client_t *))429 int client_send_buffer_callback (client_t *client, int(*callback)(client_t*))
430 {
431     client->format_data = callback;
432     client->ops = &client_buffer_ops;
433     return 0;
434 }
435 
436 
client_set_queue(client_t * client,refbuf_t * refbuf)437 void client_set_queue (client_t *client, refbuf_t *refbuf)
438 {
439     refbuf_t *next, *to_release = client->refbuf;
440 
441     while (to_release)
442     {
443         if (to_release->flags & REFBUF_SHARED)
444         {
445             ERROR1 ("content has a shared flag status for %s", client->connection.ip);
446             break;
447         }
448         next = to_release->next;
449         to_release->next = NULL;
450         refbuf_release (to_release);
451         to_release = next;
452     }
453 
454     client->refbuf = refbuf;
455     if (refbuf)
456         refbuf_addref (client->refbuf);
457 
458     client->pos = 0;
459 }
460 
is_worker_incoming(worker_t * w)461 int is_worker_incoming (worker_t *w)
462 {
463     return (w == worker_incoming) ? 1 : 0;
464 }
465 
worker_check_time_ms(worker_t * worker)466 static uint64_t worker_check_time_ms (worker_t *worker)
467 {
468     uint64_t tm = timing_get_time();
469     if (tm - worker->time_ms > 1000 && worker->time_ms)
470         WARN2 ("worker %p has been stuck for %" PRIu64 " ms", worker, (tm - worker->time_ms));
471     return tm;
472 }
473 
474 
find_least_busy_handler(int log)475 static worker_t *find_least_busy_handler (int log)
476 {
477     worker_t *min = workers;
478     int min_count = INT_MAX;
479 
480     if (workers && workers->next)
481     {
482         worker_t *handler = workers;
483 
484         while (handler)
485         {
486             thread_spin_lock (&handler->lock);
487             int cur_count = handler->count + handler->pending_count;
488             thread_spin_unlock (&handler->lock);
489 
490             if (log) DEBUG2 ("handler %p has %d clients", handler, cur_count);
491             if (cur_count < min_count)
492             {
493                 min = handler;
494                 min_count = cur_count;
495             }
496             handler = handler->next;
497         }
498         worker_min_count = min_count;
499     }
500     return min;
501 }
502 
503 
worker_selected(void)504 worker_t *worker_selected (void)
505 {
506     return worker_least_used;
507 }
508 
509 
510 /* worker mutex should be already locked */
worker_add_client(worker_t * worker,client_t * client)511 static void worker_add_client (worker_t *worker, client_t *client)
512 {
513     ++worker->pending_count;
514     client->next_on_worker = NULL;
515     *worker->pending_clients_tail = client;
516     worker->pending_clients_tail = &client->next_on_worker;
517     client->worker = worker;
518 }
519 
520 
client_change_worker(client_t * client,worker_t * dest_worker)521 int client_change_worker (client_t *client, worker_t *dest_worker)
522 {
523     if (dest_worker->running == 0)
524         return 0;
525     client->next_on_worker = NULL;
526 
527     thread_spin_lock (&dest_worker->lock);
528     worker_add_client (dest_worker, client);
529     thread_spin_unlock (&dest_worker->lock);
530     worker_wakeup (dest_worker);
531 
532     return 1;
533 }
534 
535 
client_add_worker(client_t * client)536 void client_add_worker (client_t *client)
537 {
538     worker_t *handler;
539 
540     thread_rwlock_rlock (&workers_lock);
541     /* add client to the handler with the least number of clients */
542     handler = worker_selected();
543     thread_spin_lock (&handler->lock);
544     thread_rwlock_unlock (&workers_lock);
545 
546     worker_add_client (handler, client);
547     thread_spin_unlock (&handler->lock);
548     worker_wakeup (handler);
549 }
550 
client_add_incoming(client_t * client)551 void client_add_incoming (client_t *client)
552 {
553     worker_t *handler;
554 
555     thread_rwlock_rlock (&workers_lock);
556     handler = worker_incoming;
557     thread_spin_lock (&handler->lock);
558     thread_rwlock_unlock (&workers_lock);
559 
560     worker_add_client (handler, client);
561     thread_spin_unlock (&handler->lock);
562     worker_wakeup (handler);
563 }
564 
565 
566 #ifdef _WIN32
567 #define pipe_create         sock_create_pipe_emulation
568 #define pipe_write(A, B, C) send(A, B, C, 0)
569 #define pipe_read(A,B,C)    recv(A, B, C, 0)
570 #else
571  #ifdef HAVE_PIPE2
572  #define pipe_create(x)      pipe2(x,O_CLOEXEC)
573  #elif defined(FD_CLOEXEC)
pipe_create(FD_t x[])574   int pipe_create(FD_t x[]) {
575     int r = pipe(x); if (r==0) {fcntl(x[0], F_SETFD,FD_CLOEXEC); \
576     fcntl(x[1], F_SETFD,FD_CLOEXEC); } return r;
577   }
578  #else
579   #define pipe_create pipe
580  #endif
581  #define pipe_write write
582  #define pipe_read read
583 #endif
584 
585 
worker_control_create(FD_t wakeup_fd[])586 void worker_control_create (FD_t wakeup_fd[])
587 {
588     if (pipe_create (&wakeup_fd[0]) < 0)
589     {
590         ERROR0 ("pipe failed, descriptor limit?");
591         abort();
592     }
593     sock_set_blocking (wakeup_fd[0], 0);
594     sock_set_blocking (wakeup_fd[1], 0);
595 }
596 
597 
worker_add_pending_clients(worker_t * worker)598 static client_t **worker_add_pending_clients (worker_t *worker)
599 {
600     thread_spin_lock (&worker->lock);
601     if (worker->pending_clients)
602     {
603         unsigned count;
604         client_t **p;
605 
606         p = worker->last_p;
607         *worker->last_p = worker->pending_clients;
608         worker->last_p = worker->pending_clients_tail;
609         worker->count += worker->pending_count;
610         count = worker->pending_count;
611         worker->pending_clients = NULL;
612         worker->pending_clients_tail = &worker->pending_clients;
613         worker->pending_count = 0;
614         thread_spin_unlock (&worker->lock);
615         DEBUG2 ("Added %d pending clients to %p", count, worker);
616         return p;  /* only these new ones scheduled so process from here */
617     }
618     thread_spin_unlock (&worker->lock);
619     worker->wakeup_ms = worker->time_ms + 60000;
620     return &worker->clients;
621 }
622 
623 
624 // enter with spin lock enabled, exit without
625 //
worker_wait(worker_t * worker)626 static client_t **worker_wait (worker_t *worker)
627 {
628     int ret, duration = 2;
629 
630     if (worker->running)
631     {
632         uint64_t tm = worker_check_time_ms (worker);
633         if (worker->wakeup_ms > tm)
634             duration = (int)(worker->wakeup_ms - tm);
635         if (duration > 60000) /* make duration at most 60s */
636             duration = 60000;
637     }
638     thread_spin_unlock (&worker->lock);
639 
640     ret = util_timed_wait_for_fd (worker->wakeup_fd[0], duration);
641     if (ret > 0) /* may of been several wakeup attempts */
642     {
643         char ca[100];
644         do
645         {
646             ret = pipe_read (worker->wakeup_fd[0], ca, sizeof ca);
647             if (ret > 0)
648                 break;
649             if (ret < 0 && sock_recoverable (sock_error()))
650                 break;
651             sock_close (worker->wakeup_fd[1]);
652             sock_close (worker->wakeup_fd[0]);
653             worker_control_create (&worker->wakeup_fd[0]);
654             worker_wakeup (worker);
655             WARN0 ("Had to recreate worker control feed");
656         } while (1);
657     }
658 
659     worker->time_ms = timing_get_time();
660     worker->current_time.tv_sec = (time_t)(worker->time_ms/1000);
661 
662     return worker_add_pending_clients (worker);
663 }
664 
665 
worker_relocate_clients(worker_t * worker)666 static void worker_relocate_clients (worker_t *worker)
667 {
668     if (workers == NULL)
669         return;
670     while (worker->count || worker->pending_count)
671     {
672         client_t *client = worker->clients, **prevp = &worker->clients;
673 
674         worker->wakeup_ms = worker->time_ms + 150;
675         worker->current_time.tv_sec = (time_t)(worker->time_ms/1000);
676         while (client)
677         {
678             if (client->flags & CLIENT_ACTIVE)
679             {
680                 client->worker = workers;
681                 prevp = &client->next_on_worker;
682             }
683             else
684             {
685                 *prevp = client->next_on_worker;
686                 worker_add_client (worker, client);
687                 worker->count--;
688             }
689             client = *prevp;
690         }
691         if (worker->clients)
692         {
693             thread_spin_lock (&workers->lock);
694             *workers->pending_clients_tail = worker->clients;
695             workers->pending_clients_tail = prevp;
696             workers->pending_count += worker->count;
697             thread_spin_unlock (&workers->lock);
698             worker_wakeup (workers);
699             worker->clients = NULL;
700             worker->last_p = &worker->clients;
701             worker->count = 0;
702         }
703         thread_spin_lock (&worker->lock);
704         worker_wait (worker);
705     }
706 }
707 
worker(void * arg)708 void *worker (void *arg)
709 {
710     worker_t *worker = arg;
711     long prev_count = -1;
712     client_t **prevp = &worker->clients;
713     uint64_t c = 0;
714 
715     thread_rwlock_rlock (&global.workers_rw);
716     worker->running = 1;
717     worker->wakeup_ms = (int64_t)0;
718     worker->time_ms = timing_get_time();
719 
720     while (1)
721     {
722         client_t *client = *prevp;
723         uint64_t sched_ms = worker->time_ms + 12;
724 
725         c = 0;
726         while (client)
727         {
728             if (client->worker != worker) abort();
729             /* process client details but skip those that are not ready yet */
730             if (client->flags & CLIENT_ACTIVE)
731             {
732                 int ret = 0;
733                 client_t *nx = client->next_on_worker;
734 
735                 int process = 1;
736                 if (worker->running)  // force all active clients to run on worker shutdown
737                 {
738                     if (client->schedule_ms <= sched_ms)
739                     {
740                         if (c > 9000 && client->wakeup == NULL)
741                             process = 0;
742                     }
743                     else if (client->wakeup == NULL || *client->wakeup == 0)
744                     {
745                         process = 0;
746                     }
747                 }
748 
749                 if (process)
750                 {
751                     if ((c & 511) == 0)
752                     {
753                         // update these periodically to keep in sync
754                         worker->time_ms = worker_check_time_ms (worker);
755                         worker->current_time.tv_sec = (time_t)(worker->time_ms/1000);
756                     }
757                     c++;
758                     errno = 0;
759                     ret = client->ops->process (client);
760                     if (ret < 0)
761                     {
762                         client->worker = NULL;
763                         if (client->ops->release)
764                             client->ops->release (client);
765                     }
766                     if (ret)
767                     {
768                         thread_spin_lock (&worker->lock);
769                         worker->count--;
770                         if (nx == NULL) /* is this the last client */
771                             worker->last_p = prevp;
772                         client = *prevp = nx;
773                         thread_spin_unlock (&worker->lock);
774                         continue;
775                     }
776                 }
777                 if ((client->flags & CLIENT_ACTIVE) && client->schedule_ms < worker->wakeup_ms)
778                     worker->wakeup_ms = client->schedule_ms;
779             }
780             prevp = &client->next_on_worker;
781             client = *prevp;
782         }
783         if (prev_count != worker->count)
784         {
785             DEBUG2 ("%p now has %d clients", worker, worker->count);
786             prev_count = worker->count;
787         }
788         thread_spin_lock (&worker->lock);
789         if (worker->running == 0)
790         {
791             if (worker->count == 0 && worker->pending_count == 0)
792                 break;
793         }
794         prevp = worker_wait (worker);
795     }
796     thread_spin_unlock (&worker->lock);
797     worker_relocate_clients (worker);
798     INFO0 ("shutting down");
799     thread_rwlock_unlock (&global.workers_rw);
800     return NULL;
801 }
802 
803 
804 // We pick a worker (consequetive) and set a max number of clients to move if needed
worker_balance_trigger(time_t now)805 void worker_balance_trigger (time_t now)
806 {
807     thread_rwlock_wlock (&workers_lock);
808     if (worker_count > 1)
809     {
810         int log_counts = (now & 15) == 0 ? 1 : 0;
811 
812         worker_least_used = find_least_busy_handler (log_counts);
813         if (worker_balance_to_check)
814         {
815             worker_t *w = worker_balance_to_check;
816             // DEBUG2 ("Worker allocations reset on %p, least is %p", w, worker_least_used);
817             thread_spin_lock (&w->lock);
818             w->move_allocations = 200;
819             worker_balance_to_check = w->next;
820             thread_spin_unlock (&w->lock);
821         }
822         if (worker_balance_to_check == NULL)
823             worker_balance_to_check = workers;
824     }
825     thread_rwlock_unlock (&workers_lock);
826 }
827 
828 
worker_start(void)829 static void worker_start (void)
830 {
831     worker_t *handler = calloc (1, sizeof(worker_t));
832 
833     worker_control_create (&handler->wakeup_fd[0]);
834 
835     handler->pending_clients_tail = &handler->pending_clients;
836     thread_spin_create (&handler->lock);
837     handler->last_p = &handler->clients;
838 
839     thread_rwlock_wlock (&workers_lock);
840     if (worker_incoming == NULL)
841     {
842         worker_incoming = handler;
843         handler->move_allocations = 1000000;    // should stay fixed for this one
844         handler->thread = thread_create ("worker", worker, handler, THREAD_ATTACHED);
845         thread_rwlock_unlock (&workers_lock);
846         INFO1 ("starting incoming worker thread %p", worker_incoming);
847         worker_start();  // single level recursion, just get a special worker thread set up
848         return;
849     }
850     handler->next = workers;
851     workers = handler;
852     worker_count++;
853     worker_least_used = worker_balance_to_check = workers;
854     thread_rwlock_unlock (&workers_lock);
855 
856     handler->thread = thread_create ("worker", worker, handler, THREAD_ATTACHED);
857 }
858 
859 
worker_stop(void)860 static void worker_stop (void)
861 {
862     worker_t *handler;
863 
864     thread_rwlock_wlock (&workers_lock);
865     do
866     {
867         if (worker_count > 0)
868         {
869             handler = workers;
870             workers = handler->next;
871             worker_least_used = worker_balance_to_check = workers;
872             if (workers)
873                 workers->move_allocations = 1000;
874             worker_count--;
875         }
876         else
877         {
878             handler = worker_incoming;
879             worker_incoming = NULL;
880             INFO0 ("stopping incoming worker thread");
881         }
882 
883         if (handler)
884         {
885             thread_spin_lock (&handler->lock);
886             handler->running = 0;
887             thread_spin_unlock (&handler->lock);
888 
889             worker_wakeup (handler);
890             thread_rwlock_unlock (&workers_lock);
891 
892             thread_join (handler->thread);
893             thread_spin_destroy (&handler->lock);
894 
895             sock_close (handler->wakeup_fd[1]);
896             sock_close (handler->wakeup_fd[0]);
897             free (handler);
898             thread_rwlock_wlock (&workers_lock);
899         }
900     } while (workers == NULL && worker_incoming);
901     thread_rwlock_unlock (&workers_lock);
902 }
903 
904 
workers_adjust(int new_count)905 void workers_adjust (int new_count)
906 {
907     INFO1 ("requested worker count %d", new_count);
908     while (worker_count != new_count)
909     {
910         if (worker_count < new_count)
911             worker_start ();
912         else if (worker_count > new_count)
913             worker_stop ();
914     }
915 }
916 
917 
worker_wakeup(worker_t * worker)918 void worker_wakeup (worker_t *worker)
919 {
920     pipe_write (worker->wakeup_fd[1], "W", 1);
921 }
922 
923 
logger_commits(int id)924 static void logger_commits (int id)
925 {
926     pipe_write (logger_fd[1], "L", 1);
927 }
928 
log_commit_thread(void * arg)929 static void *log_commit_thread (void *arg)
930 {
931     INFO0 ("started");
932     thread_rwlock_rlock (&global.workers_rw);
933     while (1)
934     {
935         int ret = util_timed_wait_for_fd (logger_fd[0], 5000);
936         if (ret == 0)
937         {
938             global_lock();
939             int loop = (global.running == ICE_RUNNING);
940             global_unlock();
941             if (loop) continue;
942         }
943         if (ret > 0)
944         {
945             char cm[80];
946             ret = pipe_read (logger_fd[0], cm, sizeof cm);
947             if (ret > 0)
948             {
949                 // fprintf (stderr, "logger woken with %d\n", ret);
950                 log_commit_entries ();
951                 continue;
952             }
953         }
954         int err = 0;
955         if (ret < 0 && sock_recoverable ((err = sock_error())) && global.running == ICE_RUNNING)
956             continue;
957         sock_close (logger_fd[0]);
958         if (worker_count)
959         {
960             worker_control_create (logger_fd);
961             ERROR1 ("logger received code %d", err);
962             continue;
963         }
964         log_commit_entries ();
965         // fprintf (stderr, "logger closed with zero workers\n");
966         break;
967     }
968     thread_rwlock_unlock (&global.workers_rw);
969     return NULL;
970 }
971 
972 
worker_logger_init(void)973 void worker_logger_init (void)
974 {
975     worker_control_create (logger_fd);
976     log_set_commit_callback (logger_commits);
977 }
978 
worker_logger(int stop)979 void worker_logger (int stop)
980 {
981     if (stop)
982     {
983        logger_commits(0);
984        sock_close (logger_fd[1]);
985        logger_fd[1] = -1;
986        return;
987     }
988     thread_create ("Log Thread", log_commit_thread, NULL, THREAD_DETACHED);
989 }
990 
991