1// bstar - Binary Star reactor. 2package bstar 3 4import ( 5 zmq "github.com/pebbe/zmq4" 6 7 "errors" 8 "log" 9 "strconv" 10 "time" 11) 12 13const ( 14 PRIMARY = true 15 BACKUP = false 16) 17 18// States we can be in at any point in time 19type state_t int 20 21const ( 22 _ = state_t(iota) 23 state_PRIMARY // Primary, waiting for peer to connect 24 state_BACKUP // Backup, waiting for peer to connect 25 state_ACTIVE // Active - accepting connections 26 state_PASSIVE // Passive - not accepting connections 27) 28 29// Events, which start with the states our peer can be in 30type event_t int 31 32const ( 33 _ = event_t(iota) 34 peer_PRIMARY // HA peer is pending primary 35 peer_BACKUP // HA peer is pending backup 36 peer_ACTIVE // HA peer is active 37 peer_PASSIVE // HA peer is passive 38 client_REQUEST // Client makes request 39) 40 41// Structure of our class 42 43type Bstar struct { 44 Reactor *zmq.Reactor // Reactor loop 45 statepub *zmq.Socket // State publisher 46 statesub *zmq.Socket // State subscriber 47 state state_t // Current state 48 event event_t // Current event 49 peer_expiry time.Time // When peer is considered 'dead' 50 voter_fn func(*zmq.Socket) error // Voting socket handler 51 active_fn func() error // Call when become active 52 passive_fn func() error // Call when become passive 53} 54 55// The finite-state machine is the same as in the proof-of-concept server. 56// To understand this reactor in detail, first read the CZMQ zloop class. 57 58// We send state information every this often 59// If peer doesn't respond in two heartbeats, it is 'dead' 60const ( 61 bstar_HEARTBEAT = 1000 * time.Millisecond // In msecs 62) 63 64// --------------------------------------------------------------------- 65// Binary Star finite state machine (applies event to state) 66// Returns error if there was an exception, nil if event was valid. 67 68func (bstar *Bstar) execute_fsm() (exception error) { 69 // Primary server is waiting for peer to connect 70 // Accepts client_REQUEST events in this state 71 if bstar.state == state_PRIMARY { 72 if bstar.event == peer_BACKUP { 73 log.Println("I: connected to backup (passive), ready as active") 74 bstar.state = state_ACTIVE 75 if bstar.active_fn != nil { 76 bstar.active_fn() 77 } 78 } else if bstar.event == peer_ACTIVE { 79 log.Println("I: connected to backup (active), ready as passive") 80 bstar.state = state_PASSIVE 81 if bstar.passive_fn != nil { 82 bstar.passive_fn() 83 } 84 } else if bstar.event == client_REQUEST { 85 // Allow client requests to turn us into the active if we've 86 // waited sufficiently long to believe the backup is not 87 // currently acting as active (i.e., after a failover) 88 if time.Now().After(bstar.peer_expiry) { 89 log.Println("I: request from client, ready as active") 90 bstar.state = state_ACTIVE 91 if bstar.active_fn != nil { 92 bstar.active_fn() 93 } 94 } else { 95 // Don't respond to clients yet - it's possible we're 96 // performing a failback and the backup is currently active 97 exception = errors.New("Exception") 98 } 99 } 100 } else if bstar.state == state_BACKUP { 101 // Backup server is waiting for peer to connect 102 // Rejects client_REQUEST events in this state 103 if bstar.event == peer_ACTIVE { 104 log.Println("I: connected to primary (active), ready as passive") 105 bstar.state = state_PASSIVE 106 if bstar.passive_fn != nil { 107 bstar.passive_fn() 108 } 109 } else if bstar.event == client_REQUEST { 110 exception = errors.New("Exception") 111 } 112 } else if bstar.state == state_ACTIVE { 113 // Server is active 114 // Accepts client_REQUEST events in this state 115 // The only way out of ACTIVE is death 116 if bstar.event == peer_ACTIVE { 117 // Two actives would mean split-brain 118 log.Println("E: fatal error - dual actives, aborting") 119 exception = errors.New("Exception") 120 } 121 } else if bstar.state == state_PASSIVE { 122 // Server is passive 123 // client_REQUEST events can trigger failover if peer looks dead 124 if bstar.event == peer_PRIMARY { 125 // Peer is restarting - become active, peer will go passive 126 log.Println("I: primary (passive) is restarting, ready as active") 127 bstar.state = state_ACTIVE 128 } else if bstar.event == peer_BACKUP { 129 // Peer is restarting - become active, peer will go passive 130 log.Println("I: backup (passive) is restarting, ready as active") 131 bstar.state = state_ACTIVE 132 } else if bstar.event == peer_PASSIVE { 133 // Two passives would mean cluster would be non-responsive 134 log.Println("E: fatal error - dual passives, aborting") 135 exception = errors.New("Exception") 136 } else if bstar.event == client_REQUEST { 137 // Peer becomes active if timeout has passed 138 // It's the client request that triggers the failover 139 if time.Now().After(bstar.peer_expiry) { 140 // If peer is dead, switch to the active state 141 log.Println("I: failover successful, ready as active") 142 bstar.state = state_ACTIVE 143 } else { 144 // If peer is alive, reject connections 145 exception = errors.New("Exception") 146 } 147 } 148 // Call state change handler if necessary 149 if bstar.state == state_ACTIVE && bstar.active_fn != nil { 150 bstar.active_fn() 151 } 152 } 153 return 154} 155 156func (bstar *Bstar) update_peer_expiry() { 157 bstar.peer_expiry = time.Now().Add(2 * bstar_HEARTBEAT) 158} 159 160// --------------------------------------------------------------------- 161// Reactor event handlers... 162 163// Publish our state to peer 164func (bstar *Bstar) send_state() (err error) { 165 _, err = bstar.statepub.SendMessage(int(bstar.state)) 166 return 167} 168 169// Receive state from peer, execute finite state machine 170func (bstar *Bstar) recv_state() (err error) { 171 msg, err := bstar.statesub.RecvMessage(0) 172 if err == nil { 173 e, _ := strconv.Atoi(msg[0]) 174 bstar.event = event_t(e) 175 } 176 return bstar.execute_fsm() 177} 178 179// Application wants to speak to us, see if it's possible 180func (bstar *Bstar) voter_ready(socket *zmq.Socket) error { 181 // If server can accept input now, call appl handler 182 bstar.event = client_REQUEST 183 err := bstar.execute_fsm() 184 if err == nil { 185 bstar.voter_fn(socket) 186 } else { 187 // Destroy waiting message, no-one to read it 188 socket.RecvMessage(0) 189 } 190 return nil 191} 192 193// This is the constructor for our bstar class. We have to tell it whether 194// we're primary or backup server, and our local and remote endpoints to 195// bind and connect to: 196 197func New(primary bool, local, remote string) (bstar *Bstar, err error) { 198 199 bstar = &Bstar{} 200 201 // Initialize the Binary Star 202 bstar.Reactor = zmq.NewReactor() 203 if primary { 204 bstar.state = state_PRIMARY 205 } else { 206 bstar.state = state_BACKUP 207 } 208 209 // Create publisher for state going to peer 210 bstar.statepub, err = zmq.NewSocket(zmq.PUB) 211 bstar.statepub.Bind(local) 212 213 // Create subscriber for state coming from peer 214 bstar.statesub, err = zmq.NewSocket(zmq.SUB) 215 bstar.statesub.SetSubscribe("") 216 bstar.statesub.Connect(remote) 217 218 // Set-up basic reactor events 219 bstar.Reactor.AddChannelTime(time.Tick(bstar_HEARTBEAT), 1, 220 func(i interface{}) error { return bstar.send_state() }) 221 bstar.Reactor.AddSocket(bstar.statesub, zmq.POLLIN, 222 func(e zmq.State) error { return bstar.recv_state() }) 223 224 return 225} 226 227// The voter method registers a client voter socket. Messages received 228// on this socket provide the client_REQUEST events for the Binary Star 229// FSM and are passed to the provided application handler. We require 230// exactly one voter per bstar instance: 231 232func (bstar *Bstar) Voter(endpoint string, socket_type zmq.Type, handler func(*zmq.Socket) error) { 233 // Hold actual handler so we can call this later 234 socket, _ := zmq.NewSocket(socket_type) 235 socket.Bind(endpoint) 236 if bstar.voter_fn != nil { 237 panic("Double voter function") 238 } 239 bstar.voter_fn = handler 240 bstar.Reactor.AddSocket(socket, zmq.POLLIN, 241 func(e zmq.State) error { return bstar.voter_ready(socket) }) 242} 243 244// Register handlers to be called each time there's a state change: 245 246func (bstar *Bstar) NewActive(handler func() error) { 247 if bstar.active_fn != nil { 248 panic("Double Active") 249 } 250 bstar.active_fn = handler 251} 252 253func (bstar *Bstar) NewPassive(handler func() error) { 254 if bstar.passive_fn != nil { 255 panic("Double Passive") 256 } 257 bstar.passive_fn = handler 258} 259 260// Enable/disable verbose tracing, for debugging: 261 262func (bstar *Bstar) SetVerbose(verbose bool) { 263 bstar.Reactor.SetVerbose(verbose) 264} 265 266//? Finally, start the configured reactor. It will end if any handler 267//? returns error to the reactor, or if the process receives SIGINT or SIGTERM: 268 269func (bstar *Bstar) Start() error { 270 if bstar.voter_fn == nil { 271 panic("Missing voter function") 272 } 273 bstar.update_peer_expiry() 274 return bstar.Reactor.Run(bstar_HEARTBEAT / 5) 275} 276