1// Copyright 2018 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package runtime
6
7import (
8	"runtime/internal/atomic"
9	"unsafe"
10)
11
12// This is based on the former libgo/runtime/netpoll_select.c implementation
13// except that it uses poll instead of select and is written in Go.
14// It's also based on Solaris implementation for the arming mechanisms
15
16//go:noescape
17//extern poll
18func libc_poll(pfds *pollfd, npfds uintptr, timeout uintptr) int32
19
20// pollfd represents the poll structure for AIX operating system.
21type pollfd struct {
22	fd      int32
23	events  int16
24	revents int16
25}
26
27const _POLLIN = 0x0001
28const _POLLOUT = 0x0002
29const _POLLHUP = 0x2000
30const _POLLERR = 0x4000
31
32var (
33	pfds           []pollfd
34	pds            []*pollDesc
35	mtxpoll        mutex
36	mtxset         mutex
37	rdwake         int32
38	wrwake         int32
39	pendingUpdates int32
40
41	netpollWakeSig uint32 // used to avoid duplicate calls of netpollBreak
42)
43
44func netpollinit() {
45	// Create the pipe we use to wakeup poll.
46	r, w, errno := nonblockingPipe()
47	if errno != 0 {
48		throw("netpollinit: failed to create pipe")
49	}
50	rdwake = r
51	wrwake = w
52
53	// Pre-allocate array of pollfd structures for poll.
54	pfds = make([]pollfd, 1, 128)
55
56	// Poll the read side of the pipe.
57	pfds[0].fd = rdwake
58	pfds[0].events = _POLLIN
59
60	pds = make([]*pollDesc, 1, 128)
61	pds[0] = nil
62}
63
64func netpollIsPollDescriptor(fd uintptr) bool {
65	return fd == uintptr(rdwake) || fd == uintptr(wrwake)
66}
67
68// netpollwakeup writes on wrwake to wakeup poll before any changes.
69func netpollwakeup() {
70	if pendingUpdates == 0 {
71		pendingUpdates = 1
72		b := [1]byte{0}
73		write(uintptr(wrwake), unsafe.Pointer(&b[0]), 1)
74	}
75}
76
77func netpollopen(fd uintptr, pd *pollDesc) int32 {
78	lock(&mtxpoll)
79	netpollwakeup()
80
81	lock(&mtxset)
82	unlock(&mtxpoll)
83
84	pd.user = uint32(len(pfds))
85	pfds = append(pfds, pollfd{fd: int32(fd)})
86	pds = append(pds, pd)
87	unlock(&mtxset)
88	return 0
89}
90
91func netpollclose(fd uintptr) int32 {
92	lock(&mtxpoll)
93	netpollwakeup()
94
95	lock(&mtxset)
96	unlock(&mtxpoll)
97
98	for i := 0; i < len(pfds); i++ {
99		if pfds[i].fd == int32(fd) {
100			pfds[i] = pfds[len(pfds)-1]
101			pfds = pfds[:len(pfds)-1]
102
103			pds[i] = pds[len(pds)-1]
104			pds[i].user = uint32(i)
105			pds = pds[:len(pds)-1]
106			break
107		}
108	}
109	unlock(&mtxset)
110	return 0
111}
112
113func netpollarm(pd *pollDesc, mode int) {
114	lock(&mtxpoll)
115	netpollwakeup()
116
117	lock(&mtxset)
118	unlock(&mtxpoll)
119
120	switch mode {
121	case 'r':
122		pfds[pd.user].events |= _POLLIN
123	case 'w':
124		pfds[pd.user].events |= _POLLOUT
125	}
126	unlock(&mtxset)
127}
128
129// netpollBreak interrupts a poll.
130func netpollBreak() {
131	if atomic.Cas(&netpollWakeSig, 0, 1) {
132		b := [1]byte{0}
133		write(uintptr(wrwake), unsafe.Pointer(&b[0]), 1)
134	}
135}
136
137// netpoll checks for ready network connections.
138// Returns list of goroutines that become runnable.
139// delay < 0: blocks indefinitely
140// delay == 0: does not block, just polls
141// delay > 0: block for up to that many nanoseconds
142//go:nowritebarrierrec
143func netpoll(delay int64) gList {
144	var timeout uintptr
145	if delay < 0 {
146		timeout = ^uintptr(0)
147	} else if delay == 0 {
148		// TODO: call poll with timeout == 0
149		return gList{}
150	} else if delay < 1e6 {
151		timeout = 1
152	} else if delay < 1e15 {
153		timeout = uintptr(delay / 1e6)
154	} else {
155		// An arbitrary cap on how long to wait for a timer.
156		// 1e9 ms == ~11.5 days.
157		timeout = 1e9
158	}
159retry:
160	lock(&mtxpoll)
161	lock(&mtxset)
162	pendingUpdates = 0
163	unlock(&mtxpoll)
164
165	n := libc_poll(&pfds[0], uintptr(len(pfds)), timeout)
166	if n < 0 {
167		e := errno()
168		if e != _EINTR {
169			println("errno=", e, " len(pfds)=", len(pfds))
170			throw("poll failed")
171		}
172		unlock(&mtxset)
173		// If a timed sleep was interrupted, just return to
174		// recalculate how long we should sleep now.
175		if timeout > 0 {
176			return gList{}
177		}
178		goto retry
179	}
180	// Check if some descriptors need to be changed
181	if n != 0 && pfds[0].revents&(_POLLIN|_POLLHUP|_POLLERR) != 0 {
182		if delay != 0 {
183			// A netpollwakeup could be picked up by a
184			// non-blocking poll. Only clear the wakeup
185			// if blocking.
186			var b [1]byte
187			for read(rdwake, unsafe.Pointer(&b[0]), 1) == 1 {
188			}
189			atomic.Store(&netpollWakeSig, 0)
190		}
191		// Still look at the other fds even if the mode may have
192		// changed, as netpollBreak might have been called.
193		n--
194	}
195	var toRun gList
196	for i := 1; i < len(pfds) && n > 0; i++ {
197		pfd := &pfds[i]
198
199		var mode int32
200		if pfd.revents&(_POLLIN|_POLLHUP|_POLLERR) != 0 {
201			mode += 'r'
202			pfd.events &= ^_POLLIN
203		}
204		if pfd.revents&(_POLLOUT|_POLLHUP|_POLLERR) != 0 {
205			mode += 'w'
206			pfd.events &= ^_POLLOUT
207		}
208		if mode != 0 {
209			pds[i].everr = false
210			if pfd.revents == _POLLERR {
211				pds[i].everr = true
212			}
213			netpollready(&toRun, pds[i], mode)
214			n--
215		}
216	}
217	unlock(&mtxset)
218	return toRun
219}
220