1#
2#
3#            Nim's Runtime Library
4#        (c) Copyright 2016 Eugene Kabanov
5#
6#    See the file "copying.txt", included in this
7#    distribution, for details about the copyright.
8#
9
10# This module implements Linux epoll().
11
12import posix, times, epoll
13
14# Maximum number of events that can be returned
15const MAX_EPOLL_EVENTS = 64
16
17when not defined(android):
18  type
19    SignalFdInfo* {.importc: "struct signalfd_siginfo",
20                    header: "<sys/signalfd.h>", pure, final.} = object
21      ssi_signo*: uint32
22      ssi_errno*: int32
23      ssi_code*: int32
24      ssi_pid*: uint32
25      ssi_uid*: uint32
26      ssi_fd*: int32
27      ssi_tid*: uint32
28      ssi_band*: uint32
29      ssi_overrun*: uint32
30      ssi_trapno*: uint32
31      ssi_status*: int32
32      ssi_int*: int32
33      ssi_ptr*: uint64
34      ssi_utime*: uint64
35      ssi_stime*: uint64
36      ssi_addr*: uint64
37      pad* {.importc: "__pad".}: array[0..47, uint8]
38
39proc timerfd_create(clock_id: ClockId, flags: cint): cint
40     {.cdecl, importc: "timerfd_create", header: "<sys/timerfd.h>".}
41proc timerfd_settime(ufd: cint, flags: cint,
42                      utmr: var Itimerspec, otmr: var Itimerspec): cint
43     {.cdecl, importc: "timerfd_settime", header: "<sys/timerfd.h>".}
44proc eventfd(count: cuint, flags: cint): cint
45     {.cdecl, importc: "eventfd", header: "<sys/eventfd.h>".}
46
47when not defined(android):
48  proc signalfd(fd: cint, mask: var Sigset, flags: cint): cint
49       {.cdecl, importc: "signalfd", header: "<sys/signalfd.h>".}
50
51when hasThreadSupport:
52  type
53    SelectorImpl[T] = object
54      epollFD: cint
55      maxFD: int
56      numFD: int
57      fds: ptr SharedArray[SelectorKey[T]]
58      count*: int
59    Selector*[T] = ptr SelectorImpl[T]
60else:
61  type
62    SelectorImpl[T] = object
63      epollFD: cint
64      maxFD: int
65      numFD: int
66      fds: seq[SelectorKey[T]]
67      count*: int
68    Selector*[T] = ref SelectorImpl[T]
69type
70  SelectEventImpl = object
71    efd: cint
72  SelectEvent* = ptr SelectEventImpl
73
74proc newSelector*[T](): Selector[T] =
75  # Retrieve the maximum fd count (for current OS) via getrlimit()
76  var a = RLimit()
77  if getrlimit(posix.RLIMIT_NOFILE, a) != 0:
78    raiseOSError(osLastError())
79  var maxFD = int(a.rlim_max)
80  doAssert(maxFD > 0)
81  # Start with a reasonable size, checkFd() will grow this on demand
82  const numFD = 1024
83
84  var epollFD = epoll_create1(O_CLOEXEC)
85  if epollFD < 0:
86    raiseOSError(osLastError())
87
88  when hasThreadSupport:
89    result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T])))
90    result.epollFD = epollFD
91    result.maxFD = maxFD
92    result.numFD = numFD
93    result.fds = allocSharedArray[SelectorKey[T]](numFD)
94  else:
95    result = Selector[T]()
96    result.epollFD = epollFD
97    result.maxFD = maxFD
98    result.numFD = numFD
99    result.fds = newSeq[SelectorKey[T]](numFD)
100
101  for i in 0 ..< numFD:
102    result.fds[i].ident = InvalidIdent
103
104proc close*[T](s: Selector[T]) =
105  let res = posix.close(s.epollFD)
106  when hasThreadSupport:
107    deallocSharedArray(s.fds)
108    deallocShared(cast[pointer](s))
109  if res != 0:
110    raiseIOSelectorsError(osLastError())
111
112proc newSelectEvent*(): SelectEvent =
113  let fdci = eventfd(0, O_CLOEXEC or O_NONBLOCK)
114  if fdci == -1:
115    raiseIOSelectorsError(osLastError())
116  result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
117  result.efd = fdci
118
119proc trigger*(ev: SelectEvent) =
120  var data: uint64 = 1
121  if posix.write(ev.efd, addr data, sizeof(uint64)) == -1:
122    raiseIOSelectorsError(osLastError())
123
124proc close*(ev: SelectEvent) =
125  let res = posix.close(ev.efd)
126  deallocShared(cast[pointer](ev))
127  if res != 0:
128    raiseIOSelectorsError(osLastError())
129
130template checkFd(s, f) =
131  # TODO: I don't see how this can ever happen. You won't be able to create an
132  # FD if there is too many. -- DP
133  if f >= s.maxFD:
134    raiseIOSelectorsError("Maximum number of descriptors is exhausted!")
135  if f >= s.numFD:
136    var numFD = s.numFD
137    while numFD <= f: numFD *= 2
138    when hasThreadSupport:
139      s.fds = reallocSharedArray(s.fds, numFD)
140    else:
141      s.fds.setLen(numFD)
142    for i in s.numFD ..< numFD:
143      s.fds[i].ident = InvalidIdent
144    s.numFD = numFD
145
146proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle,
147                        events: set[Event], data: T) =
148  let fdi = int(fd)
149  s.checkFd(fdi)
150  doAssert(s.fds[fdi].ident == InvalidIdent, "Descriptor $# already registered" % $fdi)
151  s.setKey(fdi, events, 0, data)
152  if events != {}:
153    var epv = EpollEvent(events: EPOLLRDHUP)
154    epv.data.u64 = fdi.uint
155    if Event.Read in events: epv.events = epv.events or EPOLLIN
156    if Event.Write in events: epv.events = epv.events or EPOLLOUT
157    if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0:
158      raiseIOSelectorsError(osLastError())
159    inc(s.count)
160
161proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle, events: set[Event]) =
162  let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode,
163                    Event.User, Event.Oneshot, Event.Error}
164  let fdi = int(fd)
165  s.checkFd(fdi)
166  var pkey = addr(s.fds[fdi])
167  doAssert(pkey.ident != InvalidIdent,
168           "Descriptor $# is not registered in the selector!" % $fdi)
169  doAssert(pkey.events * maskEvents == {})
170  if pkey.events != events:
171    var epv = EpollEvent(events: EPOLLRDHUP)
172    epv.data.u64 = fdi.uint
173
174    if Event.Read in events: epv.events = epv.events or EPOLLIN
175    if Event.Write in events: epv.events = epv.events or EPOLLOUT
176
177    if pkey.events == {}:
178      if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0:
179        raiseIOSelectorsError(osLastError())
180      inc(s.count)
181    else:
182      if events != {}:
183        if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fdi.cint, addr epv) != 0:
184          raiseIOSelectorsError(osLastError())
185      else:
186        if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
187          raiseIOSelectorsError(osLastError())
188        dec(s.count)
189    pkey.events = events
190
191proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
192  let fdi = int(fd)
193  s.checkFd(fdi)
194  var pkey = addr(s.fds[fdi])
195  doAssert(pkey.ident != InvalidIdent,
196           "Descriptor $# is not registered in the selector!" % $fdi)
197  if pkey.events != {}:
198    when not defined(android):
199      if Event.Read in pkey.events or Event.Write in pkey.events or Event.User in pkey.events:
200        var epv = EpollEvent()
201        # TODO: Refactor all these EPOLL_CTL_DEL + dec(s.count) into a proc.
202        if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
203          raiseIOSelectorsError(osLastError())
204        dec(s.count)
205      elif Event.Timer in pkey.events:
206        if Event.Finished notin pkey.events:
207          var epv = EpollEvent()
208          if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
209            raiseIOSelectorsError(osLastError())
210          dec(s.count)
211        if posix.close(cint(fdi)) != 0:
212          raiseIOSelectorsError(osLastError())
213      elif Event.Signal in pkey.events:
214        var epv = EpollEvent()
215        if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
216          raiseIOSelectorsError(osLastError())
217        var nmask, omask: Sigset
218        discard sigemptyset(nmask)
219        discard sigemptyset(omask)
220        discard sigaddset(nmask, cint(s.fds[fdi].param))
221        unblockSignals(nmask, omask)
222        dec(s.count)
223        if posix.close(cint(fdi)) != 0:
224          raiseIOSelectorsError(osLastError())
225      elif Event.Process in pkey.events:
226        if Event.Finished notin pkey.events:
227          var epv = EpollEvent()
228          if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
229            raiseIOSelectorsError(osLastError())
230          var nmask, omask: Sigset
231          discard sigemptyset(nmask)
232          discard sigemptyset(omask)
233          discard sigaddset(nmask, SIGCHLD)
234          unblockSignals(nmask, omask)
235          dec(s.count)
236        if posix.close(cint(fdi)) != 0:
237          raiseIOSelectorsError(osLastError())
238    else:
239      if Event.Read in pkey.events or Event.Write in pkey.events or Event.User in pkey.events:
240        var epv = EpollEvent()
241        if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
242          raiseIOSelectorsError(osLastError())
243        dec(s.count)
244      elif Event.Timer in pkey.events:
245        if Event.Finished notin pkey.events:
246          var epv = EpollEvent()
247          if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
248            raiseIOSelectorsError(osLastError())
249          dec(s.count)
250        if posix.close(cint(fdi)) != 0:
251          raiseIOSelectorsError(osLastError())
252  clearKey(pkey)
253
254proc unregister*[T](s: Selector[T], ev: SelectEvent) =
255  let fdi = int(ev.efd)
256  s.checkFd(fdi)
257  var pkey = addr(s.fds[fdi])
258  doAssert(pkey.ident != InvalidIdent, "Event is not registered in the queue!")
259  doAssert(Event.User in pkey.events)
260  var epv = EpollEvent()
261  if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
262    raiseIOSelectorsError(osLastError())
263  dec(s.count)
264  clearKey(pkey)
265
266proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool,
267                       data: T): int {.discardable.} =
268  var
269    newTs: Itimerspec
270    oldTs: Itimerspec
271  let fdi = timerfd_create(CLOCK_MONOTONIC, O_CLOEXEC or O_NONBLOCK).int
272  if fdi == -1:
273    raiseIOSelectorsError(osLastError())
274
275  s.checkFd(fdi)
276  doAssert(s.fds[fdi].ident == InvalidIdent)
277
278  var events = {Event.Timer}
279  var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP)
280  epv.data.u64 = fdi.uint
281
282  if oneshot:
283    newTs.it_interval.tv_sec = posix.Time(0)
284    newTs.it_interval.tv_nsec = 0
285    newTs.it_value.tv_sec = posix.Time(timeout div 1_000)
286    newTs.it_value.tv_nsec = (timeout %% 1_000) * 1_000_000
287    incl(events, Event.Oneshot)
288    epv.events = epv.events or EPOLLONESHOT
289  else:
290    newTs.it_interval.tv_sec = posix.Time(timeout div 1000)
291    newTs.it_interval.tv_nsec = (timeout %% 1_000) * 1_000_000
292    newTs.it_value.tv_sec = newTs.it_interval.tv_sec
293    newTs.it_value.tv_nsec = newTs.it_interval.tv_nsec
294
295  if timerfd_settime(fdi.cint, cint(0), newTs, oldTs) != 0:
296    raiseIOSelectorsError(osLastError())
297  if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0:
298    raiseIOSelectorsError(osLastError())
299  s.setKey(fdi, events, 0, data)
300  inc(s.count)
301  result = fdi
302
303when not defined(android):
304  proc registerSignal*[T](s: Selector[T], signal: int,
305                          data: T): int {.discardable.} =
306    var
307      nmask: Sigset
308      omask: Sigset
309
310    discard sigemptyset(nmask)
311    discard sigemptyset(omask)
312    discard sigaddset(nmask, cint(signal))
313    blockSignals(nmask, omask)
314
315    let fdi = signalfd(-1, nmask, O_CLOEXEC or O_NONBLOCK).int
316    if fdi == -1:
317      raiseIOSelectorsError(osLastError())
318
319    s.checkFd(fdi)
320    doAssert(s.fds[fdi].ident == InvalidIdent)
321
322    var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP)
323    epv.data.u64 = fdi.uint
324    if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0:
325      raiseIOSelectorsError(osLastError())
326    s.setKey(fdi, {Event.Signal}, signal, data)
327    inc(s.count)
328    result = fdi
329
330  proc registerProcess*[T](s: Selector, pid: int,
331                           data: T): int {.discardable.} =
332    var
333      nmask: Sigset
334      omask: Sigset
335
336    discard sigemptyset(nmask)
337    discard sigemptyset(omask)
338    discard sigaddset(nmask, posix.SIGCHLD)
339    blockSignals(nmask, omask)
340
341    let fdi = signalfd(-1, nmask, O_CLOEXEC or O_NONBLOCK).int
342    if fdi == -1:
343      raiseIOSelectorsError(osLastError())
344
345    s.checkFd(fdi)
346    doAssert(s.fds[fdi].ident == InvalidIdent)
347
348    var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP)
349    epv.data.u64 = fdi.uint
350    epv.events = EPOLLIN or EPOLLRDHUP
351    if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0:
352      raiseIOSelectorsError(osLastError())
353    s.setKey(fdi, {Event.Process, Event.Oneshot}, pid, data)
354    inc(s.count)
355    result = fdi
356
357proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
358  let fdi = int(ev.efd)
359  doAssert(s.fds[fdi].ident == InvalidIdent, "Event is already registered in the queue!")
360  s.setKey(fdi, {Event.User}, 0, data)
361  var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP)
362  epv.data.u64 = ev.efd.uint
363  if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, ev.efd, addr epv) != 0:
364    raiseIOSelectorsError(osLastError())
365  inc(s.count)
366
367proc selectInto*[T](s: Selector[T], timeout: int,
368                    results: var openArray[ReadyKey]): int =
369  var
370    resTable: array[MAX_EPOLL_EVENTS, EpollEvent]
371    maxres = MAX_EPOLL_EVENTS
372    i, k: int
373
374  if maxres > len(results):
375    maxres = len(results)
376
377  verifySelectParams(timeout)
378
379  let count = epoll_wait(s.epollFD, addr(resTable[0]), maxres.cint,
380                         timeout.cint)
381  if count < 0:
382    result = 0
383    let err = osLastError()
384    if cint(err) != EINTR:
385      raiseIOSelectorsError(err)
386  elif count == 0:
387    result = 0
388  else:
389    i = 0
390    k = 0
391    while i < count:
392      let fdi = int(resTable[i].data.u64)
393      let pevents = resTable[i].events
394      var pkey = addr(s.fds[fdi])
395      doAssert(pkey.ident != InvalidIdent)
396      var rkey = ReadyKey(fd: fdi, events: {})
397
398      if (pevents and EPOLLERR) != 0 or (pevents and EPOLLHUP) != 0:
399        if (pevents and EPOLLHUP) != 0:
400          rkey.errorCode = OSErrorCode ECONNRESET
401        else:
402          # Try reading SO_ERROR from fd.
403          var error: cint
404          var size = SockLen sizeof(error)
405          if getsockopt(SocketHandle fdi, SOL_SOCKET, SO_ERROR, addr(error),
406                        addr(size)) == 0'i32:
407            rkey.errorCode = OSErrorCode error
408
409        rkey.events.incl(Event.Error)
410      if (pevents and EPOLLOUT) != 0:
411        rkey.events.incl(Event.Write)
412      when not defined(android):
413        if (pevents and EPOLLIN) != 0:
414          if Event.Read in pkey.events:
415            rkey.events.incl(Event.Read)
416          elif Event.Timer in pkey.events:
417            var data: uint64 = 0
418            if posix.read(cint(fdi), addr data,
419                          sizeof(uint64)) != sizeof(uint64):
420              raiseIOSelectorsError(osLastError())
421            rkey.events.incl(Event.Timer)
422          elif Event.Signal in pkey.events:
423            var data = SignalFdInfo()
424            if posix.read(cint(fdi), addr data,
425                          sizeof(SignalFdInfo)) != sizeof(SignalFdInfo):
426              raiseIOSelectorsError(osLastError())
427            rkey.events.incl(Event.Signal)
428          elif Event.Process in pkey.events:
429            var data = SignalFdInfo()
430            if posix.read(cint(fdi), addr data,
431                          sizeof(SignalFdInfo)) != sizeof(SignalFdInfo):
432              raiseIOSelectorsError(osLastError())
433            if cast[int](data.ssi_pid) == pkey.param:
434              rkey.events.incl(Event.Process)
435            else:
436              inc(i)
437              continue
438          elif Event.User in pkey.events:
439            var data: uint64 = 0
440            if posix.read(cint(fdi), addr data,
441                          sizeof(uint64)) != sizeof(uint64):
442              let err = osLastError()
443              if err == OSErrorCode(EAGAIN):
444                inc(i)
445                continue
446              else:
447                raiseIOSelectorsError(err)
448            rkey.events.incl(Event.User)
449      else:
450        if (pevents and EPOLLIN) != 0:
451          if Event.Read in pkey.events:
452            rkey.events.incl(Event.Read)
453          elif Event.Timer in pkey.events:
454            var data: uint64 = 0
455            if posix.read(cint(fdi), addr data,
456                          sizeof(uint64)) != sizeof(uint64):
457              raiseIOSelectorsError(osLastError())
458            rkey.events.incl(Event.Timer)
459          elif Event.User in pkey.events:
460            var data: uint64 = 0
461            if posix.read(cint(fdi), addr data,
462                          sizeof(uint64)) != sizeof(uint64):
463              let err = osLastError()
464              if err == OSErrorCode(EAGAIN):
465                inc(i)
466                continue
467              else:
468                raiseIOSelectorsError(err)
469            rkey.events.incl(Event.User)
470
471      if Event.Oneshot in pkey.events:
472        var epv = EpollEvent()
473        if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, cint(fdi), addr epv) != 0:
474          raiseIOSelectorsError(osLastError())
475        # we will not clear key until it will be unregistered, so
476        # application can obtain data, but we will decrease counter,
477        # because epoll is empty.
478        dec(s.count)
479        # we are marking key with `Finished` event, to avoid double decrease.
480        pkey.events.incl(Event.Finished)
481
482      results[k] = rkey
483      inc(k)
484      inc(i)
485    result = k
486
487proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] =
488  result = newSeq[ReadyKey](MAX_EPOLL_EVENTS)
489  let count = selectInto(s, timeout, result)
490  result.setLen(count)
491
492template isEmpty*[T](s: Selector[T]): bool =
493  (s.count == 0)
494
495proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
496  return s.fds[fd.int].ident != InvalidIdent
497
498proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T =
499  let fdi = int(fd)
500  s.checkFd(fdi)
501  if fdi in s:
502    result = s.fds[fdi].data
503
504proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool =
505  let fdi = int(fd)
506  s.checkFd(fdi)
507  if fdi in s:
508    s.fds[fdi].data = data
509    result = true
510
511template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
512                        body: untyped) =
513  mixin checkFd
514  let fdi = int(fd)
515  s.checkFd(fdi)
516  if fdi in s:
517    var value = addr(s.fds[fdi].data)
518    body
519
520template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1,
521                        body2: untyped) =
522  mixin checkFd
523  let fdi = int(fd)
524  s.checkFd(fdi)
525  if fdi in s:
526    var value = addr(s.fds[fdi].data)
527    body1
528  else:
529    body2
530
531proc getFd*[T](s: Selector[T]): int =
532  return s.epollFd.int
533