1// +build linux 2 3package singlepoll 4 5import ( 6 "context" 7 "errors" 8 "golang.org/x/sys/unix" 9 "sync" 10 11 "github.com/gxed/eventfd" 12 logging "github.com/ipfs/go-log" 13) 14 15var ( 16 ErrUnsupportedMode error = errors.New("only 'w' and 'r' modes are supported on this arch") 17) 18 19var ( 20 initOnce sync.Once 21 workChan chan interface{} 22 log logging.EventLogger = logging.Logger("reuseport-poll") 23) 24 25type addPoll struct { 26 fd int 27 events uint32 28 ctx context.Context 29 wakeUp chan<- error 30} 31 32type ctxDone struct { 33 fd int 34} 35 36func PollPark(reqctx context.Context, fd int, mode string) error { 37 initOnce.Do(func() { 38 workChan = make(chan interface{}, 128) 39 go worker() 40 }) 41 42 events := uint32(0) 43 for _, c := range mode { 44 switch c { 45 case 'w': 46 events |= unix.EPOLLOUT 47 case 'r': 48 events |= unix.EPOLLIN 49 default: 50 return ErrUnsupportedMode 51 } 52 } 53 54 wakeUp := make(chan error) 55 workChan <- addPoll{ 56 fd: fd, 57 events: events, 58 ctx: reqctx, 59 wakeUp: wakeUp, 60 } 61 62 return <-wakeUp 63} 64 65func criticalError(msg string, err error) { 66 log.Errorf("%s: %s.", msg, err.Error()) 67 log.Errorf("This is critical error, please report it at https://github.com/libp2p/go-reuseport/issues/new") 68 log.Errorf("Bailing out. You are on your own. Good luck.") 69 70 for { 71 select { 72 case <-backgroundctx.Done(): 73 return 74 case unit := <-workChan: 75 switch u := unit.(type) { 76 case addPoll: 77 u.wakeUp <- err 78 default: 79 } 80 } 81 } 82} 83 84func worker() { 85 epfd, err := unix.EpollCreate1(0) 86 if err != nil { 87 criticalError("EpollCreate1(0) failed", err) 88 } 89 evfd, err := eventfd.New() 90 if err != nil { 91 criticalError("eventfd.New() failed", err) 92 } 93 94 pool := make(map[int]addPoll) 95 96 { 97 event := unix.EpollEvent{ 98 Events: unix.EPOLLIN, 99 Fd: int32(evfd.Fd()), 100 } 101 unix.EpollCtl(epfd, unix.EPOLL_CTL_ADD, evfd.Fd(), &event) 102 } 103 go poller(epfd, evfd) 104 105 remove := func(fd int) *addPoll { 106 unit, ok := pool[fd] 107 if ok { 108 unix.EpollCtl(epfd, unix.EPOLL_CTL_DEL, unit.fd, nil) 109 delete(pool, fd) 110 close(unit.wakeUp) 111 } 112 return &unit 113 } 114 for { 115 select { 116 case <-backgroundctx.Done(): 117 evfd.WriteEvents(1) 118 return 119 case unit := <-workChan: 120 switch u := unit.(type) { 121 case addPoll: 122 event := unix.EpollEvent{ 123 Events: u.events | unix.EPOLLONESHOT, 124 Fd: int32(u.fd), 125 } 126 127 // Make copies for *I* before we add it to Epoll group 128 wrapWakeUp := make(chan error) 129 wakeUp := u.wakeUp 130 u.wakeUp = wrapWakeUp 131 132 if _, ok := pool[u.fd]; ok { 133 panic("duplicate fd") // safe guard against bad close calls 134 } 135 pool[u.fd] = u 136 137 err := unix.EpollCtl(epfd, unix.EPOLL_CTL_ADD, u.fd, &event) 138 if err != nil { 139 delete(pool, u.fd) 140 u.wakeUp <- err 141 } 142 143 // *I* 144 reqCtx := u.ctx 145 fd := u.fd 146 147 go func() { 148 select { 149 case err := <-wrapWakeUp: 150 wakeUp <- err 151 case <-reqCtx.Done(): 152 workChan <- ctxDone{ 153 fd: fd, 154 } 155 <-wrapWakeUp 156 wakeUp <- reqCtx.Err() 157 } 158 }() 159 160 case []unix.EpollEvent: 161 for _, event := range u { 162 remove(int(event.Fd)) 163 } 164 case ctxDone: 165 remove(u.fd) 166 } 167 } 168 } 169 170} 171 172func poller(epfd int, evfd *eventfd.EventFD) { 173 for { 174 // do not reuse the array as we will be passing it over channel 175 // 128 is quite arbitrary 176 // to small and number of EpollWait calls would increase 177 // to big and GC overhead increases 178 events := make([]unix.EpollEvent, 128) 179 n, err := unix.EpollWait(epfd, events, -1) 180 181 switch err { 182 case nil: 183 // everything is great 184 case unix.EINTR: 185 // ignore 186 continue 187 default: 188 // log 189 log.Errorf("EpollWait returned error: %s. Continuing.", err.Error()) 190 continue 191 } 192 193 for i := 0; i < n; i++ { 194 if int(events[i].Fd) == evfd.Fd() { 195 unix.Close(epfd) 196 evfd.Close() 197 return 198 } 199 } 200 workChan <- events[:n] 201 } 202} 203