1#
2#
3#            Nim's Runtime Library
4#        (c) Copyright 2016 Eugene Kabanov
5#
6#    See the file "copying.txt", included in this
7#    distribution, for details about the copyright.
8#
9
10#  This module implements BSD kqueue().
11
12import posix, times, kqueue, nativesockets
13
14const
15  # Maximum number of events that can be returned.
16  MAX_KQUEUE_EVENTS = 64
17  # SIG_IGN and SIG_DFL declared in posix.nim as variables, but we need them
18  # to be constants and GC-safe.
19  SIG_DFL = cast[proc(x: cint) {.noconv,gcsafe.}](0)
20  SIG_IGN = cast[proc(x: cint) {.noconv,gcsafe.}](1)
21
22when defined(kqcache):
23  const CACHE_EVENTS = true
24
25when defined(macosx) or defined(freebsd) or defined(dragonfly):
26  when defined(macosx):
27    const MAX_DESCRIPTORS_ID = 29 # KERN_MAXFILESPERPROC (MacOS)
28  else:
29    const MAX_DESCRIPTORS_ID = 27 # KERN_MAXFILESPERPROC (FreeBSD)
30  proc sysctl(name: ptr cint, namelen: cuint, oldp: pointer, oldplen: ptr csize_t,
31              newp: pointer, newplen: csize_t): cint
32       {.importc: "sysctl",header: """#include <sys/types.h>
33                                      #include <sys/sysctl.h>""".}
34elif defined(netbsd) or defined(openbsd):
35  # OpenBSD and NetBSD don't have KERN_MAXFILESPERPROC, so we are using
36  # KERN_MAXFILES, because KERN_MAXFILES is always bigger,
37  # than KERN_MAXFILESPERPROC.
38  const MAX_DESCRIPTORS_ID = 7 # KERN_MAXFILES
39  proc sysctl(name: ptr cint, namelen: cuint, oldp: pointer, oldplen: ptr csize_t,
40              newp: pointer, newplen: csize_t): cint
41       {.importc: "sysctl",header: """#include <sys/param.h>
42                                      #include <sys/sysctl.h>""".}
43
44when hasThreadSupport:
45  type
46    SelectorImpl[T] = object
47      kqFD: cint
48      maxFD: int
49      changes: ptr SharedArray[KEvent]
50      fds: ptr SharedArray[SelectorKey[T]]
51      count*: int
52      changesLock: Lock
53      changesSize: int
54      changesLength: int
55      sock: cint
56    Selector*[T] = ptr SelectorImpl[T]
57else:
58  type
59    SelectorImpl[T] = object
60      kqFD: cint
61      maxFD: int
62      changes: seq[KEvent]
63      fds: seq[SelectorKey[T]]
64      count*: int
65      sock: cint
66    Selector*[T] = ref SelectorImpl[T]
67
68type
69  SelectEventImpl = object
70    rfd: cint
71    wfd: cint
72
73  SelectEvent* = ptr SelectEventImpl
74  # SelectEvent is declared as `ptr` to be placed in `shared memory`,
75  # so you can share one SelectEvent handle between threads.
76
77proc getUnique[T](s: Selector[T]): int {.inline.} =
78  # we create duplicated handles to get unique indexes for our `fds` array.
79  result = posix.fcntl(s.sock, F_DUPFD_CLOEXEC, s.sock)
80  if result == -1:
81    raiseIOSelectorsError(osLastError())
82
83proc newSelector*[T](): owned(Selector[T]) =
84  var maxFD = 0.cint
85  var size = csize_t(sizeof(cint))
86  var namearr = [1.cint, MAX_DESCRIPTORS_ID.cint]
87  # Obtain maximum number of opened file descriptors for process
88  if sysctl(addr(namearr[0]), 2, cast[pointer](addr maxFD), addr size,
89            nil, 0) != 0:
90    raiseIOSelectorsError(osLastError())
91
92  var kqFD = kqueue()
93  if kqFD < 0:
94    raiseIOSelectorsError(osLastError())
95
96  # we allocating empty socket to duplicate it handle in future, to get unique
97  # indexes for `fds` array. This is needed to properly identify
98  # {Event.Timer, Event.Signal, Event.Process} events.
99  let usock = createNativeSocket(posix.AF_INET, posix.SOCK_STREAM,
100                                 posix.IPPROTO_TCP).cint
101  if usock == -1:
102    let err = osLastError()
103    discard posix.close(kqFD)
104    raiseIOSelectorsError(err)
105
106  when hasThreadSupport:
107    result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T])))
108    result.fds = allocSharedArray[SelectorKey[T]](maxFD)
109    result.changes = allocSharedArray[KEvent](MAX_KQUEUE_EVENTS)
110    result.changesSize = MAX_KQUEUE_EVENTS
111    initLock(result.changesLock)
112  else:
113    result = Selector[T]()
114    result.fds = newSeq[SelectorKey[T]](maxFD)
115    result.changes = newSeqOfCap[KEvent](MAX_KQUEUE_EVENTS)
116
117  for i in 0 ..< maxFD:
118    result.fds[i].ident = InvalidIdent
119
120  result.sock = usock
121  result.kqFD = kqFD
122  result.maxFD = maxFD.int
123
124proc close*[T](s: Selector[T]) =
125  let res1 = posix.close(s.kqFD)
126  let res2 = posix.close(s.sock)
127  when hasThreadSupport:
128    deinitLock(s.changesLock)
129    deallocSharedArray(s.fds)
130    deallocShared(cast[pointer](s))
131  if res1 != 0 or res2 != 0:
132    raiseIOSelectorsError(osLastError())
133
134proc newSelectEvent*(): SelectEvent =
135  var fds: array[2, cint]
136  if posix.pipe(fds) != 0:
137    raiseIOSelectorsError(osLastError())
138  setNonBlocking(fds[0])
139  setNonBlocking(fds[1])
140  result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
141  result.rfd = fds[0]
142  result.wfd = fds[1]
143
144proc trigger*(ev: SelectEvent) =
145  var data: uint64 = 1
146  if posix.write(ev.wfd, addr data, sizeof(uint64)) != sizeof(uint64):
147    raiseIOSelectorsError(osLastError())
148
149proc close*(ev: SelectEvent) =
150  let res1 = posix.close(ev.rfd)
151  let res2 = posix.close(ev.wfd)
152  deallocShared(cast[pointer](ev))
153  if res1 != 0 or res2 != 0:
154    raiseIOSelectorsError(osLastError())
155
156template checkFd(s, f) =
157  if f >= s.maxFD:
158    raiseIOSelectorsError("Maximum number of descriptors is exhausted!")
159
160when hasThreadSupport:
161  template withChangeLock[T](s: Selector[T], body: untyped) =
162    acquire(s.changesLock)
163    {.locks: [s.changesLock].}:
164      try:
165        body
166      finally:
167        release(s.changesLock)
168else:
169  template withChangeLock(s, body: untyped) =
170    body
171
172when hasThreadSupport:
173  template modifyKQueue[T](s: Selector[T], nident: uint, nfilter: cshort,
174                           nflags: cushort, nfflags: cuint, ndata: int,
175                           nudata: pointer) =
176    mixin withChangeLock
177    s.withChangeLock():
178      if s.changesLength == s.changesSize:
179        # if cache array is full, we allocating new with size * 2
180        let newSize = s.changesSize shl 1
181        let rdata = allocSharedArray[KEvent](newSize)
182        copyMem(rdata, s.changes, s.changesSize * sizeof(KEvent))
183        s.changesSize = newSize
184      s.changes[s.changesLength] = KEvent(ident: nident,
185                                          filter: nfilter, flags: nflags,
186                                          fflags: nfflags, data: ndata,
187                                          udata: nudata)
188      inc(s.changesLength)
189
190  when not declared(CACHE_EVENTS):
191    template flushKQueue[T](s: Selector[T]) =
192      mixin withChangeLock
193      s.withChangeLock():
194        if s.changesLength > 0:
195          if kevent(s.kqFD, addr(s.changes[0]), cint(s.changesLength),
196                    nil, 0, nil) == -1:
197            raiseIOSelectorsError(osLastError())
198          s.changesLength = 0
199else:
200  template modifyKQueue[T](s: Selector[T], nident: uint, nfilter: cshort,
201                           nflags: cushort, nfflags: cuint, ndata: int,
202                           nudata: pointer) =
203    s.changes.add(KEvent(ident: nident,
204                         filter: nfilter, flags: nflags,
205                         fflags: nfflags, data: ndata,
206                         udata: nudata))
207
208  when not declared(CACHE_EVENTS):
209    template flushKQueue[T](s: Selector[T]) =
210      let length = cint(len(s.changes))
211      if length > 0:
212        if kevent(s.kqFD, addr(s.changes[0]), length,
213                  nil, 0, nil) == -1:
214          raiseIOSelectorsError(osLastError())
215        s.changes.setLen(0)
216
217proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle,
218                        events: set[Event], data: T) =
219  let fdi = int(fd)
220  s.checkFd(fdi)
221  doAssert(s.fds[fdi].ident == InvalidIdent)
222  s.setKey(fdi, events, 0, data)
223
224  if events != {}:
225    if Event.Read in events:
226      modifyKQueue(s, uint(fdi), EVFILT_READ, EV_ADD, 0, 0, nil)
227      inc(s.count)
228    if Event.Write in events:
229      modifyKQueue(s, uint(fdi), EVFILT_WRITE, EV_ADD, 0, 0, nil)
230      inc(s.count)
231
232    when not declared(CACHE_EVENTS):
233      flushKQueue(s)
234
235proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle,
236                      events: set[Event]) =
237  let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode,
238                    Event.User, Event.Oneshot, Event.Error}
239  let fdi = int(fd)
240  s.checkFd(fdi)
241  var pkey = addr(s.fds[fdi])
242  doAssert(pkey.ident != InvalidIdent,
243           "Descriptor $# is not registered in the queue!" % $fdi)
244  doAssert(pkey.events * maskEvents == {})
245
246  if pkey.events != events:
247    if (Event.Read in pkey.events) and (Event.Read notin events):
248      modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil)
249      dec(s.count)
250    if (Event.Write in pkey.events) and (Event.Write notin events):
251      modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_DELETE, 0, 0, nil)
252      dec(s.count)
253    if (Event.Read notin pkey.events) and (Event.Read in events):
254      modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil)
255      inc(s.count)
256    if (Event.Write notin pkey.events) and (Event.Write in events):
257      modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_ADD, 0, 0, nil)
258      inc(s.count)
259
260    when not declared(CACHE_EVENTS):
261      flushKQueue(s)
262
263    pkey.events = events
264
265proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool,
266                       data: T): int {.discardable.} =
267  let fdi = getUnique(s)
268  s.checkFd(fdi)
269  doAssert(s.fds[fdi].ident == InvalidIdent)
270
271  let events = if oneshot: {Event.Timer, Event.Oneshot} else: {Event.Timer}
272  let flags: cushort = if oneshot: EV_ONESHOT or EV_ADD else: EV_ADD
273
274  s.setKey(fdi, events, 0, data)
275
276  # EVFILT_TIMER on Open/Net(BSD) has granularity of only milliseconds,
277  # but MacOS and FreeBSD allow use `0` as `fflags` to use milliseconds
278  # too
279  modifyKQueue(s, fdi.uint, EVFILT_TIMER, flags, 0, cint(timeout), nil)
280
281  when not declared(CACHE_EVENTS):
282    flushKQueue(s)
283
284  inc(s.count)
285  result = fdi
286
287proc registerSignal*[T](s: Selector[T], signal: int,
288                        data: T): int {.discardable.} =
289  let fdi = getUnique(s)
290  s.checkFd(fdi)
291  doAssert(s.fds[fdi].ident == InvalidIdent)
292
293  s.setKey(fdi, {Event.Signal}, signal, data)
294  var nmask, omask: Sigset
295  discard sigemptyset(nmask)
296  discard sigemptyset(omask)
297  discard sigaddset(nmask, cint(signal))
298  blockSignals(nmask, omask)
299  # to be compatible with linux semantic we need to "eat" signals
300  posix.signal(cint(signal), SIG_IGN)
301
302  modifyKQueue(s, signal.uint, EVFILT_SIGNAL, EV_ADD, 0, 0,
303               cast[pointer](fdi))
304
305  when not declared(CACHE_EVENTS):
306    flushKQueue(s)
307
308  inc(s.count)
309  result = fdi
310
311proc registerProcess*[T](s: Selector[T], pid: int,
312                         data: T): int {.discardable.} =
313  let fdi = getUnique(s)
314  s.checkFd(fdi)
315  doAssert(s.fds[fdi].ident == InvalidIdent)
316
317  var kflags: cushort = EV_ONESHOT or EV_ADD
318  setKey(s, fdi, {Event.Process, Event.Oneshot}, pid, data)
319
320  modifyKQueue(s, pid.uint, EVFILT_PROC, kflags, NOTE_EXIT, 0,
321               cast[pointer](fdi))
322
323  when not declared(CACHE_EVENTS):
324    flushKQueue(s)
325
326  inc(s.count)
327  result = fdi
328
329proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
330  let fdi = ev.rfd.int
331  doAssert(s.fds[fdi].ident == InvalidIdent, "Event is already registered in the queue!")
332  setKey(s, fdi, {Event.User}, 0, data)
333
334  modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil)
335
336  when not declared(CACHE_EVENTS):
337    flushKQueue(s)
338
339  inc(s.count)
340
341template processVnodeEvents(events: set[Event]): cuint =
342  var rfflags = 0.cuint
343  if events == {Event.VnodeWrite, Event.VnodeDelete, Event.VnodeExtend,
344                Event.VnodeAttrib, Event.VnodeLink, Event.VnodeRename,
345                Event.VnodeRevoke}:
346    rfflags = NOTE_DELETE or NOTE_WRITE or NOTE_EXTEND or NOTE_ATTRIB or
347              NOTE_LINK or NOTE_RENAME or NOTE_REVOKE
348  else:
349    if Event.VnodeDelete in events: rfflags = rfflags or NOTE_DELETE
350    if Event.VnodeWrite in events: rfflags = rfflags or NOTE_WRITE
351    if Event.VnodeExtend in events: rfflags = rfflags or NOTE_EXTEND
352    if Event.VnodeAttrib in events: rfflags = rfflags or NOTE_ATTRIB
353    if Event.VnodeLink in events: rfflags = rfflags or NOTE_LINK
354    if Event.VnodeRename in events: rfflags = rfflags or NOTE_RENAME
355    if Event.VnodeRevoke in events: rfflags = rfflags or NOTE_REVOKE
356  rfflags
357
358proc registerVnode*[T](s: Selector[T], fd: cint, events: set[Event], data: T) =
359  let fdi = fd.int
360  setKey(s, fdi, {Event.Vnode} + events, 0, data)
361  var fflags = processVnodeEvents(events)
362
363  modifyKQueue(s, fdi.uint, EVFILT_VNODE, EV_ADD or EV_CLEAR, fflags, 0, nil)
364
365  when not declared(CACHE_EVENTS):
366    flushKQueue(s)
367
368  inc(s.count)
369
370proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
371  let fdi = int(fd)
372  s.checkFd(fdi)
373  var pkey = addr(s.fds[fdi])
374  doAssert(pkey.ident != InvalidIdent,
375           "Descriptor [" & $fdi & "] is not registered in the queue!")
376
377  if pkey.events != {}:
378    if pkey.events * {Event.Read, Event.Write} != {}:
379      if Event.Read in pkey.events:
380        modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil)
381        dec(s.count)
382      if Event.Write in pkey.events:
383        modifyKQueue(s, uint(fdi), EVFILT_WRITE, EV_DELETE, 0, 0, nil)
384        dec(s.count)
385      when not declared(CACHE_EVENTS):
386        flushKQueue(s)
387    elif Event.Timer in pkey.events:
388      if Event.Finished notin pkey.events:
389        modifyKQueue(s, uint(fdi), EVFILT_TIMER, EV_DELETE, 0, 0, nil)
390        when not declared(CACHE_EVENTS):
391          flushKQueue(s)
392        dec(s.count)
393      if posix.close(cint(pkey.ident)) != 0:
394        raiseIOSelectorsError(osLastError())
395    elif Event.Signal in pkey.events:
396      var nmask, omask: Sigset
397      let signal = cint(pkey.param)
398      discard sigemptyset(nmask)
399      discard sigemptyset(omask)
400      discard sigaddset(nmask, signal)
401      unblockSignals(nmask, omask)
402      posix.signal(signal, SIG_DFL)
403      modifyKQueue(s, uint(pkey.param), EVFILT_SIGNAL, EV_DELETE, 0, 0, nil)
404      when not declared(CACHE_EVENTS):
405        flushKQueue(s)
406      dec(s.count)
407      if posix.close(cint(pkey.ident)) != 0:
408        raiseIOSelectorsError(osLastError())
409    elif Event.Process in pkey.events:
410      if Event.Finished notin pkey.events:
411        modifyKQueue(s, uint(pkey.param), EVFILT_PROC, EV_DELETE, 0, 0, nil)
412        when not declared(CACHE_EVENTS):
413          flushKQueue(s)
414        dec(s.count)
415      if posix.close(cint(pkey.ident)) != 0:
416        raiseIOSelectorsError(osLastError())
417    elif Event.Vnode in pkey.events:
418      modifyKQueue(s, uint(fdi), EVFILT_VNODE, EV_DELETE, 0, 0, nil)
419      when not declared(CACHE_EVENTS):
420        flushKQueue(s)
421      dec(s.count)
422    elif Event.User in pkey.events:
423      modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil)
424      when not declared(CACHE_EVENTS):
425        flushKQueue(s)
426      dec(s.count)
427
428  clearKey(pkey)
429
430proc unregister*[T](s: Selector[T], ev: SelectEvent) =
431  let fdi = int(ev.rfd)
432  s.checkFd(fdi)
433  var pkey = addr(s.fds[fdi])
434  doAssert(pkey.ident != InvalidIdent, "Event is not registered in the queue!")
435  doAssert(Event.User in pkey.events)
436  modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil)
437  when not declared(CACHE_EVENTS):
438    flushKQueue(s)
439  clearKey(pkey)
440  dec(s.count)
441
442proc selectInto*[T](s: Selector[T], timeout: int,
443                    results: var openArray[ReadyKey]): int =
444  var
445    tv: Timespec
446    resTable: array[MAX_KQUEUE_EVENTS, KEvent]
447    ptv = addr tv
448    maxres = MAX_KQUEUE_EVENTS
449
450  verifySelectParams(timeout)
451
452  if timeout != -1:
453    if timeout >= 1000:
454      tv.tv_sec = posix.Time(timeout div 1_000)
455      tv.tv_nsec = (timeout %% 1_000) * 1_000_000
456    else:
457      tv.tv_sec = posix.Time(0)
458      tv.tv_nsec = timeout * 1_000_000
459  else:
460    ptv = nil
461
462  if maxres > len(results):
463    maxres = len(results)
464
465  var count = 0
466  when not declared(CACHE_EVENTS):
467    count = kevent(s.kqFD, nil, cint(0), addr(resTable[0]), cint(maxres), ptv)
468  else:
469    when hasThreadSupport:
470      s.withChangeLock():
471        if s.changesLength > 0:
472          count = kevent(s.kqFD, addr(s.changes[0]), cint(s.changesLength),
473                         addr(resTable[0]), cint(maxres), ptv)
474          s.changesLength = 0
475        else:
476          count = kevent(s.kqFD, nil, cint(0), addr(resTable[0]), cint(maxres),
477                         ptv)
478    else:
479      let length = cint(len(s.changes))
480      if length > 0:
481        count = kevent(s.kqFD, addr(s.changes[0]), length,
482                       addr(resTable[0]), cint(maxres), ptv)
483        s.changes.setLen(0)
484      else:
485        count = kevent(s.kqFD, nil, cint(0), addr(resTable[0]), cint(maxres),
486                       ptv)
487
488  if count < 0:
489    result = 0
490    let err = osLastError()
491    if cint(err) != EINTR:
492      raiseIOSelectorsError(err)
493  elif count == 0:
494    result = 0
495  else:
496    var i = 0
497    var k = 0 # do not delete this, because `continue` used in cycle.
498    var pkey: ptr SelectorKey[T]
499    while i < count:
500      let kevent = addr(resTable[i])
501      var rkey = ReadyKey(fd: int(kevent.ident), events: {})
502
503      if (kevent.flags and EV_ERROR) != 0:
504        rkey.events = {Event.Error}
505        rkey.errorCode = OSErrorCode(kevent.data)
506
507      case kevent.filter:
508      of EVFILT_READ:
509        pkey = addr(s.fds[int(kevent.ident)])
510        rkey.events.incl(Event.Read)
511        if Event.User in pkey.events:
512          var data: uint64 = 0
513          if posix.read(cint(kevent.ident), addr data,
514                        sizeof(uint64)) != sizeof(uint64):
515            let err = osLastError()
516            if err == OSErrorCode(EAGAIN):
517              # someone already consumed event data
518              inc(i)
519              continue
520            else:
521              raiseIOSelectorsError(err)
522          rkey.events = {Event.User}
523      of EVFILT_WRITE:
524        pkey = addr(s.fds[int(kevent.ident)])
525        rkey.events.incl(Event.Write)
526        rkey.events = {Event.Write}
527      of EVFILT_TIMER:
528        pkey = addr(s.fds[int(kevent.ident)])
529        if Event.Oneshot in pkey.events:
530          # we will not clear key until it will be unregistered, so
531          # application can obtain data, but we will decrease counter,
532          # because kqueue is empty.
533          dec(s.count)
534          # we are marking key with `Finished` event, to avoid double decrease.
535          pkey.events.incl(Event.Finished)
536        rkey.events.incl(Event.Timer)
537      of EVFILT_VNODE:
538        pkey = addr(s.fds[int(kevent.ident)])
539        rkey.events.incl(Event.Vnode)
540        if (kevent.fflags and NOTE_DELETE) != 0:
541          rkey.events.incl(Event.VnodeDelete)
542        if (kevent.fflags and NOTE_WRITE) != 0:
543          rkey.events.incl(Event.VnodeWrite)
544        if (kevent.fflags and NOTE_EXTEND) != 0:
545          rkey.events.incl(Event.VnodeExtend)
546        if (kevent.fflags and NOTE_ATTRIB) != 0:
547          rkey.events.incl(Event.VnodeAttrib)
548        if (kevent.fflags and NOTE_LINK) != 0:
549          rkey.events.incl(Event.VnodeLink)
550        if (kevent.fflags and NOTE_RENAME) != 0:
551          rkey.events.incl(Event.VnodeRename)
552        if (kevent.fflags and NOTE_REVOKE) != 0:
553          rkey.events.incl(Event.VnodeRevoke)
554      of EVFILT_SIGNAL:
555        pkey = addr(s.fds[cast[int](kevent.udata)])
556        rkey.fd = cast[int](kevent.udata)
557        rkey.events.incl(Event.Signal)
558      of EVFILT_PROC:
559        rkey.fd = cast[int](kevent.udata)
560        pkey = addr(s.fds[cast[int](kevent.udata)])
561        # we will not clear key, until it will be unregistered, so
562        # application can obtain data, but we will decrease counter,
563        # because kqueue is empty.
564        dec(s.count)
565        # we are marking key with `Finished` event, to avoid double decrease.
566        pkey.events.incl(Event.Finished)
567        rkey.events.incl(Event.Process)
568      else:
569        doAssert(true, "Unsupported kqueue filter in the queue!")
570
571      if (kevent.flags and EV_EOF) != 0:
572        # TODO this error handling needs to be rethought.
573        # `fflags` can sometimes be `0x80000000` and thus we use 'cast'
574        # here:
575        if kevent.fflags != 0:
576          rkey.errorCode = cast[OSErrorCode](kevent.fflags)
577        else:
578          # This assumes we are dealing with sockets.
579          # TODO: For future-proofing it might be a good idea to give the
580          #       user access to the raw `kevent`.
581          rkey.errorCode = OSErrorCode(ECONNRESET)
582        rkey.events.incl(Event.Error)
583
584      results[k] = rkey
585      inc(k)
586      inc(i)
587    result = k
588
589proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] =
590  result = newSeq[ReadyKey](MAX_KQUEUE_EVENTS)
591  let count = selectInto(s, timeout, result)
592  result.setLen(count)
593
594template isEmpty*[T](s: Selector[T]): bool =
595  (s.count == 0)
596
597proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
598  return s.fds[fd.int].ident != InvalidIdent
599
600proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T =
601  let fdi = int(fd)
602  s.checkFd(fdi)
603  if fdi in s:
604    result = s.fds[fdi].data
605
606proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool =
607  let fdi = int(fd)
608  s.checkFd(fdi)
609  if fdi in s:
610    s.fds[fdi].data = data
611    result = true
612
613template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
614                      body: untyped) =
615  mixin checkFd
616  let fdi = int(fd)
617  s.checkFd(fdi)
618  if fdi in s:
619    var value = addr(s.fds[fdi].data)
620    body
621
622template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1,
623                      body2: untyped) =
624  mixin checkFd
625  let fdi = int(fd)
626  s.checkFd(fdi)
627  if fdi in s:
628    var value = addr(s.fds[fdi].data)
629    body1
630  else:
631    body2
632
633
634proc getFd*[T](s: Selector[T]): int =
635  return s.kqFD.int
636