1//
2//  Binary Star server proof-of-concept implementation. This server does no
3//  real work; it just demonstrates the Binary Star failover model.
4
5package main
6
7import (
8	zmq "github.com/pebbe/zmq4"
9
10	"fmt"
11	"os"
12	"strconv"
13	"time"
14)
15
16//  States we can be in at any point in time
17type state_t int
18
19const (
20	_             = state_t(iota)
21	STATE_PRIMARY //  Primary, waiting for peer to connect
22	STATE_BACKUP  //  Backup, waiting for peer to connect
23	STATE_ACTIVE  //  Active - accepting connections
24	STATE_PASSIVE //  Passive - not accepting connections
25)
26
27//  Events, which start with the states our peer can be in
28type event_t int
29
30const (
31	_              = event_t(iota)
32	PEER_PRIMARY   //  HA peer is pending primary
33	PEER_BACKUP    //  HA peer is pending backup
34	PEER_ACTIVE    //  HA peer is active
35	PEER_PASSIVE   //  HA peer is passive
36	CLIENT_REQUEST //  Client makes request
37)
38
39//  Our finite state machine
40type bstar_t struct {
41	state       state_t   //  Current state
42	event       event_t   //  Current event
43	peer_expiry time.Time //  When peer is considered 'dead'
44}
45
46//  We send state information every this often
47//  If peer doesn't respond in two heartbeats, it is 'dead'
48const (
49	HEARTBEAT = 1000 * time.Millisecond //  In msecs
50)
51
52//  The heart of the Binary Star design is its finite-state machine (FSM).
53//  The FSM runs one event at a time. We apply an event to the current state,
54//  which checks if the event is accepted, and if so sets a new state:
55
56func StateMachine(fsm *bstar_t) (exception bool) {
57	//  These are the PRIMARY and BACKUP states; we're waiting to become
58	//  ACTIVE or PASSIVE depending on events we get from our peer:
59	if fsm.state == STATE_PRIMARY {
60		if fsm.event == PEER_BACKUP {
61			fmt.Println("I: connected to backup (passive), ready as active")
62			fsm.state = STATE_ACTIVE
63		} else if fsm.event == PEER_ACTIVE {
64			fmt.Println("I: connected to backup (active), ready as passive")
65			fsm.state = STATE_PASSIVE
66		}
67		//  Accept client connections
68	} else if fsm.state == STATE_BACKUP {
69		if fsm.event == PEER_ACTIVE {
70			fmt.Println("I: connected to primary (active), ready as passive")
71			fsm.state = STATE_PASSIVE
72		} else if fsm.event == CLIENT_REQUEST {
73			//  Reject client connections when acting as backup
74			exception = true
75		}
76	} else if fsm.state == STATE_ACTIVE {
77		//  These are the ACTIVE and PASSIVE states:
78		if fsm.event == PEER_ACTIVE {
79			//  Two actives would mean split-brain
80			fmt.Println("E: fatal error - dual actives, aborting")
81			exception = true
82		}
83	} else if fsm.state == STATE_PASSIVE {
84		//  Server is passive
85		//  CLIENT_REQUEST events can trigger failover if peer looks dead
86		if fsm.event == PEER_PRIMARY {
87			//  Peer is restarting - become active, peer will go passive
88			fmt.Println("I: primary (passive) is restarting, ready as active")
89			fsm.state = STATE_ACTIVE
90		} else if fsm.event == PEER_BACKUP {
91			//  Peer is restarting - become active, peer will go passive
92			fmt.Println("I: backup (passive) is restarting, ready as active")
93			fsm.state = STATE_ACTIVE
94		} else if fsm.event == PEER_PASSIVE {
95			//  Two passives would mean cluster would be non-responsive
96			fmt.Println("E: fatal error - dual passives, aborting")
97			exception = true
98		} else if fsm.event == CLIENT_REQUEST {
99			//  Peer becomes active if timeout has passed
100			//  It's the client request that triggers the failover
101			if time.Now().After(fsm.peer_expiry) {
102				//  If peer is dead, switch to the active state
103				fmt.Println("I: failover successful, ready as active")
104				fsm.state = STATE_ACTIVE
105			} else {
106				//  If peer is alive, reject connections
107				exception = true
108			}
109		}
110	}
111	return
112}
113
114//  This is our main task. First we bind/connect our sockets with our
115//  peer and make sure we will get state messages correctly. We use
116//  three sockets; one to publish state, one to subscribe to state, and
117//  one for client requests/replies:
118
119func main() {
120	//  Arguments can be either of:
121	//      -p  primary server, at tcp://localhost:5001
122	//      -b  backup server, at tcp://localhost:5002
123	statepub, _ := zmq.NewSocket(zmq.PUB)
124	statesub, _ := zmq.NewSocket(zmq.SUB)
125	statesub.SetSubscribe("")
126	frontend, _ := zmq.NewSocket(zmq.ROUTER)
127	fsm := &bstar_t{peer_expiry: time.Now().Add(2 * HEARTBEAT)}
128
129	if len(os.Args) == 2 && os.Args[1] == "-p" {
130		fmt.Println("I: Primary active, waiting for backup (passive)")
131		frontend.Bind("tcp://*:5001")
132		statepub.Bind("tcp://*:5003")
133		statesub.Connect("tcp://localhost:5004")
134		fsm.state = STATE_PRIMARY
135	} else if len(os.Args) == 2 && os.Args[1] == "-b" {
136		fmt.Println("I: Backup passive, waiting for primary (active)")
137		frontend.Bind("tcp://*:5002")
138		statepub.Bind("tcp://*:5004")
139		statesub.Connect("tcp://localhost:5003")
140		fsm.state = STATE_BACKUP
141	} else {
142		fmt.Println("Usage: bstarsrv { -p | -b }")
143		return
144	}
145	//  We now process events on our two input sockets, and process these
146	//  events one at a time via our finite-state machine. Our "work" for
147	//  a client request is simply to echo it back:
148
149	//  Set timer for next outgoing state message
150	send_state_at := time.Now().Add(HEARTBEAT)
151
152	poller := zmq.NewPoller()
153	poller.Add(frontend, zmq.POLLIN)
154	poller.Add(statesub, zmq.POLLIN)
155
156LOOP:
157	for {
158		time_left := send_state_at.Sub(time.Now())
159		if time_left < 0 {
160			time_left = 0
161		}
162		polled, err := poller.Poll(time_left)
163		if err != nil {
164			break //  Context has been shut down
165		}
166		for _, socket := range polled {
167			switch socket.Socket {
168			case frontend:
169				//  Have a client request
170				msg, _ := frontend.RecvMessage(0)
171				fsm.event = CLIENT_REQUEST
172				if !StateMachine(fsm) {
173					//  Answer client by echoing request back
174					frontend.SendMessage(msg)
175				}
176			case statesub:
177				//  Have state from our peer, execute as event
178				message, _ := statesub.RecvMessage(0)
179				i, _ := strconv.Atoi(message[0])
180				fsm.event = event_t(i)
181				if StateMachine(fsm) {
182					break LOOP //  Error, so exit
183				}
184				fsm.peer_expiry = time.Now().Add(2 * HEARTBEAT)
185			}
186		}
187		//  If we timed-out, send state to peer
188		if time.Now().After(send_state_at) {
189			statepub.SendMessage(int(fsm.state))
190			send_state_at = time.Now().Add(HEARTBEAT)
191		}
192	}
193	fmt.Println("W: interrupted")
194}
195