1// 2// Clone client Model Three 3// 4 5package main 6 7import ( 8 zmq "github.com/pebbe/zmq4" 9 "github.com/pebbe/zmq4/examples/kvsimple" 10 11 "fmt" 12 "math/rand" 13 "time" 14) 15 16func main() { 17 snapshot, _ := zmq.NewSocket(zmq.DEALER) 18 snapshot.Connect("tcp://localhost:5556") 19 subscriber, _ := zmq.NewSocket(zmq.SUB) 20 subscriber.SetSubscribe("") 21 subscriber.Connect("tcp://localhost:5557") 22 publisher, _ := zmq.NewSocket(zmq.PUSH) 23 publisher.Connect("tcp://localhost:5558") 24 25 kvmap := make(map[string]*kvsimple.Kvmsg) 26 rand.Seed(time.Now().UnixNano()) 27 28 // We first request a state snapshot: 29 sequence := int64(0) 30 snapshot.SendMessage("ICANHAZ?") 31 for { 32 kvmsg, err := kvsimple.RecvKvmsg(snapshot) 33 if err != nil { 34 break // Interrupted 35 } 36 if key, _ := kvmsg.GetKey(); key == "KTHXBAI" { 37 sequence, _ := kvmsg.GetSequence() 38 fmt.Println("I: received snapshot =", sequence) 39 break // Done 40 } 41 kvmsg.Store(kvmap) 42 } 43 snapshot.Close() 44 45 // Now we wait for updates from the server, and every so often, we 46 // send a random key-value update to the server: 47 48 poller := zmq.NewPoller() 49 poller.Add(subscriber, zmq.POLLIN) 50 alarm := time.Now().Add(1000 * time.Millisecond) 51 for { 52 tickless := alarm.Sub(time.Now()) 53 if tickless < 0 { 54 tickless = 0 55 } 56 polled, err := poller.Poll(tickless) 57 if err != nil { 58 break // Context has been shut down 59 } 60 if len(polled) == 1 { 61 kvmsg, err := kvsimple.RecvKvmsg(subscriber) 62 if err != nil { 63 break // Interrupted 64 } 65 66 // Discard out-of-sequence kvmsgs, incl. heartbeats 67 if seq, _ := kvmsg.GetSequence(); seq > sequence { 68 sequence = seq 69 kvmsg.Store(kvmap) 70 fmt.Println("I: received update =", sequence) 71 } 72 } 73 // If we timed-out, generate a random kvmsg 74 if time.Now().After(alarm) { 75 kvmsg := kvsimple.NewKvmsg(0) 76 kvmsg.SetKey(fmt.Sprint(rand.Intn(10000))) 77 kvmsg.SetBody(fmt.Sprint(rand.Intn(1000000))) 78 kvmsg.Send(publisher) 79 alarm = time.Now().Add(1000 * time.Millisecond) 80 } 81 } 82 fmt.Printf("Interrupted\n%d messages in\n", sequence) 83} 84