1//
2//  Broker peering simulation (part 3).
3//  Prototypes the full flow of status and tasks
4//
5
6/*
7
8One of the differences between peering2 and peering3 is that
9peering2 always uses Poll() and then uses a helper function socketInPolled()
10to check if a specific socket returned a result, while peering3 uses PollAll()
11and checks the event state of the socket in a specific index in the list.
12
13*/
14
15package main
16
17import (
18	zmq "github.com/pebbe/zmq4"
19
20	"fmt"
21	"math/rand"
22	"os"
23	"strconv"
24	"strings"
25	"time"
26)
27
28const (
29	NBR_CLIENTS  = 10
30	NBR_WORKERS  = 5
31	WORKER_READY = "**READY**" //  Signals worker is ready
32)
33
34var (
35	//  Our own name; in practice this would be configured per node
36	self string
37)
38
39//  This is the client task. It issues a burst of requests and then
40//  sleeps for a few seconds. This simulates sporadic activity; when
41//  a number of clients are active at once, the local workers should
42//  be overloaded. The client uses a REQ socket for requests and also
43//  pushes statistics to the monitor socket:
44
45func client_task(i int) {
46	client, _ := zmq.NewSocket(zmq.REQ)
47	defer client.Close()
48	client.Connect("ipc://" + self + "-localfe.ipc")
49	monitor, _ := zmq.NewSocket(zmq.PUSH)
50	defer monitor.Close()
51	monitor.Connect("ipc://" + self + "-monitor.ipc")
52
53	poller := zmq.NewPoller()
54	poller.Add(client, zmq.POLLIN)
55	for {
56		time.Sleep(time.Duration(rand.Intn(5000)) * time.Millisecond)
57		for burst := rand.Intn(15); burst > 0; burst-- {
58			task_id := fmt.Sprintf("%04X-%s-%d", rand.Intn(0x10000), self, i)
59
60			//  Send request with random hex ID
61			client.Send(task_id, 0)
62
63			//  Wait max ten seconds for a reply, then complain
64			sockets, err := poller.Poll(10 * time.Second)
65			if err != nil {
66				break //  Interrupted
67			}
68
69			if len(sockets) == 1 {
70				reply, err := client.Recv(0)
71				if err != nil {
72					break //  Interrupted
73				}
74				//  Worker is supposed to answer us with our task id
75				id := strings.Fields(reply)[0]
76				if id != task_id {
77					panic("id != task_id")
78				}
79				monitor.Send(reply, 0)
80			} else {
81				monitor.Send("E: CLIENT EXIT - lost task "+task_id, 0)
82				return
83			}
84		}
85	}
86}
87
88//  This is the worker task, which uses a REQ socket to plug into the
89//  load-balancer. It's the same stub worker task you've seen in other
90//  examples:
91
92func worker_task(i int) {
93	worker, _ := zmq.NewSocket(zmq.REQ)
94	defer worker.Close()
95	worker.Connect("ipc://" + self + "-localbe.ipc")
96
97	//  Tell broker we're ready for work
98	worker.SendMessage(WORKER_READY)
99
100	//  Process messages as they arrive
101	for {
102		msg, err := worker.RecvMessage(0)
103		if err != nil {
104			break //  Interrupted
105		}
106
107		//  Workers are busy for 0/1 seconds
108		time.Sleep(time.Duration(rand.Intn(2)) * time.Second)
109		n := len(msg) - 1
110		worker.SendMessage(msg[:n], fmt.Sprintf("%s %s-%d", msg[n], self, i))
111	}
112}
113
114//  The main task begins by setting-up all its sockets. The local frontend
115//  talks to clients, and our local backend talks to workers. The cloud
116//  frontend talks to peer brokers as if they were clients, and the cloud
117//  backend talks to peer brokers as if they were workers. The state
118//  backend publishes regular state messages, and the state frontend
119//  subscribes to all state backends to collect these messages. Finally,
120//  we use a PULL monitor socket to collect printable messages from tasks:
121
122func main() {
123	//  First argument is this broker's name
124	//  Other arguments are our peers' names
125	//
126	if len(os.Args) < 2 {
127		fmt.Println("syntax: peering1 me {you}...")
128		os.Exit(1)
129	}
130	self = os.Args[1]
131	fmt.Printf("I: preparing broker at %s...\n", self)
132	rand.Seed(time.Now().UnixNano())
133
134	//  Prepare local frontend and backend
135	localfe, _ := zmq.NewSocket(zmq.ROUTER)
136	defer localfe.Close()
137	localfe.Bind("ipc://" + self + "-localfe.ipc")
138
139	localbe, _ := zmq.NewSocket(zmq.ROUTER)
140	defer localbe.Close()
141	localbe.Bind("ipc://" + self + "-localbe.ipc")
142
143	//  Bind cloud frontend to endpoint
144	cloudfe, _ := zmq.NewSocket(zmq.ROUTER)
145	defer cloudfe.Close()
146	cloudfe.SetIdentity(self)
147	cloudfe.Bind("ipc://" + self + "-cloud.ipc")
148
149	//  Connect cloud backend to all peers
150	cloudbe, _ := zmq.NewSocket(zmq.ROUTER)
151	defer cloudbe.Close()
152	cloudbe.SetIdentity(self)
153	for _, peer := range os.Args[2:] {
154		fmt.Printf("I: connecting to cloud frontend at '%s'\n", peer)
155		cloudbe.Connect("ipc://" + peer + "-cloud.ipc")
156	}
157	//  Bind state backend to endpoint
158	statebe, _ := zmq.NewSocket(zmq.PUB)
159	defer statebe.Close()
160	statebe.Bind("ipc://" + self + "-state.ipc")
161
162	//  Connect state frontend to all peers
163	statefe, _ := zmq.NewSocket(zmq.SUB)
164	defer statefe.Close()
165	statefe.SetSubscribe("")
166	for _, peer := range os.Args[2:] {
167		fmt.Printf("I: connecting to state backend at '%s'\n", peer)
168		statefe.Connect("ipc://" + peer + "-state.ipc")
169	}
170	//  Prepare monitor socket
171	monitor, _ := zmq.NewSocket(zmq.PULL)
172	defer monitor.Close()
173	monitor.Bind("ipc://" + self + "-monitor.ipc")
174
175	//  After binding and connecting all our sockets, we start our child
176	//  tasks - workers and clients:
177
178	for worker_nbr := 0; worker_nbr < NBR_WORKERS; worker_nbr++ {
179		go worker_task(worker_nbr)
180	}
181
182	//  Start local clients
183	for client_nbr := 0; client_nbr < NBR_CLIENTS; client_nbr++ {
184		go client_task(client_nbr)
185	}
186
187	//  Queue of available workers
188	local_capacity := 0
189	cloud_capacity := 0
190	workers := make([]string, 0)
191
192	primary := zmq.NewPoller()
193	primary.Add(localbe, zmq.POLLIN)
194	primary.Add(cloudbe, zmq.POLLIN)
195	primary.Add(statefe, zmq.POLLIN)
196	primary.Add(monitor, zmq.POLLIN)
197
198	secondary1 := zmq.NewPoller()
199	secondary1.Add(localfe, zmq.POLLIN)
200	secondary2 := zmq.NewPoller()
201	secondary2.Add(localfe, zmq.POLLIN)
202	secondary2.Add(cloudfe, zmq.POLLIN)
203
204	msg := make([]string, 0)
205	for {
206
207		//  If we have no workers ready, wait indefinitely
208		timeout := time.Duration(time.Second)
209		if local_capacity == 0 {
210			timeout = -1
211		}
212		sockets, err := primary.PollAll(timeout)
213		if err != nil {
214			break //  Interrupted
215		}
216
217		//  Track if capacity changes during this iteration
218		previous := local_capacity
219
220		//  Handle reply from local worker
221		msg = msg[0:0]
222
223		if sockets[0].Events&zmq.POLLIN != 0 { // 0 == localbe
224			msg, err = localbe.RecvMessage(0)
225			if err != nil {
226				break //  Interrupted
227			}
228			var identity string
229			identity, msg = unwrap(msg)
230			workers = append(workers, identity)
231			local_capacity++
232
233			//  If it's READY, don't route the message any further
234			if msg[0] == WORKER_READY {
235				msg = msg[0:0]
236			}
237		} else if sockets[1].Events&zmq.POLLIN != 0 { // 1 == cloudbe
238			//  Or handle reply from peer broker
239			msg, err = cloudbe.RecvMessage(0)
240			if err != nil {
241				break //  Interrupted
242			}
243			//  We don't use peer broker identity for anything
244			_, msg = unwrap(msg)
245		}
246
247		if len(msg) > 0 {
248
249			//  Route reply to cloud if it's addressed to a broker
250			to_broker := false
251			for _, peer := range os.Args[2:] {
252				if peer == msg[0] {
253					to_broker = true
254					break
255				}
256			}
257			if to_broker {
258				cloudfe.SendMessage(msg)
259			} else {
260				localfe.SendMessage(msg)
261			}
262		}
263
264		//  If we have input messages on our statefe or monitor sockets we
265		//  can process these immediately:
266
267		if sockets[2].Events&zmq.POLLIN != 0 { // 2 == statefe
268			var status string
269			m, _ := statefe.RecvMessage(0)
270			_, m = unwrap(m) // peer
271			status, _ = unwrap(m)
272			cloud_capacity, _ = strconv.Atoi(status)
273		}
274		if sockets[3].Events&zmq.POLLIN != 0 { // 3 == monitor
275			status, _ := monitor.Recv(0)
276			fmt.Println(status)
277		}
278		//  Now route as many clients requests as we can handle. If we have
279		//  local capacity we poll both localfe and cloudfe. If we have cloud
280		//  capacity only, we poll just localfe. We route any request locally
281		//  if we can, else we route to the cloud.
282
283		for local_capacity+cloud_capacity > 0 {
284			var sockets []zmq.Polled
285			var err error
286			if local_capacity > 0 {
287				sockets, err = secondary2.PollAll(0)
288			} else {
289				sockets, err = secondary1.PollAll(0)
290			}
291			if err != nil {
292				panic(err)
293			}
294
295			if sockets[0].Events&zmq.POLLIN != 0 { // 0 == localfe
296				msg, _ = localfe.RecvMessage(0)
297			} else if len(sockets) > 1 && sockets[1].Events&zmq.POLLIN != 0 { // 1 == cloudfe
298				msg, _ = cloudfe.RecvMessage(0)
299			} else {
300				break //  No work, go back to primary
301			}
302
303			if local_capacity > 0 {
304				localbe.SendMessage(workers[0], "", msg)
305				workers = workers[1:]
306				local_capacity--
307			} else {
308				//  Route to random broker peer
309				random_peer := rand.Intn(len(os.Args)-2) + 2
310				cloudbe.SendMessage(os.Args[random_peer], "", msg)
311			}
312		}
313		//  We broadcast capacity messages to other peers; to reduce chatter
314		//  we do this only if our capacity changed.
315
316		if local_capacity != previous {
317			//  We stick our own identity onto the envelope
318			//  Broadcast new capacity
319			statebe.SendMessage(self, "", local_capacity)
320		}
321	}
322}
323
324//  Pops frame off front of message and returns it as 'head'
325//  If next frame is empty, pops that empty frame.
326//  Return remaining frames of message as 'tail'
327func unwrap(msg []string) (head string, tail []string) {
328	head = msg[0]
329	if len(msg) > 1 && msg[1] == "" {
330		tail = msg[2:]
331	} else {
332		tail = msg[1:]
333	}
334	return
335}
336