1// 2// Paranoid Pirate queue. 3// 4 5package main 6 7import ( 8 zmq "github.com/pebbe/zmq4" 9 10 "fmt" 11 "time" 12) 13 14const ( 15 HEARTBEAT_LIVENESS = 3 // 3-5 is reasonable 16 HEARTBEAT_INTERVAL = 1000 * time.Millisecond // msecs 17 18 PPP_READY = "\001" // Signals worker is ready 19 PPP_HEARTBEAT = "\002" // Signals worker heartbeat 20) 21 22// Here we define the worker class; a structure and a set of functions that 23// as constructor, destructor, and methods on worker objects: 24 25type worker_t struct { 26 identity string // Identity of worker 27 id_string string // Printable identity 28 expire time.Time // Expires at this time 29} 30 31// Construct new worker 32func s_worker_new(identity string) worker_t { 33 return worker_t{ 34 identity: identity, 35 id_string: identity, 36 expire: time.Now().Add(HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS), 37 } 38} 39 40// The ready method puts a worker to the end of the ready list: 41 42func s_worker_ready(self worker_t, workers []worker_t) []worker_t { 43 for i, worker := range workers { 44 if self.id_string == worker.id_string { 45 if i == 0 { 46 workers = workers[1:] 47 } else if i == len(workers)-1 { 48 workers = workers[:i] 49 } else { 50 workers = append(workers[:i], workers[i+1:]...) 51 } 52 break 53 } 54 } 55 return append(workers, self) 56} 57 58// The purge method looks for and kills expired workers. We hold workers 59// from oldest to most recent, so we stop at the first alive worker: 60 61func s_workers_purge(workers []worker_t) []worker_t { 62 now := time.Now() 63 for i, worker := range workers { 64 if now.Before(worker.expire) { 65 return workers[i:] // Worker is alive, we're done here 66 } 67 } 68 return workers[0:0] 69} 70 71// The main task is a load-balancer with heartbeating on workers so we 72// can detect crashed or blocked worker tasks: 73 74func main() { 75 frontend, _ := zmq.NewSocket(zmq.ROUTER) 76 backend, _ := zmq.NewSocket(zmq.ROUTER) 77 defer frontend.Close() 78 defer backend.Close() 79 frontend.Bind("tcp://*:5555") // For clients 80 backend.Bind("tcp://*:5556") // For workers 81 82 // List of available workers 83 workers := make([]worker_t, 0) 84 85 // Send out heartbeats at regular intervals 86 heartbeat_at := time.Tick(HEARTBEAT_INTERVAL) 87 88 poller1 := zmq.NewPoller() 89 poller1.Add(backend, zmq.POLLIN) 90 poller2 := zmq.NewPoller() 91 poller2.Add(backend, zmq.POLLIN) 92 poller2.Add(frontend, zmq.POLLIN) 93 94 for { 95 // Poll frontend only if we have available workers 96 var sockets []zmq.Polled 97 var err error 98 if len(workers) > 0 { 99 sockets, err = poller2.Poll(HEARTBEAT_INTERVAL) 100 } else { 101 sockets, err = poller1.Poll(HEARTBEAT_INTERVAL) 102 } 103 if err != nil { 104 break // Interrupted 105 } 106 107 for _, socket := range sockets { 108 switch socket.Socket { 109 case backend: 110 // Handle worker activity on backend 111 // Use worker identity for load-balancing 112 msg, err := backend.RecvMessage(0) 113 if err != nil { 114 break // Interrupted 115 } 116 117 // Any sign of life from worker means it's ready 118 identity, msg := unwrap(msg) 119 workers = s_worker_ready(s_worker_new(identity), workers) 120 121 // Validate control message, or return reply to client 122 if len(msg) == 1 { 123 if msg[0] != PPP_READY && msg[0] != PPP_HEARTBEAT { 124 fmt.Println("E: invalid message from worker", msg) 125 } 126 } else { 127 frontend.SendMessage(msg) 128 } 129 case frontend: 130 // Now get next client request, route to next worker 131 msg, err := frontend.RecvMessage(0) 132 if err != nil { 133 break // Interrupted 134 } 135 backend.SendMessage(workers[0].identity, msg) 136 workers = workers[1:] 137 } 138 } 139 140 // We handle heartbeating after any socket activity. First we send 141 // heartbeats to any idle workers if it's time. Then we purge any 142 // dead workers: 143 144 select { 145 case <-heartbeat_at: 146 for _, worker := range workers { 147 backend.SendMessage(worker.identity, PPP_HEARTBEAT) 148 } 149 default: 150 } 151 workers = s_workers_purge(workers) 152 } 153} 154 155// Pops frame off front of message and returns it as 'head' 156// If next frame is empty, pops that empty frame. 157// Return remaining frames of message as 'tail' 158func unwrap(msg []string) (head string, tail []string) { 159 head = msg[0] 160 if len(msg) > 1 && msg[1] == "" { 161 tail = msg[2:] 162 } else { 163 tail = msg[1:] 164 } 165 return 166} 167