1--[[ 2 3 4 server.lua based on lua/libevent by blastbeat 5 6 notes: 7 -- when using luaevent, never register 2 or more EV_READ at one socket, same for EV_WRITE 8 -- you can't even register a new EV_READ/EV_WRITE callback inside another one 9 -- to do some of the above, use timeout events or something what will called from outside 10 -- don't let garbagecollect eventcallbacks, as long they are running 11 -- when using luasec, there are 4 cases of timeout errors: wantread or wantwrite during reading or writing 12 13--]] 14-- luacheck: ignore 212/self 431/err 211/ret 15 16local SCRIPT_NAME = "server_event.lua" 17local SCRIPT_VERSION = "0.05" 18local SCRIPT_AUTHOR = "blastbeat" 19local LAST_MODIFIED = "2009/11/20" 20 21local cfg = { 22 MAX_CONNECTIONS = 100000, -- max per server connections (use "ulimit -n" on *nix) 23 MAX_HANDSHAKE_ATTEMPTS= 1000, -- attempts to finish ssl handshake 24 HANDSHAKE_TIMEOUT = 60, -- timeout in seconds per handshake attempt 25 MAX_READ_LENGTH = 1024 * 1024 * 1024 * 1024, -- max bytes allowed to read from sockets 26 MAX_SEND_LENGTH = 1024 * 1024 * 1024 * 1024, -- max bytes size of write buffer (for writing on sockets) 27 ACCEPT_QUEUE = 128, -- might influence the length of the pending sockets queue 28 ACCEPT_DELAY = 10, -- seconds to wait until the next attempt of a full server to accept 29 READ_TIMEOUT = 14 * 60, -- timeout in seconds for read data from socket 30 WRITE_TIMEOUT = 180, -- timeout in seconds for write data on socket 31 CONNECT_TIMEOUT = 20, -- timeout in seconds for connection attempts 32 CLEAR_DELAY = 5, -- seconds to wait for clearing interface list (and calling ondisconnect listeners) 33 READ_RETRY_DELAY = 1e-06, -- if, after reading, there is still data in buffer, wait this long and continue reading 34 DEBUG = true, -- show debug messages 35} 36 37local pairs = pairs 38local select = select 39local require = require 40local tostring = tostring 41local setmetatable = setmetatable 42 43local t_insert = table.insert 44local t_concat = table.concat 45local s_sub = string.sub 46 47local coroutine_wrap = coroutine.wrap 48local coroutine_yield = coroutine.yield 49 50local has_luasec, ssl = pcall ( require , "ssl" ) 51local socket = require "socket" 52local levent = require "luaevent.core" 53local inet = require "util.net"; 54local inet_pton = inet.pton; 55 56local socket_gettime = socket.gettime 57 58local log = require ("util.logger").init("socket") 59 60local function debug(...) 61 return log("debug", ("%s "):rep(select('#', ...)), ...) 62end 63-- local vdebug = debug; 64 65local bitor = ( function( ) -- thx Rici Lake 66 local hasbit = function( x, p ) 67 return x % ( p + p ) >= p 68 end 69 return function( x, y ) 70 local p = 1 71 local z = 0 72 local limit = x > y and x or y 73 while p <= limit do 74 if hasbit( x, p ) or hasbit( y, p ) then 75 z = z + p 76 end 77 p = p + p 78 end 79 return z 80 end 81end )( ) 82 83local base = levent.new( ) 84local addevent = base.addevent 85local EV_READ = levent.EV_READ 86local EV_WRITE = levent.EV_WRITE 87local EV_TIMEOUT = levent.EV_TIMEOUT 88local EV_SIGNAL = levent.EV_SIGNAL 89 90local EV_READWRITE = bitor( EV_READ, EV_WRITE ) 91 92local interfacelist = { } 93 94-- Client interface methods 95local interface_mt = {}; interface_mt.__index = interface_mt; 96 97-- Private methods 98function interface_mt:_close() 99 return self:_destroy(); 100end 101 102function interface_mt:_start_connection(plainssl) -- called from wrapclient 103 local callback = function( event ) 104 if EV_TIMEOUT == event then -- timeout during connection 105 self.fatalerror = "connection timeout" 106 self:ontimeout() -- call timeout listener 107 self:_close() 108 debug( "new connection failed. id:", self.id, "error:", self.fatalerror ) 109 else 110 if EV_READWRITE == event then 111 if self.readcallback(event) == -1 then 112 -- Fatal error occurred 113 return -1; 114 end 115 end 116 if plainssl and has_luasec then -- start ssl session 117 self:starttls(self._sslctx, true) 118 else -- normal connection 119 self:_start_session(true) 120 end 121 debug( "new connection established. id:", self.id ) 122 end 123 self.eventconnect = nil 124 return -1 125 end 126 self.eventconnect = addevent( base, self.conn, EV_READWRITE, callback, cfg.CONNECT_TIMEOUT ) 127 return true 128end 129function interface_mt:_start_session(call_onconnect) -- new session, for example after startssl 130 if self.type == "client" then 131 local callback = function( ) 132 self:_lock( false, false, false ) 133 --vdebug( "start listening on client socket with id:", self.id ) 134 self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback 135 if call_onconnect then 136 self:onconnect() 137 end 138 self.eventsession = nil 139 return -1 140 end 141 self.eventsession = addevent( base, nil, EV_TIMEOUT, callback, 0 ) 142 else 143 self:_lock( false ) 144 --vdebug( "start listening on server socket with id:", self.id ) 145 self.eventread = addevent( base, self.conn, EV_READ, self.readcallback ) -- register callback 146 end 147 return true 148end 149function interface_mt:_start_ssl(call_onconnect) -- old socket will be destroyed, therefore we have to close read/write events first 150 --vdebug( "starting ssl session with client id:", self.id ) 151 local _ 152 _ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks! 153 _ = self.eventwrite and self.eventwrite:close( ) 154 self.eventread, self.eventwrite = nil, nil 155 local err 156 self.conn, err = ssl.wrap( self.conn, self._sslctx ) 157 if err then 158 self.fatalerror = err 159 self.conn = nil -- cannot be used anymore 160 if call_onconnect then 161 self.ondisconnect = nil -- don't call this when client isn't really connected 162 end 163 self:_close() 164 debug( "fatal error while ssl wrapping:", err ) 165 return false 166 end 167 168 if self.conn.sni and self.servername then 169 self.conn:sni(self.servername); 170 end 171 172 self.conn:settimeout( 0 ) -- set non blocking 173 local handshakecallback = coroutine_wrap(function( event ) 174 local _, err 175 local attempt = 0 176 local maxattempt = cfg.MAX_HANDSHAKE_ATTEMPTS 177 while attempt < maxattempt do -- no endless loop 178 attempt = attempt + 1 179 debug( "ssl handshake of client with id:"..tostring(self)..", attempt:"..attempt ) 180 if attempt > maxattempt then 181 self.fatalerror = "max handshake attempts exceeded" 182 elseif EV_TIMEOUT == event then 183 self.fatalerror = "timeout during handshake" 184 else 185 _, err = self.conn:dohandshake( ) 186 if not err then 187 self:_lock( false, false, false ) -- unlock the interface; sending, closing etc allowed 188 self.send = self.conn.send -- caching table lookups with new client object 189 self.receive = self.conn.receive 190 if not call_onconnect then -- trigger listener 191 self:onstatus("ssl-handshake-complete"); 192 end 193 self:_start_session( call_onconnect ) 194 debug( "ssl handshake done" ) 195 self.eventhandshake = nil 196 return -1 197 end 198 if err == "wantwrite" then 199 event = EV_WRITE 200 elseif err == "wantread" then 201 event = EV_READ 202 else 203 debug( "ssl handshake error:", err ) 204 self.fatalerror = err 205 end 206 end 207 if self.fatalerror then 208 if call_onconnect then 209 self.ondisconnect = nil -- don't call this when client isn't really connected 210 end 211 self:_close() 212 debug( "handshake failed because:", self.fatalerror ) 213 self.eventhandshake = nil 214 return -1 215 end 216 event = coroutine_yield( event, cfg.HANDSHAKE_TIMEOUT ) -- yield this monster... 217 end 218 end 219 ) 220 debug "starting handshake..." 221 self:_lock( false, true, true ) -- unlock read/write events, but keep interface locked 222 self.eventhandshake = addevent( base, self.conn, EV_READWRITE, handshakecallback, cfg.HANDSHAKE_TIMEOUT ) 223 return true 224end 225function interface_mt:_destroy() -- close this interface + events and call last listener 226 debug( "closing client with id:", self.id, self.fatalerror ) 227 self:_lock( true, true, true ) -- first of all, lock the interface to avoid further actions 228 local _ 229 _ = self.eventread and self.eventread:close( ) 230 if self.type == "client" then 231 _ = self.eventwrite and self.eventwrite:close( ) 232 _ = self.eventhandshake and self.eventhandshake:close( ) 233 _ = self.eventstarthandshake and self.eventstarthandshake:close( ) 234 _ = self.eventconnect and self.eventconnect:close( ) 235 _ = self.eventsession and self.eventsession:close( ) 236 _ = self.eventwritetimeout and self.eventwritetimeout:close( ) 237 _ = self.eventreadtimeout and self.eventreadtimeout:close( ) 238 -- call ondisconnect listener (won't be the case if handshake failed on connect) 239 _ = self.ondisconnect and self:ondisconnect( self.fatalerror ~= "client to close" and self.fatalerror) 240 _ = self.conn and self.conn:close( ) -- close connection 241 _ = self._server and self._server:counter(-1); 242 self.eventread, self.eventwrite = nil, nil 243 self.eventstarthandshake, self.eventhandshake, self.eventclose = nil, nil, nil 244 self.readcallback, self.writecallback = nil, nil 245 else 246 self.conn:close( ) 247 self.eventread, self.eventclose = nil, nil 248 self.interface, self.readcallback = nil, nil 249 end 250 interfacelist[ self ] = nil 251 return true 252end 253 254function interface_mt:_lock(nointerface, noreading, nowriting) -- lock or unlock this interface or events 255 self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting 256 return nointerface, noreading, nowriting 257end 258 259--TODO: Deprecate 260function interface_mt:lock_read(switch) 261 if switch then 262 return self:pause(); 263 else 264 return self:resume(); 265 end 266end 267 268function interface_mt:pause() 269 return self:_lock(self.nointerface, true, self.nowriting); 270end 271 272function interface_mt:resume() 273 self:_lock(self.nointerface, false, self.nowriting); 274 if self.readcallback and not self.eventread then 275 self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback 276 return true; 277 end 278end 279 280function interface_mt:counter(c) 281 if c then 282 self._connections = self._connections + c 283 end 284 return self._connections 285end 286 287-- Public methods 288function interface_mt:write(data) 289 if self.nowriting then return nil, "locked" end 290 --vdebug( "try to send data to client, id/data:", self.id, data ) 291 data = tostring( data ) 292 local len = #data 293 local total = len + self.writebufferlen 294 if total > cfg.MAX_SEND_LENGTH then -- check buffer length 295 local err = "send buffer exceeded" 296 debug( "error:", err ) -- to much, check your app 297 return nil, err 298 end 299 t_insert(self.writebuffer, data) -- new buffer 300 self.writebufferlen = total 301 if not self.eventwrite then -- register new write event 302 --vdebug( "register new write event" ) 303 self.eventwrite = addevent( base, self.conn, EV_WRITE, self.writecallback, cfg.WRITE_TIMEOUT ) 304 end 305 return true 306end 307function interface_mt:close() 308 if self.nointerface then return nil, "locked"; end 309 debug( "try to close client connection with id:", self.id ) 310 if self.type == "client" then 311 self.fatalerror = "client to close" 312 if self.eventwrite then -- wait for incomplete write request 313 self:_lock( true, true, false ) 314 debug "closing delayed until writebuffer is empty" 315 return nil, "writebuffer not empty, waiting" 316 else -- close now 317 self:_lock( true, true, true ) 318 self:_close() 319 return true 320 end 321 else 322 debug( "try to close server with id:", tostring(self.id)) 323 self.fatalerror = "server to close" 324 self:_lock( true ) 325 self:_close( 0 ) 326 return true 327 end 328end 329 330function interface_mt:socket() 331 return self.conn 332end 333 334function interface_mt:server() 335 return self._server or self; 336end 337 338function interface_mt:port() 339 return self._port 340end 341 342function interface_mt:serverport() 343 return self._serverport 344end 345 346function interface_mt:ip() 347 return self._ip 348end 349 350function interface_mt:ssl() 351 return self._usingssl 352end 353interface_mt.clientport = interface_mt.port -- COMPAT server_select 354 355function interface_mt:type() 356 return self._type or "client" 357end 358 359function interface_mt:connections() 360 return self._connections 361end 362 363function interface_mt:address() 364 return self.addr 365end 366 367function interface_mt:set_sslctx(sslctx) 368 self._sslctx = sslctx; 369 if sslctx then 370 self.starttls = nil; -- use starttls() of interface_mt 371 else 372 self.starttls = false; -- prevent starttls() 373 end 374end 375 376function interface_mt:set_mode(pattern) 377 if pattern then 378 self._pattern = pattern; 379 end 380 return self._pattern; 381end 382 383function interface_mt:set_send(new_send) -- luacheck: ignore 212 384 -- No-op, we always use the underlying connection's send 385end 386 387function interface_mt:starttls(sslctx, call_onconnect) 388 debug( "try to start ssl at client id:", self.id ) 389 local err 390 self._sslctx = sslctx; 391 if self._usingssl then -- startssl was already called 392 err = "ssl already active" 393 end 394 if err then 395 debug( "error:", err ) 396 return nil, err 397 end 398 self._usingssl = true 399 self.startsslcallback = function( ) -- we have to start the handshake outside of a read/write event 400 self.startsslcallback = nil 401 self:_start_ssl(call_onconnect); 402 self.eventstarthandshake = nil 403 return -1 404 end 405 if not self.eventwrite then 406 self:_lock( true, true, true ) -- lock the interface, to not disturb the handshake 407 self.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, self.startsslcallback, 0 ) -- add event to start handshake 408 else 409 -- wait until writebuffer is empty 410 self:_lock( true, true, false ) 411 debug "ssl session delayed until writebuffer is empty..." 412 end 413 self.starttls = false; 414 return true 415end 416 417function interface_mt:setoption(option, value) 418 if self.conn.setoption then 419 return self.conn:setoption(option, value); 420 end 421 return false, "setoption not implemented"; 422end 423 424function interface_mt:setlistener(listener, data) 425 self:ondetach(); -- Notify listener that it is no longer responsible for this connection 426 self.onconnect = listener.onconnect; 427 self.ondisconnect = listener.ondisconnect; 428 self.onincoming = listener.onincoming; 429 self.ontimeout = listener.ontimeout; 430 self.onreadtimeout = listener.onreadtimeout; 431 self.onstatus = listener.onstatus; 432 self.ondetach = listener.ondetach; 433 self.onattach = listener.onattach; 434 self.ondrain = listener.ondrain; 435 self:onattach(data); 436end 437 438-- Stub handlers 439function interface_mt:onconnect() 440end 441function interface_mt:onincoming() 442end 443function interface_mt:ondisconnect() 444end 445function interface_mt:ontimeout() 446end 447function interface_mt:onreadtimeout() 448 self.fatalerror = "timeout during receiving" 449 debug( "connection failed:", self.fatalerror ) 450 self:_close() 451 self.eventread = nil 452end 453function interface_mt:ondrain() 454end 455function interface_mt:ondetach() 456end 457function interface_mt:onattach() 458end 459function interface_mt:onstatus() 460end 461 462-- End of client interface methods 463 464local function handleclient( client, ip, port, server, pattern, listener, sslctx, extra ) -- creates an client interface 465 --vdebug("creating client interfacce...") 466 local interface = { 467 type = "client"; 468 conn = client; 469 currenttime = socket_gettime( ); -- safe the origin 470 writebuffer = {}; -- writebuffer 471 writebufferlen = 0; -- length of writebuffer 472 send = client.send; -- caching table lookups 473 receive = client.receive; 474 onconnect = listener.onconnect; -- will be called when client disconnects 475 ondisconnect = listener.ondisconnect; -- will be called when client disconnects 476 onincoming = listener.onincoming; -- will be called when client sends data 477 ontimeout = listener.ontimeout; -- called when fatal socket timeout occurs 478 onreadtimeout = listener.onreadtimeout; -- called when socket inactivity timeout occurs 479 ondrain = listener.ondrain; -- called when writebuffer is empty 480 ondetach = listener.ondetach; -- called when disassociating this listener from this connection 481 onstatus = listener.onstatus; -- called for status changes (e.g. of SSL/TLS) 482 eventread = false, eventwrite = false, eventclose = false, 483 eventhandshake = false, eventstarthandshake = false; -- event handler 484 eventconnect = false, eventsession = false; -- more event handler... 485 eventwritetimeout = false; -- even more event handler... 486 eventreadtimeout = false; 487 fatalerror = false; -- error message 488 writecallback = false; -- will be called on write events 489 readcallback = false; -- will be called on read events 490 nointerface = true; -- lock/unlock parameter of this interface 491 noreading = false, nowriting = false; -- locks of the read/writecallback 492 startsslcallback = false; -- starting handshake callback 493 position = false; -- position of client in interfacelist 494 495 -- Properties 496 _ip = ip, _port = port, _server = server, _pattern = pattern, 497 _serverport = (server and server:port() or nil), 498 _sslctx = sslctx; -- parameters 499 _usingssl = false; -- client is using ssl; 500 extra = extra; 501 servername = extra and extra.servername; 502 } 503 if not has_luasec then interface.starttls = false; end 504 interface.id = tostring(interface):match("%x+$"); 505 interface.writecallback = function( event ) -- called on write events 506 --vdebug( "new client write event, id/ip/port:", interface, ip, port ) 507 if interface.nowriting or ( interface.fatalerror and ( "client to close" ~= interface.fatalerror ) ) then -- leave this event 508 --vdebug( "leaving this event because:", interface.nowriting or interface.fatalerror ) 509 interface.eventwrite = false 510 return -1 511 end 512 if EV_TIMEOUT == event then -- took too long to write some data to socket -> disconnect 513 interface.fatalerror = "timeout during writing" 514 debug( "writing failed:", interface.fatalerror ) 515 interface:_close() 516 interface.eventwrite = false 517 return -1 518 else -- can write :) 519 if interface._usingssl then -- handle luasec 520 if interface.eventreadtimeout then -- we have to read first 521 local ret = interface.readcallback( ) -- call readcallback 522 --vdebug( "tried to read in writecallback, result:", ret ) 523 end 524 if interface.eventwritetimeout then -- luasec only 525 interface.eventwritetimeout:close( ) -- first we have to close timeout event which where regged after a wantread error 526 interface.eventwritetimeout = false 527 end 528 end 529 interface.writebuffer = { t_concat(interface.writebuffer) } 530 local succ, err, byte = interface.conn:send( interface.writebuffer[1], 1, interface.writebufferlen ) 531 --vdebug( "write data:", interface.writebuffer, "error:", err, "part:", byte ) 532 if succ then -- writing successful 533 interface.writebuffer[1] = nil 534 interface.writebufferlen = 0 535 interface:ondrain(); 536 if interface.fatalerror then 537 debug "closing client after writing" 538 interface:_close() -- close interface if needed 539 elseif interface.startsslcallback then -- start ssl connection if needed 540 debug "starting ssl handshake after writing" 541 interface.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, interface.startsslcallback, 0 ) 542 elseif interface.writebufferlen ~= 0 then 543 -- data possibly written from ondrain 544 return EV_WRITE, cfg.WRITE_TIMEOUT 545 elseif interface.eventreadtimeout then 546 return EV_WRITE, cfg.WRITE_TIMEOUT 547 end 548 interface.eventwrite = nil 549 return -1 550 elseif byte and (err == "timeout" or err == "wantwrite") then -- want write again 551 --vdebug( "writebuffer is not empty:", err ) 552 interface.writebuffer[1] = s_sub( interface.writebuffer[1], byte + 1, interface.writebufferlen ) -- new buffer 553 interface.writebufferlen = interface.writebufferlen - byte 554 if "wantread" == err then -- happens only with luasec 555 local callback = function( ) 556 interface:_close() 557 interface.eventwritetimeout = nil 558 return -1; 559 end 560 interface.eventwritetimeout = addevent( base, nil, EV_TIMEOUT, callback, cfg.WRITE_TIMEOUT ) -- reg a new timeout event 561 debug( "wantread during write attempt, reg it in readcallback but don't know what really happens next..." ) 562 -- hopefully this works with luasec; its simply not possible to use 2 different write events on a socket in luaevent 563 return -1 564 end 565 return EV_WRITE, cfg.WRITE_TIMEOUT 566 else -- connection was closed during writing or fatal error 567 interface.fatalerror = err or "fatal error" 568 debug( "connection failed in write event:", interface.fatalerror ) 569 interface:_close() 570 interface.eventwrite = nil 571 return -1 572 end 573 end 574 end 575 576 interface.readcallback = function( event ) -- called on read events 577 --vdebug( "new client read event, id/ip/port:", tostring(interface.id), tostring(ip), tostring(port) ) 578 if interface.noreading or interface.fatalerror then -- leave this event 579 --vdebug( "leaving this event because:", tostring(interface.noreading or interface.fatalerror) ) 580 interface.eventread = nil 581 return -1 582 end 583 if EV_TIMEOUT == event and not interface.conn:dirty() and interface:onreadtimeout() ~= true then 584 interface.fatalerror = "timeout during receiving" 585 debug( "connection failed:", interface.fatalerror ) 586 interface:_close() 587 interface.eventread = nil 588 return -1 -- took too long to get some data from client -> disconnect 589 end 590 if interface._usingssl then -- handle luasec 591 if interface.eventwritetimeout then -- ok, in the past writecallback was regged 592 local ret = interface.writecallback( ) -- call it 593 --vdebug( "tried to write in readcallback, result:", tostring(ret) ) 594 end 595 if interface.eventreadtimeout then 596 interface.eventreadtimeout:close( ) 597 interface.eventreadtimeout = nil 598 end 599 end 600 local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern" 601 --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) ) 602 buffer = buffer or part 603 if buffer and #buffer > cfg.MAX_READ_LENGTH then -- check buffer length 604 interface.fatalerror = "receive buffer exceeded" 605 debug( "fatal error:", interface.fatalerror ) 606 interface:_close() 607 interface.eventread = nil 608 return -1 609 end 610 if err and ( err ~= "timeout" and err ~= "wantread" ) then 611 if "wantwrite" == err then -- need to read on write event 612 if not interface.eventwrite then -- register new write event if needed 613 interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT ) 614 end 615 interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT, 616 function( ) interface:_close() end, cfg.READ_TIMEOUT) 617 debug( "wantwrite during read attempt, reg it in writecallback but don't know what really happens next..." ) 618 -- to be honest i don't know what happens next, if it is allowed to first read, the write etc... 619 else -- connection was closed or fatal error 620 interface.fatalerror = err 621 debug( "connection failed in read event:", interface.fatalerror ) 622 interface:_close() 623 interface.eventread = nil 624 return -1 625 end 626 else 627 interface.onincoming( interface, buffer, err ) -- send new data to listener 628 end 629 if interface.noreading then 630 interface.eventread = nil; 631 return -1; 632 end 633 if interface.conn:dirty() then -- still data left in buffer 634 return EV_TIMEOUT, cfg.READ_RETRY_DELAY; 635 end 636 return EV_READ, cfg.READ_TIMEOUT 637 end 638 639 client:settimeout( 0 ) -- set non blocking 640 setmetatable(interface, interface_mt) 641 interfacelist[ interface ] = true -- add to interfacelist 642 return interface 643end 644 645local function handleserver( server, addr, port, pattern, listener, sslctx ) -- creates an server interface 646 debug "creating server interface..." 647 local interface = { 648 _connections = 0; 649 650 type = "server"; 651 conn = server; 652 onconnect = listener.onconnect; -- will be called when new client connected 653 eventread = false; -- read event handler 654 eventclose = false; -- close event handler 655 readcallback = false; -- read event callback 656 fatalerror = false; -- error message 657 nointerface = true; -- lock/unlock parameter 658 659 _ip = addr, _port = port, _pattern = pattern, 660 _sslctx = sslctx; 661 } 662 interface.id = tostring(interface):match("%x+$"); 663 interface.readcallback = function( event ) -- server handler, called on incoming connections 664 --vdebug( "server can accept, id/addr/port:", interface, addr, port ) 665 if interface.fatalerror then 666 --vdebug( "leaving this event because:", self.fatalerror ) 667 interface.eventread = nil 668 return -1 669 end 670 local delay = cfg.ACCEPT_DELAY 671 if EV_TIMEOUT == event then 672 if interface._connections >= cfg.MAX_CONNECTIONS then -- check connection count 673 debug( "to many connections, seconds to wait for next accept:", delay ) 674 return EV_TIMEOUT, delay -- timeout... 675 else 676 return EV_READ -- accept again 677 end 678 end 679 --vdebug("max connection check ok, accepting...") 680 local client, err = server:accept() -- try to accept; TODO: check err 681 while client do 682 if interface._connections >= cfg.MAX_CONNECTIONS then 683 client:close( ) -- refuse connection 684 debug( "maximal connections reached, refuse client connection; accept delay:", delay ) 685 return EV_TIMEOUT, delay -- delay for next accept attempt 686 end 687 local client_ip, client_port = client:getpeername( ) 688 interface._connections = interface._connections + 1 -- increase connection count 689 local clientinterface = handleclient( client, client_ip, client_port, interface, pattern, listener, sslctx ) 690 --vdebug( "client id:", clientinterface, "startssl:", startssl ) 691 if has_luasec and sslctx then 692 clientinterface:starttls(sslctx, true) 693 else 694 clientinterface:_start_session( true ) 695 end 696 debug( "accepted incoming client connection from:", client_ip or "<unknown IP>", client_port or "<unknown port>", "to", port or "<unknown port>"); 697 698 client, err = server:accept() -- try to accept again 699 end 700 return EV_READ 701 end 702 703 server:settimeout( 0 ) 704 setmetatable(interface, interface_mt) 705 interfacelist[ interface ] = true 706 interface:_start_session() 707 return interface 708end 709 710local function addserver( addr, port, listener, pattern, sslctx, startssl ) -- TODO: check arguments 711 --vdebug( "creating new tcp server with following parameters:", addr or "nil", port or "nil", sslctx or "nil", startssl or "nil") 712 if sslctx and not has_luasec then 713 debug "fatal error: luasec not found" 714 return nil, "luasec not found" 715 end 716 local server, err = socket.bind( addr, port, cfg.ACCEPT_QUEUE ) -- create server socket 717 if not server then 718 debug( "creating server socket on "..addr.." port "..port.." failed:", err ) 719 return nil, err 720 end 721 local interface = handleserver( server, addr, port, pattern, listener, sslctx, startssl ) -- new server handler 722 debug( "new server created with id:", tostring(interface)) 723 return interface 724end 725 726local function wrapclient( client, ip, port, listeners, pattern, sslctx, extra ) 727 local interface = handleclient( client, ip, port, nil, pattern, listeners, sslctx, extra ) 728 interface:_start_connection(sslctx) 729 return interface, client 730 --function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface 731end 732 733local function addclient( addr, serverport, listener, pattern, sslctx, typ, extra ) 734 if sslctx and not has_luasec then 735 debug "need luasec, but not available" 736 return nil, "luasec not found" 737 end 738 if not typ then 739 local n = inet_pton(addr); 740 if not n then return nil, "invalid-ip"; end 741 if #n == 16 then 742 typ = "tcp6"; 743 elseif #n == 4 then 744 typ = "tcp4"; 745 end 746 end 747 local create = socket[typ]; 748 if type( create ) ~= "function" then 749 return nil, "invalid socket type" 750 end 751 local client, err = create() -- creating new socket 752 if not client then 753 debug( "cannot create socket:", err ) 754 return nil, err 755 end 756 client:settimeout( 0 ) -- set nonblocking 757 local res, err = client:setpeername( addr, serverport ) -- connect 758 if res or ( err == "timeout" ) then 759 local ip, port = client:getsockname( ) 760 local interface = wrapclient( client, ip, serverport, listener, pattern, sslctx, extra ) 761 debug( "new connection id:", interface.id ) 762 return interface, err 763 else 764 debug( "new connection failed:", err ) 765 return nil, err 766 end 767end 768 769local function loop( ) -- starts the event loop 770 base:loop( ) 771 return "quitting"; 772end 773 774local function newevent( ... ) 775 return addevent( base, ... ) 776end 777 778local function closeallservers ( arg ) 779 for item in pairs( interfacelist ) do 780 if item.type == "server" then 781 item:close( arg ) 782 end 783 end 784end 785 786local function setquitting(yes) 787 if yes then 788 -- Quit now 789 if yes ~= "once" then 790 closeallservers(); 791 end 792 base:loopexit(); 793 end 794end 795 796local function get_backend() 797 return "libevent " .. base:method(); 798end 799 800-- We need to hold onto the events to stop them 801-- being garbage-collected 802local signal_events = {}; -- [signal_num] -> event object 803local function hook_signal(signal_num, handler) 804 local function _handler() 805 local ret = handler(); 806 if ret ~= false then -- Continue handling this signal? 807 return EV_SIGNAL; -- Yes 808 end 809 return -1; -- Close this event 810 end 811 signal_events[signal_num] = base:addevent(signal_num, EV_SIGNAL, _handler); 812 return signal_events[signal_num]; 813end 814 815local function link(sender, receiver, buffersize) 816 local sender_locked; 817 818 function receiver:ondrain() 819 if sender_locked then 820 sender:resume(); 821 sender_locked = nil; 822 end 823 end 824 825 function sender:onincoming(data) 826 receiver:write(data); 827 if receiver.writebufferlen >= buffersize then 828 sender_locked = true; 829 sender:pause(); 830 end 831 end 832 sender:set_mode("*a"); 833end 834 835local function add_task(delay, callback) 836 local event_handle; 837 event_handle = base:addevent(nil, 0, function () 838 local ret = callback(socket_gettime()); 839 if ret then 840 return 0, ret; 841 elseif event_handle then 842 return -1; 843 end 844 end 845 , delay); 846 return event_handle; 847end 848 849local function watchfd(fd, onreadable, onwriteable) 850 local handle = {}; 851 function handle:setflags(r,w) 852 if r ~= nil then 853 if r and not self.wantread then 854 self.wantread = base:addevent(fd, EV_READ, function () 855 onreadable(self); 856 end); 857 elseif not r and self.wantread then 858 self.wantread:close(); 859 self.wantread = nil; 860 end 861 end 862 if w ~= nil then 863 if w and not self.wantwrite then 864 self.wantwrite = base:addevent(fd, EV_WRITE, function () 865 onwriteable(self); 866 end); 867 elseif not r and self.wantread then 868 self.wantwrite:close(); 869 self.wantwrite = nil; 870 end 871 end 872 end 873 handle:setflags(onreadable, onwriteable); 874 return handle; 875end 876 877return { 878 cfg = cfg, 879 base = base, 880 loop = loop, 881 link = link, 882 event = levent, 883 event_base = base, 884 addevent = newevent, 885 addserver = addserver, 886 addclient = addclient, 887 wrapclient = wrapclient, 888 setquitting = setquitting, 889 closeall = closeallservers, 890 get_backend = get_backend, 891 hook_signal = hook_signal, 892 add_task = add_task, 893 watchfd = watchfd, 894 895 __NAME = SCRIPT_NAME, 896 __DATE = LAST_MODIFIED, 897 __AUTHOR = SCRIPT_AUTHOR, 898 __VERSION = SCRIPT_VERSION, 899 900} 901