1//  bstar - Binary Star reactor.
2package bstar
3
4import (
5	zmq "github.com/pebbe/zmq4"
6
7	"errors"
8	"log"
9	"strconv"
10	"time"
11)
12
13const (
14	PRIMARY = true
15	BACKUP  = false
16)
17
18//  States we can be in at any point in time
19type state_t int
20
21const (
22	_             = state_t(iota)
23	state_PRIMARY //  Primary, waiting for peer to connect
24	state_BACKUP  //  Backup, waiting for peer to connect
25	state_ACTIVE  //  Active - accepting connections
26	state_PASSIVE //  Passive - not accepting connections
27)
28
29//  Events, which start with the states our peer can be in
30type event_t int
31
32const (
33	_              = event_t(iota)
34	peer_PRIMARY   //  HA peer is pending primary
35	peer_BACKUP    //  HA peer is pending backup
36	peer_ACTIVE    //  HA peer is active
37	peer_PASSIVE   //  HA peer is passive
38	client_REQUEST //  Client makes request
39)
40
41//  Structure of our class
42
43type Bstar struct {
44	Reactor     *zmq.Reactor            //  Reactor loop
45	statepub    *zmq.Socket             //  State publisher
46	statesub    *zmq.Socket             //  State subscriber
47	state       state_t                 //  Current state
48	event       event_t                 //  Current event
49	peer_expiry time.Time               //  When peer is considered 'dead'
50	voter_fn    func(*zmq.Socket) error //  Voting socket handler
51	active_fn   func() error            //  Call when become active
52	passive_fn  func() error            //  Call when become passive
53}
54
55//  The finite-state machine is the same as in the proof-of-concept server.
56//  To understand this reactor in detail, first read the CZMQ zloop class.
57
58//  We send state information every this often
59//  If peer doesn't respond in two heartbeats, it is 'dead'
60const (
61	bstar_HEARTBEAT = 1000 * time.Millisecond //  In msecs
62)
63
64//  ---------------------------------------------------------------------
65//  Binary Star finite state machine (applies event to state)
66//  Returns error if there was an exception, nil if event was valid.
67
68func (bstar *Bstar) execute_fsm() (exception error) {
69	//  Primary server is waiting for peer to connect
70	//  Accepts client_REQUEST events in this state
71	if bstar.state == state_PRIMARY {
72		if bstar.event == peer_BACKUP {
73			log.Println("I: connected to backup (passive), ready as active")
74			bstar.state = state_ACTIVE
75			if bstar.active_fn != nil {
76				bstar.active_fn()
77			}
78		} else if bstar.event == peer_ACTIVE {
79			log.Println("I: connected to backup (active), ready as passive")
80			bstar.state = state_PASSIVE
81			if bstar.passive_fn != nil {
82				bstar.passive_fn()
83			}
84		} else if bstar.event == client_REQUEST {
85			// Allow client requests to turn us into the active if we've
86			// waited sufficiently long to believe the backup is not
87			// currently acting as active (i.e., after a failover)
88			if time.Now().After(bstar.peer_expiry) {
89				log.Println("I: request from client, ready as active")
90				bstar.state = state_ACTIVE
91				if bstar.active_fn != nil {
92					bstar.active_fn()
93				}
94			} else {
95				// Don't respond to clients yet - it's possible we're
96				// performing a failback and the backup is currently active
97				exception = errors.New("Exception")
98			}
99		}
100	} else if bstar.state == state_BACKUP {
101		//  Backup server is waiting for peer to connect
102		//  Rejects client_REQUEST events in this state
103		if bstar.event == peer_ACTIVE {
104			log.Println("I: connected to primary (active), ready as passive")
105			bstar.state = state_PASSIVE
106			if bstar.passive_fn != nil {
107				bstar.passive_fn()
108			}
109		} else if bstar.event == client_REQUEST {
110			exception = errors.New("Exception")
111		}
112	} else if bstar.state == state_ACTIVE {
113		//  Server is active
114		//  Accepts client_REQUEST events in this state
115		//  The only way out of ACTIVE is death
116		if bstar.event == peer_ACTIVE {
117			//  Two actives would mean split-brain
118			log.Println("E: fatal error - dual actives, aborting")
119			exception = errors.New("Exception")
120		}
121	} else if bstar.state == state_PASSIVE {
122		//  Server is passive
123		//  client_REQUEST events can trigger failover if peer looks dead
124		if bstar.event == peer_PRIMARY {
125			//  Peer is restarting - become active, peer will go passive
126			log.Println("I: primary (passive) is restarting, ready as active")
127			bstar.state = state_ACTIVE
128		} else if bstar.event == peer_BACKUP {
129			//  Peer is restarting - become active, peer will go passive
130			log.Println("I: backup (passive) is restarting, ready as active")
131			bstar.state = state_ACTIVE
132		} else if bstar.event == peer_PASSIVE {
133			//  Two passives would mean cluster would be non-responsive
134			log.Println("E: fatal error - dual passives, aborting")
135			exception = errors.New("Exception")
136		} else if bstar.event == client_REQUEST {
137			//  Peer becomes active if timeout has passed
138			//  It's the client request that triggers the failover
139			if time.Now().After(bstar.peer_expiry) {
140				//  If peer is dead, switch to the active state
141				log.Println("I: failover successful, ready as active")
142				bstar.state = state_ACTIVE
143			} else {
144				//  If peer is alive, reject connections
145				exception = errors.New("Exception")
146			}
147		}
148		//  Call state change handler if necessary
149		if bstar.state == state_ACTIVE && bstar.active_fn != nil {
150			bstar.active_fn()
151		}
152	}
153	return
154}
155
156func (bstar *Bstar) update_peer_expiry() {
157	bstar.peer_expiry = time.Now().Add(2 * bstar_HEARTBEAT)
158}
159
160//  ---------------------------------------------------------------------
161//  Reactor event handlers...
162
163//  Publish our state to peer
164func (bstar *Bstar) send_state() (err error) {
165	_, err = bstar.statepub.SendMessage(int(bstar.state))
166	return
167}
168
169//  Receive state from peer, execute finite state machine
170func (bstar *Bstar) recv_state() (err error) {
171	msg, err := bstar.statesub.RecvMessage(0)
172	if err == nil {
173		e, _ := strconv.Atoi(msg[0])
174		bstar.event = event_t(e)
175	}
176	return bstar.execute_fsm()
177}
178
179//  Application wants to speak to us, see if it's possible
180func (bstar *Bstar) voter_ready(socket *zmq.Socket) error {
181	//  If server can accept input now, call appl handler
182	bstar.event = client_REQUEST
183	err := bstar.execute_fsm()
184	if err == nil {
185		bstar.voter_fn(socket)
186	} else {
187		//  Destroy waiting message, no-one to read it
188		socket.RecvMessage(0)
189	}
190	return nil
191}
192
193//  This is the constructor for our bstar class. We have to tell it whether
194//  we're primary or backup server, and our local and remote endpoints to
195//  bind and connect to:
196
197func New(primary bool, local, remote string) (bstar *Bstar, err error) {
198
199	bstar = &Bstar{}
200
201	//  Initialize the Binary Star
202	bstar.Reactor = zmq.NewReactor()
203	if primary {
204		bstar.state = state_PRIMARY
205	} else {
206		bstar.state = state_BACKUP
207	}
208
209	//  Create publisher for state going to peer
210	bstar.statepub, err = zmq.NewSocket(zmq.PUB)
211	bstar.statepub.Bind(local)
212
213	//  Create subscriber for state coming from peer
214	bstar.statesub, err = zmq.NewSocket(zmq.SUB)
215	bstar.statesub.SetSubscribe("")
216	bstar.statesub.Connect(remote)
217
218	//  Set-up basic reactor events
219	bstar.Reactor.AddChannelTime(time.Tick(bstar_HEARTBEAT), 1,
220		func(i interface{}) error { return bstar.send_state() })
221	bstar.Reactor.AddSocket(bstar.statesub, zmq.POLLIN,
222		func(e zmq.State) error { return bstar.recv_state() })
223
224	return
225}
226
227//  The voter method registers a client voter socket. Messages received
228//  on this socket provide the client_REQUEST events for the Binary Star
229//  FSM and are passed to the provided application handler. We require
230//  exactly one voter per bstar instance:
231
232func (bstar *Bstar) Voter(endpoint string, socket_type zmq.Type, handler func(*zmq.Socket) error) {
233	//  Hold actual handler so we can call this later
234	socket, _ := zmq.NewSocket(socket_type)
235	socket.Bind(endpoint)
236	if bstar.voter_fn != nil {
237		panic("Double voter function")
238	}
239	bstar.voter_fn = handler
240	bstar.Reactor.AddSocket(socket, zmq.POLLIN,
241		func(e zmq.State) error { return bstar.voter_ready(socket) })
242}
243
244//  Register handlers to be called each time there's a state change:
245
246func (bstar *Bstar) NewActive(handler func() error) {
247	if bstar.active_fn != nil {
248		panic("Double Active")
249	}
250	bstar.active_fn = handler
251}
252
253func (bstar *Bstar) NewPassive(handler func() error) {
254	if bstar.passive_fn != nil {
255		panic("Double Passive")
256	}
257	bstar.passive_fn = handler
258}
259
260//  Enable/disable verbose tracing, for debugging:
261
262func (bstar *Bstar) SetVerbose(verbose bool) {
263	bstar.Reactor.SetVerbose(verbose)
264}
265
266//?  Finally, start the configured reactor. It will end if any handler
267//?  returns error to the reactor, or if the process receives SIGINT or SIGTERM:
268
269func (bstar *Bstar) Start() error {
270	if bstar.voter_fn == nil {
271		panic("Missing voter function")
272	}
273	bstar.update_peer_expiry()
274	return bstar.Reactor.Run(bstar_HEARTBEAT / 5)
275}
276