1// Copyright 2018 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package runtime
6
7import (
8	"runtime/internal/atomic"
9	"unsafe"
10)
11
12// This is based on the former libgo/runtime/netpoll_select.c implementation
13// except that it uses poll instead of select and is written in Go.
14// It's also based on Solaris implementation for the arming mechanisms
15
16//go:cgo_import_dynamic libc_poll poll "libc.a/shr_64.o"
17//go:linkname libc_poll libc_poll
18
19var libc_poll libFunc
20
21//go:nosplit
22func poll(pfds *pollfd, npfds uintptr, timeout uintptr) (int32, int32) {
23	r, err := syscall3(&libc_poll, uintptr(unsafe.Pointer(pfds)), npfds, timeout)
24	return int32(r), int32(err)
25}
26
27// pollfd represents the poll structure for AIX operating system.
28type pollfd struct {
29	fd      int32
30	events  int16
31	revents int16
32}
33
34const _POLLIN = 0x0001
35const _POLLOUT = 0x0002
36const _POLLHUP = 0x2000
37const _POLLERR = 0x4000
38
39var (
40	pfds           []pollfd
41	pds            []*pollDesc
42	mtxpoll        mutex
43	mtxset         mutex
44	rdwake         int32
45	wrwake         int32
46	pendingUpdates int32
47
48	netpollWakeSig uint32 // used to avoid duplicate calls of netpollBreak
49)
50
51func netpollinit() {
52	// Create the pipe we use to wakeup poll.
53	r, w, errno := nonblockingPipe()
54	if errno != 0 {
55		throw("netpollinit: failed to create pipe")
56	}
57	rdwake = r
58	wrwake = w
59
60	// Pre-allocate array of pollfd structures for poll.
61	pfds = make([]pollfd, 1, 128)
62
63	// Poll the read side of the pipe.
64	pfds[0].fd = rdwake
65	pfds[0].events = _POLLIN
66
67	pds = make([]*pollDesc, 1, 128)
68	pds[0] = nil
69}
70
71func netpollIsPollDescriptor(fd uintptr) bool {
72	return fd == uintptr(rdwake) || fd == uintptr(wrwake)
73}
74
75// netpollwakeup writes on wrwake to wakeup poll before any changes.
76func netpollwakeup() {
77	if pendingUpdates == 0 {
78		pendingUpdates = 1
79		b := [1]byte{0}
80		write(uintptr(wrwake), unsafe.Pointer(&b[0]), 1)
81	}
82}
83
84func netpollopen(fd uintptr, pd *pollDesc) int32 {
85	lock(&mtxpoll)
86	netpollwakeup()
87
88	lock(&mtxset)
89	unlock(&mtxpoll)
90
91	pd.user = uint32(len(pfds))
92	pfds = append(pfds, pollfd{fd: int32(fd)})
93	pds = append(pds, pd)
94	unlock(&mtxset)
95	return 0
96}
97
98func netpollclose(fd uintptr) int32 {
99	lock(&mtxpoll)
100	netpollwakeup()
101
102	lock(&mtxset)
103	unlock(&mtxpoll)
104
105	for i := 0; i < len(pfds); i++ {
106		if pfds[i].fd == int32(fd) {
107			pfds[i] = pfds[len(pfds)-1]
108			pfds = pfds[:len(pfds)-1]
109
110			pds[i] = pds[len(pds)-1]
111			pds[i].user = uint32(i)
112			pds = pds[:len(pds)-1]
113			break
114		}
115	}
116	unlock(&mtxset)
117	return 0
118}
119
120func netpollarm(pd *pollDesc, mode int) {
121	lock(&mtxpoll)
122	netpollwakeup()
123
124	lock(&mtxset)
125	unlock(&mtxpoll)
126
127	switch mode {
128	case 'r':
129		pfds[pd.user].events |= _POLLIN
130	case 'w':
131		pfds[pd.user].events |= _POLLOUT
132	}
133	unlock(&mtxset)
134}
135
136// netpollBreak interrupts a poll.
137func netpollBreak() {
138	if atomic.Cas(&netpollWakeSig, 0, 1) {
139		b := [1]byte{0}
140		write(uintptr(wrwake), unsafe.Pointer(&b[0]), 1)
141	}
142}
143
144// netpoll checks for ready network connections.
145// Returns list of goroutines that become runnable.
146// delay < 0: blocks indefinitely
147// delay == 0: does not block, just polls
148// delay > 0: block for up to that many nanoseconds
149//go:nowritebarrierrec
150func netpoll(delay int64) gList {
151	var timeout uintptr
152	if delay < 0 {
153		timeout = ^uintptr(0)
154	} else if delay == 0 {
155		// TODO: call poll with timeout == 0
156		return gList{}
157	} else if delay < 1e6 {
158		timeout = 1
159	} else if delay < 1e15 {
160		timeout = uintptr(delay / 1e6)
161	} else {
162		// An arbitrary cap on how long to wait for a timer.
163		// 1e9 ms == ~11.5 days.
164		timeout = 1e9
165	}
166retry:
167	lock(&mtxpoll)
168	lock(&mtxset)
169	pendingUpdates = 0
170	unlock(&mtxpoll)
171
172	n, e := poll(&pfds[0], uintptr(len(pfds)), timeout)
173	if n < 0 {
174		if e != _EINTR {
175			println("errno=", e, " len(pfds)=", len(pfds))
176			throw("poll failed")
177		}
178		unlock(&mtxset)
179		// If a timed sleep was interrupted, just return to
180		// recalculate how long we should sleep now.
181		if timeout > 0 {
182			return gList{}
183		}
184		goto retry
185	}
186	// Check if some descriptors need to be changed
187	if n != 0 && pfds[0].revents&(_POLLIN|_POLLHUP|_POLLERR) != 0 {
188		if delay != 0 {
189			// A netpollwakeup could be picked up by a
190			// non-blocking poll. Only clear the wakeup
191			// if blocking.
192			var b [1]byte
193			for read(rdwake, unsafe.Pointer(&b[0]), 1) == 1 {
194			}
195			atomic.Store(&netpollWakeSig, 0)
196		}
197		// Still look at the other fds even if the mode may have
198		// changed, as netpollBreak might have been called.
199		n--
200	}
201	var toRun gList
202	for i := 1; i < len(pfds) && n > 0; i++ {
203		pfd := &pfds[i]
204
205		var mode int32
206		if pfd.revents&(_POLLIN|_POLLHUP|_POLLERR) != 0 {
207			mode += 'r'
208			pfd.events &= ^_POLLIN
209		}
210		if pfd.revents&(_POLLOUT|_POLLHUP|_POLLERR) != 0 {
211			mode += 'w'
212			pfd.events &= ^_POLLOUT
213		}
214		if mode != 0 {
215			pds[i].everr = false
216			if pfd.revents == _POLLERR {
217				pds[i].everr = true
218			}
219			netpollready(&toRun, pds[i], mode)
220			n--
221		}
222	}
223	unlock(&mtxset)
224	return toRun
225}
226