1//
2//  Clone client Model Three
3//
4
5package main
6
7import (
8	zmq "github.com/pebbe/zmq4"
9	"github.com/pebbe/zmq4/examples/kvsimple"
10
11	"fmt"
12	"math/rand"
13	"time"
14)
15
16func main() {
17	snapshot, _ := zmq.NewSocket(zmq.DEALER)
18	snapshot.Connect("tcp://localhost:5556")
19	subscriber, _ := zmq.NewSocket(zmq.SUB)
20	subscriber.SetSubscribe("")
21	subscriber.Connect("tcp://localhost:5557")
22	publisher, _ := zmq.NewSocket(zmq.PUSH)
23	publisher.Connect("tcp://localhost:5558")
24
25	kvmap := make(map[string]*kvsimple.Kvmsg)
26	rand.Seed(time.Now().UnixNano())
27
28	//  We first request a state snapshot:
29	sequence := int64(0)
30	snapshot.SendMessage("ICANHAZ?")
31	for {
32		kvmsg, err := kvsimple.RecvKvmsg(snapshot)
33		if err != nil {
34			break //  Interrupted
35		}
36		if key, _ := kvmsg.GetKey(); key == "KTHXBAI" {
37			sequence, _ := kvmsg.GetSequence()
38			fmt.Println("I: received snapshot =", sequence)
39			break //  Done
40		}
41		kvmsg.Store(kvmap)
42	}
43	snapshot.Close()
44
45	//  Now we wait for updates from the server, and every so often, we
46	//  send a random key-value update to the server:
47
48	poller := zmq.NewPoller()
49	poller.Add(subscriber, zmq.POLLIN)
50	alarm := time.Now().Add(1000 * time.Millisecond)
51	for {
52		tickless := alarm.Sub(time.Now())
53		if tickless < 0 {
54			tickless = 0
55		}
56		polled, err := poller.Poll(tickless)
57		if err != nil {
58			break //  Context has been shut down
59		}
60		if len(polled) == 1 {
61			kvmsg, err := kvsimple.RecvKvmsg(subscriber)
62			if err != nil {
63				break //  Interrupted
64			}
65
66			//  Discard out-of-sequence kvmsgs, incl. heartbeats
67			if seq, _ := kvmsg.GetSequence(); seq > sequence {
68				sequence = seq
69				kvmsg.Store(kvmap)
70				fmt.Println("I: received update =", sequence)
71			}
72		}
73		//  If we timed-out, generate a random kvmsg
74		if time.Now().After(alarm) {
75			kvmsg := kvsimple.NewKvmsg(0)
76			kvmsg.SetKey(fmt.Sprint(rand.Intn(10000)))
77			kvmsg.SetBody(fmt.Sprint(rand.Intn(1000000)))
78			kvmsg.Send(publisher)
79			alarm = time.Now().Add(1000 * time.Millisecond)
80		}
81	}
82	fmt.Printf("Interrupted\n%d messages in\n", sequence)
83}
84