1//
2//  Clone server Model Two
3//
4//  In the original C example, the client misses updates between snapshot
5//  and further updates. Sometimes, it even misses the END message of
6//  the snapshot, so it waits for it forever.
7//  This Go implementation has some modifications to improve this, but it
8//  is still not fully reliable.
9
10package main
11
12import (
13	zmq "github.com/pebbe/zmq4"
14	"github.com/pebbe/zmq4/examples/kvsimple"
15
16	"fmt"
17	"math/rand"
18	"time"
19)
20
21func main() {
22	//  Prepare our context and sockets
23	publisher, _ := zmq.NewSocket(zmq.PUB)
24	publisher.Bind("tcp://*:5557")
25
26	sequence := int64(0)
27	rand.Seed(time.Now().UnixNano())
28
29	//  Start state manager and wait for synchronization signal
30	updates, _ := zmq.NewSocket(zmq.PAIR)
31	updates.Bind("inproc://pipe")
32	go state_manager()
33	updates.RecvMessage(0) // "READY"
34
35	for {
36		//  Distribute as key-value message
37		sequence++
38		kvmsg := kvsimple.NewKvmsg(sequence)
39		kvmsg.SetKey(fmt.Sprint(rand.Intn(10000)))
40		kvmsg.SetBody(fmt.Sprint(rand.Intn(1000000)))
41		if kvmsg.Send(publisher) != nil {
42			break
43		}
44		if kvmsg.Send(updates) != nil {
45			break
46		}
47	}
48	fmt.Printf("Interrupted\n%d messages out\n", sequence)
49}
50
51//  The state manager task maintains the state and handles requests from
52//  clients for snapshots:
53
54func state_manager() {
55	kvmap := make(map[string]*kvsimple.Kvmsg)
56
57	pipe, _ := zmq.NewSocket(zmq.PAIR)
58	pipe.Connect("inproc://pipe")
59	pipe.SendMessage("READY")
60	snapshot, _ := zmq.NewSocket(zmq.ROUTER)
61	snapshot.Bind("tcp://*:5556")
62
63	poller := zmq.NewPoller()
64	poller.Add(pipe, zmq.POLLIN)
65	poller.Add(snapshot, zmq.POLLIN)
66	sequence := int64(0) //  Current snapshot version number
67LOOP:
68	for {
69		polled, err := poller.Poll(-1)
70		if err != nil {
71			break //  Context has been shut down
72		}
73		for _, item := range polled {
74			switch socket := item.Socket; socket {
75			case pipe:
76				//  Apply state update from main thread
77				kvmsg, err := kvsimple.RecvKvmsg(pipe)
78				if err != nil {
79					break LOOP //  Interrupted
80				}
81				sequence, _ = kvmsg.GetSequence()
82				kvmsg.Store(kvmap)
83			case snapshot:
84				//  Execute state snapshot request
85				msg, err := snapshot.RecvMessage(0)
86				if err != nil {
87					break LOOP //  Interrupted
88				}
89				identity := msg[0]
90				//  Request is in second frame of message
91				request := msg[1]
92				if request != "ICANHAZ?" {
93					fmt.Println("E: bad request, aborting")
94					break LOOP
95				}
96				//  Send state snapshot to client
97
98				//  For each entry in kvmap, send kvmsg to client
99				for _, kvmsg := range kvmap {
100					snapshot.Send(identity, zmq.SNDMORE)
101					kvmsg.Send(snapshot)
102				}
103
104				// Give client some time to deal with it.
105				// This reduces the risk that the client won't see
106				// the END message, but it doesn't eliminate the risk.
107				time.Sleep(100 * time.Millisecond)
108
109				//  Now send END message with sequence number
110				fmt.Printf("Sending state shapshot=%d\n", sequence)
111				snapshot.Send(identity, zmq.SNDMORE)
112				kvmsg := kvsimple.NewKvmsg(sequence)
113				kvmsg.SetKey("KTHXBAI")
114				kvmsg.SetBody("")
115				kvmsg.Send(snapshot)
116			}
117		}
118	}
119}
120