1// +build linux
2
3package singlepoll
4
5import (
6	"context"
7	"errors"
8	"golang.org/x/sys/unix"
9	"sync"
10
11	"github.com/gxed/eventfd"
12	logging "github.com/ipfs/go-log"
13)
14
15var (
16	ErrUnsupportedMode error = errors.New("only 'w' and 'r' modes are supported on this arch")
17)
18
19var (
20	initOnce sync.Once
21	workChan chan interface{}
22	log      logging.EventLogger = logging.Logger("reuseport-poll")
23)
24
25type addPoll struct {
26	fd     int
27	events uint32
28	ctx    context.Context
29	wakeUp chan<- error
30}
31
32type ctxDone struct {
33	fd int
34}
35
36func PollPark(reqctx context.Context, fd int, mode string) error {
37	initOnce.Do(func() {
38		workChan = make(chan interface{}, 128)
39		go worker()
40	})
41
42	events := uint32(0)
43	for _, c := range mode {
44		switch c {
45		case 'w':
46			events |= unix.EPOLLOUT
47		case 'r':
48			events |= unix.EPOLLIN
49		default:
50			return ErrUnsupportedMode
51		}
52	}
53
54	wakeUp := make(chan error)
55	workChan <- addPoll{
56		fd:     fd,
57		events: events,
58		ctx:    reqctx,
59		wakeUp: wakeUp,
60	}
61
62	return <-wakeUp
63}
64
65func criticalError(msg string, err error) {
66	log.Errorf("%s: %s.", msg, err.Error())
67	log.Errorf("This is critical error, please report it at https://github.com/libp2p/go-reuseport/issues/new")
68	log.Errorf("Bailing out. You are on your own. Good luck.")
69
70	for {
71		select {
72		case <-backgroundctx.Done():
73			return
74		case unit := <-workChan:
75			switch u := unit.(type) {
76			case addPoll:
77				u.wakeUp <- err
78			default:
79			}
80		}
81	}
82}
83
84func worker() {
85	epfd, err := unix.EpollCreate1(0)
86	if err != nil {
87		criticalError("EpollCreate1(0) failed", err)
88	}
89	evfd, err := eventfd.New()
90	if err != nil {
91		criticalError("eventfd.New() failed", err)
92	}
93
94	pool := make(map[int]addPoll)
95
96	{
97		event := unix.EpollEvent{
98			Events: unix.EPOLLIN,
99			Fd:     int32(evfd.Fd()),
100		}
101		unix.EpollCtl(epfd, unix.EPOLL_CTL_ADD, evfd.Fd(), &event)
102	}
103	go poller(epfd, evfd)
104
105	remove := func(fd int) *addPoll {
106		unit, ok := pool[fd]
107		if ok {
108			unix.EpollCtl(epfd, unix.EPOLL_CTL_DEL, unit.fd, nil)
109			delete(pool, fd)
110			close(unit.wakeUp)
111		}
112		return &unit
113	}
114	for {
115		select {
116		case <-backgroundctx.Done():
117			evfd.WriteEvents(1)
118			return
119		case unit := <-workChan:
120			switch u := unit.(type) {
121			case addPoll:
122				event := unix.EpollEvent{
123					Events: u.events | unix.EPOLLONESHOT,
124					Fd:     int32(u.fd),
125				}
126
127				// Make copies for *I* before we add it to Epoll group
128				wrapWakeUp := make(chan error)
129				wakeUp := u.wakeUp
130				u.wakeUp = wrapWakeUp
131
132				if _, ok := pool[u.fd]; ok {
133					panic("duplicate fd") // safe guard against bad close calls
134				}
135				pool[u.fd] = u
136
137				err := unix.EpollCtl(epfd, unix.EPOLL_CTL_ADD, u.fd, &event)
138				if err != nil {
139					delete(pool, u.fd)
140					u.wakeUp <- err
141				}
142
143				// *I*
144				reqCtx := u.ctx
145				fd := u.fd
146
147				go func() {
148					select {
149					case err := <-wrapWakeUp:
150						wakeUp <- err
151					case <-reqCtx.Done():
152						workChan <- ctxDone{
153							fd: fd,
154						}
155						<-wrapWakeUp
156						wakeUp <- reqCtx.Err()
157					}
158				}()
159
160			case []unix.EpollEvent:
161				for _, event := range u {
162					remove(int(event.Fd))
163				}
164			case ctxDone:
165				remove(u.fd)
166			}
167		}
168	}
169
170}
171
172func poller(epfd int, evfd *eventfd.EventFD) {
173	for {
174		// do not reuse the array as we will be passing it over channel
175		// 128 is quite arbitrary
176		// to small and number of EpollWait calls would increase
177		// to big and GC overhead increases
178		events := make([]unix.EpollEvent, 128)
179		n, err := unix.EpollWait(epfd, events, -1)
180
181		switch err {
182		case nil:
183			// everything is great
184		case unix.EINTR:
185			// ignore
186			continue
187		default:
188			// log
189			log.Errorf("EpollWait returned error: %s. Continuing.", err.Error())
190			continue
191		}
192
193		for i := 0; i < n; i++ {
194			if int(events[i].Fd) == evfd.Fd() {
195				unix.Close(epfd)
196				evfd.Close()
197				return
198			}
199		}
200		workChan <- events[:n]
201	}
202}
203