1// 2// Broker peering simulation (part 3). 3// Prototypes the full flow of status and tasks 4// 5 6/* 7 8One of the differences between peering2 and peering3 is that 9peering2 always uses Poll() and then uses a helper function socketInPolled() 10to check if a specific socket returned a result, while peering3 uses PollAll() 11and checks the event state of the socket in a specific index in the list. 12 13*/ 14 15package main 16 17import ( 18 zmq "github.com/pebbe/zmq4" 19 20 "fmt" 21 "math/rand" 22 "os" 23 "strconv" 24 "strings" 25 "time" 26) 27 28const ( 29 NBR_CLIENTS = 10 30 NBR_WORKERS = 5 31 WORKER_READY = "**READY**" // Signals worker is ready 32) 33 34var ( 35 // Our own name; in practice this would be configured per node 36 self string 37) 38 39// This is the client task. It issues a burst of requests and then 40// sleeps for a few seconds. This simulates sporadic activity; when 41// a number of clients are active at once, the local workers should 42// be overloaded. The client uses a REQ socket for requests and also 43// pushes statistics to the monitor socket: 44 45func client_task(i int) { 46 client, _ := zmq.NewSocket(zmq.REQ) 47 defer client.Close() 48 client.Connect("ipc://" + self + "-localfe.ipc") 49 monitor, _ := zmq.NewSocket(zmq.PUSH) 50 defer monitor.Close() 51 monitor.Connect("ipc://" + self + "-monitor.ipc") 52 53 poller := zmq.NewPoller() 54 poller.Add(client, zmq.POLLIN) 55 for { 56 time.Sleep(time.Duration(rand.Intn(5000)) * time.Millisecond) 57 for burst := rand.Intn(15); burst > 0; burst-- { 58 task_id := fmt.Sprintf("%04X-%s-%d", rand.Intn(0x10000), self, i) 59 60 // Send request with random hex ID 61 client.Send(task_id, 0) 62 63 // Wait max ten seconds for a reply, then complain 64 sockets, err := poller.Poll(10 * time.Second) 65 if err != nil { 66 break // Interrupted 67 } 68 69 if len(sockets) == 1 { 70 reply, err := client.Recv(0) 71 if err != nil { 72 break // Interrupted 73 } 74 // Worker is supposed to answer us with our task id 75 id := strings.Fields(reply)[0] 76 if id != task_id { 77 panic("id != task_id") 78 } 79 monitor.Send(reply, 0) 80 } else { 81 monitor.Send("E: CLIENT EXIT - lost task "+task_id, 0) 82 return 83 } 84 } 85 } 86} 87 88// This is the worker task, which uses a REQ socket to plug into the 89// load-balancer. It's the same stub worker task you've seen in other 90// examples: 91 92func worker_task(i int) { 93 worker, _ := zmq.NewSocket(zmq.REQ) 94 defer worker.Close() 95 worker.Connect("ipc://" + self + "-localbe.ipc") 96 97 // Tell broker we're ready for work 98 worker.SendMessage(WORKER_READY) 99 100 // Process messages as they arrive 101 for { 102 msg, err := worker.RecvMessage(0) 103 if err != nil { 104 break // Interrupted 105 } 106 107 // Workers are busy for 0/1 seconds 108 time.Sleep(time.Duration(rand.Intn(2)) * time.Second) 109 n := len(msg) - 1 110 worker.SendMessage(msg[:n], fmt.Sprintf("%s %s-%d", msg[n], self, i)) 111 } 112} 113 114// The main task begins by setting-up all its sockets. The local frontend 115// talks to clients, and our local backend talks to workers. The cloud 116// frontend talks to peer brokers as if they were clients, and the cloud 117// backend talks to peer brokers as if they were workers. The state 118// backend publishes regular state messages, and the state frontend 119// subscribes to all state backends to collect these messages. Finally, 120// we use a PULL monitor socket to collect printable messages from tasks: 121 122func main() { 123 // First argument is this broker's name 124 // Other arguments are our peers' names 125 // 126 if len(os.Args) < 2 { 127 fmt.Println("syntax: peering1 me {you}...") 128 os.Exit(1) 129 } 130 self = os.Args[1] 131 fmt.Printf("I: preparing broker at %s...\n", self) 132 rand.Seed(time.Now().UnixNano()) 133 134 // Prepare local frontend and backend 135 localfe, _ := zmq.NewSocket(zmq.ROUTER) 136 defer localfe.Close() 137 localfe.Bind("ipc://" + self + "-localfe.ipc") 138 139 localbe, _ := zmq.NewSocket(zmq.ROUTER) 140 defer localbe.Close() 141 localbe.Bind("ipc://" + self + "-localbe.ipc") 142 143 // Bind cloud frontend to endpoint 144 cloudfe, _ := zmq.NewSocket(zmq.ROUTER) 145 defer cloudfe.Close() 146 cloudfe.SetIdentity(self) 147 cloudfe.Bind("ipc://" + self + "-cloud.ipc") 148 149 // Connect cloud backend to all peers 150 cloudbe, _ := zmq.NewSocket(zmq.ROUTER) 151 defer cloudbe.Close() 152 cloudbe.SetIdentity(self) 153 for _, peer := range os.Args[2:] { 154 fmt.Printf("I: connecting to cloud frontend at '%s'\n", peer) 155 cloudbe.Connect("ipc://" + peer + "-cloud.ipc") 156 } 157 // Bind state backend to endpoint 158 statebe, _ := zmq.NewSocket(zmq.PUB) 159 defer statebe.Close() 160 statebe.Bind("ipc://" + self + "-state.ipc") 161 162 // Connect state frontend to all peers 163 statefe, _ := zmq.NewSocket(zmq.SUB) 164 defer statefe.Close() 165 statefe.SetSubscribe("") 166 for _, peer := range os.Args[2:] { 167 fmt.Printf("I: connecting to state backend at '%s'\n", peer) 168 statefe.Connect("ipc://" + peer + "-state.ipc") 169 } 170 // Prepare monitor socket 171 monitor, _ := zmq.NewSocket(zmq.PULL) 172 defer monitor.Close() 173 monitor.Bind("ipc://" + self + "-monitor.ipc") 174 175 // After binding and connecting all our sockets, we start our child 176 // tasks - workers and clients: 177 178 for worker_nbr := 0; worker_nbr < NBR_WORKERS; worker_nbr++ { 179 go worker_task(worker_nbr) 180 } 181 182 // Start local clients 183 for client_nbr := 0; client_nbr < NBR_CLIENTS; client_nbr++ { 184 go client_task(client_nbr) 185 } 186 187 // Queue of available workers 188 local_capacity := 0 189 cloud_capacity := 0 190 workers := make([]string, 0) 191 192 primary := zmq.NewPoller() 193 primary.Add(localbe, zmq.POLLIN) 194 primary.Add(cloudbe, zmq.POLLIN) 195 primary.Add(statefe, zmq.POLLIN) 196 primary.Add(monitor, zmq.POLLIN) 197 198 secondary1 := zmq.NewPoller() 199 secondary1.Add(localfe, zmq.POLLIN) 200 secondary2 := zmq.NewPoller() 201 secondary2.Add(localfe, zmq.POLLIN) 202 secondary2.Add(cloudfe, zmq.POLLIN) 203 204 msg := make([]string, 0) 205 for { 206 207 // If we have no workers ready, wait indefinitely 208 timeout := time.Duration(time.Second) 209 if local_capacity == 0 { 210 timeout = -1 211 } 212 sockets, err := primary.PollAll(timeout) 213 if err != nil { 214 break // Interrupted 215 } 216 217 // Track if capacity changes during this iteration 218 previous := local_capacity 219 220 // Handle reply from local worker 221 msg = msg[0:0] 222 223 if sockets[0].Events&zmq.POLLIN != 0 { // 0 == localbe 224 msg, err = localbe.RecvMessage(0) 225 if err != nil { 226 break // Interrupted 227 } 228 var identity string 229 identity, msg = unwrap(msg) 230 workers = append(workers, identity) 231 local_capacity++ 232 233 // If it's READY, don't route the message any further 234 if msg[0] == WORKER_READY { 235 msg = msg[0:0] 236 } 237 } else if sockets[1].Events&zmq.POLLIN != 0 { // 1 == cloudbe 238 // Or handle reply from peer broker 239 msg, err = cloudbe.RecvMessage(0) 240 if err != nil { 241 break // Interrupted 242 } 243 // We don't use peer broker identity for anything 244 _, msg = unwrap(msg) 245 } 246 247 if len(msg) > 0 { 248 249 // Route reply to cloud if it's addressed to a broker 250 to_broker := false 251 for _, peer := range os.Args[2:] { 252 if peer == msg[0] { 253 to_broker = true 254 break 255 } 256 } 257 if to_broker { 258 cloudfe.SendMessage(msg) 259 } else { 260 localfe.SendMessage(msg) 261 } 262 } 263 264 // If we have input messages on our statefe or monitor sockets we 265 // can process these immediately: 266 267 if sockets[2].Events&zmq.POLLIN != 0 { // 2 == statefe 268 var status string 269 m, _ := statefe.RecvMessage(0) 270 _, m = unwrap(m) // peer 271 status, _ = unwrap(m) 272 cloud_capacity, _ = strconv.Atoi(status) 273 } 274 if sockets[3].Events&zmq.POLLIN != 0 { // 3 == monitor 275 status, _ := monitor.Recv(0) 276 fmt.Println(status) 277 } 278 // Now route as many clients requests as we can handle. If we have 279 // local capacity we poll both localfe and cloudfe. If we have cloud 280 // capacity only, we poll just localfe. We route any request locally 281 // if we can, else we route to the cloud. 282 283 for local_capacity+cloud_capacity > 0 { 284 var sockets []zmq.Polled 285 var err error 286 if local_capacity > 0 { 287 sockets, err = secondary2.PollAll(0) 288 } else { 289 sockets, err = secondary1.PollAll(0) 290 } 291 if err != nil { 292 panic(err) 293 } 294 295 if sockets[0].Events&zmq.POLLIN != 0 { // 0 == localfe 296 msg, _ = localfe.RecvMessage(0) 297 } else if len(sockets) > 1 && sockets[1].Events&zmq.POLLIN != 0 { // 1 == cloudfe 298 msg, _ = cloudfe.RecvMessage(0) 299 } else { 300 break // No work, go back to primary 301 } 302 303 if local_capacity > 0 { 304 localbe.SendMessage(workers[0], "", msg) 305 workers = workers[1:] 306 local_capacity-- 307 } else { 308 // Route to random broker peer 309 random_peer := rand.Intn(len(os.Args)-2) + 2 310 cloudbe.SendMessage(os.Args[random_peer], "", msg) 311 } 312 } 313 // We broadcast capacity messages to other peers; to reduce chatter 314 // we do this only if our capacity changed. 315 316 if local_capacity != previous { 317 // We stick our own identity onto the envelope 318 // Broadcast new capacity 319 statebe.SendMessage(self, "", local_capacity) 320 } 321 } 322} 323 324// Pops frame off front of message and returns it as 'head' 325// If next frame is empty, pops that empty frame. 326// Return remaining frames of message as 'tail' 327func unwrap(msg []string) (head string, tail []string) { 328 head = msg[0] 329 if len(msg) > 1 && msg[1] == "" { 330 tail = msg[2:] 331 } else { 332 tail = msg[1:] 333 } 334 return 335} 336