1# 2# 3# Nim's Runtime Library 4# (c) Copyright 2016 Eugene Kabanov 5# 6# See the file "copying.txt", included in this 7# distribution, for details about the copyright. 8# 9 10# This module implements Linux epoll(). 11 12import posix, times, epoll 13 14# Maximum number of events that can be returned 15const MAX_EPOLL_EVENTS = 64 16 17when not defined(android): 18 type 19 SignalFdInfo* {.importc: "struct signalfd_siginfo", 20 header: "<sys/signalfd.h>", pure, final.} = object 21 ssi_signo*: uint32 22 ssi_errno*: int32 23 ssi_code*: int32 24 ssi_pid*: uint32 25 ssi_uid*: uint32 26 ssi_fd*: int32 27 ssi_tid*: uint32 28 ssi_band*: uint32 29 ssi_overrun*: uint32 30 ssi_trapno*: uint32 31 ssi_status*: int32 32 ssi_int*: int32 33 ssi_ptr*: uint64 34 ssi_utime*: uint64 35 ssi_stime*: uint64 36 ssi_addr*: uint64 37 pad* {.importc: "__pad".}: array[0..47, uint8] 38 39proc timerfd_create(clock_id: ClockId, flags: cint): cint 40 {.cdecl, importc: "timerfd_create", header: "<sys/timerfd.h>".} 41proc timerfd_settime(ufd: cint, flags: cint, 42 utmr: var Itimerspec, otmr: var Itimerspec): cint 43 {.cdecl, importc: "timerfd_settime", header: "<sys/timerfd.h>".} 44proc eventfd(count: cuint, flags: cint): cint 45 {.cdecl, importc: "eventfd", header: "<sys/eventfd.h>".} 46 47when not defined(android): 48 proc signalfd(fd: cint, mask: var Sigset, flags: cint): cint 49 {.cdecl, importc: "signalfd", header: "<sys/signalfd.h>".} 50 51when hasThreadSupport: 52 type 53 SelectorImpl[T] = object 54 epollFD: cint 55 maxFD: int 56 numFD: int 57 fds: ptr SharedArray[SelectorKey[T]] 58 count*: int 59 Selector*[T] = ptr SelectorImpl[T] 60else: 61 type 62 SelectorImpl[T] = object 63 epollFD: cint 64 maxFD: int 65 numFD: int 66 fds: seq[SelectorKey[T]] 67 count*: int 68 Selector*[T] = ref SelectorImpl[T] 69type 70 SelectEventImpl = object 71 efd: cint 72 SelectEvent* = ptr SelectEventImpl 73 74proc newSelector*[T](): Selector[T] = 75 # Retrieve the maximum fd count (for current OS) via getrlimit() 76 var a = RLimit() 77 if getrlimit(posix.RLIMIT_NOFILE, a) != 0: 78 raiseOSError(osLastError()) 79 var maxFD = int(a.rlim_max) 80 doAssert(maxFD > 0) 81 # Start with a reasonable size, checkFd() will grow this on demand 82 const numFD = 1024 83 84 var epollFD = epoll_create1(O_CLOEXEC) 85 if epollFD < 0: 86 raiseOSError(osLastError()) 87 88 when hasThreadSupport: 89 result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T]))) 90 result.epollFD = epollFD 91 result.maxFD = maxFD 92 result.numFD = numFD 93 result.fds = allocSharedArray[SelectorKey[T]](numFD) 94 else: 95 result = Selector[T]() 96 result.epollFD = epollFD 97 result.maxFD = maxFD 98 result.numFD = numFD 99 result.fds = newSeq[SelectorKey[T]](numFD) 100 101 for i in 0 ..< numFD: 102 result.fds[i].ident = InvalidIdent 103 104proc close*[T](s: Selector[T]) = 105 let res = posix.close(s.epollFD) 106 when hasThreadSupport: 107 deallocSharedArray(s.fds) 108 deallocShared(cast[pointer](s)) 109 if res != 0: 110 raiseIOSelectorsError(osLastError()) 111 112proc newSelectEvent*(): SelectEvent = 113 let fdci = eventfd(0, O_CLOEXEC or O_NONBLOCK) 114 if fdci == -1: 115 raiseIOSelectorsError(osLastError()) 116 result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl))) 117 result.efd = fdci 118 119proc trigger*(ev: SelectEvent) = 120 var data: uint64 = 1 121 if posix.write(ev.efd, addr data, sizeof(uint64)) == -1: 122 raiseIOSelectorsError(osLastError()) 123 124proc close*(ev: SelectEvent) = 125 let res = posix.close(ev.efd) 126 deallocShared(cast[pointer](ev)) 127 if res != 0: 128 raiseIOSelectorsError(osLastError()) 129 130template checkFd(s, f) = 131 # TODO: I don't see how this can ever happen. You won't be able to create an 132 # FD if there is too many. -- DP 133 if f >= s.maxFD: 134 raiseIOSelectorsError("Maximum number of descriptors is exhausted!") 135 if f >= s.numFD: 136 var numFD = s.numFD 137 while numFD <= f: numFD *= 2 138 when hasThreadSupport: 139 s.fds = reallocSharedArray(s.fds, numFD) 140 else: 141 s.fds.setLen(numFD) 142 for i in s.numFD ..< numFD: 143 s.fds[i].ident = InvalidIdent 144 s.numFD = numFD 145 146proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle, 147 events: set[Event], data: T) = 148 let fdi = int(fd) 149 s.checkFd(fdi) 150 doAssert(s.fds[fdi].ident == InvalidIdent, "Descriptor $# already registered" % $fdi) 151 s.setKey(fdi, events, 0, data) 152 if events != {}: 153 var epv = EpollEvent(events: EPOLLRDHUP) 154 epv.data.u64 = fdi.uint 155 if Event.Read in events: epv.events = epv.events or EPOLLIN 156 if Event.Write in events: epv.events = epv.events or EPOLLOUT 157 if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0: 158 raiseIOSelectorsError(osLastError()) 159 inc(s.count) 160 161proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle, events: set[Event]) = 162 let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode, 163 Event.User, Event.Oneshot, Event.Error} 164 let fdi = int(fd) 165 s.checkFd(fdi) 166 var pkey = addr(s.fds[fdi]) 167 doAssert(pkey.ident != InvalidIdent, 168 "Descriptor $# is not registered in the selector!" % $fdi) 169 doAssert(pkey.events * maskEvents == {}) 170 if pkey.events != events: 171 var epv = EpollEvent(events: EPOLLRDHUP) 172 epv.data.u64 = fdi.uint 173 174 if Event.Read in events: epv.events = epv.events or EPOLLIN 175 if Event.Write in events: epv.events = epv.events or EPOLLOUT 176 177 if pkey.events == {}: 178 if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0: 179 raiseIOSelectorsError(osLastError()) 180 inc(s.count) 181 else: 182 if events != {}: 183 if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fdi.cint, addr epv) != 0: 184 raiseIOSelectorsError(osLastError()) 185 else: 186 if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: 187 raiseIOSelectorsError(osLastError()) 188 dec(s.count) 189 pkey.events = events 190 191proc unregister*[T](s: Selector[T], fd: int|SocketHandle) = 192 let fdi = int(fd) 193 s.checkFd(fdi) 194 var pkey = addr(s.fds[fdi]) 195 doAssert(pkey.ident != InvalidIdent, 196 "Descriptor $# is not registered in the selector!" % $fdi) 197 if pkey.events != {}: 198 when not defined(android): 199 if Event.Read in pkey.events or Event.Write in pkey.events or Event.User in pkey.events: 200 var epv = EpollEvent() 201 # TODO: Refactor all these EPOLL_CTL_DEL + dec(s.count) into a proc. 202 if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: 203 raiseIOSelectorsError(osLastError()) 204 dec(s.count) 205 elif Event.Timer in pkey.events: 206 if Event.Finished notin pkey.events: 207 var epv = EpollEvent() 208 if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: 209 raiseIOSelectorsError(osLastError()) 210 dec(s.count) 211 if posix.close(cint(fdi)) != 0: 212 raiseIOSelectorsError(osLastError()) 213 elif Event.Signal in pkey.events: 214 var epv = EpollEvent() 215 if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: 216 raiseIOSelectorsError(osLastError()) 217 var nmask, omask: Sigset 218 discard sigemptyset(nmask) 219 discard sigemptyset(omask) 220 discard sigaddset(nmask, cint(s.fds[fdi].param)) 221 unblockSignals(nmask, omask) 222 dec(s.count) 223 if posix.close(cint(fdi)) != 0: 224 raiseIOSelectorsError(osLastError()) 225 elif Event.Process in pkey.events: 226 if Event.Finished notin pkey.events: 227 var epv = EpollEvent() 228 if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: 229 raiseIOSelectorsError(osLastError()) 230 var nmask, omask: Sigset 231 discard sigemptyset(nmask) 232 discard sigemptyset(omask) 233 discard sigaddset(nmask, SIGCHLD) 234 unblockSignals(nmask, omask) 235 dec(s.count) 236 if posix.close(cint(fdi)) != 0: 237 raiseIOSelectorsError(osLastError()) 238 else: 239 if Event.Read in pkey.events or Event.Write in pkey.events or Event.User in pkey.events: 240 var epv = EpollEvent() 241 if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: 242 raiseIOSelectorsError(osLastError()) 243 dec(s.count) 244 elif Event.Timer in pkey.events: 245 if Event.Finished notin pkey.events: 246 var epv = EpollEvent() 247 if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: 248 raiseIOSelectorsError(osLastError()) 249 dec(s.count) 250 if posix.close(cint(fdi)) != 0: 251 raiseIOSelectorsError(osLastError()) 252 clearKey(pkey) 253 254proc unregister*[T](s: Selector[T], ev: SelectEvent) = 255 let fdi = int(ev.efd) 256 s.checkFd(fdi) 257 var pkey = addr(s.fds[fdi]) 258 doAssert(pkey.ident != InvalidIdent, "Event is not registered in the queue!") 259 doAssert(Event.User in pkey.events) 260 var epv = EpollEvent() 261 if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: 262 raiseIOSelectorsError(osLastError()) 263 dec(s.count) 264 clearKey(pkey) 265 266proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool, 267 data: T): int {.discardable.} = 268 var 269 newTs: Itimerspec 270 oldTs: Itimerspec 271 let fdi = timerfd_create(CLOCK_MONOTONIC, O_CLOEXEC or O_NONBLOCK).int 272 if fdi == -1: 273 raiseIOSelectorsError(osLastError()) 274 275 s.checkFd(fdi) 276 doAssert(s.fds[fdi].ident == InvalidIdent) 277 278 var events = {Event.Timer} 279 var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP) 280 epv.data.u64 = fdi.uint 281 282 if oneshot: 283 newTs.it_interval.tv_sec = posix.Time(0) 284 newTs.it_interval.tv_nsec = 0 285 newTs.it_value.tv_sec = posix.Time(timeout div 1_000) 286 newTs.it_value.tv_nsec = (timeout %% 1_000) * 1_000_000 287 incl(events, Event.Oneshot) 288 epv.events = epv.events or EPOLLONESHOT 289 else: 290 newTs.it_interval.tv_sec = posix.Time(timeout div 1000) 291 newTs.it_interval.tv_nsec = (timeout %% 1_000) * 1_000_000 292 newTs.it_value.tv_sec = newTs.it_interval.tv_sec 293 newTs.it_value.tv_nsec = newTs.it_interval.tv_nsec 294 295 if timerfd_settime(fdi.cint, cint(0), newTs, oldTs) != 0: 296 raiseIOSelectorsError(osLastError()) 297 if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0: 298 raiseIOSelectorsError(osLastError()) 299 s.setKey(fdi, events, 0, data) 300 inc(s.count) 301 result = fdi 302 303when not defined(android): 304 proc registerSignal*[T](s: Selector[T], signal: int, 305 data: T): int {.discardable.} = 306 var 307 nmask: Sigset 308 omask: Sigset 309 310 discard sigemptyset(nmask) 311 discard sigemptyset(omask) 312 discard sigaddset(nmask, cint(signal)) 313 blockSignals(nmask, omask) 314 315 let fdi = signalfd(-1, nmask, O_CLOEXEC or O_NONBLOCK).int 316 if fdi == -1: 317 raiseIOSelectorsError(osLastError()) 318 319 s.checkFd(fdi) 320 doAssert(s.fds[fdi].ident == InvalidIdent) 321 322 var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP) 323 epv.data.u64 = fdi.uint 324 if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0: 325 raiseIOSelectorsError(osLastError()) 326 s.setKey(fdi, {Event.Signal}, signal, data) 327 inc(s.count) 328 result = fdi 329 330 proc registerProcess*[T](s: Selector, pid: int, 331 data: T): int {.discardable.} = 332 var 333 nmask: Sigset 334 omask: Sigset 335 336 discard sigemptyset(nmask) 337 discard sigemptyset(omask) 338 discard sigaddset(nmask, posix.SIGCHLD) 339 blockSignals(nmask, omask) 340 341 let fdi = signalfd(-1, nmask, O_CLOEXEC or O_NONBLOCK).int 342 if fdi == -1: 343 raiseIOSelectorsError(osLastError()) 344 345 s.checkFd(fdi) 346 doAssert(s.fds[fdi].ident == InvalidIdent) 347 348 var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP) 349 epv.data.u64 = fdi.uint 350 epv.events = EPOLLIN or EPOLLRDHUP 351 if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0: 352 raiseIOSelectorsError(osLastError()) 353 s.setKey(fdi, {Event.Process, Event.Oneshot}, pid, data) 354 inc(s.count) 355 result = fdi 356 357proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) = 358 let fdi = int(ev.efd) 359 doAssert(s.fds[fdi].ident == InvalidIdent, "Event is already registered in the queue!") 360 s.setKey(fdi, {Event.User}, 0, data) 361 var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP) 362 epv.data.u64 = ev.efd.uint 363 if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, ev.efd, addr epv) != 0: 364 raiseIOSelectorsError(osLastError()) 365 inc(s.count) 366 367proc selectInto*[T](s: Selector[T], timeout: int, 368 results: var openArray[ReadyKey]): int = 369 var 370 resTable: array[MAX_EPOLL_EVENTS, EpollEvent] 371 maxres = MAX_EPOLL_EVENTS 372 i, k: int 373 374 if maxres > len(results): 375 maxres = len(results) 376 377 verifySelectParams(timeout) 378 379 let count = epoll_wait(s.epollFD, addr(resTable[0]), maxres.cint, 380 timeout.cint) 381 if count < 0: 382 result = 0 383 let err = osLastError() 384 if cint(err) != EINTR: 385 raiseIOSelectorsError(err) 386 elif count == 0: 387 result = 0 388 else: 389 i = 0 390 k = 0 391 while i < count: 392 let fdi = int(resTable[i].data.u64) 393 let pevents = resTable[i].events 394 var pkey = addr(s.fds[fdi]) 395 doAssert(pkey.ident != InvalidIdent) 396 var rkey = ReadyKey(fd: fdi, events: {}) 397 398 if (pevents and EPOLLERR) != 0 or (pevents and EPOLLHUP) != 0: 399 if (pevents and EPOLLHUP) != 0: 400 rkey.errorCode = OSErrorCode ECONNRESET 401 else: 402 # Try reading SO_ERROR from fd. 403 var error: cint 404 var size = SockLen sizeof(error) 405 if getsockopt(SocketHandle fdi, SOL_SOCKET, SO_ERROR, addr(error), 406 addr(size)) == 0'i32: 407 rkey.errorCode = OSErrorCode error 408 409 rkey.events.incl(Event.Error) 410 if (pevents and EPOLLOUT) != 0: 411 rkey.events.incl(Event.Write) 412 when not defined(android): 413 if (pevents and EPOLLIN) != 0: 414 if Event.Read in pkey.events: 415 rkey.events.incl(Event.Read) 416 elif Event.Timer in pkey.events: 417 var data: uint64 = 0 418 if posix.read(cint(fdi), addr data, 419 sizeof(uint64)) != sizeof(uint64): 420 raiseIOSelectorsError(osLastError()) 421 rkey.events.incl(Event.Timer) 422 elif Event.Signal in pkey.events: 423 var data = SignalFdInfo() 424 if posix.read(cint(fdi), addr data, 425 sizeof(SignalFdInfo)) != sizeof(SignalFdInfo): 426 raiseIOSelectorsError(osLastError()) 427 rkey.events.incl(Event.Signal) 428 elif Event.Process in pkey.events: 429 var data = SignalFdInfo() 430 if posix.read(cint(fdi), addr data, 431 sizeof(SignalFdInfo)) != sizeof(SignalFdInfo): 432 raiseIOSelectorsError(osLastError()) 433 if cast[int](data.ssi_pid) == pkey.param: 434 rkey.events.incl(Event.Process) 435 else: 436 inc(i) 437 continue 438 elif Event.User in pkey.events: 439 var data: uint64 = 0 440 if posix.read(cint(fdi), addr data, 441 sizeof(uint64)) != sizeof(uint64): 442 let err = osLastError() 443 if err == OSErrorCode(EAGAIN): 444 inc(i) 445 continue 446 else: 447 raiseIOSelectorsError(err) 448 rkey.events.incl(Event.User) 449 else: 450 if (pevents and EPOLLIN) != 0: 451 if Event.Read in pkey.events: 452 rkey.events.incl(Event.Read) 453 elif Event.Timer in pkey.events: 454 var data: uint64 = 0 455 if posix.read(cint(fdi), addr data, 456 sizeof(uint64)) != sizeof(uint64): 457 raiseIOSelectorsError(osLastError()) 458 rkey.events.incl(Event.Timer) 459 elif Event.User in pkey.events: 460 var data: uint64 = 0 461 if posix.read(cint(fdi), addr data, 462 sizeof(uint64)) != sizeof(uint64): 463 let err = osLastError() 464 if err == OSErrorCode(EAGAIN): 465 inc(i) 466 continue 467 else: 468 raiseIOSelectorsError(err) 469 rkey.events.incl(Event.User) 470 471 if Event.Oneshot in pkey.events: 472 var epv = EpollEvent() 473 if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, cint(fdi), addr epv) != 0: 474 raiseIOSelectorsError(osLastError()) 475 # we will not clear key until it will be unregistered, so 476 # application can obtain data, but we will decrease counter, 477 # because epoll is empty. 478 dec(s.count) 479 # we are marking key with `Finished` event, to avoid double decrease. 480 pkey.events.incl(Event.Finished) 481 482 results[k] = rkey 483 inc(k) 484 inc(i) 485 result = k 486 487proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] = 488 result = newSeq[ReadyKey](MAX_EPOLL_EVENTS) 489 let count = selectInto(s, timeout, result) 490 result.setLen(count) 491 492template isEmpty*[T](s: Selector[T]): bool = 493 (s.count == 0) 494 495proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} = 496 return s.fds[fd.int].ident != InvalidIdent 497 498proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T = 499 let fdi = int(fd) 500 s.checkFd(fdi) 501 if fdi in s: 502 result = s.fds[fdi].data 503 504proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool = 505 let fdi = int(fd) 506 s.checkFd(fdi) 507 if fdi in s: 508 s.fds[fdi].data = data 509 result = true 510 511template withData*[T](s: Selector[T], fd: SocketHandle|int, value, 512 body: untyped) = 513 mixin checkFd 514 let fdi = int(fd) 515 s.checkFd(fdi) 516 if fdi in s: 517 var value = addr(s.fds[fdi].data) 518 body 519 520template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1, 521 body2: untyped) = 522 mixin checkFd 523 let fdi = int(fd) 524 s.checkFd(fdi) 525 if fdi in s: 526 var value = addr(s.fds[fdi].data) 527 body1 528 else: 529 body2 530 531proc getFd*[T](s: Selector[T]): int = 532 return s.epollFd.int 533