1// Copyright 2018 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package runtime 6 7import ( 8 "runtime/internal/atomic" 9 "unsafe" 10) 11 12// This is based on the former libgo/runtime/netpoll_select.c implementation 13// except that it uses poll instead of select and is written in Go. 14// It's also based on Solaris implementation for the arming mechanisms 15 16//go:noescape 17//extern poll 18func libc_poll(pfds *pollfd, npfds uintptr, timeout uintptr) int32 19 20// pollfd represents the poll structure for AIX operating system. 21type pollfd struct { 22 fd int32 23 events int16 24 revents int16 25} 26 27const _POLLIN = 0x0001 28const _POLLOUT = 0x0002 29const _POLLHUP = 0x2000 30const _POLLERR = 0x4000 31 32var ( 33 pfds []pollfd 34 pds []*pollDesc 35 mtxpoll mutex 36 mtxset mutex 37 rdwake int32 38 wrwake int32 39 pendingUpdates int32 40 41 netpollWakeSig uint32 // used to avoid duplicate calls of netpollBreak 42) 43 44func netpollinit() { 45 // Create the pipe we use to wakeup poll. 46 r, w, errno := nonblockingPipe() 47 if errno != 0 { 48 throw("netpollinit: failed to create pipe") 49 } 50 rdwake = r 51 wrwake = w 52 53 // Pre-allocate array of pollfd structures for poll. 54 pfds = make([]pollfd, 1, 128) 55 56 // Poll the read side of the pipe. 57 pfds[0].fd = rdwake 58 pfds[0].events = _POLLIN 59 60 pds = make([]*pollDesc, 1, 128) 61 pds[0] = nil 62} 63 64func netpollIsPollDescriptor(fd uintptr) bool { 65 return fd == uintptr(rdwake) || fd == uintptr(wrwake) 66} 67 68// netpollwakeup writes on wrwake to wakeup poll before any changes. 69func netpollwakeup() { 70 if pendingUpdates == 0 { 71 pendingUpdates = 1 72 b := [1]byte{0} 73 write(uintptr(wrwake), unsafe.Pointer(&b[0]), 1) 74 } 75} 76 77func netpollopen(fd uintptr, pd *pollDesc) int32 { 78 lock(&mtxpoll) 79 netpollwakeup() 80 81 lock(&mtxset) 82 unlock(&mtxpoll) 83 84 pd.user = uint32(len(pfds)) 85 pfds = append(pfds, pollfd{fd: int32(fd)}) 86 pds = append(pds, pd) 87 unlock(&mtxset) 88 return 0 89} 90 91func netpollclose(fd uintptr) int32 { 92 lock(&mtxpoll) 93 netpollwakeup() 94 95 lock(&mtxset) 96 unlock(&mtxpoll) 97 98 for i := 0; i < len(pfds); i++ { 99 if pfds[i].fd == int32(fd) { 100 pfds[i] = pfds[len(pfds)-1] 101 pfds = pfds[:len(pfds)-1] 102 103 pds[i] = pds[len(pds)-1] 104 pds[i].user = uint32(i) 105 pds = pds[:len(pds)-1] 106 break 107 } 108 } 109 unlock(&mtxset) 110 return 0 111} 112 113func netpollarm(pd *pollDesc, mode int) { 114 lock(&mtxpoll) 115 netpollwakeup() 116 117 lock(&mtxset) 118 unlock(&mtxpoll) 119 120 switch mode { 121 case 'r': 122 pfds[pd.user].events |= _POLLIN 123 case 'w': 124 pfds[pd.user].events |= _POLLOUT 125 } 126 unlock(&mtxset) 127} 128 129// netpollBreak interrupts a poll. 130func netpollBreak() { 131 if atomic.Cas(&netpollWakeSig, 0, 1) { 132 b := [1]byte{0} 133 write(uintptr(wrwake), unsafe.Pointer(&b[0]), 1) 134 } 135} 136 137// netpoll checks for ready network connections. 138// Returns list of goroutines that become runnable. 139// delay < 0: blocks indefinitely 140// delay == 0: does not block, just polls 141// delay > 0: block for up to that many nanoseconds 142//go:nowritebarrierrec 143func netpoll(delay int64) gList { 144 var timeout uintptr 145 if delay < 0 { 146 timeout = ^uintptr(0) 147 } else if delay == 0 { 148 // TODO: call poll with timeout == 0 149 return gList{} 150 } else if delay < 1e6 { 151 timeout = 1 152 } else if delay < 1e15 { 153 timeout = uintptr(delay / 1e6) 154 } else { 155 // An arbitrary cap on how long to wait for a timer. 156 // 1e9 ms == ~11.5 days. 157 timeout = 1e9 158 } 159retry: 160 lock(&mtxpoll) 161 lock(&mtxset) 162 pendingUpdates = 0 163 unlock(&mtxpoll) 164 165 n := libc_poll(&pfds[0], uintptr(len(pfds)), timeout) 166 if n < 0 { 167 e := errno() 168 if e != _EINTR { 169 println("errno=", e, " len(pfds)=", len(pfds)) 170 throw("poll failed") 171 } 172 unlock(&mtxset) 173 // If a timed sleep was interrupted, just return to 174 // recalculate how long we should sleep now. 175 if timeout > 0 { 176 return gList{} 177 } 178 goto retry 179 } 180 // Check if some descriptors need to be changed 181 if n != 0 && pfds[0].revents&(_POLLIN|_POLLHUP|_POLLERR) != 0 { 182 if delay != 0 { 183 // A netpollwakeup could be picked up by a 184 // non-blocking poll. Only clear the wakeup 185 // if blocking. 186 var b [1]byte 187 for read(rdwake, unsafe.Pointer(&b[0]), 1) == 1 { 188 } 189 atomic.Store(&netpollWakeSig, 0) 190 } 191 // Still look at the other fds even if the mode may have 192 // changed, as netpollBreak might have been called. 193 n-- 194 } 195 var toRun gList 196 for i := 1; i < len(pfds) && n > 0; i++ { 197 pfd := &pfds[i] 198 199 var mode int32 200 if pfd.revents&(_POLLIN|_POLLHUP|_POLLERR) != 0 { 201 mode += 'r' 202 pfd.events &= ^_POLLIN 203 } 204 if pfd.revents&(_POLLOUT|_POLLHUP|_POLLERR) != 0 { 205 mode += 'w' 206 pfd.events &= ^_POLLOUT 207 } 208 if mode != 0 { 209 pds[i].everr = false 210 if pfd.revents == _POLLERR { 211 pds[i].everr = true 212 } 213 netpollready(&toRun, pds[i], mode) 214 n-- 215 } 216 } 217 unlock(&mtxset) 218 return toRun 219} 220