1// 2// Clone server Model Two 3// 4// In the original C example, the client misses updates between snapshot 5// and further updates. Sometimes, it even misses the END message of 6// the snapshot, so it waits for it forever. 7// This Go implementation has some modifications to improve this, but it 8// is still not fully reliable. 9 10package main 11 12import ( 13 zmq "github.com/pebbe/zmq4" 14 "github.com/pebbe/zmq4/examples/kvsimple" 15 16 "fmt" 17 "math/rand" 18 "time" 19) 20 21func main() { 22 // Prepare our context and sockets 23 publisher, _ := zmq.NewSocket(zmq.PUB) 24 publisher.Bind("tcp://*:5557") 25 26 sequence := int64(0) 27 rand.Seed(time.Now().UnixNano()) 28 29 // Start state manager and wait for synchronization signal 30 updates, _ := zmq.NewSocket(zmq.PAIR) 31 updates.Bind("inproc://pipe") 32 go state_manager() 33 updates.RecvMessage(0) // "READY" 34 35 for { 36 // Distribute as key-value message 37 sequence++ 38 kvmsg := kvsimple.NewKvmsg(sequence) 39 kvmsg.SetKey(fmt.Sprint(rand.Intn(10000))) 40 kvmsg.SetBody(fmt.Sprint(rand.Intn(1000000))) 41 if kvmsg.Send(publisher) != nil { 42 break 43 } 44 if kvmsg.Send(updates) != nil { 45 break 46 } 47 } 48 fmt.Printf("Interrupted\n%d messages out\n", sequence) 49} 50 51// The state manager task maintains the state and handles requests from 52// clients for snapshots: 53 54func state_manager() { 55 kvmap := make(map[string]*kvsimple.Kvmsg) 56 57 pipe, _ := zmq.NewSocket(zmq.PAIR) 58 pipe.Connect("inproc://pipe") 59 pipe.SendMessage("READY") 60 snapshot, _ := zmq.NewSocket(zmq.ROUTER) 61 snapshot.Bind("tcp://*:5556") 62 63 poller := zmq.NewPoller() 64 poller.Add(pipe, zmq.POLLIN) 65 poller.Add(snapshot, zmq.POLLIN) 66 sequence := int64(0) // Current snapshot version number 67LOOP: 68 for { 69 polled, err := poller.Poll(-1) 70 if err != nil { 71 break // Context has been shut down 72 } 73 for _, item := range polled { 74 switch socket := item.Socket; socket { 75 case pipe: 76 // Apply state update from main thread 77 kvmsg, err := kvsimple.RecvKvmsg(pipe) 78 if err != nil { 79 break LOOP // Interrupted 80 } 81 sequence, _ = kvmsg.GetSequence() 82 kvmsg.Store(kvmap) 83 case snapshot: 84 // Execute state snapshot request 85 msg, err := snapshot.RecvMessage(0) 86 if err != nil { 87 break LOOP // Interrupted 88 } 89 identity := msg[0] 90 // Request is in second frame of message 91 request := msg[1] 92 if request != "ICANHAZ?" { 93 fmt.Println("E: bad request, aborting") 94 break LOOP 95 } 96 // Send state snapshot to client 97 98 // For each entry in kvmap, send kvmsg to client 99 for _, kvmsg := range kvmap { 100 snapshot.Send(identity, zmq.SNDMORE) 101 kvmsg.Send(snapshot) 102 } 103 104 // Give client some time to deal with it. 105 // This reduces the risk that the client won't see 106 // the END message, but it doesn't eliminate the risk. 107 time.Sleep(100 * time.Millisecond) 108 109 // Now send END message with sequence number 110 fmt.Printf("Sending state shapshot=%d\n", sequence) 111 snapshot.Send(identity, zmq.SNDMORE) 112 kvmsg := kvsimple.NewKvmsg(sequence) 113 kvmsg.SetKey("KTHXBAI") 114 kvmsg.SetBody("") 115 kvmsg.Send(snapshot) 116 } 117 } 118 } 119} 120