1// Interface class for Chapter 8. 2// This implements an "interface" to our network of nodes. 3package intface 4 5import ( 6 zmq "github.com/pebbe/zmq4" 7 8 "github.com/google/uuid" 9 10 "bytes" 11 "errors" 12 "net" 13 "time" 14) 15 16// ===================================================================== 17// Synchronous part, works in our application thread 18 19// --------------------------------------------------------------------- 20// Structure of our class 21 22type Intface struct { 23 pipe *zmq.Socket // Pipe through to agent 24} 25 26// This is the thread that handles our real interface class 27 28// Here is the constructor for the interface class. 29// Note that the class has barely any properties, it is just an excuse 30// to start the background thread, and a wrapper around zmsg_recv(): 31 32func New() (iface *Intface) { 33 iface = &Intface{} 34 var err error 35 iface.pipe, err = zmq.NewSocket(zmq.PAIR) 36 if err != nil { 37 panic(err) 38 } 39 err = iface.pipe.Bind("inproc://iface") 40 if err != nil { 41 panic(err) 42 } 43 go iface.agent() 44 time.Sleep(100 * time.Millisecond) 45 return 46} 47 48// Here we wait for a message from the interface. This returns 49// us a []string, or error if interrupted: 50 51func (iface *Intface) Recv() (msg []string, err error) { 52 msg, err = iface.pipe.RecvMessage(0) 53 return 54} 55 56// ===================================================================== 57// // Asynchronous part, works in the background 58 59// This structure defines each peer that we discover and track: 60 61type peer_t struct { 62 uuid_bytes []byte 63 uuid_string string 64 expires_at time.Time 65} 66 67const ( 68 PING_PORT_NUMBER = 9999 69 PING_INTERVAL = 1000 * time.Millisecond // Once per second 70 PEER_EXPIRY = 5000 * time.Millisecond // Five seconds and it's gone 71) 72 73// We have a constructor for the peer class: 74 75func new_peer(uuid uuid.UUID) (peer *peer_t) { 76 uuid_bytes, _ := uuid.MarshalBinary() 77 peer = &peer_t{ 78 uuid_bytes: uuid_bytes, 79 uuid_string: uuid.String(), 80 } 81 return 82} 83 84// Just resets the peers expiry time; we call this method 85// whenever we get any activity from a peer. 86 87func (peer *peer_t) is_alive() { 88 peer.expires_at = time.Now().Add(PEER_EXPIRY) 89} 90 91// This structure holds the context for our agent, so we can 92// pass that around cleanly to methods which need it: 93 94type agent_t struct { 95 pipe *zmq.Socket // Pipe back to application 96 udp *zmq.Socket 97 conn *net.UDPConn 98 uuid_bytes []byte // Our UUID 99 uuid_string string 100 peers map[string]*peer_t // Hash of known peers, fast lookup 101} 102 103// Now the constructor for our agent. Each interface 104// has one agent object, which implements its background thread: 105 106func new_agent() (agent *agent_t) { 107 108 // push output from udp into zmq socket 109 addr := &net.UDPAddr{Port: PING_PORT_NUMBER, IP: net.IPv4allsys} 110 conn, e := net.ListenMulticastUDP("udp", nil, addr) 111 if e != nil { 112 panic(e) 113 } 114 go func() { 115 buffer := make([]byte, 1024) 116 udp, _ := zmq.NewSocket(zmq.PAIR) 117 udp.Bind("inproc://udp") 118 for { 119 if n, _, err := conn.ReadFrom(buffer); err == nil { 120 udp.SendBytes(buffer[:n], 0) 121 } 122 } 123 }() 124 time.Sleep(100 * time.Millisecond) 125 126 pipe, _ := zmq.NewSocket(zmq.PAIR) 127 pipe.Connect("inproc://iface") 128 udp, _ := zmq.NewSocket(zmq.PAIR) 129 udp.Connect("inproc://udp") 130 131 uuID := uuid.New() 132 uuid_bytes, _ := uuID.MarshalBinary() 133 agent = &agent_t{ 134 pipe: pipe, 135 udp: udp, 136 conn: conn, 137 uuid_bytes: uuid_bytes, 138 uuid_string: uuID.String(), 139 peers: make(map[string]*peer_t), 140 } 141 142 pipe.SendMessage("AGENT ", uuID.String()) 143 144 return 145} 146 147// Here we handle the different control messages from the front-end. 148 149func (agent *agent_t) control_message() (err error) { 150 // Get the whole message off the pipe in one go 151 msg, e := agent.pipe.RecvMessage(0) 152 if e != nil { 153 return e 154 } 155 command := msg[0] 156 157 // We don't actually implement any control commands yet 158 // but if we did, this would be where we did it.. 159 switch command { 160 case "EXAMPLE": 161 default: 162 } 163 164 return 165} 166 167// This is how we handle a beacon coming into our UDP socket; 168// this may be from other peers or an echo of our own broadcast 169// beacon: 170 171func (agent *agent_t) handle_beacon() (err error) { 172 173 msg, err := agent.udp.RecvMessage(0) 174 if len(msg[0]) != 16 { 175 return errors.New("Not a uuid") 176 } 177 178 // If we got a UUID and it's not our own beacon, we have a peer 179 uuid_bytes := []byte(msg[0]) 180 if bytes.Compare(uuid_bytes, agent.uuid_bytes) != 0 { 181 // Find or create peer via its UUID string 182 uuID, _ := uuid.FromBytes(uuid_bytes) 183 uuid_string := uuID.String() 184 peer, ok := agent.peers[uuid_string] 185 if !ok { 186 peer = new_peer(uuID) 187 agent.peers[uuid_string] = peer 188 189 // Report peer joined the network 190 agent.pipe.SendMessage("JOINED", uuid_string) 191 } 192 // Any activity from the peer means it's alive 193 peer.is_alive() 194 } 195 return 196} 197 198// This method checks one peer item for expiry; if the peer hasn't 199// sent us anything by now, it's 'dead' and we can delete it: 200 201func (agent *agent_t) reap_peer(peer *peer_t) { 202 if time.Now().After(peer.expires_at) { 203 // Report peer left the network 204 agent.pipe.SendMessage("LEFT ", peer.uuid_string) 205 delete(agent.peers, peer.uuid_string) 206 } 207} 208 209// This is the main loop for the background agent. It uses zmq_poll 210// to monitor the front-end pipe (commands from the API) and the 211// back-end UDP handle (beacons): 212 213func (iface *Intface) agent() { 214 // Create agent instance to pass around 215 agent := new_agent() 216 217 // Send first beacon immediately 218 ping_at := time.Now() 219 220 poller := zmq.NewPoller() 221 poller.Add(agent.pipe, zmq.POLLIN) 222 poller.Add(agent.udp, zmq.POLLIN) 223 224 bcast := &net.UDPAddr{Port: PING_PORT_NUMBER, IP: net.IPv4bcast} 225 for { 226 timeout := ping_at.Add(time.Millisecond).Sub(time.Now()) 227 if timeout < 0 { 228 timeout = 0 229 } 230 polled, err := poller.Poll(timeout) 231 if err != nil { 232 break 233 } 234 235 for _, item := range polled { 236 switch socket := item.Socket; socket { 237 case agent.pipe: 238 // If we had activity on the pipe, go handle the control 239 // message. Current code never sends control messages. 240 agent.control_message() 241 242 case agent.udp: 243 // If we had input on the UDP socket, go process that 244 agent.handle_beacon() 245 } 246 } 247 248 // If we passed the 1-second mark, broadcast our beacon 249 now := time.Now() 250 if now.After(ping_at) { 251 agent.conn.WriteTo(agent.uuid_bytes, bcast) 252 ping_at = now.Add(PING_INTERVAL) 253 } 254 // Delete and report any expired peers 255 for _, peer := range agent.peers { 256 agent.reap_peer(peer) 257 } 258 } 259} 260