1package zmq4 2 3import ( 4 "errors" 5 "fmt" 6 "time" 7) 8 9type reactor_socket struct { 10 e State 11 f func(State) error 12} 13 14type reactor_channel struct { 15 ch <-chan interface{} 16 f func(interface{}) error 17 limit int 18} 19 20type Reactor struct { 21 sockets map[*Socket]*reactor_socket 22 channels map[uint64]*reactor_channel 23 p *Poller 24 idx uint64 25 remove []uint64 26 verbose bool 27} 28 29/* 30Create a reactor to mix the handling of sockets and channels (timers or other channels). 31 32Example: 33 34 reactor := zmq.NewReactor() 35 reactor.AddSocket(socket1, zmq.POLLIN, socket1_handler) 36 reactor.AddSocket(socket2, zmq.POLLIN, socket2_handler) 37 reactor.AddChannelTime(time.Tick(time.Second), 1, ticker_handler) 38 reactor.Run(time.Second) 39 40Warning: 41 42There are problems with the reactor showing up with Go 1.14 (and later) 43such as data race occurrences and code lock-up. Using SetRetryAfterEINTR 44seems an effective fix, but at the moment there is no guaranty. 45*/ 46func NewReactor() *Reactor { 47 r := &Reactor{ 48 sockets: make(map[*Socket]*reactor_socket), 49 channels: make(map[uint64]*reactor_channel), 50 p: NewPoller(), 51 remove: make([]uint64, 0), 52 } 53 return r 54} 55 56// Add socket handler to the reactor. 57// 58// You can have only one handler per socket. Adding a second one will remove the first. 59// 60// The handler receives the socket state as an argument: POLLIN, POLLOUT, or both. 61func (r *Reactor) AddSocket(soc *Socket, events State, handler func(State) error) { 62 r.RemoveSocket(soc) 63 r.sockets[soc] = &reactor_socket{e: events, f: handler} 64 r.p.Add(soc, events) 65} 66 67// Remove a socket handler from the reactor. 68func (r *Reactor) RemoveSocket(soc *Socket) { 69 if _, ok := r.sockets[soc]; ok { 70 delete(r.sockets, soc) 71 // rebuild poller 72 r.p = NewPoller() 73 for s, props := range r.sockets { 74 r.p.Add(s, props.e) 75 } 76 } 77} 78 79// Add channel handler to the reactor. 80// 81// Returns id of added handler, that can be used later to remove it. 82// 83// If limit is positive, at most this many items will be handled in each run through the main loop, 84// otherwise it will process as many items as possible. 85// 86// The handler function receives the value received from the channel. 87func (r *Reactor) AddChannel(ch <-chan interface{}, limit int, handler func(interface{}) error) (id uint64) { 88 r.idx++ 89 id = r.idx 90 r.channels[id] = &reactor_channel{ch: ch, f: handler, limit: limit} 91 return 92} 93 94// This function wraps AddChannel, using a channel of type time.Time instead of type interface{}. 95func (r *Reactor) AddChannelTime(ch <-chan time.Time, limit int, handler func(interface{}) error) (id uint64) { 96 ch2 := make(chan interface{}) 97 go func() { 98 for { 99 a, ok := <-ch 100 if !ok { 101 close(ch2) 102 break 103 } 104 ch2 <- a 105 } 106 }() 107 return r.AddChannel(ch2, limit, handler) 108} 109 110// Remove a channel from the reactor. 111// 112// Closed channels are removed automatically. 113func (r *Reactor) RemoveChannel(id uint64) { 114 r.remove = append(r.remove, id) 115} 116 117func (r *Reactor) SetVerbose(verbose bool) { 118 r.verbose = verbose 119} 120 121// Run the reactor. 122// 123// The interval determines the time-out on the polling of sockets. 124// Interval must be positive if there are channels. 125// If there are no channels, you can set interval to -1. 126// 127// The run alternates between polling/handling sockets (using the interval as timeout), 128// and reading/handling channels. The reading of channels is without time-out: if there 129// is no activity on any channel, the run continues to poll sockets immediately. 130// 131// The run exits when any handler returns an error, returning that same error. 132func (r *Reactor) Run(interval time.Duration) (err error) { 133 for { 134 135 // process requests to remove channels 136 for _, id := range r.remove { 137 delete(r.channels, id) 138 } 139 r.remove = r.remove[0:0] 140 141 CHANNELS: 142 for id, ch := range r.channels { 143 limit := ch.limit 144 for { 145 select { 146 case val, ok := <-ch.ch: 147 if !ok { 148 if r.verbose { 149 fmt.Printf("Reactor(%p) removing closed channel %d\n", r, id) 150 } 151 r.RemoveChannel(id) 152 continue CHANNELS 153 } 154 if r.verbose { 155 fmt.Printf("Reactor(%p) channel %d: %v\n", r, id, val) 156 } 157 err = ch.f(val) 158 if err != nil { 159 return 160 } 161 if ch.limit > 0 { 162 limit-- 163 if limit == 0 { 164 continue CHANNELS 165 } 166 } 167 default: 168 continue CHANNELS 169 } 170 } 171 } 172 173 if len(r.channels) > 0 && interval < 0 { 174 return errors.New("There are channels, but polling time-out is infinite") 175 } 176 177 if len(r.sockets) == 0 { 178 if len(r.channels) == 0 { 179 return errors.New("No sockets to poll, no channels to read") 180 } 181 time.Sleep(interval) 182 continue 183 } 184 185 polled, e := r.p.Poll(interval) 186 if e != nil { 187 return e 188 } 189 for _, item := range polled { 190 if r.verbose { 191 fmt.Printf("Reactor(%p) %v\n", r, item) 192 } 193 err = r.sockets[item.Socket].f(item.Events) 194 if err != nil { 195 return 196 } 197 } 198 } 199 return 200} 201