1//  Interface class for Chapter 8.
2//  This implements an "interface" to our network of nodes.
3package intface
4
5import (
6	zmq "github.com/pebbe/zmq4"
7
8	"github.com/google/uuid"
9
10	"bytes"
11	"errors"
12	"net"
13	"time"
14)
15
16//  =====================================================================
17//  Synchronous part, works in our application thread
18
19//  ---------------------------------------------------------------------
20//  Structure of our class
21
22type Intface struct {
23	pipe *zmq.Socket //  Pipe through to agent
24}
25
26//  This is the thread that handles our real interface class
27
28//  Here is the constructor for the interface class.
29//  Note that the class has barely any properties, it is just an excuse
30//  to start the background thread, and a wrapper around zmsg_recv():
31
32func New() (iface *Intface) {
33	iface = &Intface{}
34	var err error
35	iface.pipe, err = zmq.NewSocket(zmq.PAIR)
36	if err != nil {
37		panic(err)
38	}
39	err = iface.pipe.Bind("inproc://iface")
40	if err != nil {
41		panic(err)
42	}
43	go iface.agent()
44	time.Sleep(100 * time.Millisecond)
45	return
46}
47
48//  Here we wait for a message from the interface. This returns
49//  us a []string, or error if interrupted:
50
51func (iface *Intface) Recv() (msg []string, err error) {
52	msg, err = iface.pipe.RecvMessage(0)
53	return
54}
55
56//  =====================================================================
57// //  Asynchronous part, works in the background
58
59//  This structure defines each peer that we discover and track:
60
61type peer_t struct {
62	uuid_bytes  []byte
63	uuid_string string
64	expires_at  time.Time
65}
66
67const (
68	PING_PORT_NUMBER = 9999
69	PING_INTERVAL    = 1000 * time.Millisecond //  Once per second
70	PEER_EXPIRY      = 5000 * time.Millisecond //  Five seconds and it's gone
71)
72
73//  We have a constructor for the peer class:
74
75func new_peer(uuid uuid.UUID) (peer *peer_t) {
76	uuid_bytes, _ := uuid.MarshalBinary()
77	peer = &peer_t{
78		uuid_bytes:  uuid_bytes,
79		uuid_string: uuid.String(),
80	}
81	return
82}
83
84//  Just resets the peers expiry time; we call this method
85//  whenever we get any activity from a peer.
86
87func (peer *peer_t) is_alive() {
88	peer.expires_at = time.Now().Add(PEER_EXPIRY)
89}
90
91//  This structure holds the context for our agent, so we can
92//  pass that around cleanly to methods which need it:
93
94type agent_t struct {
95	pipe        *zmq.Socket //  Pipe back to application
96	udp         *zmq.Socket
97	conn        *net.UDPConn
98	uuid_bytes  []byte //  Our UUID
99	uuid_string string
100	peers       map[string]*peer_t //  Hash of known peers, fast lookup
101}
102
103//  Now the constructor for our agent. Each interface
104//  has one agent object, which implements its background thread:
105
106func new_agent() (agent *agent_t) {
107
108	// push output from udp into zmq socket
109	addr := &net.UDPAddr{Port: PING_PORT_NUMBER, IP: net.IPv4allsys}
110	conn, e := net.ListenMulticastUDP("udp", nil, addr)
111	if e != nil {
112		panic(e)
113	}
114	go func() {
115		buffer := make([]byte, 1024)
116		udp, _ := zmq.NewSocket(zmq.PAIR)
117		udp.Bind("inproc://udp")
118		for {
119			if n, _, err := conn.ReadFrom(buffer); err == nil {
120				udp.SendBytes(buffer[:n], 0)
121			}
122		}
123	}()
124	time.Sleep(100 * time.Millisecond)
125
126	pipe, _ := zmq.NewSocket(zmq.PAIR)
127	pipe.Connect("inproc://iface")
128	udp, _ := zmq.NewSocket(zmq.PAIR)
129	udp.Connect("inproc://udp")
130
131	uuID := uuid.New()
132	uuid_bytes, _ := uuID.MarshalBinary()
133	agent = &agent_t{
134		pipe:        pipe,
135		udp:         udp,
136		conn:        conn,
137		uuid_bytes:  uuid_bytes,
138		uuid_string: uuID.String(),
139		peers:       make(map[string]*peer_t),
140	}
141
142	pipe.SendMessage("AGENT ", uuID.String())
143
144	return
145}
146
147//  Here we handle the different control messages from the front-end.
148
149func (agent *agent_t) control_message() (err error) {
150	//  Get the whole message off the pipe in one go
151	msg, e := agent.pipe.RecvMessage(0)
152	if e != nil {
153		return e
154	}
155	command := msg[0]
156
157	//  We don't actually implement any control commands yet
158	//  but if we did, this would be where we did it..
159	switch command {
160	case "EXAMPLE":
161	default:
162	}
163
164	return
165}
166
167//  This is how we handle a beacon coming into our UDP socket;
168//  this may be from other peers or an echo of our own broadcast
169//  beacon:
170
171func (agent *agent_t) handle_beacon() (err error) {
172
173	msg, err := agent.udp.RecvMessage(0)
174	if len(msg[0]) != 16 {
175		return errors.New("Not a uuid")
176	}
177
178	//  If we got a UUID and it's not our own beacon, we have a peer
179	uuid_bytes := []byte(msg[0])
180	if bytes.Compare(uuid_bytes, agent.uuid_bytes) != 0 {
181		//  Find or create peer via its UUID string
182		uuID, _ := uuid.FromBytes(uuid_bytes)
183		uuid_string := uuID.String()
184		peer, ok := agent.peers[uuid_string]
185		if !ok {
186			peer = new_peer(uuID)
187			agent.peers[uuid_string] = peer
188
189			//  Report peer joined the network
190			agent.pipe.SendMessage("JOINED", uuid_string)
191		}
192		//  Any activity from the peer means it's alive
193		peer.is_alive()
194	}
195	return
196}
197
198//  This method checks one peer item for expiry; if the peer hasn't
199//  sent us anything by now, it's 'dead' and we can delete it:
200
201func (agent *agent_t) reap_peer(peer *peer_t) {
202	if time.Now().After(peer.expires_at) {
203		//  Report peer left the network
204		agent.pipe.SendMessage("LEFT  ", peer.uuid_string)
205		delete(agent.peers, peer.uuid_string)
206	}
207}
208
209//  This is the main loop for the background agent. It uses zmq_poll
210//  to monitor the front-end pipe (commands from the API) and the
211//  back-end UDP handle (beacons):
212
213func (iface *Intface) agent() {
214	//  Create agent instance to pass around
215	agent := new_agent()
216
217	//  Send first beacon immediately
218	ping_at := time.Now()
219
220	poller := zmq.NewPoller()
221	poller.Add(agent.pipe, zmq.POLLIN)
222	poller.Add(agent.udp, zmq.POLLIN)
223
224	bcast := &net.UDPAddr{Port: PING_PORT_NUMBER, IP: net.IPv4bcast}
225	for {
226		timeout := ping_at.Add(time.Millisecond).Sub(time.Now())
227		if timeout < 0 {
228			timeout = 0
229		}
230		polled, err := poller.Poll(timeout)
231		if err != nil {
232			break
233		}
234
235		for _, item := range polled {
236			switch socket := item.Socket; socket {
237			case agent.pipe:
238				//  If we had activity on the pipe, go handle the control
239				//  message. Current code never sends control messages.
240				agent.control_message()
241
242			case agent.udp:
243				//  If we had input on the UDP socket, go process that
244				agent.handle_beacon()
245			}
246		}
247
248		//  If we passed the 1-second mark, broadcast our beacon
249		now := time.Now()
250		if now.After(ping_at) {
251			agent.conn.WriteTo(agent.uuid_bytes, bcast)
252			ping_at = now.Add(PING_INTERVAL)
253		}
254		//  Delete and report any expired peers
255		for _, peer := range agent.peers {
256			agent.reap_peer(peer)
257		}
258	}
259}
260