1// 2// Binary Star server proof-of-concept implementation. This server does no 3// real work; it just demonstrates the Binary Star failover model. 4 5package main 6 7import ( 8 zmq "github.com/pebbe/zmq4" 9 10 "fmt" 11 "os" 12 "strconv" 13 "time" 14) 15 16// States we can be in at any point in time 17type state_t int 18 19const ( 20 _ = state_t(iota) 21 STATE_PRIMARY // Primary, waiting for peer to connect 22 STATE_BACKUP // Backup, waiting for peer to connect 23 STATE_ACTIVE // Active - accepting connections 24 STATE_PASSIVE // Passive - not accepting connections 25) 26 27// Events, which start with the states our peer can be in 28type event_t int 29 30const ( 31 _ = event_t(iota) 32 PEER_PRIMARY // HA peer is pending primary 33 PEER_BACKUP // HA peer is pending backup 34 PEER_ACTIVE // HA peer is active 35 PEER_PASSIVE // HA peer is passive 36 CLIENT_REQUEST // Client makes request 37) 38 39// Our finite state machine 40type bstar_t struct { 41 state state_t // Current state 42 event event_t // Current event 43 peer_expiry time.Time // When peer is considered 'dead' 44} 45 46// We send state information every this often 47// If peer doesn't respond in two heartbeats, it is 'dead' 48const ( 49 HEARTBEAT = 1000 * time.Millisecond // In msecs 50) 51 52// The heart of the Binary Star design is its finite-state machine (FSM). 53// The FSM runs one event at a time. We apply an event to the current state, 54// which checks if the event is accepted, and if so sets a new state: 55 56func StateMachine(fsm *bstar_t) (exception bool) { 57 // These are the PRIMARY and BACKUP states; we're waiting to become 58 // ACTIVE or PASSIVE depending on events we get from our peer: 59 if fsm.state == STATE_PRIMARY { 60 if fsm.event == PEER_BACKUP { 61 fmt.Println("I: connected to backup (passive), ready as active") 62 fsm.state = STATE_ACTIVE 63 } else if fsm.event == PEER_ACTIVE { 64 fmt.Println("I: connected to backup (active), ready as passive") 65 fsm.state = STATE_PASSIVE 66 } 67 // Accept client connections 68 } else if fsm.state == STATE_BACKUP { 69 if fsm.event == PEER_ACTIVE { 70 fmt.Println("I: connected to primary (active), ready as passive") 71 fsm.state = STATE_PASSIVE 72 } else if fsm.event == CLIENT_REQUEST { 73 // Reject client connections when acting as backup 74 exception = true 75 } 76 } else if fsm.state == STATE_ACTIVE { 77 // These are the ACTIVE and PASSIVE states: 78 if fsm.event == PEER_ACTIVE { 79 // Two actives would mean split-brain 80 fmt.Println("E: fatal error - dual actives, aborting") 81 exception = true 82 } 83 } else if fsm.state == STATE_PASSIVE { 84 // Server is passive 85 // CLIENT_REQUEST events can trigger failover if peer looks dead 86 if fsm.event == PEER_PRIMARY { 87 // Peer is restarting - become active, peer will go passive 88 fmt.Println("I: primary (passive) is restarting, ready as active") 89 fsm.state = STATE_ACTIVE 90 } else if fsm.event == PEER_BACKUP { 91 // Peer is restarting - become active, peer will go passive 92 fmt.Println("I: backup (passive) is restarting, ready as active") 93 fsm.state = STATE_ACTIVE 94 } else if fsm.event == PEER_PASSIVE { 95 // Two passives would mean cluster would be non-responsive 96 fmt.Println("E: fatal error - dual passives, aborting") 97 exception = true 98 } else if fsm.event == CLIENT_REQUEST { 99 // Peer becomes active if timeout has passed 100 // It's the client request that triggers the failover 101 if time.Now().After(fsm.peer_expiry) { 102 // If peer is dead, switch to the active state 103 fmt.Println("I: failover successful, ready as active") 104 fsm.state = STATE_ACTIVE 105 } else { 106 // If peer is alive, reject connections 107 exception = true 108 } 109 } 110 } 111 return 112} 113 114// This is our main task. First we bind/connect our sockets with our 115// peer and make sure we will get state messages correctly. We use 116// three sockets; one to publish state, one to subscribe to state, and 117// one for client requests/replies: 118 119func main() { 120 // Arguments can be either of: 121 // -p primary server, at tcp://localhost:5001 122 // -b backup server, at tcp://localhost:5002 123 statepub, _ := zmq.NewSocket(zmq.PUB) 124 statesub, _ := zmq.NewSocket(zmq.SUB) 125 statesub.SetSubscribe("") 126 frontend, _ := zmq.NewSocket(zmq.ROUTER) 127 fsm := &bstar_t{peer_expiry: time.Now().Add(2 * HEARTBEAT)} 128 129 if len(os.Args) == 2 && os.Args[1] == "-p" { 130 fmt.Println("I: Primary active, waiting for backup (passive)") 131 frontend.Bind("tcp://*:5001") 132 statepub.Bind("tcp://*:5003") 133 statesub.Connect("tcp://localhost:5004") 134 fsm.state = STATE_PRIMARY 135 } else if len(os.Args) == 2 && os.Args[1] == "-b" { 136 fmt.Println("I: Backup passive, waiting for primary (active)") 137 frontend.Bind("tcp://*:5002") 138 statepub.Bind("tcp://*:5004") 139 statesub.Connect("tcp://localhost:5003") 140 fsm.state = STATE_BACKUP 141 } else { 142 fmt.Println("Usage: bstarsrv { -p | -b }") 143 return 144 } 145 // We now process events on our two input sockets, and process these 146 // events one at a time via our finite-state machine. Our "work" for 147 // a client request is simply to echo it back: 148 149 // Set timer for next outgoing state message 150 send_state_at := time.Now().Add(HEARTBEAT) 151 152 poller := zmq.NewPoller() 153 poller.Add(frontend, zmq.POLLIN) 154 poller.Add(statesub, zmq.POLLIN) 155 156LOOP: 157 for { 158 time_left := send_state_at.Sub(time.Now()) 159 if time_left < 0 { 160 time_left = 0 161 } 162 polled, err := poller.Poll(time_left) 163 if err != nil { 164 break // Context has been shut down 165 } 166 for _, socket := range polled { 167 switch socket.Socket { 168 case frontend: 169 // Have a client request 170 msg, _ := frontend.RecvMessage(0) 171 fsm.event = CLIENT_REQUEST 172 if !StateMachine(fsm) { 173 // Answer client by echoing request back 174 frontend.SendMessage(msg) 175 } 176 case statesub: 177 // Have state from our peer, execute as event 178 message, _ := statesub.RecvMessage(0) 179 i, _ := strconv.Atoi(message[0]) 180 fsm.event = event_t(i) 181 if StateMachine(fsm) { 182 break LOOP // Error, so exit 183 } 184 fsm.peer_expiry = time.Now().Add(2 * HEARTBEAT) 185 } 186 } 187 // If we timed-out, send state to peer 188 if time.Now().After(send_state_at) { 189 statepub.SendMessage(int(fsm.state)) 190 send_state_at = time.Now().Add(HEARTBEAT) 191 } 192 } 193 fmt.Println("W: interrupted") 194} 195