1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker 4 E-mail: J.Wielemaker@vu.nl 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2002-2020, University of Amsterdam 7 VU University Amsterdam 8 CWI, Amsterdam 9 All rights reserved. 10 11 Redistribution and use in source and binary forms, with or without 12 modification, are permitted provided that the following conditions 13 are met: 14 15 1. Redistributions of source code must retain the above copyright 16 notice, this list of conditions and the following disclaimer. 17 18 2. Redistributions in binary form must reproduce the above copyright 19 notice, this list of conditions and the following disclaimer in 20 the documentation and/or other materials provided with the 21 distribution. 22 23 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 24 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 25 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 26 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 27 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 28 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 29 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 30 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 31 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 32 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 33 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 POSSIBILITY OF SUCH DAMAGE. 35*/ 36 37:- module(thread_httpd, 38 [ http_current_server/2, % ?:Goal, ?Port 39 http_server_property/2, % ?Port, ?Property 40 http_server/2, % :Goal, +Options 41 http_workers/2, % +Port, ?WorkerCount 42 http_add_worker/2, % +Port, +Options 43 http_current_worker/2, % ?Port, ?ThreadID 44 http_stop_server/2, % +Port, +Options 45 http_spawn/2, % :Goal, +Options 46 47 http_requeue/1, % +Request 48 http_close_connection/1, % +Request 49 http_enough_workers/3 % +Queue, +Why, +Peer 50 ]). 51:- use_module(library(debug)). 52:- use_module(library(error)). 53:- use_module(library(option)). 54:- use_module(library(socket)). 55:- use_module(library(thread_pool)). 56:- use_module(library(gensym)). 57:- use_module(http_wrapper). 58:- use_module(http_path). 59 60 61:- predicate_options(http_server/2, 2, 62 [ port(any), 63 entry_page(atom), 64 tcp_socket(any), 65 workers(positive_integer), 66 timeout(number), 67 keep_alive_timeout(number), 68 silent(boolean), 69 ssl(list(any)), % if http/http_ssl_plugin is loaded 70 pass_to(system:thread_create/3, 3) 71 ]). 72:- predicate_options(http_spawn/2, 2, 73 [ pool(atom), 74 pass_to(system:thread_create/3, 3), 75 pass_to(thread_pool:thread_create_in_pool/4, 4) 76 ]). 77:- predicate_options(http_add_worker/2, 2, 78 [ timeout(number), 79 keep_alive_timeout(number), 80 max_idle_time(number), 81 pass_to(system:thread_create/3, 3) 82 ]). 83 84/** <module> Threaded HTTP server 85 86Most code doesn't need to use this directly; instead use 87library(http/http_server), which combines this library with the 88typical HTTP libraries that most servers need. 89 90This library defines the HTTP server frontend of choice for SWI-Prolog. 91It is based on the multi-threading capabilities of SWI-Prolog and thus 92exploits multiple cores to serve requests concurrently. The server 93scales well and can cooperate with library(thread_pool) to control the 94number of concurrent requests of a given type. For example, it can be 95configured to handle 200 file download requests concurrently, 2 requests 96that potentially uses a lot of memory and 8 requests that use a lot of 97CPU resources. 98 99On Unix systems, this library can be combined with 100library(http/http_unix_daemon) to realise a proper Unix service process 101that creates a web server at port 80, runs under a specific account, 102optionally detaches from the controlling terminal, etc. 103 104Combined with library(http/http_ssl_plugin) from the SSL package, this 105library can be used to create an HTTPS server. See 106<plbase>/doc/packages/examples/ssl/https for an example server using a 107self-signed SSL certificate. 108*/ 109 110:- meta_predicate 111 http_server(1, :), 112 http_current_server(1, ?), 113 http_spawn(0, +). 114 115:- dynamic 116 current_server/6, % Port, Goal, Thread, Queue, Scheme, StartTime 117 queue_worker/2, % Queue, ThreadID 118 queue_options/2. % Queue, Options 119 120:- multifile 121 make_socket_hook/3, 122 accept_hook/2, 123 close_hook/1, 124 open_client_hook/6, 125 http:create_pool/1, 126 http:schedule_workers/1. 127 128:- meta_predicate 129 thread_repeat_wait(0). 130 131%! http_server(:Goal, :Options) is det. 132% 133% Create a server at Port that calls Goal for each parsed request. 134% Options provide a list of options. Defined options are 135% 136% * port(?Address) 137% Port to bind to. Address is either a port or a term 138% Host:Port. The port may be a variable, causing the system 139% to select a free port. See tcp_bind/2. 140% 141% * entry_page(+URI) 142% Affects the message printed while the server is started. 143% Interpreted as a URI relative to the server root. 144% 145% * tcp_socket(+Socket) 146% If provided, use this socket instead of the creating one and 147% binding it to an address. The socket must be bound to an 148% address. 149% 150% * workers(+Count) 151% Determine the number of worker threads. Default is 5. This 152% is fine for small scale usage. Public servers typically need 153% a higher number. 154% 155% * timeout(+Seconds) 156% Maximum time of inactivity trying to read the request after a 157% connection has been opened. Default is 60 seconds. See 158% set_stream/1 using the _timeout_ option. 159% 160% * keep_alive_timeout(+Seconds) 161% Time to keep `Keep alive' connections alive. Default is 162% 2 seconds. 163% 164% * stack_limit(+Bytes) 165% Stack limit to use for the workers. The default is inherited 166% from the `main` thread. 167% If you need to control resource usage you may consider the 168% `spawn` option of http_handler/3 and library(thread_pool). 169% 170% * silent(Bool) 171% If `true` (default `false`), do not print an informational 172% message that the server was started. 173% 174% A typical initialization for an HTTP server that uses 175% http_dispatch/1 to relay requests to predicates is: 176% 177% == 178% :- use_module(library(http/thread_httpd)). 179% :- use_module(library(http/http_dispatch)). 180% 181% start_server(Port) :- 182% http_server(http_dispatch, [port(Port)]). 183% == 184% 185% Note that multiple servers can coexist in the same Prolog 186% process. A notable application of this is to have both an HTTP 187% and HTTPS server, where the HTTP server redirects to the HTTPS 188% server for handling sensitive requests. 189 190http_server(Goal, M:Options0) :- 191 option(port(Port), Options0), 192 !, 193 make_socket(Port, M:Options0, Options), 194 create_workers(Options), 195 create_server(Goal, Port, Options), 196 ( option(silent(true), Options0) 197 -> true 198 ; print_message(informational, 199 httpd_started_server(Port, Options0)) 200 ). 201http_server(_Goal, _Options) :- 202 existence_error(option, port). 203 204 205%! make_socket(?Port, :OptionsIn, -OptionsOut) is det. 206% 207% Create the HTTP server socket and worker pool queue. OptionsOut 208% is quaranteed to hold the option queue(QueueId). 209% 210% @arg OptionsIn is qualified to allow passing the 211% module-sensitive ssl option argument. 212 213make_socket(Port, Options0, Options) :- 214 make_socket_hook(Port, Options0, Options), 215 !. 216make_socket(Port, _:Options0, Options) :- 217 option(tcp_socket(_), Options0), 218 !, 219 make_addr_atom('httpd', Port, Queue), 220 Options = [ queue(Queue) 221 | Options0 222 ]. 223make_socket(Port, _:Options0, Options) :- 224 tcp_socket(Socket), 225 tcp_setopt(Socket, reuseaddr), 226 tcp_bind(Socket, Port), 227 tcp_listen(Socket, 64), 228 make_addr_atom('httpd', Port, Queue), 229 Options = [ queue(Queue), 230 tcp_socket(Socket) 231 | Options0 232 ]. 233 234%! make_addr_atom(+Scheme, +Address, -Atom) is det. 235% 236% Create an atom that identifies the server's queue and thread 237% resources. 238 239make_addr_atom(Scheme, Address, Atom) :- 240 phrase(address_parts(Address), Parts), 241 atomic_list_concat([Scheme,@|Parts], Atom). 242 243address_parts(Atomic) --> 244 { atomic(Atomic) }, 245 !, 246 [Atomic]. 247address_parts(Host:Port) --> 248 !, 249 address_parts(Host), [:], address_parts(Port). 250address_parts(ip(A,B,C,D)) --> 251 !, 252 [ A, '.', B, '.', C, '.', D ]. 253 254%! create_server(:Goal, +Address, +Options) is det. 255% 256% Create the main server thread that runs accept_server/2 to 257% listen to new requests. 258 259create_server(Goal, Address, Options) :- 260 get_time(StartTime), 261 memberchk(queue(Queue), Options), 262 scheme(Scheme, Options), 263 autoload_https(Scheme), 264 address_port(Address, Port), 265 make_addr_atom(Scheme, Port, Alias), 266 thread_self(Initiator), 267 thread_create(accept_server(Goal, Initiator, Options), _, 268 [ alias(Alias) 269 ]), 270 thread_get_message(server_started), 271 assert(current_server(Port, Goal, Alias, Queue, Scheme, StartTime)). 272 273scheme(Scheme, Options) :- 274 option(scheme(Scheme), Options), 275 !. 276scheme(Scheme, Options) :- 277 ( option(ssl(_), Options) 278 ; option(ssl_instance(_), Options) 279 ), 280 !, 281 Scheme = https. 282scheme(http, _). 283 284address_port(_Host:Port, Port) :- !. 285address_port(Port, Port). 286 287autoload_https(https) :- 288 \+ clause(accept_hook(_Goal, _Options), _), 289 exists_source(library(http/http_ssl_plugin)), 290 !, 291 use_module(library(http/http_ssl_plugin)). 292autoload_https(_). 293 294%! http_current_server(:Goal, ?Port) is nondet. 295% 296% True if Goal is the goal of a server at Port. 297% 298% @deprecated Use http_server_property(Port, goal(Goal)) 299 300http_current_server(Goal, Port) :- 301 current_server(Port, Goal, _, _, _, _). 302 303 304%! http_server_property(?Port, ?Property) is nondet. 305% 306% True if Property is a property of the HTTP server running at 307% Port. Defined properties are: 308% 309% * goal(:Goal) 310% Goal used to start the server. This is often 311% http_dispatch/1. 312% * scheme(-Scheme) 313% Scheme is one of `http` or `https`. 314% * start_time(?Time) 315% Time-stamp when the server was created. 316 317http_server_property(_:Port, Property) :- 318 integer(Port), 319 !, 320 server_property(Property, Port). 321http_server_property(Port, Property) :- 322 server_property(Property, Port). 323 324server_property(goal(Goal), Port) :- 325 current_server(Port, Goal, _, _, _, _). 326server_property(scheme(Scheme), Port) :- 327 current_server(Port, _, _, _, Scheme, _). 328server_property(start_time(Time), Port) :- 329 current_server(Port, _, _, _, _, Time). 330 331 332%! http_workers(+Port, -Workers) is det. 333%! http_workers(+Port, +Workers:int) is det. 334% 335% Query or set the number of workers for the server at this port. 336% The number of workers is dynamically modified. Setting it to 1 337% (one) can be used to profile the worker using tprofile/1. 338 339http_workers(Port, Workers) :- 340 must_be(ground, Port), 341 current_server(Port, _, _, Queue, _, _), 342 !, 343 ( integer(Workers) 344 -> resize_pool(Queue, Workers) 345 ; findall(W, queue_worker(Queue, W), WorkerIDs), 346 length(WorkerIDs, Workers) 347 ). 348http_workers(Port, _) :- 349 existence_error(http_server, Port). 350 351 352%! http_add_worker(+Port, +Options) is det. 353% 354% Add a new worker to the HTTP server for port Port. Options 355% overrule the default queue options. The following additional 356% options are processed: 357% 358% - max_idle_time(+Seconds) 359% The created worker will automatically terminate if there is 360% no new work within Seconds. 361 362http_add_worker(Port, Options) :- 363 must_be(ground, Port), 364 current_server(Port, _, _, Queue, _, _), 365 !, 366 queue_options(Queue, QueueOptions), 367 merge_options(Options, QueueOptions, WorkerOptions), 368 atom_concat(Queue, '_', AliasBase), 369 create_workers(1, 1, Queue, AliasBase, WorkerOptions). 370http_add_worker(Port, _) :- 371 existence_error(http_server, Port). 372 373 374%! http_current_worker(?Port, ?ThreadID) is nondet. 375% 376% True if ThreadID is the identifier of a Prolog thread serving 377% Port. This predicate is motivated to allow for the use of 378% arbitrary interaction with the worker thread for development and 379% statistics. 380 381http_current_worker(Port, ThreadID) :- 382 current_server(Port, _, _, Queue, _, _), 383 queue_worker(Queue, ThreadID). 384 385 386%! accept_server(:Goal, +Initiator, +Options) 387% 388% The goal of a small server-thread accepting new requests and 389% posting them to the queue of workers. 390 391accept_server(Goal, Initiator, Options) :- 392 catch(accept_server2(Goal, Initiator, Options), http_stop, true), 393 thread_self(Thread), 394 retract(current_server(_Port, _, Thread, _Queue, _Scheme, _StartTime)), 395 close_server_socket(Options). 396 397accept_server2(Goal, Initiator, Options) :- 398 thread_send_message(Initiator, server_started), 399 repeat, 400 ( catch(accept_server3(Goal, Options), E, true) 401 -> ( var(E) 402 -> fail 403 ; accept_rethrow_error(E) 404 -> throw(E) 405 ; print_message(error, E), 406 fail 407 ) 408 ; print_message(error, % internal error 409 goal_failed(accept_server3(Goal, Options))), 410 fail 411 ). 412 413accept_server3(Goal, Options) :- 414 accept_hook(Goal, Options), 415 !. 416accept_server3(Goal, Options) :- 417 memberchk(tcp_socket(Socket), Options), 418 memberchk(queue(Queue), Options), 419 debug(http(connection), 'Waiting for connection', []), 420 tcp_accept(Socket, Client, Peer), 421 debug(http(connection), 'New HTTP connection from ~p', [Peer]), 422 thread_send_message(Queue, tcp_client(Client, Goal, Peer)), 423 http_enough_workers(Queue, accept, Peer). 424 425accept_rethrow_error(http_stop). 426accept_rethrow_error('$aborted'). 427 428 429%! close_server_socket(+Options) 430% 431% Close the server socket. 432 433close_server_socket(Options) :- 434 close_hook(Options), 435 !. 436close_server_socket(Options) :- 437 memberchk(tcp_socket(Socket), Options), 438 !, 439 tcp_close_socket(Socket). 440 441 442%! http_stop_server(+Port, +Options) 443% 444% Stop the indicated HTTP server gracefully. First stops all 445% workers, then stops the server. 446% 447% @tbd Realise non-graceful stop 448 449http_stop_server(Host:Port, Options) :- % e.g., localhost:4000 450 ground(Host), 451 !, 452 http_stop_server(Port, Options). 453http_stop_server(Port, _Options) :- 454 http_workers(Port, 0), % checks Port is ground 455 current_server(Port, _, Thread, Queue, _Scheme, _Start), 456 retractall(queue_options(Queue, _)), 457 thread_signal(Thread, throw(http_stop)), 458 catch(connect(localhost:Port), _, true), 459 thread_join(Thread, _), 460 message_queue_destroy(Queue). 461 462connect(Address) :- 463 setup_call_cleanup( 464 tcp_socket(Socket), 465 tcp_connect(Socket, Address), 466 tcp_close_socket(Socket)). 467 468%! http_enough_workers(+Queue, +Why, +Peer) is det. 469% 470% Check that we have enough workers in our queue. If not, call the 471% hook http:schedule_workers/1 to extend the worker pool. This 472% predicate can be used by accept_hook/2. 473 474http_enough_workers(Queue, _Why, _Peer) :- 475 message_queue_property(Queue, waiting(_0)), 476 !, 477 debug(http(scheduler), '~D waiting for work; ok', [_0]). 478http_enough_workers(Queue, Why, Peer) :- 479 message_queue_property(Queue, size(Size)), 480 ( enough(Size, Why) 481 -> debug(http(scheduler), '~D in queue; ok', [Size]) 482 ; current_server(Port, _, _, Queue, _, _), 483 Data = _{ port:Port, 484 reason:Why, 485 peer:Peer, 486 waiting:Size, 487 queue:Queue 488 }, 489 debug(http(scheduler), 'Asking to reschedule: ~p', [Data]), 490 catch(http:schedule_workers(Data), 491 Error, 492 print_message(error, Error)) 493 -> true 494 ; true 495 ). 496 497enough(0, _). 498enough(1, keep_alive). % I will be ready myself 499 500 501%! http:schedule_workers(+Data:dict) is semidet. 502% 503% Hook called if a new connection or a keep-alive connection 504% cannot be scheduled _immediately_ to a worker. Dict contains the 505% following keys: 506% 507% - port:Port 508% Port number that identifies the server. 509% - reason:Reason 510% One of =accept= for a new connection or =keep_alive= if a 511% worker tries to reschedule itself. 512% - peer:Peer 513% Identify the other end of the connection 514% - waiting:Size 515% Number of messages waiting in the queue. 516% - queue:Queue 517% Message queue used to dispatch accepted messages. 518% 519% Note that, when called with `reason:accept`, we are called in 520% the time critical main accept loop. An implementation of this 521% hook shall typically send the event to thread dedicated to 522% dynamic worker-pool management. 523% 524% @see http_add_worker/2 may be used to create (temporary) extra 525% workers. 526 527 528 /******************************* 529 * WORKER QUEUE OPERATIONS * 530 *******************************/ 531 532%! create_workers(+Options) 533% 534% Create the pool of HTTP worker-threads. Each worker has the 535% alias http_worker_N. 536 537create_workers(Options) :- 538 option(workers(N), Options, 5), 539 option(queue(Queue), Options), 540 catch(message_queue_create(Queue), _, true), 541 atom_concat(Queue, '_', AliasBase), 542 create_workers(1, N, Queue, AliasBase, Options), 543 assert(queue_options(Queue, Options)). 544 545create_workers(I, N, _, _, _) :- 546 I > N, 547 !. 548create_workers(I, N, Queue, AliasBase, Options) :- 549 gensym(AliasBase, Alias), 550 thread_create(http_worker(Options), Id, 551 [ alias(Alias) 552 | Options 553 ]), 554 assertz(queue_worker(Queue, Id)), 555 I2 is I + 1, 556 create_workers(I2, N, Queue, AliasBase, Options). 557 558 559%! resize_pool(+Queue, +Workers) is det. 560% 561% Create or destroy workers. If workers are destroyed, the call 562% waits until the desired number of waiters is reached. 563 564resize_pool(Queue, Size) :- 565 findall(W, queue_worker(Queue, W), Workers), 566 length(Workers, Now), 567 ( Now < Size 568 -> queue_options(Queue, Options), 569 atom_concat(Queue, '_', AliasBase), 570 I0 is Now+1, 571 create_workers(I0, Size, Queue, AliasBase, Options) 572 ; Now == Size 573 -> true 574 ; Now > Size 575 -> Excess is Now - Size, 576 thread_self(Me), 577 forall(between(1, Excess, _), thread_send_message(Queue, quit(Me))), 578 forall(between(1, Excess, _), thread_get_message(quitted(_))) 579 ). 580 581 582%! http_worker(+Options) 583% 584% Run HTTP worker main loop. Workers simply wait until they are 585% passed an accepted socket to process a client. 586% 587% If the message quit(Sender) is read from the queue, the worker 588% stops. 589 590http_worker(Options) :- 591 debug(http(scheduler), 'New worker', []), 592 prolog_listen(this_thread_exit, done_worker), 593 option(queue(Queue), Options), 594 option(max_idle_time(MaxIdle), Options, infinite), 595 thread_repeat_wait(get_work(Queue, Message, MaxIdle)), 596 debug(http(worker), 'Waiting for a job ...', []), 597 debug(http(worker), 'Got job ~p', [Message]), 598 ( Message = quit(Sender) 599 -> !, 600 thread_self(Self), 601 thread_detach(Self), 602 ( Sender == idle 603 -> true 604 ; retract(queue_worker(Queue, Self)), 605 thread_send_message(Sender, quitted(Self)) 606 ) 607 ; open_client(Message, Queue, Goal, In, Out, 608 Options, ClientOptions), 609 ( catch(http_process(Goal, In, Out, ClientOptions), 610 Error, true) 611 -> true 612 ; Error = goal_failed(http_process/4) 613 ), 614 ( var(Error) 615 -> fail 616 ; current_message_level(Error, Level), 617 print_message(Level, Error), 618 memberchk(peer(Peer), ClientOptions), 619 close_connection(Peer, In, Out), 620 fail 621 ) 622 ). 623 624get_work(Queue, Message, infinite) :- 625 !, 626 thread_get_message(Queue, Message). 627get_work(Queue, Message, MaxIdle) :- 628 ( thread_get_message(Queue, Message, [timeout(MaxIdle)]) 629 -> true 630 ; Message = quit(idle) 631 ). 632 633 634%! open_client(+Message, +Queue, -Goal, -In, -Out, 635%! +Options, -ClientOptions) is semidet. 636% 637% Opens the connection to the client in a worker from the message 638% sent to the queue by accept_server/2. 639 640open_client(requeue(In, Out, Goal, ClOpts), 641 _, Goal, In, Out, Opts, ClOpts) :- 642 !, 643 memberchk(peer(Peer), ClOpts), 644 option(keep_alive_timeout(KeepAliveTMO), Opts, 2), 645 check_keep_alive_connection(In, KeepAliveTMO, Peer, In, Out). 646open_client(Message, Queue, Goal, In, Out, Opts, 647 [ pool(client(Queue, Goal, In, Out)), 648 timeout(Timeout) 649 | Options 650 ]) :- 651 catch(open_client(Message, Goal, In, Out, Options, Opts), 652 E, report_error(E)), 653 option(timeout(Timeout), Opts, 60), 654 ( debugging(http(connection)) 655 -> memberchk(peer(Peer), Options), 656 debug(http(connection), 'Opened connection from ~p', [Peer]) 657 ; true 658 ). 659 660 661%! open_client(+Message, +Goal, -In, -Out, 662%! -ClientOptions, +Options) is det. 663 664open_client(Message, Goal, In, Out, ClientOptions, Options) :- 665 open_client_hook(Message, Goal, In, Out, ClientOptions, Options), 666 !. 667open_client(tcp_client(Socket, Goal, Peer), Goal, In, Out, 668 [ peer(Peer), 669 protocol(http) 670 ], _) :- 671 tcp_open_socket(Socket, In, Out). 672 673report_error(E) :- 674 print_message(error, E), 675 fail. 676 677 678%! check_keep_alive_connection(+In, +TimeOut, +Peer, +In, +Out) is semidet. 679% 680% Wait for the client for at most TimeOut seconds. Succeed if the 681% client starts a new request within this time. Otherwise close 682% the connection and fail. 683 684check_keep_alive_connection(In, TMO, Peer, In, Out) :- 685 stream_property(In, timeout(Old)), 686 set_stream(In, timeout(TMO)), 687 debug(http(keep_alive), 'Waiting for keep-alive ...', []), 688 catch(peek_code(In, Code), E, true), 689 ( var(E), % no exception 690 Code \== -1 % no end-of-file 691 -> set_stream(In, timeout(Old)), 692 debug(http(keep_alive), '\tre-using keep-alive connection', []) 693 ; ( Code == -1 694 -> debug(http(keep_alive), '\tRemote closed keep-alive connection', []) 695 ; debug(http(keep_alive), '\tTimeout on keep-alive connection', []) 696 ), 697 close_connection(Peer, In, Out), 698 fail 699 ). 700 701 702%! done_worker 703% 704% Called when worker is terminated due to http_workers/2 or a 705% (debugging) exception. In the latter case, recreate_worker/2 706% creates a new worker. 707 708done_worker :- 709 thread_self(Self), 710 thread_detach(Self), 711 retract(queue_worker(Queue, Self)), 712 thread_property(Self, status(Status)), 713 !, 714 ( catch(recreate_worker(Status, Queue), _, fail) 715 -> print_message(informational, 716 httpd_restarted_worker(Self)) 717 ; done_status_message_level(Status, Level), 718 print_message(Level, 719 httpd_stopped_worker(Self, Status)) 720 ). 721done_worker :- % received quit(Sender) 722 thread_self(Self), 723 thread_property(Self, status(Status)), 724 done_status_message_level(Status, Level), 725 print_message(Level, 726 httpd_stopped_worker(Self, Status)). 727 728done_status_message_level(true, silent) :- !. 729done_status_message_level(exception('$aborted'), silent) :- !. 730done_status_message_level(_, informational). 731 732 733%! recreate_worker(+Status, +Queue) is semidet. 734% 735% Deal with the possibility that threads are, during development, 736% killed with abort/0. We recreate the worker to avoid that eventually 737% we run out of workers. If we are aborted due to a halt/0 call, 738% thread_create/3 will raise a permission error. 739% 740% The first clause deals with the possibility that we cannot write to 741% `user_error`. This is possible when Prolog is started as a service 742% using some service managers. Would be nice if we could write an 743% error, but where? 744 745recreate_worker(exception(error(io_error(write,user_error),_)), _Queue) :- 746 halt(2). 747recreate_worker(exception(Error), Queue) :- 748 recreate_on_error(Error), 749 queue_options(Queue, Options), 750 atom_concat(Queue, '_', AliasBase), 751 create_workers(1, 1, Queue, AliasBase, Options). 752 753recreate_on_error('$aborted'). 754recreate_on_error(time_limit_exceeded). 755 756%! thread_httpd:message_level(+Exception, -Level) 757% 758% Determine the message stream used for exceptions that may occur 759% during server_loop/5. Being multifile, clauses can be added by the 760% application to refine error handling. See also message_hook/3 for 761% further programming error handling. 762 763:- multifile 764 message_level/2. 765 766message_level(error(io_error(read, _), _), silent). 767message_level(error(socket_error(epipe,_), _), silent). 768message_level(error(http_write_short(_Obj,_Written), _), silent). 769message_level(error(timeout_error(read, _), _), informational). 770message_level(keep_alive_timeout, silent). 771 772current_message_level(Term, Level) :- 773 ( message_level(Term, Level) 774 -> true 775 ; Level = error 776 ). 777 778 779%! http_requeue(+Header) 780% 781% Re-queue a connection to the worker pool. This deals with 782% processing additional requests on keep-alive connections. 783 784http_requeue(Header) :- 785 requeue_header(Header, ClientOptions), 786 memberchk(pool(client(Queue, Goal, In, Out)), ClientOptions), 787 memberchk(peer(Peer), ClientOptions), 788 http_enough_workers(Queue, keep_alive, Peer), 789 thread_send_message(Queue, requeue(In, Out, Goal, ClientOptions)), 790 !. 791http_requeue(Header) :- 792 debug(http(error), 'Re-queue failed: ~p', [Header]), 793 fail. 794 795requeue_header([], []). 796requeue_header([H|T0], [H|T]) :- 797 requeue_keep(H), 798 !, 799 requeue_header(T0, T). 800requeue_header([_|T0], T) :- 801 requeue_header(T0, T). 802 803requeue_keep(pool(_)). 804requeue_keep(peer(_)). 805requeue_keep(protocol(_)). 806 807 808%! http_process(Message, Queue, +Options) 809% 810% Handle a single client message on the given stream. 811 812http_process(Goal, In, Out, Options) :- 813 debug(http(server), 'Running server goal ~p on ~p -> ~p', 814 [Goal, In, Out]), 815 option(timeout(TMO), Options, 60), 816 set_stream(In, timeout(TMO)), 817 set_stream(Out, timeout(TMO)), 818 http_wrapper(Goal, In, Out, Connection, 819 [ request(Request) 820 | Options 821 ]), 822 next(Connection, Request). 823 824next(Connection, Request) :- 825 next_(Connection, Request), !. 826next(Connection, Request) :- 827 print_message(warning, goal_failed(next(Connection,Request))). 828 829next_(switch_protocol(SwitchGoal, _SwitchOptions), Request) :- 830 !, 831 memberchk(pool(client(_Queue, _Goal, In, Out)), Request), 832 ( catch(call(SwitchGoal, In, Out), E, 833 ( print_message(error, E), 834 fail)) 835 -> true 836 ; http_close_connection(Request) 837 ). 838next_(spawned(ThreadId), _) :- 839 !, 840 debug(http(spawn), 'Handler spawned to thread ~w', [ThreadId]). 841next_(Connection, Request) :- 842 downcase_atom(Connection, 'keep-alive'), 843 http_requeue(Request), 844 !. 845next_(_, Request) :- 846 http_close_connection(Request). 847 848 849%! http_close_connection(+Request) 850% 851% Close connection associated to Request. See also http_requeue/1. 852 853http_close_connection(Request) :- 854 memberchk(pool(client(_Queue, _Goal, In, Out)), Request), 855 memberchk(peer(Peer), Request), 856 close_connection(Peer, In, Out). 857 858%! close_connection(+Peer, +In, +Out) 859% 860% Closes the connection from the server to the client. Errors are 861% currently silently ignored. 862 863close_connection(Peer, In, Out) :- 864 debug(http(connection), 'Closing connection from ~p', [Peer]), 865 catch(close(In, [force(true)]), _, true), 866 catch(close(Out, [force(true)]), _, true). 867 868%! http_spawn(:Goal, +Options) is det. 869% 870% Continue this connection on a new thread. A handler may call 871% http_spawn/2 to start a new thread that continues processing the 872% current request using Goal. The original thread returns to the 873% worker pool for processing new requests. Options are passed to 874% thread_create/3, except for: 875% 876% * pool(+Pool) 877% Interfaces to library(thread_pool), starting the thread 878% on the given pool. 879% 880% If a pool does not exist, this predicate calls the multifile 881% hook http:create_pool/1 to create it. If this predicate succeeds 882% the operation is retried. 883 884http_spawn(Goal, Options) :- 885 select_option(pool(Pool), Options, ThreadOptions), 886 !, 887 current_output(CGI), 888 catch(thread_create_in_pool(Pool, 889 wrap_spawned(CGI, Goal), Id, 890 [ detached(true) 891 | ThreadOptions 892 ]), 893 Error, 894 true), 895 ( var(Error) 896 -> http_spawned(Id) 897 ; Error = error(resource_error(threads_in_pool(_)), _) 898 -> throw(http_reply(busy)) 899 ; Error = error(existence_error(thread_pool, Pool), _), 900 create_pool(Pool) 901 -> http_spawn(Goal, Options) 902 ; throw(Error) 903 ). 904http_spawn(Goal, Options) :- 905 current_output(CGI), 906 thread_create(wrap_spawned(CGI, Goal), Id, 907 [ detached(true) 908 | Options 909 ]), 910 http_spawned(Id). 911 912wrap_spawned(CGI, Goal) :- 913 set_output(CGI), 914 http_wrap_spawned(Goal, Request, Connection), 915 next(Connection, Request). 916 917%! create_pool(+Pool) 918% 919% Lazy creation of worker-pools for the HTTP server. This 920% predicate calls the hook http:create_pool/1. If the hook fails 921% it creates a default pool of size 10. This should suffice most 922% typical usecases. Note that we get a permission error if the 923% pool is already created. We can ignore this. 924 925create_pool(Pool) :- 926 E = error(permission_error(create, thread_pool, Pool), _), 927 catch(http:create_pool(Pool), E, true). 928create_pool(Pool) :- 929 print_message(informational, httpd(created_pool(Pool))), 930 thread_pool_create(Pool, 10, []). 931 932 933 /******************************* 934 * WAIT POLICIES * 935 *******************************/ 936 937:- meta_predicate 938 thread_repeat_wait(0). 939 940%! thread_repeat_wait(:Goal) is multi. 941% 942% Acts as `repeat, thread_idle(Goal)`, choosing whether to use a 943% `long` or `short` idle time based on the average firing rate. 944 945thread_repeat_wait(Goal) :- 946 new_rate_mma(5, 1000, State), 947 repeat, 948 update_rate_mma(State, MMA), 949 long(MMA, IsLong), 950 ( IsLong == brief 951 -> call(Goal) 952 ; thread_idle(Goal, IsLong) 953 ). 954 955long(MMA, brief) :- 956 MMA < 0.05, 957 !. 958long(MMA, short) :- 959 MMA < 1, 960 !. 961long(_, long). 962 963%! new_rate_mma(+N, +Resolution, -State) is det. 964%! update_rate_mma(!State, -MMA) is det. 965% 966% Implement _Modified Moving Average_ computing the average time 967% between requests as an exponential moving averate with alpha=1/N. 968% 969% @arg Resolution is the time resolution in 1/Resolution seconds. All 970% storage is done in integers to avoid the need for stack freezing in 971% nb_setarg/3. 972% 973% @see https://en.wikipedia.org/wiki/Moving_average 974 975new_rate_mma(N, Resolution, rstate(Base, 0, MaxI, Resolution, N, 0)) :- 976 current_prolog_flag(max_tagged_integer, MaxI), 977 get_time(Base). 978 979update_rate_mma(State, MMAr) :- 980 State = rstate(Base, Last, MaxI, Resolution, N, MMA0), 981 get_time(Now), 982 Stamp is round((Now-Base)*Resolution), 983 ( Stamp > MaxI 984 -> nb_setarg(1, State, Now), 985 nb_setarg(2, State, 0) 986 ; true 987 ), 988 Diff is Stamp-Last, 989 nb_setarg(2, State, Stamp), 990 MMA is round(((N-1)*MMA0+Diff)/N), 991 nb_setarg(6, State, MMA), 992 MMAr is MMA/float(Resolution). 993 994 995 /******************************* 996 * MESSAGES * 997 *******************************/ 998 999:- multifile 1000 prolog:message/3. 1001 1002prolog:message(httpd_started_server(Port, Options)) --> 1003 [ 'Started server at '-[] ], 1004 http_root(Port, Options). 1005prolog:message(httpd_stopped_worker(Self, Status)) --> 1006 [ 'Stopped worker ~p: ~p'-[Self, Status] ]. 1007prolog:message(httpd_restarted_worker(Self)) --> 1008 [ 'Replaced aborted worker ~p'-[Self] ]. 1009prolog:message(httpd(created_pool(Pool))) --> 1010 [ 'Created thread-pool ~p of size 10'-[Pool], nl, 1011 'Create this pool at startup-time or define the hook ', nl, 1012 'http:create_pool/1 to avoid this message and create a ', nl, 1013 'pool that fits the usage-profile.' 1014 ]. 1015 1016http_root(Address, Options) --> 1017 { landing_page(Address, URI, Options) }, 1018 [ '~w'-[URI] ]. 1019 1020landing_page(Host:Port, URI, Options) :- 1021 must_be(atom, Host), 1022 http_server_property(Port, scheme(Scheme)), 1023 ( default_port(Scheme, Port) 1024 -> format(atom(Base), '~w://~w', [Scheme, Host]) 1025 ; format(atom(Base), '~w://~w:~w', [Scheme, Host, Port]) 1026 ), 1027 entry_page(Base, URI, Options). 1028landing_page(Port, URI, Options) :- 1029 landing_page(localhost:Port, URI, Options). 1030 1031default_port(http, 80). 1032default_port(https, 443). 1033 1034entry_page(Base, URI, Options) :- 1035 option(entry_page(Entry), Options), 1036 !, 1037 uri_resolve(Entry, Base, URI). 1038entry_page(Base, URI, _) :- 1039 http_absolute_location(root(.), Entry, []), 1040 uri_resolve(Entry, Base, URI). 1041