1/* Part of SWISH 2 3 Author: Jan Wielemaker 4 E-mail: J.Wielemaker@cs.vu.nl 5 WWW: http://www.swi-prolog.org 6 Copyright (C): 2018, VU University Amsterdam 7 CWI Amsterdam 8 All rights reserved. 9 10 Redistribution and use in source and binary forms, with or without 11 modification, are permitted provided that the following conditions 12 are met: 13 14 1. Redistributions of source code must retain the above copyright 15 notice, this list of conditions and the following disclaimer. 16 17 2. Redistributions in binary form must reproduce the above copyright 18 notice, this list of conditions and the following disclaimer in 19 the documentation and/or other materials provided with the 20 distribution. 21 22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 32 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 POSSIBILITY OF SUCH DAMAGE. 34*/ 35 36:- module(http_dyn_workers, 37 [ 38 ]). 39:- use_module(library(http/thread_httpd)). 40:- use_module(library(debug)). 41:- use_module(library(settings)). 42:- use_module(library(aggregate)). 43 44:- setting(http:max_workers, integer, 100, 45 "Maximum number of workers to create"). 46:- setting(http:worker_idle_limit, number, 10, 47 "Terminate a dynamic worker when idle for this time"). 48:- setting(http:max_load, number, 10, 49 "Maximum load average caused by HTTP workers"). 50 51/** <module> Dynamically schedule HTTP workers. 52 53Most code doesn't need to use this directly; instead use 54library(http/http_server), which combines this library with the 55typical HTTP libraries that most servers need. 56 57This module defines hooks into the HTTP framework to dynamically 58schedule worker threads. Dynamic scheduling relieves us from finding a 59good value for the size of the HTTP worker pool. 60 61The decision to add a worker follows these rules: 62 63 - If the load average caused by the worker threads exceeds 64 http:max_load, no worker is added. 65 - Wait for some time, depending on how close we are to the 66 http:max_workers limit. 67 - If the worker is still needed, add it. 68 69The policy depends on three settings: 70 71 - http:max_workers 72 The maximum number of workers that will be created. Default is 73 100. 74 - http:worker_idle_limit 75 The number of seconds a dynamic worker waits for a new job. If 76 no job arrives in time it terminates. Default is 10 seconds. 77 - http:max_load 78 Max load average created by __the HTTP server__, i.e. the amount 79 of CPU time consumed per second. Default is 10. 80*/ 81 82%! http:schedule_workers(+Dict) 83% 84% Called if there is no immediately free worker to handle the 85% incomming request. The request is forwarded to the thread 86% =|__http_scheduler|= as the hook is called in time critical code. 87 88:- multifile 89 http:schedule_workers/1. 90 91http:schedule_workers(Dict) :- 92 get_time(Now), 93 catch(thread_send_message('__http_scheduler', no_workers(Now, Dict)), 94 error(existence_error(message_queue, _), _), 95 fail), 96 !. 97http:schedule_workers(Dict) :- 98 create_scheduler, 99 http:schedule_workers(Dict). 100 101create_scheduler :- 102 catch(thread_create(http_scheduler, _, 103 [ alias('__http_scheduler'), 104 inherit_from(main), 105 debug(false), 106 detached(true) 107 ]), 108 error(_,_), 109 fail). 110 111http_scheduler :- 112 get_time(Now), 113 http_scheduler(_{ waiting:0, 114 time:Now 115 }). 116 117http_scheduler(State) :- 118 ( thread_self(Me), 119 thread_get_message(Me, Task, [timeout(10)]) 120 -> true 121 ; Task = update_load_avg 122 ), 123 ( catch(reschedule(Task, State, State1), 124 Error, 125 ( print_message(warning, Error), 126 fail)) 127 -> !, 128 http_scheduler(State1) 129 ; http_scheduler(State) 130 ). 131 132%! reschedule(+Message, +State0, -State) is semidet. 133 134reschedule(no_workers(Reported, Dict), State0, State) :- 135 update_load_avg(Dict, State0, State, Load), 136 setting(http:max_load, MaxLoad), 137 ( Load > MaxLoad 138 -> debug(http(scheduler), 'Load ~1f > ~1f; not adding workers', 139 [ Load, MaxLoad ]) 140 ; aggregate_all(count, http_current_worker(Dict.port, _), Workers), 141 setting(http:max_workers, MaxWorkers), 142 ( Workers >= MaxWorkers 143 -> debug(http(scheduler), 144 'Reached max workers (~D); not adding workers', 145 [ MaxWorkers ]) 146 ; Wait is 0.001*(MaxWorkers/max(1, MaxWorkers-Workers)), 147 get_time(Now), 148 Sleep is max(0.001, Wait + Reported-Now), 149 debug(http(scheduler), 150 'Waiting: ~w; active: ~w; sleep: ~3f; load: ~1f', 151 [Dict.waiting, Workers, Sleep, Load]), 152 sleep(Sleep), 153 accept_queue(Dict, Queue), 154 message_queue_property(Queue, size(Newsize)), 155 ( Newsize == 0 156 -> debug(http(scheduler), 'Drained', []) 157 ; debug(http(scheduler), 'Size is ~w: adding worker', [Newsize]), 158 setting(http:worker_idle_limit, MaxIdle), 159 http_add_worker(Dict.port, 160 [ max_idle_time(MaxIdle) 161 ]) 162 ) 163 ) 164 ). 165reschedule(update_load_avg, State0, State) :- 166 update_load_avg(_{}, State0, State, _). 167 168update_load_avg(_Dict, State, State, Load) :- 169 _{stamp:Last, load:Load} :< State.get(load), 170 get_time(Now), 171 Now - Last < 10. 172update_load_avg(Dict, State0, State, Load) :- 173 server_port(Dict, State0, State1, Port), 174 !, 175 aggregate_all(sum(CPU), worker_cpu(Port, CPU), CPU1), 176 get_time(Now), 177 ( LoadDict = State1.get(load), 178 _{stamp:Last, cpu:LastCPU} :< LoadDict 179 -> Load0 is (CPU1-LastCPU)/(Now-Last), 180 smooth_load(LoadDict, Load0, Load), 181 State = State1.put(load, _{stamp:Now, cpu:CPU1, load:Load}) 182 ; State = State1.put(load, _{stamp:Now, cpu:CPU1}), 183 Load = 0 184 ). 185update_load_avg(_, _, _, 0). 186 187worker_cpu(Port, CPU) :- 188 http_current_worker(Port, Thread), 189 catch(thread_statistics(Thread, cputime, CPU), _, fail). 190 191server_port(_Dict, State, State, Port) :- 192 Port = State.get(port), 193 !. 194server_port(Dict, State0, State, Port) :- 195 Port = Dict.get(port), 196 State = State0.put(port, Port). 197 198smooth_load(LoadDict, Load0, Load) :- 199 OldLoad = LoadDict.get(load), 200 !, 201 Load is (5*OldLoad+Load0)/6. 202smooth_load(_, Load, Load). 203 204%! accept_queue(+Dict, -Queue) 205% 206% As of 7.7.16, `queue` is a member of the provided dict. For older 207% versions we need a hack. 208 209accept_queue(Dict, Queue) :- 210 Queue = Dict.get(queue), 211 !. 212accept_queue(Dict, Queue) :- 213 thread_httpd:current_server(Dict.port, _, _, Queue, _, _), 214 !. 215