1package zmq4
2
3import (
4	"errors"
5	"fmt"
6	"time"
7)
8
9type reactor_socket struct {
10	e State
11	f func(State) error
12}
13
14type reactor_channel struct {
15	ch    <-chan interface{}
16	f     func(interface{}) error
17	limit int
18}
19
20type Reactor struct {
21	sockets  map[*Socket]*reactor_socket
22	channels map[uint64]*reactor_channel
23	p        *Poller
24	idx      uint64
25	remove   []uint64
26	verbose  bool
27}
28
29/*
30Create a reactor to mix the handling of sockets and channels (timers or other channels).
31
32Example:
33
34    reactor := zmq.NewReactor()
35    reactor.AddSocket(socket1, zmq.POLLIN, socket1_handler)
36    reactor.AddSocket(socket2, zmq.POLLIN, socket2_handler)
37    reactor.AddChannelTime(time.Tick(time.Second), 1, ticker_handler)
38    reactor.Run(time.Second)
39
40Warning:
41
42There are problems with the reactor showing up with Go 1.14 (and later)
43such as data race occurrences and code lock-up. Using SetRetryAfterEINTR
44seems an effective fix, but at the moment there is no guaranty.
45*/
46func NewReactor() *Reactor {
47	r := &Reactor{
48		sockets:  make(map[*Socket]*reactor_socket),
49		channels: make(map[uint64]*reactor_channel),
50		p:        NewPoller(),
51		remove:   make([]uint64, 0),
52	}
53	return r
54}
55
56// Add socket handler to the reactor.
57//
58// You can have only one handler per socket. Adding a second one will remove the first.
59//
60// The handler receives the socket state as an argument: POLLIN, POLLOUT, or both.
61func (r *Reactor) AddSocket(soc *Socket, events State, handler func(State) error) {
62	r.RemoveSocket(soc)
63	r.sockets[soc] = &reactor_socket{e: events, f: handler}
64	r.p.Add(soc, events)
65}
66
67// Remove a socket handler from the reactor.
68func (r *Reactor) RemoveSocket(soc *Socket) {
69	if _, ok := r.sockets[soc]; ok {
70		delete(r.sockets, soc)
71		// rebuild poller
72		r.p = NewPoller()
73		for s, props := range r.sockets {
74			r.p.Add(s, props.e)
75		}
76	}
77}
78
79// Add channel handler to the reactor.
80//
81// Returns id of added handler, that can be used later to remove it.
82//
83// If limit is positive, at most this many items will be handled in each run through the main loop,
84// otherwise it will process as many items as possible.
85//
86// The handler function receives the value received from the channel.
87func (r *Reactor) AddChannel(ch <-chan interface{}, limit int, handler func(interface{}) error) (id uint64) {
88	r.idx++
89	id = r.idx
90	r.channels[id] = &reactor_channel{ch: ch, f: handler, limit: limit}
91	return
92}
93
94// This function wraps AddChannel, using a channel of type time.Time instead of type interface{}.
95func (r *Reactor) AddChannelTime(ch <-chan time.Time, limit int, handler func(interface{}) error) (id uint64) {
96	ch2 := make(chan interface{})
97	go func() {
98		for {
99			a, ok := <-ch
100			if !ok {
101				close(ch2)
102				break
103			}
104			ch2 <- a
105		}
106	}()
107	return r.AddChannel(ch2, limit, handler)
108}
109
110// Remove a channel from the reactor.
111//
112// Closed channels are removed automatically.
113func (r *Reactor) RemoveChannel(id uint64) {
114	r.remove = append(r.remove, id)
115}
116
117func (r *Reactor) SetVerbose(verbose bool) {
118	r.verbose = verbose
119}
120
121// Run the reactor.
122//
123// The interval determines the time-out on the polling of sockets.
124// Interval must be positive if there are channels.
125// If there are no channels, you can set interval to -1.
126//
127// The run alternates between polling/handling sockets (using the interval as timeout),
128// and reading/handling channels. The reading of channels is without time-out: if there
129// is no activity on any channel, the run continues to poll sockets immediately.
130//
131// The run exits when any handler returns an error, returning that same error.
132func (r *Reactor) Run(interval time.Duration) (err error) {
133	for {
134
135		// process requests to remove channels
136		for _, id := range r.remove {
137			delete(r.channels, id)
138		}
139		r.remove = r.remove[0:0]
140
141	CHANNELS:
142		for id, ch := range r.channels {
143			limit := ch.limit
144			for {
145				select {
146				case val, ok := <-ch.ch:
147					if !ok {
148						if r.verbose {
149							fmt.Printf("Reactor(%p) removing closed channel %d\n", r, id)
150						}
151						r.RemoveChannel(id)
152						continue CHANNELS
153					}
154					if r.verbose {
155						fmt.Printf("Reactor(%p) channel %d: %v\n", r, id, val)
156					}
157					err = ch.f(val)
158					if err != nil {
159						return
160					}
161					if ch.limit > 0 {
162						limit--
163						if limit == 0 {
164							continue CHANNELS
165						}
166					}
167				default:
168					continue CHANNELS
169				}
170			}
171		}
172
173		if len(r.channels) > 0 && interval < 0 {
174			return errors.New("There are channels, but polling time-out is infinite")
175		}
176
177		if len(r.sockets) == 0 {
178			if len(r.channels) == 0 {
179				return errors.New("No sockets to poll, no channels to read")
180			}
181			time.Sleep(interval)
182			continue
183		}
184
185		polled, e := r.p.Poll(interval)
186		if e != nil {
187			return e
188		}
189		for _, item := range polled {
190			if r.verbose {
191				fmt.Printf("Reactor(%p) %v\n", r, item)
192			}
193			err = r.sockets[item.Socket].f(item.Events)
194			if err != nil {
195				return
196			}
197		}
198	}
199	return
200}
201