1// 2// Load-balancing broker. 3// Clients and workers are shown here in-process 4// 5 6package main 7 8import ( 9 zmq "github.com/pebbe/zmq4" 10 11 "fmt" 12 //"math/rand" 13 "time" 14) 15 16const ( 17 NBR_CLIENTS = 10 18 NBR_WORKERS = 3 19) 20 21// Basic request-reply client using REQ socket 22// Since Go Send and Recv can handle 0MQ binary identities we 23// don't need printable text identity to allow routing. 24 25func client_task() { 26 client, _ := zmq.NewSocket(zmq.REQ) 27 defer client.Close() 28 // set_id(client) // Set a printable identity 29 client.Connect("ipc://frontend.ipc") 30 31 // Send request, get reply 32 client.Send("HELLO", 0) 33 reply, _ := client.Recv(0) 34 fmt.Println("Client:", reply) 35} 36 37// While this example runs in a single process, that is just to make 38// it easier to start and stop the example. 39// This is the worker task, using a REQ socket to do load-balancing. 40// Since Go Send and Recv can handle 0MQ binary identities we 41// don't need printable text identity to allow routing. 42 43func worker_task() { 44 worker, _ := zmq.NewSocket(zmq.REQ) 45 defer worker.Close() 46 // set_id(worker) 47 worker.Connect("ipc://backend.ipc") 48 49 // Tell broker we're ready for work 50 worker.Send("READY", 0) 51 52 for { 53 // Read and save all frames until we get an empty frame 54 // In this example there is only 1 but it could be more 55 identity, _ := worker.Recv(0) 56 empty, _ := worker.Recv(0) 57 if empty != "" { 58 panic(fmt.Sprintf("empty is not \"\": %q", empty)) 59 } 60 61 // Get request, send reply 62 request, _ := worker.Recv(0) 63 fmt.Println("Worker:", request) 64 65 worker.Send(identity, zmq.SNDMORE) 66 worker.Send("", zmq.SNDMORE) 67 worker.Send("OK", 0) 68 } 69} 70 71// This is the main task. It starts the clients and workers, and then 72// routes requests between the two layers. Workers signal READY when 73// they start; after that we treat them as ready when they reply with 74// a response back to a client. The load-balancing data structure is 75// just a queue of next available workers. 76 77func main() { 78 // Prepare our sockets 79 frontend, _ := zmq.NewSocket(zmq.ROUTER) 80 backend, _ := zmq.NewSocket(zmq.ROUTER) 81 defer frontend.Close() 82 defer backend.Close() 83 frontend.Bind("ipc://frontend.ipc") 84 backend.Bind("ipc://backend.ipc") 85 86 client_nbr := 0 87 for ; client_nbr < NBR_CLIENTS; client_nbr++ { 88 go client_task() 89 } 90 for worker_nbr := 0; worker_nbr < NBR_WORKERS; worker_nbr++ { 91 go worker_task() 92 } 93 94 // Here is the main loop for the least-recently-used queue. It has two 95 // sockets; a frontend for clients and a backend for workers. It polls 96 // the backend in all cases, and polls the frontend only when there are 97 // one or more workers ready. This is a neat way to use 0MQ's own queues 98 // to hold messages we're not ready to process yet. When we get a client 99 // reply, we pop the next available worker, and send the request to it, 100 // including the originating client identity. When a worker replies, we 101 // re-queue that worker, and we forward the reply to the original client, 102 // using the reply envelope. 103 104 // Queue of available workers 105 worker_queue := make([]string, 0, 10) 106 107 poller1 := zmq.NewPoller() 108 poller1.Add(backend, zmq.POLLIN) 109 poller2 := zmq.NewPoller() 110 poller2.Add(backend, zmq.POLLIN) 111 poller2.Add(frontend, zmq.POLLIN) 112 113 for client_nbr > 0 { 114 // Poll frontend only if we have available workers 115 var sockets []zmq.Polled 116 if len(worker_queue) > 0 { 117 sockets, _ = poller2.Poll(-1) 118 } else { 119 sockets, _ = poller1.Poll(-1) 120 } 121 for _, socket := range sockets { 122 switch socket.Socket { 123 case backend: 124 125 // Handle worker activity on backend 126 // Queue worker identity for load-balancing 127 worker_id, _ := backend.Recv(0) 128 if !(len(worker_queue) < NBR_WORKERS) { 129 panic("!(len(worker_queue) < NBR_WORKERS)") 130 } 131 worker_queue = append(worker_queue, worker_id) 132 133 // Second frame is empty 134 empty, _ := backend.Recv(0) 135 if empty != "" { 136 panic(fmt.Sprintf("empty is not \"\": %q", empty)) 137 } 138 139 // Third frame is READY or else a client reply identity 140 client_id, _ := backend.Recv(0) 141 142 // If client reply, send rest back to frontend 143 if client_id != "READY" { 144 empty, _ := backend.Recv(0) 145 if empty != "" { 146 panic(fmt.Sprintf("empty is not \"\": %q", empty)) 147 } 148 reply, _ := backend.Recv(0) 149 frontend.Send(client_id, zmq.SNDMORE) 150 frontend.Send("", zmq.SNDMORE) 151 frontend.Send(reply, 0) 152 client_nbr-- 153 } 154 155 case frontend: 156 // Here is how we handle a client request: 157 158 // Now get next client request, route to last-used worker 159 // Client request is [identity][empty][request] 160 client_id, _ := frontend.Recv(0) 161 empty, _ := frontend.Recv(0) 162 if empty != "" { 163 panic(fmt.Sprintf("empty is not \"\": %q", empty)) 164 } 165 request, _ := frontend.Recv(0) 166 167 backend.Send(worker_queue[0], zmq.SNDMORE) 168 backend.Send("", zmq.SNDMORE) 169 backend.Send(client_id, zmq.SNDMORE) 170 backend.Send("", zmq.SNDMORE) 171 backend.Send(request, 0) 172 173 // Dequeue and drop the next worker identity 174 worker_queue = worker_queue[1:] 175 176 } 177 } 178 } 179 180 time.Sleep(100 * time.Millisecond) 181} 182 183/* 184func set_id(soc *zmq.Socket) { 185 identity := fmt.Sprintf("%04X-%04X", rand.Intn(0x10000), rand.Intn(0x10000)) 186 soc.SetIdentity(identity) 187} 188*/ 189