1/*  Part of SWI-Prolog
2
3    Author:        Jan Wielemaker
4    E-mail:        J.Wielemaker@vu.nl
5    WWW:           http://www.swi-prolog.org
6    Copyright (c)  2002-2020, University of Amsterdam
7                              VU University Amsterdam
8                              CWI, Amsterdam
9    All rights reserved.
10
11    Redistribution and use in source and binary forms, with or without
12    modification, are permitted provided that the following conditions
13    are met:
14
15    1. Redistributions of source code must retain the above copyright
16       notice, this list of conditions and the following disclaimer.
17
18    2. Redistributions in binary form must reproduce the above copyright
19       notice, this list of conditions and the following disclaimer in
20       the documentation and/or other materials provided with the
21       distribution.
22
23    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
26    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
27    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
28    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
29    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
30    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
31    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
32    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
33    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34    POSSIBILITY OF SUCH DAMAGE.
35*/
36
37:- module(thread_httpd,
38          [ http_current_server/2,      % ?:Goal, ?Port
39            http_server_property/2,     % ?Port, ?Property
40            http_server/2,              % :Goal, +Options
41            http_workers/2,             % +Port, ?WorkerCount
42            http_add_worker/2,          % +Port, +Options
43            http_current_worker/2,      % ?Port, ?ThreadID
44            http_stop_server/2,         % +Port, +Options
45            http_spawn/2,               % :Goal, +Options
46
47            http_requeue/1,             % +Request
48            http_close_connection/1,    % +Request
49            http_enough_workers/3       % +Queue, +Why, +Peer
50          ]).
51:- use_module(library(debug)).
52:- use_module(library(error)).
53:- use_module(library(option)).
54:- use_module(library(socket)).
55:- use_module(library(thread_pool)).
56:- use_module(library(gensym)).
57:- use_module(http_wrapper).
58:- use_module(http_path).
59
60
61:- predicate_options(http_server/2, 2,
62                     [ port(any),
63                       entry_page(atom),
64                       tcp_socket(any),
65                       workers(positive_integer),
66                       timeout(number),
67                       keep_alive_timeout(number),
68                       silent(boolean),
69                       ssl(list(any)),  % if http/http_ssl_plugin is loaded
70                       pass_to(system:thread_create/3, 3)
71                     ]).
72:- predicate_options(http_spawn/2, 2,
73                     [ pool(atom),
74                       pass_to(system:thread_create/3, 3),
75                       pass_to(thread_pool:thread_create_in_pool/4, 4)
76                     ]).
77:- predicate_options(http_add_worker/2, 2,
78                     [ timeout(number),
79                       keep_alive_timeout(number),
80                       max_idle_time(number),
81                       pass_to(system:thread_create/3, 3)
82                     ]).
83
84/** <module> Threaded HTTP server
85
86Most   code   doesn't   need  to   use  this   directly;  instead   use
87library(http/http_server),  which  combines   this  library  with   the
88typical HTTP libraries that most servers need.
89
90This library defines the HTTP server  frontend of choice for SWI-Prolog.
91It is based on the multi-threading   capabilities of SWI-Prolog and thus
92exploits multiple cores  to  serve   requests  concurrently.  The server
93scales well and can cooperate with   library(thread_pool) to control the
94number of concurrent requests of a given   type.  For example, it can be
95configured to handle 200 file download requests concurrently, 2 requests
96that potentially uses a lot of memory and   8 requests that use a lot of
97CPU resources.
98
99On   Unix   systems,    this    library     can    be    combined   with
100library(http/http_unix_daemon) to realise a proper  Unix service process
101that creates a web server at  port   80,  runs under a specific account,
102optionally detaches from the controlling terminal, etc.
103
104Combined with library(http/http_ssl_plugin) from the   SSL package, this
105library   can   be   used   to    create     an    HTTPS   server.   See
106<plbase>/doc/packages/examples/ssl/https for an example   server using a
107self-signed SSL certificate.
108*/
109
110:- meta_predicate
111    http_server(1, :),
112    http_current_server(1, ?),
113    http_spawn(0, +).
114
115:- dynamic
116    current_server/6,       % Port, Goal, Thread, Queue, Scheme, StartTime
117    queue_worker/2,         % Queue, ThreadID
118    queue_options/2.        % Queue, Options
119
120:- multifile
121    make_socket_hook/3,
122    accept_hook/2,
123    close_hook/1,
124    open_client_hook/6,
125    http:create_pool/1,
126    http:schedule_workers/1.
127
128:- meta_predicate
129    thread_repeat_wait(0).
130
131%!  http_server(:Goal, :Options) is det.
132%
133%   Create a server at Port that calls Goal for each parsed request.
134%   Options provide a list of options. Defined options are
135%
136%     * port(?Address)
137%     Port to bind to.  Address is either a port or a term
138%     Host:Port. The port may be a variable, causing the system
139%     to select a free port.  See tcp_bind/2.
140%
141%     * entry_page(+URI)
142%     Affects the message printed while the server is started.
143%     Interpreted as a URI relative to the server root.
144%
145%     * tcp_socket(+Socket)
146%     If provided, use this socket instead of the creating one and
147%     binding it to an address.  The socket must be bound to an
148%     address.
149%
150%     * workers(+Count)
151%     Determine the number of worker threads.  Default is 5.  This
152%     is fine for small scale usage.  Public servers typically need
153%     a higher number.
154%
155%     * timeout(+Seconds)
156%     Maximum time of inactivity trying to read the request after a
157%     connection has been opened.  Default is 60 seconds.  See
158%     set_stream/1 using the _timeout_ option.
159%
160%     * keep_alive_timeout(+Seconds)
161%     Time to keep `Keep alive' connections alive.  Default is
162%     2 seconds.
163%
164%     * stack_limit(+Bytes)
165%     Stack limit to use for the workers.  The default is inherited
166%     from the `main` thread.
167%     If you need to control resource usage you may consider the
168%     `spawn` option of http_handler/3 and library(thread_pool).
169%
170%     * silent(Bool)
171%     If `true` (default `false`), do not print an informational
172%     message that the server was started.
173%
174%   A  typical  initialization  for  an    HTTP   server  that  uses
175%   http_dispatch/1 to relay requests to predicates is:
176%
177%     ==
178%     :- use_module(library(http/thread_httpd)).
179%     :- use_module(library(http/http_dispatch)).
180%
181%     start_server(Port) :-
182%         http_server(http_dispatch, [port(Port)]).
183%     ==
184%
185%   Note that multiple servers  can  coexist   in  the  same  Prolog
186%   process. A notable application of this is   to have both an HTTP
187%   and HTTPS server, where the HTTP   server redirects to the HTTPS
188%   server for handling sensitive requests.
189
190http_server(Goal, M:Options0) :-
191    option(port(Port), Options0),
192    !,
193    make_socket(Port, M:Options0, Options),
194    create_workers(Options),
195    create_server(Goal, Port, Options),
196    (   option(silent(true), Options0)
197    ->  true
198    ;   print_message(informational,
199                      httpd_started_server(Port, Options0))
200    ).
201http_server(_Goal, _Options) :-
202    existence_error(option, port).
203
204
205%!  make_socket(?Port, :OptionsIn, -OptionsOut) is det.
206%
207%   Create the HTTP server socket and  worker pool queue. OptionsOut
208%   is quaranteed to hold the option queue(QueueId).
209%
210%   @arg   OptionsIn   is   qualified   to     allow   passing   the
211%   module-sensitive ssl option argument.
212
213make_socket(Port, Options0, Options) :-
214    make_socket_hook(Port, Options0, Options),
215    !.
216make_socket(Port, _:Options0, Options) :-
217    option(tcp_socket(_), Options0),
218    !,
219    make_addr_atom('httpd', Port, Queue),
220    Options = [ queue(Queue)
221              | Options0
222              ].
223make_socket(Port, _:Options0, Options) :-
224    tcp_socket(Socket),
225    tcp_setopt(Socket, reuseaddr),
226    tcp_bind(Socket, Port),
227    tcp_listen(Socket, 64),
228    make_addr_atom('httpd', Port, Queue),
229    Options = [ queue(Queue),
230                tcp_socket(Socket)
231              | Options0
232              ].
233
234%!  make_addr_atom(+Scheme, +Address, -Atom) is det.
235%
236%   Create an atom that identifies  the   server's  queue and thread
237%   resources.
238
239make_addr_atom(Scheme, Address, Atom) :-
240    phrase(address_parts(Address), Parts),
241    atomic_list_concat([Scheme,@|Parts], Atom).
242
243address_parts(Atomic) -->
244    { atomic(Atomic) },
245    !,
246    [Atomic].
247address_parts(Host:Port) -->
248    !,
249    address_parts(Host), [:], address_parts(Port).
250address_parts(ip(A,B,C,D)) -->
251    !,
252    [ A, '.', B, '.', C, '.', D ].
253
254%!  create_server(:Goal, +Address, +Options) is det.
255%
256%   Create the main server thread that runs accept_server/2 to
257%   listen to new requests.
258
259create_server(Goal, Address, Options) :-
260    get_time(StartTime),
261    memberchk(queue(Queue), Options),
262    scheme(Scheme, Options),
263    autoload_https(Scheme),
264    address_port(Address, Port),
265    make_addr_atom(Scheme, Port, Alias),
266    thread_self(Initiator),
267    thread_create(accept_server(Goal, Initiator, Options), _,
268                  [ alias(Alias)
269                  ]),
270    thread_get_message(server_started),
271    assert(current_server(Port, Goal, Alias, Queue, Scheme, StartTime)).
272
273scheme(Scheme, Options) :-
274    option(scheme(Scheme), Options),
275    !.
276scheme(Scheme, Options) :-
277    (   option(ssl(_), Options)
278    ;   option(ssl_instance(_), Options)
279    ),
280    !,
281    Scheme = https.
282scheme(http, _).
283
284address_port(_Host:Port, Port) :- !.
285address_port(Port, Port).
286
287autoload_https(https) :-
288    \+ clause(accept_hook(_Goal, _Options), _),
289    exists_source(library(http/http_ssl_plugin)),
290    !,
291    use_module(library(http/http_ssl_plugin)).
292autoload_https(_).
293
294%!  http_current_server(:Goal, ?Port) is nondet.
295%
296%   True if Goal is the goal of a server at Port.
297%
298%   @deprecated Use http_server_property(Port, goal(Goal))
299
300http_current_server(Goal, Port) :-
301    current_server(Port, Goal, _, _, _, _).
302
303
304%!  http_server_property(?Port, ?Property) is nondet.
305%
306%   True if Property is a property of the HTTP server running at
307%   Port.  Defined properties are:
308%
309%       * goal(:Goal)
310%       Goal used to start the server. This is often
311%       http_dispatch/1.
312%       * scheme(-Scheme)
313%       Scheme is one of `http` or `https`.
314%       * start_time(?Time)
315%       Time-stamp when the server was created.
316
317http_server_property(_:Port, Property) :-
318    integer(Port),
319    !,
320    server_property(Property, Port).
321http_server_property(Port, Property) :-
322    server_property(Property, Port).
323
324server_property(goal(Goal), Port) :-
325    current_server(Port, Goal, _, _, _, _).
326server_property(scheme(Scheme), Port) :-
327    current_server(Port, _, _, _, Scheme, _).
328server_property(start_time(Time), Port) :-
329    current_server(Port, _, _, _, _, Time).
330
331
332%!  http_workers(+Port, -Workers) is det.
333%!  http_workers(+Port, +Workers:int) is det.
334%
335%   Query or set the number of workers  for the server at this port.
336%   The number of workers is dynamically   modified. Setting it to 1
337%   (one) can be used to profile the worker using tprofile/1.
338
339http_workers(Port, Workers) :-
340    must_be(ground, Port),
341    current_server(Port, _, _, Queue, _, _),
342    !,
343    (   integer(Workers)
344    ->  resize_pool(Queue, Workers)
345    ;   findall(W, queue_worker(Queue, W), WorkerIDs),
346        length(WorkerIDs, Workers)
347    ).
348http_workers(Port, _) :-
349    existence_error(http_server, Port).
350
351
352%!  http_add_worker(+Port, +Options) is det.
353%
354%   Add a new worker to  the  HTTP   server  for  port Port. Options
355%   overrule the default queue  options.   The  following additional
356%   options are processed:
357%
358%     - max_idle_time(+Seconds)
359%     The created worker will automatically terminate if there is
360%     no new work within Seconds.
361
362http_add_worker(Port, Options) :-
363    must_be(ground, Port),
364    current_server(Port, _, _, Queue, _, _),
365    !,
366    queue_options(Queue, QueueOptions),
367    merge_options(Options, QueueOptions, WorkerOptions),
368    atom_concat(Queue, '_', AliasBase),
369    create_workers(1, 1, Queue, AliasBase, WorkerOptions).
370http_add_worker(Port, _) :-
371    existence_error(http_server, Port).
372
373
374%!  http_current_worker(?Port, ?ThreadID) is nondet.
375%
376%   True if ThreadID is the identifier   of  a Prolog thread serving
377%   Port. This predicate is  motivated  to   allow  for  the  use of
378%   arbitrary interaction with the worker thread for development and
379%   statistics.
380
381http_current_worker(Port, ThreadID) :-
382    current_server(Port, _, _, Queue, _, _),
383    queue_worker(Queue, ThreadID).
384
385
386%!  accept_server(:Goal, +Initiator, +Options)
387%
388%   The goal of a small server-thread accepting new requests and
389%   posting them to the queue of workers.
390
391accept_server(Goal, Initiator, Options) :-
392    catch(accept_server2(Goal, Initiator, Options), http_stop, true),
393    thread_self(Thread),
394    retract(current_server(_Port, _, Thread, _Queue, _Scheme, _StartTime)),
395    close_server_socket(Options).
396
397accept_server2(Goal, Initiator, Options) :-
398    thread_send_message(Initiator, server_started),
399    repeat,
400      (   catch(accept_server3(Goal, Options), E, true)
401      ->  (   var(E)
402          ->  fail
403          ;   accept_rethrow_error(E)
404          ->  throw(E)
405          ;   print_message(error, E),
406              fail
407          )
408      ;   print_message(error,      % internal error
409                        goal_failed(accept_server3(Goal, Options))),
410          fail
411      ).
412
413accept_server3(Goal, Options) :-
414    accept_hook(Goal, Options),
415    !.
416accept_server3(Goal, Options) :-
417    memberchk(tcp_socket(Socket), Options),
418    memberchk(queue(Queue), Options),
419    debug(http(connection), 'Waiting for connection', []),
420    tcp_accept(Socket, Client, Peer),
421    debug(http(connection), 'New HTTP connection from ~p', [Peer]),
422    thread_send_message(Queue, tcp_client(Client, Goal, Peer)),
423    http_enough_workers(Queue, accept, Peer).
424
425accept_rethrow_error(http_stop).
426accept_rethrow_error('$aborted').
427
428
429%!  close_server_socket(+Options)
430%
431%   Close the server socket.
432
433close_server_socket(Options) :-
434    close_hook(Options),
435    !.
436close_server_socket(Options) :-
437    memberchk(tcp_socket(Socket), Options),
438    !,
439    tcp_close_socket(Socket).
440
441
442%!  http_stop_server(+Port, +Options)
443%
444%   Stop the indicated  HTTP  server   gracefully.  First  stops all
445%   workers, then stops the server.
446%
447%   @tbd    Realise non-graceful stop
448
449http_stop_server(Host:Port, Options) :-         % e.g., localhost:4000
450    ground(Host),
451    !,
452    http_stop_server(Port, Options).
453http_stop_server(Port, _Options) :-
454    http_workers(Port, 0),                  % checks Port is ground
455    current_server(Port, _, Thread, Queue, _Scheme, _Start),
456    retractall(queue_options(Queue, _)),
457    thread_signal(Thread, throw(http_stop)),
458    catch(connect(localhost:Port), _, true),
459    thread_join(Thread, _),
460    message_queue_destroy(Queue).
461
462connect(Address) :-
463    setup_call_cleanup(
464        tcp_socket(Socket),
465        tcp_connect(Socket, Address),
466        tcp_close_socket(Socket)).
467
468%!  http_enough_workers(+Queue, +Why, +Peer) is det.
469%
470%   Check that we have enough workers in our queue. If not, call the
471%   hook http:schedule_workers/1 to extend  the   worker  pool. This
472%   predicate can be used by accept_hook/2.
473
474http_enough_workers(Queue, _Why, _Peer) :-
475    message_queue_property(Queue, waiting(_0)),
476    !,
477    debug(http(scheduler), '~D waiting for work; ok', [_0]).
478http_enough_workers(Queue, Why, Peer) :-
479    message_queue_property(Queue, size(Size)),
480    (   enough(Size, Why)
481    ->  debug(http(scheduler), '~D in queue; ok', [Size])
482    ;   current_server(Port, _, _, Queue, _, _),
483        Data = _{ port:Port,
484                  reason:Why,
485                  peer:Peer,
486                  waiting:Size,
487                  queue:Queue
488                },
489        debug(http(scheduler), 'Asking to reschedule: ~p', [Data]),
490        catch(http:schedule_workers(Data),
491              Error,
492              print_message(error, Error))
493    ->  true
494    ;   true
495    ).
496
497enough(0, _).
498enough(1, keep_alive).                  % I will be ready myself
499
500
501%!  http:schedule_workers(+Data:dict) is semidet.
502%
503%   Hook called if a  new  connection   or  a  keep-alive connection
504%   cannot be scheduled _immediately_ to a worker. Dict contains the
505%   following keys:
506%
507%     - port:Port
508%     Port number that identifies the server.
509%     - reason:Reason
510%     One of =accept= for a new connection or =keep_alive= if a
511%     worker tries to reschedule itself.
512%     - peer:Peer
513%     Identify the other end of the connection
514%     - waiting:Size
515%     Number of messages waiting in the queue.
516%     - queue:Queue
517%     Message queue used to dispatch accepted messages.
518%
519%   Note that, when called with `reason:accept`,   we  are called in
520%   the time critical main accept loop.   An  implementation of this
521%   hook shall typically send  the  event   to  thread  dedicated to
522%   dynamic worker-pool management.
523%
524%   @see    http_add_worker/2 may be used to create (temporary) extra
525%           workers.
526
527
528                 /*******************************
529                 *    WORKER QUEUE OPERATIONS   *
530                 *******************************/
531
532%!  create_workers(+Options)
533%
534%   Create the pool of HTTP worker-threads. Each worker has the
535%   alias http_worker_N.
536
537create_workers(Options) :-
538    option(workers(N), Options, 5),
539    option(queue(Queue), Options),
540    catch(message_queue_create(Queue), _, true),
541    atom_concat(Queue, '_', AliasBase),
542    create_workers(1, N, Queue, AliasBase, Options),
543    assert(queue_options(Queue, Options)).
544
545create_workers(I, N, _, _, _) :-
546    I > N,
547    !.
548create_workers(I, N, Queue, AliasBase, Options) :-
549    gensym(AliasBase, Alias),
550    thread_create(http_worker(Options), Id,
551                  [ alias(Alias)
552                  | Options
553                  ]),
554    assertz(queue_worker(Queue, Id)),
555    I2 is I + 1,
556    create_workers(I2, N, Queue, AliasBase, Options).
557
558
559%!  resize_pool(+Queue, +Workers) is det.
560%
561%   Create or destroy workers. If workers   are  destroyed, the call
562%   waits until the desired number of waiters is reached.
563
564resize_pool(Queue, Size) :-
565    findall(W, queue_worker(Queue, W), Workers),
566    length(Workers, Now),
567    (   Now < Size
568    ->  queue_options(Queue, Options),
569        atom_concat(Queue, '_', AliasBase),
570        I0 is Now+1,
571        create_workers(I0, Size, Queue, AliasBase, Options)
572    ;   Now == Size
573    ->  true
574    ;   Now > Size
575    ->  Excess is Now - Size,
576        thread_self(Me),
577        forall(between(1, Excess, _), thread_send_message(Queue, quit(Me))),
578        forall(between(1, Excess, _), thread_get_message(quitted(_)))
579    ).
580
581
582%!  http_worker(+Options)
583%
584%   Run HTTP worker main loop. Workers   simply  wait until they are
585%   passed an accepted socket to process  a client.
586%
587%   If the message quit(Sender) is read   from the queue, the worker
588%   stops.
589
590http_worker(Options) :-
591    debug(http(scheduler), 'New worker', []),
592    prolog_listen(this_thread_exit, done_worker),
593    option(queue(Queue), Options),
594    option(max_idle_time(MaxIdle), Options, infinite),
595    thread_repeat_wait(get_work(Queue, Message, MaxIdle)),
596      debug(http(worker), 'Waiting for a job ...', []),
597      debug(http(worker), 'Got job ~p', [Message]),
598      (   Message = quit(Sender)
599      ->  !,
600          thread_self(Self),
601          thread_detach(Self),
602          (   Sender == idle
603          ->  true
604          ;   retract(queue_worker(Queue, Self)),
605              thread_send_message(Sender, quitted(Self))
606          )
607      ;   open_client(Message, Queue, Goal, In, Out,
608                      Options, ClientOptions),
609          (   catch(http_process(Goal, In, Out, ClientOptions),
610                    Error, true)
611          ->  true
612          ;   Error = goal_failed(http_process/4)
613          ),
614          (   var(Error)
615          ->  fail
616          ;   current_message_level(Error, Level),
617              print_message(Level, Error),
618              memberchk(peer(Peer), ClientOptions),
619              close_connection(Peer, In, Out),
620              fail
621          )
622      ).
623
624get_work(Queue, Message, infinite) :-
625    !,
626    thread_get_message(Queue, Message).
627get_work(Queue, Message, MaxIdle) :-
628    (   thread_get_message(Queue, Message, [timeout(MaxIdle)])
629    ->  true
630    ;   Message = quit(idle)
631    ).
632
633
634%!  open_client(+Message, +Queue, -Goal, -In, -Out,
635%!              +Options, -ClientOptions) is semidet.
636%
637%   Opens the connection to the client in a worker from the message
638%   sent to the queue by accept_server/2.
639
640open_client(requeue(In, Out, Goal, ClOpts),
641            _, Goal, In, Out, Opts, ClOpts) :-
642    !,
643    memberchk(peer(Peer), ClOpts),
644    option(keep_alive_timeout(KeepAliveTMO), Opts, 2),
645    check_keep_alive_connection(In, KeepAliveTMO, Peer, In, Out).
646open_client(Message, Queue, Goal, In, Out, Opts,
647            [ pool(client(Queue, Goal, In, Out)),
648              timeout(Timeout)
649            | Options
650            ]) :-
651    catch(open_client(Message, Goal, In, Out, Options, Opts),
652          E, report_error(E)),
653    option(timeout(Timeout), Opts, 60),
654    (   debugging(http(connection))
655    ->  memberchk(peer(Peer), Options),
656        debug(http(connection), 'Opened connection from ~p', [Peer])
657    ;   true
658    ).
659
660
661%!  open_client(+Message, +Goal, -In, -Out,
662%!              -ClientOptions, +Options) is det.
663
664open_client(Message, Goal, In, Out, ClientOptions, Options) :-
665    open_client_hook(Message, Goal, In, Out, ClientOptions, Options),
666    !.
667open_client(tcp_client(Socket, Goal, Peer), Goal, In, Out,
668            [ peer(Peer),
669              protocol(http)
670            ], _) :-
671    tcp_open_socket(Socket, In, Out).
672
673report_error(E) :-
674    print_message(error, E),
675    fail.
676
677
678%!  check_keep_alive_connection(+In, +TimeOut, +Peer, +In, +Out) is semidet.
679%
680%   Wait for the client for at most  TimeOut seconds. Succeed if the
681%   client starts a new request within   this  time. Otherwise close
682%   the connection and fail.
683
684check_keep_alive_connection(In, TMO, Peer, In, Out) :-
685    stream_property(In, timeout(Old)),
686    set_stream(In, timeout(TMO)),
687    debug(http(keep_alive), 'Waiting for keep-alive ...', []),
688    catch(peek_code(In, Code), E, true),
689    (   var(E),                     % no exception
690        Code \== -1                 % no end-of-file
691    ->  set_stream(In, timeout(Old)),
692        debug(http(keep_alive), '\tre-using keep-alive connection', [])
693    ;   (   Code == -1
694        ->  debug(http(keep_alive), '\tRemote closed keep-alive connection', [])
695        ;   debug(http(keep_alive), '\tTimeout on keep-alive connection', [])
696        ),
697        close_connection(Peer, In, Out),
698        fail
699    ).
700
701
702%!  done_worker
703%
704%   Called when worker is terminated  due   to  http_workers/2  or a
705%   (debugging) exception. In  the   latter  case, recreate_worker/2
706%   creates a new worker.
707
708done_worker :-
709    thread_self(Self),
710    thread_detach(Self),
711    retract(queue_worker(Queue, Self)),
712    thread_property(Self, status(Status)),
713    !,
714    (   catch(recreate_worker(Status, Queue), _, fail)
715    ->  print_message(informational,
716                      httpd_restarted_worker(Self))
717    ;   done_status_message_level(Status, Level),
718        print_message(Level,
719                      httpd_stopped_worker(Self, Status))
720    ).
721done_worker :-                                  % received quit(Sender)
722    thread_self(Self),
723    thread_property(Self, status(Status)),
724    done_status_message_level(Status, Level),
725    print_message(Level,
726                  httpd_stopped_worker(Self, Status)).
727
728done_status_message_level(true, silent) :- !.
729done_status_message_level(exception('$aborted'), silent) :- !.
730done_status_message_level(_, informational).
731
732
733%!  recreate_worker(+Status, +Queue) is semidet.
734%
735%   Deal with the possibility  that   threads  are,  during development,
736%   killed with abort/0. We recreate the worker to avoid that eventually
737%   we run out of workers. If  we  are   aborted  due  to a halt/0 call,
738%   thread_create/3 will raise a permission error.
739%
740%   The first clause deals with the possibility  that we cannot write to
741%   `user_error`. This is possible when Prolog   is started as a service
742%   using some service managers. Would be  nice   if  we  could write an
743%   error, but where?
744
745recreate_worker(exception(error(io_error(write,user_error),_)), _Queue) :-
746    halt(2).
747recreate_worker(exception(Error), Queue) :-
748    recreate_on_error(Error),
749    queue_options(Queue, Options),
750    atom_concat(Queue, '_', AliasBase),
751    create_workers(1, 1, Queue, AliasBase, Options).
752
753recreate_on_error('$aborted').
754recreate_on_error(time_limit_exceeded).
755
756%!  thread_httpd:message_level(+Exception, -Level)
757%
758%   Determine the message stream used  for   exceptions  that  may occur
759%   during server_loop/5. Being multifile, clauses can   be added by the
760%   application to refine error handling.   See  also message_hook/3 for
761%   further programming error handling.
762
763:- multifile
764    message_level/2.
765
766message_level(error(io_error(read, _), _),               silent).
767message_level(error(socket_error(epipe,_), _),           silent).
768message_level(error(http_write_short(_Obj,_Written), _), silent).
769message_level(error(timeout_error(read, _), _),          informational).
770message_level(keep_alive_timeout,                        silent).
771
772current_message_level(Term, Level) :-
773    (   message_level(Term, Level)
774    ->  true
775    ;   Level = error
776    ).
777
778
779%!  http_requeue(+Header)
780%
781%   Re-queue a connection to  the  worker   pool.  This  deals  with
782%   processing additional requests on keep-alive connections.
783
784http_requeue(Header) :-
785    requeue_header(Header, ClientOptions),
786    memberchk(pool(client(Queue, Goal, In, Out)), ClientOptions),
787    memberchk(peer(Peer), ClientOptions),
788    http_enough_workers(Queue, keep_alive, Peer),
789    thread_send_message(Queue, requeue(In, Out, Goal, ClientOptions)),
790    !.
791http_requeue(Header) :-
792    debug(http(error), 'Re-queue failed: ~p', [Header]),
793    fail.
794
795requeue_header([], []).
796requeue_header([H|T0], [H|T]) :-
797    requeue_keep(H),
798    !,
799    requeue_header(T0, T).
800requeue_header([_|T0], T) :-
801    requeue_header(T0, T).
802
803requeue_keep(pool(_)).
804requeue_keep(peer(_)).
805requeue_keep(protocol(_)).
806
807
808%!  http_process(Message, Queue, +Options)
809%
810%   Handle a single client message on the given stream.
811
812http_process(Goal, In, Out, Options) :-
813    debug(http(server), 'Running server goal ~p on ~p -> ~p',
814          [Goal, In, Out]),
815    option(timeout(TMO), Options, 60),
816    set_stream(In, timeout(TMO)),
817    set_stream(Out, timeout(TMO)),
818    http_wrapper(Goal, In, Out, Connection,
819                 [ request(Request)
820                 | Options
821                 ]),
822    next(Connection, Request).
823
824next(Connection, Request) :-
825    next_(Connection, Request), !.
826next(Connection, Request) :-
827    print_message(warning, goal_failed(next(Connection,Request))).
828
829next_(switch_protocol(SwitchGoal, _SwitchOptions), Request) :-
830    !,
831    memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
832    (   catch(call(SwitchGoal, In, Out), E,
833              (   print_message(error, E),
834                  fail))
835    ->  true
836    ;   http_close_connection(Request)
837    ).
838next_(spawned(ThreadId), _) :-
839    !,
840    debug(http(spawn), 'Handler spawned to thread ~w', [ThreadId]).
841next_(Connection, Request) :-
842    downcase_atom(Connection, 'keep-alive'),
843    http_requeue(Request),
844    !.
845next_(_, Request) :-
846    http_close_connection(Request).
847
848
849%!  http_close_connection(+Request)
850%
851%   Close connection associated to Request.  See also http_requeue/1.
852
853http_close_connection(Request) :-
854    memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
855    memberchk(peer(Peer), Request),
856    close_connection(Peer, In, Out).
857
858%!  close_connection(+Peer, +In, +Out)
859%
860%   Closes the connection from the server to the client.  Errors are
861%   currently silently ignored.
862
863close_connection(Peer, In, Out) :-
864    debug(http(connection), 'Closing connection from ~p', [Peer]),
865    catch(close(In, [force(true)]), _, true),
866    catch(close(Out, [force(true)]), _, true).
867
868%!  http_spawn(:Goal, +Options) is det.
869%
870%   Continue this connection on a  new   thread.  A handler may call
871%   http_spawn/2 to start a new thread that continues processing the
872%   current request using Goal. The original   thread returns to the
873%   worker pool for processing new requests.   Options are passed to
874%   thread_create/3, except for:
875%
876%       * pool(+Pool)
877%       Interfaces to library(thread_pool), starting the thread
878%       on the given pool.
879%
880%   If a pool does not exist, this predicate calls the multifile
881%   hook http:create_pool/1 to create it. If this predicate succeeds
882%   the operation is retried.
883
884http_spawn(Goal, Options) :-
885    select_option(pool(Pool), Options, ThreadOptions),
886    !,
887    current_output(CGI),
888    catch(thread_create_in_pool(Pool,
889                                wrap_spawned(CGI, Goal), Id,
890                                [ detached(true)
891                                | ThreadOptions
892                                ]),
893          Error,
894          true),
895    (   var(Error)
896    ->  http_spawned(Id)
897    ;   Error = error(resource_error(threads_in_pool(_)), _)
898    ->  throw(http_reply(busy))
899    ;   Error = error(existence_error(thread_pool, Pool), _),
900        create_pool(Pool)
901    ->  http_spawn(Goal, Options)
902    ;   throw(Error)
903    ).
904http_spawn(Goal, Options) :-
905    current_output(CGI),
906    thread_create(wrap_spawned(CGI, Goal), Id,
907                  [ detached(true)
908                  | Options
909                  ]),
910    http_spawned(Id).
911
912wrap_spawned(CGI, Goal) :-
913    set_output(CGI),
914    http_wrap_spawned(Goal, Request, Connection),
915    next(Connection, Request).
916
917%!  create_pool(+Pool)
918%
919%   Lazy  creation  of  worker-pools  for   the  HTTP  server.  This
920%   predicate calls the hook http:create_pool/1.   If the hook fails
921%   it creates a default pool of size   10. This should suffice most
922%   typical usecases. Note that we  get   a  permission error if the
923%   pool is already created.  We can ignore this.
924
925create_pool(Pool) :-
926    E = error(permission_error(create, thread_pool, Pool), _),
927    catch(http:create_pool(Pool), E, true).
928create_pool(Pool) :-
929    print_message(informational, httpd(created_pool(Pool))),
930    thread_pool_create(Pool, 10, []).
931
932
933		 /*******************************
934		 *         WAIT POLICIES	*
935		 *******************************/
936
937:- meta_predicate
938    thread_repeat_wait(0).
939
940%!  thread_repeat_wait(:Goal) is multi.
941%
942%   Acts as `repeat,  thread_idle(Goal)`,  choosing   whether  to  use a
943%   `long` or `short` idle time based on the average firing rate.
944
945thread_repeat_wait(Goal) :-
946    new_rate_mma(5, 1000, State),
947    repeat,
948      update_rate_mma(State, MMA),
949      long(MMA, IsLong),
950      (   IsLong == brief
951      ->  call(Goal)
952      ;   thread_idle(Goal, IsLong)
953      ).
954
955long(MMA, brief) :-
956    MMA < 0.05,
957    !.
958long(MMA, short) :-
959    MMA < 1,
960    !.
961long(_, long).
962
963%!  new_rate_mma(+N, +Resolution, -State) is det.
964%!  update_rate_mma(!State, -MMA) is det.
965%
966%   Implement _Modified Moving  Average_  computing   the  average  time
967%   between requests as an exponential moving averate with alpha=1/N.
968%
969%   @arg Resolution is the time resolution  in 1/Resolution seconds. All
970%   storage is done in integers to avoid  the need for stack freezing in
971%   nb_setarg/3.
972%
973%   @see https://en.wikipedia.org/wiki/Moving_average
974
975new_rate_mma(N, Resolution, rstate(Base, 0, MaxI, Resolution, N, 0)) :-
976    current_prolog_flag(max_tagged_integer, MaxI),
977    get_time(Base).
978
979update_rate_mma(State, MMAr) :-
980    State = rstate(Base, Last, MaxI, Resolution, N, MMA0),
981    get_time(Now),
982    Stamp is round((Now-Base)*Resolution),
983    (   Stamp > MaxI
984    ->  nb_setarg(1, State, Now),
985        nb_setarg(2, State, 0)
986    ;   true
987    ),
988    Diff is Stamp-Last,
989    nb_setarg(2, State, Stamp),
990    MMA is round(((N-1)*MMA0+Diff)/N),
991    nb_setarg(6, State, MMA),
992    MMAr is MMA/float(Resolution).
993
994
995                 /*******************************
996                 *            MESSAGES          *
997                 *******************************/
998
999:- multifile
1000    prolog:message/3.
1001
1002prolog:message(httpd_started_server(Port, Options)) -->
1003    [ 'Started server at '-[] ],
1004    http_root(Port, Options).
1005prolog:message(httpd_stopped_worker(Self, Status)) -->
1006    [ 'Stopped worker ~p: ~p'-[Self, Status] ].
1007prolog:message(httpd_restarted_worker(Self)) -->
1008    [ 'Replaced aborted worker ~p'-[Self] ].
1009prolog:message(httpd(created_pool(Pool))) -->
1010    [ 'Created thread-pool ~p of size 10'-[Pool], nl,
1011      'Create this pool at startup-time or define the hook ', nl,
1012      'http:create_pool/1 to avoid this message and create a ', nl,
1013      'pool that fits the usage-profile.'
1014    ].
1015
1016http_root(Address, Options) -->
1017    { landing_page(Address, URI, Options) },
1018    [ '~w'-[URI] ].
1019
1020landing_page(Host:Port, URI, Options) :-
1021    must_be(atom, Host),
1022    http_server_property(Port, scheme(Scheme)),
1023    (   default_port(Scheme, Port)
1024    ->  format(atom(Base), '~w://~w', [Scheme, Host])
1025    ;   format(atom(Base), '~w://~w:~w', [Scheme, Host, Port])
1026    ),
1027    entry_page(Base, URI, Options).
1028landing_page(Port, URI, Options) :-
1029    landing_page(localhost:Port, URI, Options).
1030
1031default_port(http, 80).
1032default_port(https, 443).
1033
1034entry_page(Base, URI, Options) :-
1035    option(entry_page(Entry), Options),
1036    !,
1037    uri_resolve(Entry, Base, URI).
1038entry_page(Base, URI, _) :-
1039    http_absolute_location(root(.), Entry, []),
1040    uri_resolve(Entry, Base, URI).
1041