1//
2//  Clone client Model Four
3//
4
5package main
6
7import (
8	zmq "github.com/pebbe/zmq4"
9	"github.com/pebbe/zmq4/examples/kvsimple"
10
11	"fmt"
12	"math/rand"
13	"time"
14)
15
16const (
17	SUBTREE = "/client/"
18)
19
20func main() {
21	snapshot, _ := zmq.NewSocket(zmq.DEALER)
22	snapshot.Connect("tcp://localhost:5556")
23	subscriber, _ := zmq.NewSocket(zmq.SUB)
24	subscriber.SetSubscribe(SUBTREE)
25	subscriber.Connect("tcp://localhost:5557")
26	publisher, _ := zmq.NewSocket(zmq.PUSH)
27	publisher.Connect("tcp://localhost:5558")
28
29	kvmap := make(map[string]*kvsimple.Kvmsg)
30	rand.Seed(time.Now().UnixNano())
31
32	//  We first request a state snapshot:
33	sequence := int64(0)
34	snapshot.SendMessage("ICANHAZ?", SUBTREE)
35	for {
36		kvmsg, err := kvsimple.RecvKvmsg(snapshot)
37		if err != nil {
38			break //  Interrupted
39		}
40		if key, _ := kvmsg.GetKey(); key == "KTHXBAI" {
41			sequence, _ := kvmsg.GetSequence()
42			fmt.Println("I: received snapshot =", sequence)
43			break //  Done
44		}
45		kvmsg.Store(kvmap)
46	}
47	snapshot.Close()
48
49	poller := zmq.NewPoller()
50	poller.Add(subscriber, zmq.POLLIN)
51	alarm := time.Now().Add(1000 * time.Millisecond)
52	for {
53		tickless := alarm.Sub(time.Now())
54		if tickless < 0 {
55			tickless = 0
56		}
57		polled, err := poller.Poll(tickless)
58		if err != nil {
59			break //  Context has been shut down
60		}
61		if len(polled) == 1 {
62			kvmsg, err := kvsimple.RecvKvmsg(subscriber)
63			if err != nil {
64				break //  Interrupted
65			}
66
67			//  Discard out-of-sequence kvmsgs, incl. heartbeats
68			if seq, _ := kvmsg.GetSequence(); seq > sequence {
69				sequence = seq
70				kvmsg.Store(kvmap)
71				fmt.Println("I: received update =", sequence)
72			}
73		}
74		//  If we timed-out, generate a random kvmsg
75		if time.Now().After(alarm) {
76			kvmsg := kvsimple.NewKvmsg(0)
77			kvmsg.SetKey(fmt.Sprintf("%s%d", SUBTREE, rand.Intn(10000)))
78			kvmsg.SetBody(fmt.Sprint(rand.Intn(1000000)))
79			kvmsg.Send(publisher)
80			alarm = time.Now().Add(1000 * time.Millisecond)
81		}
82	}
83	fmt.Printf("Interrupted\n%d messages in\n", sequence)
84}
85