1//
2//  Load-balancing broker.
3//  Clients and workers are shown here in-process
4//
5
6package main
7
8import (
9	zmq "github.com/pebbe/zmq4"
10
11	"fmt"
12	//"math/rand"
13	"time"
14)
15
16const (
17	NBR_CLIENTS = 10
18	NBR_WORKERS = 3
19)
20
21//  Basic request-reply client using REQ socket
22//  Since Go Send and Recv can handle 0MQ binary identities we
23//  don't need printable text identity to allow routing.
24
25func client_task() {
26	client, _ := zmq.NewSocket(zmq.REQ)
27	defer client.Close()
28	// set_id(client) //  Set a printable identity
29	client.Connect("ipc://frontend.ipc")
30
31	//  Send request, get reply
32	client.Send("HELLO", 0)
33	reply, _ := client.Recv(0)
34	fmt.Println("Client:", reply)
35}
36
37//  While this example runs in a single process, that is just to make
38//  it easier to start and stop the example.
39//  This is the worker task, using a REQ socket to do load-balancing.
40//  Since Go Send and Recv can handle 0MQ binary identities we
41//  don't need printable text identity to allow routing.
42
43func worker_task() {
44	worker, _ := zmq.NewSocket(zmq.REQ)
45	defer worker.Close()
46	// set_id(worker)
47	worker.Connect("ipc://backend.ipc")
48
49	//  Tell broker we're ready for work
50	worker.Send("READY", 0)
51
52	for {
53		//  Read and save all frames until we get an empty frame
54		//  In this example there is only 1 but it could be more
55		identity, _ := worker.Recv(0)
56		empty, _ := worker.Recv(0)
57		if empty != "" {
58			panic(fmt.Sprintf("empty is not \"\": %q", empty))
59		}
60
61		//  Get request, send reply
62		request, _ := worker.Recv(0)
63		fmt.Println("Worker:", request)
64
65		worker.Send(identity, zmq.SNDMORE)
66		worker.Send("", zmq.SNDMORE)
67		worker.Send("OK", 0)
68	}
69}
70
71//  This is the main task. It starts the clients and workers, and then
72//  routes requests between the two layers. Workers signal READY when
73//  they start; after that we treat them as ready when they reply with
74//  a response back to a client. The load-balancing data structure is
75//  just a queue of next available workers.
76
77func main() {
78	//  Prepare our sockets
79	frontend, _ := zmq.NewSocket(zmq.ROUTER)
80	backend, _ := zmq.NewSocket(zmq.ROUTER)
81	defer frontend.Close()
82	defer backend.Close()
83	frontend.Bind("ipc://frontend.ipc")
84	backend.Bind("ipc://backend.ipc")
85
86	client_nbr := 0
87	for ; client_nbr < NBR_CLIENTS; client_nbr++ {
88		go client_task()
89	}
90	for worker_nbr := 0; worker_nbr < NBR_WORKERS; worker_nbr++ {
91		go worker_task()
92	}
93
94	//  Here is the main loop for the least-recently-used queue. It has two
95	//  sockets; a frontend for clients and a backend for workers. It polls
96	//  the backend in all cases, and polls the frontend only when there are
97	//  one or more workers ready. This is a neat way to use 0MQ's own queues
98	//  to hold messages we're not ready to process yet. When we get a client
99	//  reply, we pop the next available worker, and send the request to it,
100	//  including the originating client identity. When a worker replies, we
101	//  re-queue that worker, and we forward the reply to the original client,
102	//  using the reply envelope.
103
104	//  Queue of available workers
105	worker_queue := make([]string, 0, 10)
106
107	poller1 := zmq.NewPoller()
108	poller1.Add(backend, zmq.POLLIN)
109	poller2 := zmq.NewPoller()
110	poller2.Add(backend, zmq.POLLIN)
111	poller2.Add(frontend, zmq.POLLIN)
112
113	for client_nbr > 0 {
114		//  Poll frontend only if we have available workers
115		var sockets []zmq.Polled
116		if len(worker_queue) > 0 {
117			sockets, _ = poller2.Poll(-1)
118		} else {
119			sockets, _ = poller1.Poll(-1)
120		}
121		for _, socket := range sockets {
122			switch socket.Socket {
123			case backend:
124
125				//  Handle worker activity on backend
126				//  Queue worker identity for load-balancing
127				worker_id, _ := backend.Recv(0)
128				if !(len(worker_queue) < NBR_WORKERS) {
129					panic("!(len(worker_queue) < NBR_WORKERS)")
130				}
131				worker_queue = append(worker_queue, worker_id)
132
133				//  Second frame is empty
134				empty, _ := backend.Recv(0)
135				if empty != "" {
136					panic(fmt.Sprintf("empty is not \"\": %q", empty))
137				}
138
139				//  Third frame is READY or else a client reply identity
140				client_id, _ := backend.Recv(0)
141
142				//  If client reply, send rest back to frontend
143				if client_id != "READY" {
144					empty, _ := backend.Recv(0)
145					if empty != "" {
146						panic(fmt.Sprintf("empty is not \"\": %q", empty))
147					}
148					reply, _ := backend.Recv(0)
149					frontend.Send(client_id, zmq.SNDMORE)
150					frontend.Send("", zmq.SNDMORE)
151					frontend.Send(reply, 0)
152					client_nbr--
153				}
154
155			case frontend:
156				//  Here is how we handle a client request:
157
158				//  Now get next client request, route to last-used worker
159				//  Client request is [identity][empty][request]
160				client_id, _ := frontend.Recv(0)
161				empty, _ := frontend.Recv(0)
162				if empty != "" {
163					panic(fmt.Sprintf("empty is not \"\": %q", empty))
164				}
165				request, _ := frontend.Recv(0)
166
167				backend.Send(worker_queue[0], zmq.SNDMORE)
168				backend.Send("", zmq.SNDMORE)
169				backend.Send(client_id, zmq.SNDMORE)
170				backend.Send("", zmq.SNDMORE)
171				backend.Send(request, 0)
172
173				//  Dequeue and drop the next worker identity
174				worker_queue = worker_queue[1:]
175
176			}
177		}
178	}
179
180	time.Sleep(100 * time.Millisecond)
181}
182
183/*
184func set_id(soc *zmq.Socket) {
185	identity := fmt.Sprintf("%04X-%04X", rand.Intn(0x10000), rand.Intn(0x10000))
186	soc.SetIdentity(identity)
187}
188*/
189