1// 2// Clone client Model Four 3// 4 5package main 6 7import ( 8 zmq "github.com/pebbe/zmq4" 9 "github.com/pebbe/zmq4/examples/kvsimple" 10 11 "fmt" 12 "math/rand" 13 "time" 14) 15 16const ( 17 SUBTREE = "/client/" 18) 19 20func main() { 21 snapshot, _ := zmq.NewSocket(zmq.DEALER) 22 snapshot.Connect("tcp://localhost:5556") 23 subscriber, _ := zmq.NewSocket(zmq.SUB) 24 subscriber.SetSubscribe(SUBTREE) 25 subscriber.Connect("tcp://localhost:5557") 26 publisher, _ := zmq.NewSocket(zmq.PUSH) 27 publisher.Connect("tcp://localhost:5558") 28 29 kvmap := make(map[string]*kvsimple.Kvmsg) 30 rand.Seed(time.Now().UnixNano()) 31 32 // We first request a state snapshot: 33 sequence := int64(0) 34 snapshot.SendMessage("ICANHAZ?", SUBTREE) 35 for { 36 kvmsg, err := kvsimple.RecvKvmsg(snapshot) 37 if err != nil { 38 break // Interrupted 39 } 40 if key, _ := kvmsg.GetKey(); key == "KTHXBAI" { 41 sequence, _ := kvmsg.GetSequence() 42 fmt.Println("I: received snapshot =", sequence) 43 break // Done 44 } 45 kvmsg.Store(kvmap) 46 } 47 snapshot.Close() 48 49 poller := zmq.NewPoller() 50 poller.Add(subscriber, zmq.POLLIN) 51 alarm := time.Now().Add(1000 * time.Millisecond) 52 for { 53 tickless := alarm.Sub(time.Now()) 54 if tickless < 0 { 55 tickless = 0 56 } 57 polled, err := poller.Poll(tickless) 58 if err != nil { 59 break // Context has been shut down 60 } 61 if len(polled) == 1 { 62 kvmsg, err := kvsimple.RecvKvmsg(subscriber) 63 if err != nil { 64 break // Interrupted 65 } 66 67 // Discard out-of-sequence kvmsgs, incl. heartbeats 68 if seq, _ := kvmsg.GetSequence(); seq > sequence { 69 sequence = seq 70 kvmsg.Store(kvmap) 71 fmt.Println("I: received update =", sequence) 72 } 73 } 74 // If we timed-out, generate a random kvmsg 75 if time.Now().After(alarm) { 76 kvmsg := kvsimple.NewKvmsg(0) 77 kvmsg.SetKey(fmt.Sprintf("%s%d", SUBTREE, rand.Intn(10000))) 78 kvmsg.SetBody(fmt.Sprint(rand.Intn(1000000))) 79 kvmsg.Send(publisher) 80 alarm = time.Now().Add(1000 * time.Millisecond) 81 } 82 } 83 fmt.Printf("Interrupted\n%d messages in\n", sequence) 84} 85