1# 2# 3# Nim's Runtime Library 4# (c) Copyright 2016 Eugene Kabanov 5# 6# See the file "copying.txt", included in this 7# distribution, for details about the copyright. 8# 9 10# This module implements BSD kqueue(). 11 12import posix, times, kqueue, nativesockets 13 14const 15 # Maximum number of events that can be returned. 16 MAX_KQUEUE_EVENTS = 64 17 # SIG_IGN and SIG_DFL declared in posix.nim as variables, but we need them 18 # to be constants and GC-safe. 19 SIG_DFL = cast[proc(x: cint) {.noconv,gcsafe.}](0) 20 SIG_IGN = cast[proc(x: cint) {.noconv,gcsafe.}](1) 21 22when defined(kqcache): 23 const CACHE_EVENTS = true 24 25when defined(macosx) or defined(freebsd) or defined(dragonfly): 26 when defined(macosx): 27 const MAX_DESCRIPTORS_ID = 29 # KERN_MAXFILESPERPROC (MacOS) 28 else: 29 const MAX_DESCRIPTORS_ID = 27 # KERN_MAXFILESPERPROC (FreeBSD) 30 proc sysctl(name: ptr cint, namelen: cuint, oldp: pointer, oldplen: ptr csize_t, 31 newp: pointer, newplen: csize_t): cint 32 {.importc: "sysctl",header: """#include <sys/types.h> 33 #include <sys/sysctl.h>""".} 34elif defined(netbsd) or defined(openbsd): 35 # OpenBSD and NetBSD don't have KERN_MAXFILESPERPROC, so we are using 36 # KERN_MAXFILES, because KERN_MAXFILES is always bigger, 37 # than KERN_MAXFILESPERPROC. 38 const MAX_DESCRIPTORS_ID = 7 # KERN_MAXFILES 39 proc sysctl(name: ptr cint, namelen: cuint, oldp: pointer, oldplen: ptr csize_t, 40 newp: pointer, newplen: csize_t): cint 41 {.importc: "sysctl",header: """#include <sys/param.h> 42 #include <sys/sysctl.h>""".} 43 44when hasThreadSupport: 45 type 46 SelectorImpl[T] = object 47 kqFD: cint 48 maxFD: int 49 changes: ptr SharedArray[KEvent] 50 fds: ptr SharedArray[SelectorKey[T]] 51 count*: int 52 changesLock: Lock 53 changesSize: int 54 changesLength: int 55 sock: cint 56 Selector*[T] = ptr SelectorImpl[T] 57else: 58 type 59 SelectorImpl[T] = object 60 kqFD: cint 61 maxFD: int 62 changes: seq[KEvent] 63 fds: seq[SelectorKey[T]] 64 count*: int 65 sock: cint 66 Selector*[T] = ref SelectorImpl[T] 67 68type 69 SelectEventImpl = object 70 rfd: cint 71 wfd: cint 72 73 SelectEvent* = ptr SelectEventImpl 74 # SelectEvent is declared as `ptr` to be placed in `shared memory`, 75 # so you can share one SelectEvent handle between threads. 76 77proc getUnique[T](s: Selector[T]): int {.inline.} = 78 # we create duplicated handles to get unique indexes for our `fds` array. 79 result = posix.fcntl(s.sock, F_DUPFD_CLOEXEC, s.sock) 80 if result == -1: 81 raiseIOSelectorsError(osLastError()) 82 83proc newSelector*[T](): owned(Selector[T]) = 84 var maxFD = 0.cint 85 var size = csize_t(sizeof(cint)) 86 var namearr = [1.cint, MAX_DESCRIPTORS_ID.cint] 87 # Obtain maximum number of opened file descriptors for process 88 if sysctl(addr(namearr[0]), 2, cast[pointer](addr maxFD), addr size, 89 nil, 0) != 0: 90 raiseIOSelectorsError(osLastError()) 91 92 var kqFD = kqueue() 93 if kqFD < 0: 94 raiseIOSelectorsError(osLastError()) 95 96 # we allocating empty socket to duplicate it handle in future, to get unique 97 # indexes for `fds` array. This is needed to properly identify 98 # {Event.Timer, Event.Signal, Event.Process} events. 99 let usock = createNativeSocket(posix.AF_INET, posix.SOCK_STREAM, 100 posix.IPPROTO_TCP).cint 101 if usock == -1: 102 let err = osLastError() 103 discard posix.close(kqFD) 104 raiseIOSelectorsError(err) 105 106 when hasThreadSupport: 107 result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T]))) 108 result.fds = allocSharedArray[SelectorKey[T]](maxFD) 109 result.changes = allocSharedArray[KEvent](MAX_KQUEUE_EVENTS) 110 result.changesSize = MAX_KQUEUE_EVENTS 111 initLock(result.changesLock) 112 else: 113 result = Selector[T]() 114 result.fds = newSeq[SelectorKey[T]](maxFD) 115 result.changes = newSeqOfCap[KEvent](MAX_KQUEUE_EVENTS) 116 117 for i in 0 ..< maxFD: 118 result.fds[i].ident = InvalidIdent 119 120 result.sock = usock 121 result.kqFD = kqFD 122 result.maxFD = maxFD.int 123 124proc close*[T](s: Selector[T]) = 125 let res1 = posix.close(s.kqFD) 126 let res2 = posix.close(s.sock) 127 when hasThreadSupport: 128 deinitLock(s.changesLock) 129 deallocSharedArray(s.fds) 130 deallocShared(cast[pointer](s)) 131 if res1 != 0 or res2 != 0: 132 raiseIOSelectorsError(osLastError()) 133 134proc newSelectEvent*(): SelectEvent = 135 var fds: array[2, cint] 136 if posix.pipe(fds) != 0: 137 raiseIOSelectorsError(osLastError()) 138 setNonBlocking(fds[0]) 139 setNonBlocking(fds[1]) 140 result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl))) 141 result.rfd = fds[0] 142 result.wfd = fds[1] 143 144proc trigger*(ev: SelectEvent) = 145 var data: uint64 = 1 146 if posix.write(ev.wfd, addr data, sizeof(uint64)) != sizeof(uint64): 147 raiseIOSelectorsError(osLastError()) 148 149proc close*(ev: SelectEvent) = 150 let res1 = posix.close(ev.rfd) 151 let res2 = posix.close(ev.wfd) 152 deallocShared(cast[pointer](ev)) 153 if res1 != 0 or res2 != 0: 154 raiseIOSelectorsError(osLastError()) 155 156template checkFd(s, f) = 157 if f >= s.maxFD: 158 raiseIOSelectorsError("Maximum number of descriptors is exhausted!") 159 160when hasThreadSupport: 161 template withChangeLock[T](s: Selector[T], body: untyped) = 162 acquire(s.changesLock) 163 {.locks: [s.changesLock].}: 164 try: 165 body 166 finally: 167 release(s.changesLock) 168else: 169 template withChangeLock(s, body: untyped) = 170 body 171 172when hasThreadSupport: 173 template modifyKQueue[T](s: Selector[T], nident: uint, nfilter: cshort, 174 nflags: cushort, nfflags: cuint, ndata: int, 175 nudata: pointer) = 176 mixin withChangeLock 177 s.withChangeLock(): 178 if s.changesLength == s.changesSize: 179 # if cache array is full, we allocating new with size * 2 180 let newSize = s.changesSize shl 1 181 let rdata = allocSharedArray[KEvent](newSize) 182 copyMem(rdata, s.changes, s.changesSize * sizeof(KEvent)) 183 s.changesSize = newSize 184 s.changes[s.changesLength] = KEvent(ident: nident, 185 filter: nfilter, flags: nflags, 186 fflags: nfflags, data: ndata, 187 udata: nudata) 188 inc(s.changesLength) 189 190 when not declared(CACHE_EVENTS): 191 template flushKQueue[T](s: Selector[T]) = 192 mixin withChangeLock 193 s.withChangeLock(): 194 if s.changesLength > 0: 195 if kevent(s.kqFD, addr(s.changes[0]), cint(s.changesLength), 196 nil, 0, nil) == -1: 197 raiseIOSelectorsError(osLastError()) 198 s.changesLength = 0 199else: 200 template modifyKQueue[T](s: Selector[T], nident: uint, nfilter: cshort, 201 nflags: cushort, nfflags: cuint, ndata: int, 202 nudata: pointer) = 203 s.changes.add(KEvent(ident: nident, 204 filter: nfilter, flags: nflags, 205 fflags: nfflags, data: ndata, 206 udata: nudata)) 207 208 when not declared(CACHE_EVENTS): 209 template flushKQueue[T](s: Selector[T]) = 210 let length = cint(len(s.changes)) 211 if length > 0: 212 if kevent(s.kqFD, addr(s.changes[0]), length, 213 nil, 0, nil) == -1: 214 raiseIOSelectorsError(osLastError()) 215 s.changes.setLen(0) 216 217proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle, 218 events: set[Event], data: T) = 219 let fdi = int(fd) 220 s.checkFd(fdi) 221 doAssert(s.fds[fdi].ident == InvalidIdent) 222 s.setKey(fdi, events, 0, data) 223 224 if events != {}: 225 if Event.Read in events: 226 modifyKQueue(s, uint(fdi), EVFILT_READ, EV_ADD, 0, 0, nil) 227 inc(s.count) 228 if Event.Write in events: 229 modifyKQueue(s, uint(fdi), EVFILT_WRITE, EV_ADD, 0, 0, nil) 230 inc(s.count) 231 232 when not declared(CACHE_EVENTS): 233 flushKQueue(s) 234 235proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle, 236 events: set[Event]) = 237 let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode, 238 Event.User, Event.Oneshot, Event.Error} 239 let fdi = int(fd) 240 s.checkFd(fdi) 241 var pkey = addr(s.fds[fdi]) 242 doAssert(pkey.ident != InvalidIdent, 243 "Descriptor $# is not registered in the queue!" % $fdi) 244 doAssert(pkey.events * maskEvents == {}) 245 246 if pkey.events != events: 247 if (Event.Read in pkey.events) and (Event.Read notin events): 248 modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil) 249 dec(s.count) 250 if (Event.Write in pkey.events) and (Event.Write notin events): 251 modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_DELETE, 0, 0, nil) 252 dec(s.count) 253 if (Event.Read notin pkey.events) and (Event.Read in events): 254 modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil) 255 inc(s.count) 256 if (Event.Write notin pkey.events) and (Event.Write in events): 257 modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_ADD, 0, 0, nil) 258 inc(s.count) 259 260 when not declared(CACHE_EVENTS): 261 flushKQueue(s) 262 263 pkey.events = events 264 265proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool, 266 data: T): int {.discardable.} = 267 let fdi = getUnique(s) 268 s.checkFd(fdi) 269 doAssert(s.fds[fdi].ident == InvalidIdent) 270 271 let events = if oneshot: {Event.Timer, Event.Oneshot} else: {Event.Timer} 272 let flags: cushort = if oneshot: EV_ONESHOT or EV_ADD else: EV_ADD 273 274 s.setKey(fdi, events, 0, data) 275 276 # EVFILT_TIMER on Open/Net(BSD) has granularity of only milliseconds, 277 # but MacOS and FreeBSD allow use `0` as `fflags` to use milliseconds 278 # too 279 modifyKQueue(s, fdi.uint, EVFILT_TIMER, flags, 0, cint(timeout), nil) 280 281 when not declared(CACHE_EVENTS): 282 flushKQueue(s) 283 284 inc(s.count) 285 result = fdi 286 287proc registerSignal*[T](s: Selector[T], signal: int, 288 data: T): int {.discardable.} = 289 let fdi = getUnique(s) 290 s.checkFd(fdi) 291 doAssert(s.fds[fdi].ident == InvalidIdent) 292 293 s.setKey(fdi, {Event.Signal}, signal, data) 294 var nmask, omask: Sigset 295 discard sigemptyset(nmask) 296 discard sigemptyset(omask) 297 discard sigaddset(nmask, cint(signal)) 298 blockSignals(nmask, omask) 299 # to be compatible with linux semantic we need to "eat" signals 300 posix.signal(cint(signal), SIG_IGN) 301 302 modifyKQueue(s, signal.uint, EVFILT_SIGNAL, EV_ADD, 0, 0, 303 cast[pointer](fdi)) 304 305 when not declared(CACHE_EVENTS): 306 flushKQueue(s) 307 308 inc(s.count) 309 result = fdi 310 311proc registerProcess*[T](s: Selector[T], pid: int, 312 data: T): int {.discardable.} = 313 let fdi = getUnique(s) 314 s.checkFd(fdi) 315 doAssert(s.fds[fdi].ident == InvalidIdent) 316 317 var kflags: cushort = EV_ONESHOT or EV_ADD 318 setKey(s, fdi, {Event.Process, Event.Oneshot}, pid, data) 319 320 modifyKQueue(s, pid.uint, EVFILT_PROC, kflags, NOTE_EXIT, 0, 321 cast[pointer](fdi)) 322 323 when not declared(CACHE_EVENTS): 324 flushKQueue(s) 325 326 inc(s.count) 327 result = fdi 328 329proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) = 330 let fdi = ev.rfd.int 331 doAssert(s.fds[fdi].ident == InvalidIdent, "Event is already registered in the queue!") 332 setKey(s, fdi, {Event.User}, 0, data) 333 334 modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil) 335 336 when not declared(CACHE_EVENTS): 337 flushKQueue(s) 338 339 inc(s.count) 340 341template processVnodeEvents(events: set[Event]): cuint = 342 var rfflags = 0.cuint 343 if events == {Event.VnodeWrite, Event.VnodeDelete, Event.VnodeExtend, 344 Event.VnodeAttrib, Event.VnodeLink, Event.VnodeRename, 345 Event.VnodeRevoke}: 346 rfflags = NOTE_DELETE or NOTE_WRITE or NOTE_EXTEND or NOTE_ATTRIB or 347 NOTE_LINK or NOTE_RENAME or NOTE_REVOKE 348 else: 349 if Event.VnodeDelete in events: rfflags = rfflags or NOTE_DELETE 350 if Event.VnodeWrite in events: rfflags = rfflags or NOTE_WRITE 351 if Event.VnodeExtend in events: rfflags = rfflags or NOTE_EXTEND 352 if Event.VnodeAttrib in events: rfflags = rfflags or NOTE_ATTRIB 353 if Event.VnodeLink in events: rfflags = rfflags or NOTE_LINK 354 if Event.VnodeRename in events: rfflags = rfflags or NOTE_RENAME 355 if Event.VnodeRevoke in events: rfflags = rfflags or NOTE_REVOKE 356 rfflags 357 358proc registerVnode*[T](s: Selector[T], fd: cint, events: set[Event], data: T) = 359 let fdi = fd.int 360 setKey(s, fdi, {Event.Vnode} + events, 0, data) 361 var fflags = processVnodeEvents(events) 362 363 modifyKQueue(s, fdi.uint, EVFILT_VNODE, EV_ADD or EV_CLEAR, fflags, 0, nil) 364 365 when not declared(CACHE_EVENTS): 366 flushKQueue(s) 367 368 inc(s.count) 369 370proc unregister*[T](s: Selector[T], fd: int|SocketHandle) = 371 let fdi = int(fd) 372 s.checkFd(fdi) 373 var pkey = addr(s.fds[fdi]) 374 doAssert(pkey.ident != InvalidIdent, 375 "Descriptor [" & $fdi & "] is not registered in the queue!") 376 377 if pkey.events != {}: 378 if pkey.events * {Event.Read, Event.Write} != {}: 379 if Event.Read in pkey.events: 380 modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil) 381 dec(s.count) 382 if Event.Write in pkey.events: 383 modifyKQueue(s, uint(fdi), EVFILT_WRITE, EV_DELETE, 0, 0, nil) 384 dec(s.count) 385 when not declared(CACHE_EVENTS): 386 flushKQueue(s) 387 elif Event.Timer in pkey.events: 388 if Event.Finished notin pkey.events: 389 modifyKQueue(s, uint(fdi), EVFILT_TIMER, EV_DELETE, 0, 0, nil) 390 when not declared(CACHE_EVENTS): 391 flushKQueue(s) 392 dec(s.count) 393 if posix.close(cint(pkey.ident)) != 0: 394 raiseIOSelectorsError(osLastError()) 395 elif Event.Signal in pkey.events: 396 var nmask, omask: Sigset 397 let signal = cint(pkey.param) 398 discard sigemptyset(nmask) 399 discard sigemptyset(omask) 400 discard sigaddset(nmask, signal) 401 unblockSignals(nmask, omask) 402 posix.signal(signal, SIG_DFL) 403 modifyKQueue(s, uint(pkey.param), EVFILT_SIGNAL, EV_DELETE, 0, 0, nil) 404 when not declared(CACHE_EVENTS): 405 flushKQueue(s) 406 dec(s.count) 407 if posix.close(cint(pkey.ident)) != 0: 408 raiseIOSelectorsError(osLastError()) 409 elif Event.Process in pkey.events: 410 if Event.Finished notin pkey.events: 411 modifyKQueue(s, uint(pkey.param), EVFILT_PROC, EV_DELETE, 0, 0, nil) 412 when not declared(CACHE_EVENTS): 413 flushKQueue(s) 414 dec(s.count) 415 if posix.close(cint(pkey.ident)) != 0: 416 raiseIOSelectorsError(osLastError()) 417 elif Event.Vnode in pkey.events: 418 modifyKQueue(s, uint(fdi), EVFILT_VNODE, EV_DELETE, 0, 0, nil) 419 when not declared(CACHE_EVENTS): 420 flushKQueue(s) 421 dec(s.count) 422 elif Event.User in pkey.events: 423 modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil) 424 when not declared(CACHE_EVENTS): 425 flushKQueue(s) 426 dec(s.count) 427 428 clearKey(pkey) 429 430proc unregister*[T](s: Selector[T], ev: SelectEvent) = 431 let fdi = int(ev.rfd) 432 s.checkFd(fdi) 433 var pkey = addr(s.fds[fdi]) 434 doAssert(pkey.ident != InvalidIdent, "Event is not registered in the queue!") 435 doAssert(Event.User in pkey.events) 436 modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil) 437 when not declared(CACHE_EVENTS): 438 flushKQueue(s) 439 clearKey(pkey) 440 dec(s.count) 441 442proc selectInto*[T](s: Selector[T], timeout: int, 443 results: var openArray[ReadyKey]): int = 444 var 445 tv: Timespec 446 resTable: array[MAX_KQUEUE_EVENTS, KEvent] 447 ptv = addr tv 448 maxres = MAX_KQUEUE_EVENTS 449 450 verifySelectParams(timeout) 451 452 if timeout != -1: 453 if timeout >= 1000: 454 tv.tv_sec = posix.Time(timeout div 1_000) 455 tv.tv_nsec = (timeout %% 1_000) * 1_000_000 456 else: 457 tv.tv_sec = posix.Time(0) 458 tv.tv_nsec = timeout * 1_000_000 459 else: 460 ptv = nil 461 462 if maxres > len(results): 463 maxres = len(results) 464 465 var count = 0 466 when not declared(CACHE_EVENTS): 467 count = kevent(s.kqFD, nil, cint(0), addr(resTable[0]), cint(maxres), ptv) 468 else: 469 when hasThreadSupport: 470 s.withChangeLock(): 471 if s.changesLength > 0: 472 count = kevent(s.kqFD, addr(s.changes[0]), cint(s.changesLength), 473 addr(resTable[0]), cint(maxres), ptv) 474 s.changesLength = 0 475 else: 476 count = kevent(s.kqFD, nil, cint(0), addr(resTable[0]), cint(maxres), 477 ptv) 478 else: 479 let length = cint(len(s.changes)) 480 if length > 0: 481 count = kevent(s.kqFD, addr(s.changes[0]), length, 482 addr(resTable[0]), cint(maxres), ptv) 483 s.changes.setLen(0) 484 else: 485 count = kevent(s.kqFD, nil, cint(0), addr(resTable[0]), cint(maxres), 486 ptv) 487 488 if count < 0: 489 result = 0 490 let err = osLastError() 491 if cint(err) != EINTR: 492 raiseIOSelectorsError(err) 493 elif count == 0: 494 result = 0 495 else: 496 var i = 0 497 var k = 0 # do not delete this, because `continue` used in cycle. 498 var pkey: ptr SelectorKey[T] 499 while i < count: 500 let kevent = addr(resTable[i]) 501 var rkey = ReadyKey(fd: int(kevent.ident), events: {}) 502 503 if (kevent.flags and EV_ERROR) != 0: 504 rkey.events = {Event.Error} 505 rkey.errorCode = OSErrorCode(kevent.data) 506 507 case kevent.filter: 508 of EVFILT_READ: 509 pkey = addr(s.fds[int(kevent.ident)]) 510 rkey.events.incl(Event.Read) 511 if Event.User in pkey.events: 512 var data: uint64 = 0 513 if posix.read(cint(kevent.ident), addr data, 514 sizeof(uint64)) != sizeof(uint64): 515 let err = osLastError() 516 if err == OSErrorCode(EAGAIN): 517 # someone already consumed event data 518 inc(i) 519 continue 520 else: 521 raiseIOSelectorsError(err) 522 rkey.events = {Event.User} 523 of EVFILT_WRITE: 524 pkey = addr(s.fds[int(kevent.ident)]) 525 rkey.events.incl(Event.Write) 526 rkey.events = {Event.Write} 527 of EVFILT_TIMER: 528 pkey = addr(s.fds[int(kevent.ident)]) 529 if Event.Oneshot in pkey.events: 530 # we will not clear key until it will be unregistered, so 531 # application can obtain data, but we will decrease counter, 532 # because kqueue is empty. 533 dec(s.count) 534 # we are marking key with `Finished` event, to avoid double decrease. 535 pkey.events.incl(Event.Finished) 536 rkey.events.incl(Event.Timer) 537 of EVFILT_VNODE: 538 pkey = addr(s.fds[int(kevent.ident)]) 539 rkey.events.incl(Event.Vnode) 540 if (kevent.fflags and NOTE_DELETE) != 0: 541 rkey.events.incl(Event.VnodeDelete) 542 if (kevent.fflags and NOTE_WRITE) != 0: 543 rkey.events.incl(Event.VnodeWrite) 544 if (kevent.fflags and NOTE_EXTEND) != 0: 545 rkey.events.incl(Event.VnodeExtend) 546 if (kevent.fflags and NOTE_ATTRIB) != 0: 547 rkey.events.incl(Event.VnodeAttrib) 548 if (kevent.fflags and NOTE_LINK) != 0: 549 rkey.events.incl(Event.VnodeLink) 550 if (kevent.fflags and NOTE_RENAME) != 0: 551 rkey.events.incl(Event.VnodeRename) 552 if (kevent.fflags and NOTE_REVOKE) != 0: 553 rkey.events.incl(Event.VnodeRevoke) 554 of EVFILT_SIGNAL: 555 pkey = addr(s.fds[cast[int](kevent.udata)]) 556 rkey.fd = cast[int](kevent.udata) 557 rkey.events.incl(Event.Signal) 558 of EVFILT_PROC: 559 rkey.fd = cast[int](kevent.udata) 560 pkey = addr(s.fds[cast[int](kevent.udata)]) 561 # we will not clear key, until it will be unregistered, so 562 # application can obtain data, but we will decrease counter, 563 # because kqueue is empty. 564 dec(s.count) 565 # we are marking key with `Finished` event, to avoid double decrease. 566 pkey.events.incl(Event.Finished) 567 rkey.events.incl(Event.Process) 568 else: 569 doAssert(true, "Unsupported kqueue filter in the queue!") 570 571 if (kevent.flags and EV_EOF) != 0: 572 # TODO this error handling needs to be rethought. 573 # `fflags` can sometimes be `0x80000000` and thus we use 'cast' 574 # here: 575 if kevent.fflags != 0: 576 rkey.errorCode = cast[OSErrorCode](kevent.fflags) 577 else: 578 # This assumes we are dealing with sockets. 579 # TODO: For future-proofing it might be a good idea to give the 580 # user access to the raw `kevent`. 581 rkey.errorCode = OSErrorCode(ECONNRESET) 582 rkey.events.incl(Event.Error) 583 584 results[k] = rkey 585 inc(k) 586 inc(i) 587 result = k 588 589proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] = 590 result = newSeq[ReadyKey](MAX_KQUEUE_EVENTS) 591 let count = selectInto(s, timeout, result) 592 result.setLen(count) 593 594template isEmpty*[T](s: Selector[T]): bool = 595 (s.count == 0) 596 597proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} = 598 return s.fds[fd.int].ident != InvalidIdent 599 600proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T = 601 let fdi = int(fd) 602 s.checkFd(fdi) 603 if fdi in s: 604 result = s.fds[fdi].data 605 606proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool = 607 let fdi = int(fd) 608 s.checkFd(fdi) 609 if fdi in s: 610 s.fds[fdi].data = data 611 result = true 612 613template withData*[T](s: Selector[T], fd: SocketHandle|int, value, 614 body: untyped) = 615 mixin checkFd 616 let fdi = int(fd) 617 s.checkFd(fdi) 618 if fdi in s: 619 var value = addr(s.fds[fdi].data) 620 body 621 622template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1, 623 body2: untyped) = 624 mixin checkFd 625 let fdi = int(fd) 626 s.checkFd(fdi) 627 if fdi in s: 628 var value = addr(s.fds[fdi].data) 629 body1 630 else: 631 body2 632 633 634proc getFd*[T](s: Selector[T]): int = 635 return s.kqFD.int 636