1//
2//  Paranoid Pirate queue.
3//
4
5package main
6
7import (
8	zmq "github.com/pebbe/zmq4"
9
10	"fmt"
11	"time"
12)
13
14const (
15	HEARTBEAT_LIVENESS = 3                       //  3-5 is reasonable
16	HEARTBEAT_INTERVAL = 1000 * time.Millisecond //  msecs
17
18	PPP_READY     = "\001" //  Signals worker is ready
19	PPP_HEARTBEAT = "\002" //  Signals worker heartbeat
20)
21
22//  Here we define the worker class; a structure and a set of functions that
23//  as constructor, destructor, and methods on worker objects:
24
25type worker_t struct {
26	identity  string    //  Identity of worker
27	id_string string    //  Printable identity
28	expire    time.Time //  Expires at this time
29}
30
31//  Construct new worker
32func s_worker_new(identity string) worker_t {
33	return worker_t{
34		identity:  identity,
35		id_string: identity,
36		expire:    time.Now().Add(HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS),
37	}
38}
39
40//  The ready method puts a worker to the end of the ready list:
41
42func s_worker_ready(self worker_t, workers []worker_t) []worker_t {
43	for i, worker := range workers {
44		if self.id_string == worker.id_string {
45			if i == 0 {
46				workers = workers[1:]
47			} else if i == len(workers)-1 {
48				workers = workers[:i]
49			} else {
50				workers = append(workers[:i], workers[i+1:]...)
51			}
52			break
53		}
54	}
55	return append(workers, self)
56}
57
58//  The purge method looks for and kills expired workers. We hold workers
59//  from oldest to most recent, so we stop at the first alive worker:
60
61func s_workers_purge(workers []worker_t) []worker_t {
62	now := time.Now()
63	for i, worker := range workers {
64		if now.Before(worker.expire) {
65			return workers[i:] //  Worker is alive, we're done here
66		}
67	}
68	return workers[0:0]
69}
70
71//  The main task is a load-balancer with heartbeating on workers so we
72//  can detect crashed or blocked worker tasks:
73
74func main() {
75	frontend, _ := zmq.NewSocket(zmq.ROUTER)
76	backend, _ := zmq.NewSocket(zmq.ROUTER)
77	defer frontend.Close()
78	defer backend.Close()
79	frontend.Bind("tcp://*:5555") //  For clients
80	backend.Bind("tcp://*:5556")  //  For workers
81
82	//  List of available workers
83	workers := make([]worker_t, 0)
84
85	//  Send out heartbeats at regular intervals
86	heartbeat_at := time.Tick(HEARTBEAT_INTERVAL)
87
88	poller1 := zmq.NewPoller()
89	poller1.Add(backend, zmq.POLLIN)
90	poller2 := zmq.NewPoller()
91	poller2.Add(backend, zmq.POLLIN)
92	poller2.Add(frontend, zmq.POLLIN)
93
94	for {
95		//  Poll frontend only if we have available workers
96		var sockets []zmq.Polled
97		var err error
98		if len(workers) > 0 {
99			sockets, err = poller2.Poll(HEARTBEAT_INTERVAL)
100		} else {
101			sockets, err = poller1.Poll(HEARTBEAT_INTERVAL)
102		}
103		if err != nil {
104			break //  Interrupted
105		}
106
107		for _, socket := range sockets {
108			switch socket.Socket {
109			case backend:
110				//  Handle worker activity on backend
111				//  Use worker identity for load-balancing
112				msg, err := backend.RecvMessage(0)
113				if err != nil {
114					break //  Interrupted
115				}
116
117				//  Any sign of life from worker means it's ready
118				identity, msg := unwrap(msg)
119				workers = s_worker_ready(s_worker_new(identity), workers)
120
121				//  Validate control message, or return reply to client
122				if len(msg) == 1 {
123					if msg[0] != PPP_READY && msg[0] != PPP_HEARTBEAT {
124						fmt.Println("E: invalid message from worker", msg)
125					}
126				} else {
127					frontend.SendMessage(msg)
128				}
129			case frontend:
130				//  Now get next client request, route to next worker
131				msg, err := frontend.RecvMessage(0)
132				if err != nil {
133					break //  Interrupted
134				}
135				backend.SendMessage(workers[0].identity, msg)
136				workers = workers[1:]
137			}
138		}
139
140		//  We handle heartbeating after any socket activity. First we send
141		//  heartbeats to any idle workers if it's time. Then we purge any
142		//  dead workers:
143
144		select {
145		case <-heartbeat_at:
146			for _, worker := range workers {
147				backend.SendMessage(worker.identity, PPP_HEARTBEAT)
148			}
149		default:
150		}
151		workers = s_workers_purge(workers)
152	}
153}
154
155//  Pops frame off front of message and returns it as 'head'
156//  If next frame is empty, pops that empty frame.
157//  Return remaining frames of message as 'tail'
158func unwrap(msg []string) (head string, tail []string) {
159	head = msg[0]
160	if len(msg) > 1 && msg[1] == "" {
161		tail = msg[2:]
162	} else {
163		tail = msg[1:]
164	}
165	return
166}
167