1// Copyright 2018 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package runtime 6 7import ( 8 "runtime/internal/atomic" 9 "unsafe" 10) 11 12// This is based on the former libgo/runtime/netpoll_select.c implementation 13// except that it uses poll instead of select and is written in Go. 14// It's also based on Solaris implementation for the arming mechanisms 15 16//go:cgo_import_dynamic libc_poll poll "libc.a/shr_64.o" 17//go:linkname libc_poll libc_poll 18 19var libc_poll libFunc 20 21//go:nosplit 22func poll(pfds *pollfd, npfds uintptr, timeout uintptr) (int32, int32) { 23 r, err := syscall3(&libc_poll, uintptr(unsafe.Pointer(pfds)), npfds, timeout) 24 return int32(r), int32(err) 25} 26 27// pollfd represents the poll structure for AIX operating system. 28type pollfd struct { 29 fd int32 30 events int16 31 revents int16 32} 33 34const _POLLIN = 0x0001 35const _POLLOUT = 0x0002 36const _POLLHUP = 0x2000 37const _POLLERR = 0x4000 38 39var ( 40 pfds []pollfd 41 pds []*pollDesc 42 mtxpoll mutex 43 mtxset mutex 44 rdwake int32 45 wrwake int32 46 pendingUpdates int32 47 48 netpollWakeSig uint32 // used to avoid duplicate calls of netpollBreak 49) 50 51func netpollinit() { 52 // Create the pipe we use to wakeup poll. 53 r, w, errno := nonblockingPipe() 54 if errno != 0 { 55 throw("netpollinit: failed to create pipe") 56 } 57 rdwake = r 58 wrwake = w 59 60 // Pre-allocate array of pollfd structures for poll. 61 pfds = make([]pollfd, 1, 128) 62 63 // Poll the read side of the pipe. 64 pfds[0].fd = rdwake 65 pfds[0].events = _POLLIN 66 67 pds = make([]*pollDesc, 1, 128) 68 pds[0] = nil 69} 70 71func netpollIsPollDescriptor(fd uintptr) bool { 72 return fd == uintptr(rdwake) || fd == uintptr(wrwake) 73} 74 75// netpollwakeup writes on wrwake to wakeup poll before any changes. 76func netpollwakeup() { 77 if pendingUpdates == 0 { 78 pendingUpdates = 1 79 b := [1]byte{0} 80 write(uintptr(wrwake), unsafe.Pointer(&b[0]), 1) 81 } 82} 83 84func netpollopen(fd uintptr, pd *pollDesc) int32 { 85 lock(&mtxpoll) 86 netpollwakeup() 87 88 lock(&mtxset) 89 unlock(&mtxpoll) 90 91 pd.user = uint32(len(pfds)) 92 pfds = append(pfds, pollfd{fd: int32(fd)}) 93 pds = append(pds, pd) 94 unlock(&mtxset) 95 return 0 96} 97 98func netpollclose(fd uintptr) int32 { 99 lock(&mtxpoll) 100 netpollwakeup() 101 102 lock(&mtxset) 103 unlock(&mtxpoll) 104 105 for i := 0; i < len(pfds); i++ { 106 if pfds[i].fd == int32(fd) { 107 pfds[i] = pfds[len(pfds)-1] 108 pfds = pfds[:len(pfds)-1] 109 110 pds[i] = pds[len(pds)-1] 111 pds[i].user = uint32(i) 112 pds = pds[:len(pds)-1] 113 break 114 } 115 } 116 unlock(&mtxset) 117 return 0 118} 119 120func netpollarm(pd *pollDesc, mode int) { 121 lock(&mtxpoll) 122 netpollwakeup() 123 124 lock(&mtxset) 125 unlock(&mtxpoll) 126 127 switch mode { 128 case 'r': 129 pfds[pd.user].events |= _POLLIN 130 case 'w': 131 pfds[pd.user].events |= _POLLOUT 132 } 133 unlock(&mtxset) 134} 135 136// netpollBreak interrupts a poll. 137func netpollBreak() { 138 if atomic.Cas(&netpollWakeSig, 0, 1) { 139 b := [1]byte{0} 140 write(uintptr(wrwake), unsafe.Pointer(&b[0]), 1) 141 } 142} 143 144// netpoll checks for ready network connections. 145// Returns list of goroutines that become runnable. 146// delay < 0: blocks indefinitely 147// delay == 0: does not block, just polls 148// delay > 0: block for up to that many nanoseconds 149//go:nowritebarrierrec 150func netpoll(delay int64) gList { 151 var timeout uintptr 152 if delay < 0 { 153 timeout = ^uintptr(0) 154 } else if delay == 0 { 155 // TODO: call poll with timeout == 0 156 return gList{} 157 } else if delay < 1e6 { 158 timeout = 1 159 } else if delay < 1e15 { 160 timeout = uintptr(delay / 1e6) 161 } else { 162 // An arbitrary cap on how long to wait for a timer. 163 // 1e9 ms == ~11.5 days. 164 timeout = 1e9 165 } 166retry: 167 lock(&mtxpoll) 168 lock(&mtxset) 169 pendingUpdates = 0 170 unlock(&mtxpoll) 171 172 n, e := poll(&pfds[0], uintptr(len(pfds)), timeout) 173 if n < 0 { 174 if e != _EINTR { 175 println("errno=", e, " len(pfds)=", len(pfds)) 176 throw("poll failed") 177 } 178 unlock(&mtxset) 179 // If a timed sleep was interrupted, just return to 180 // recalculate how long we should sleep now. 181 if timeout > 0 { 182 return gList{} 183 } 184 goto retry 185 } 186 // Check if some descriptors need to be changed 187 if n != 0 && pfds[0].revents&(_POLLIN|_POLLHUP|_POLLERR) != 0 { 188 if delay != 0 { 189 // A netpollwakeup could be picked up by a 190 // non-blocking poll. Only clear the wakeup 191 // if blocking. 192 var b [1]byte 193 for read(rdwake, unsafe.Pointer(&b[0]), 1) == 1 { 194 } 195 atomic.Store(&netpollWakeSig, 0) 196 } 197 // Still look at the other fds even if the mode may have 198 // changed, as netpollBreak might have been called. 199 n-- 200 } 201 var toRun gList 202 for i := 1; i < len(pfds) && n > 0; i++ { 203 pfd := &pfds[i] 204 205 var mode int32 206 if pfd.revents&(_POLLIN|_POLLHUP|_POLLERR) != 0 { 207 mode += 'r' 208 pfd.events &= ^_POLLIN 209 } 210 if pfd.revents&(_POLLOUT|_POLLHUP|_POLLERR) != 0 { 211 mode += 'w' 212 pfd.events &= ^_POLLOUT 213 } 214 if mode != 0 { 215 pds[i].everr = false 216 if pfd.revents == _POLLERR { 217 pds[i].everr = true 218 } 219 netpollready(&toRun, pds[i], mode) 220 n-- 221 } 222 } 223 unlock(&mtxset) 224 return toRun 225} 226