1//
2//  Clone server Model Four
3//
4
5package main
6
7import (
8	zmq "github.com/pebbe/zmq4"
9	"github.com/pebbe/zmq4/examples/kvsimple"
10
11	"fmt"
12	"strings"
13	"time"
14)
15
16//  The main task is identical to clonesrv3 except for where it
17//  handles subtrees.
18
19func main() {
20	snapshot, _ := zmq.NewSocket(zmq.ROUTER)
21	snapshot.Bind("tcp://*:5556")
22	publisher, _ := zmq.NewSocket(zmq.PUB)
23	publisher.Bind("tcp://*:5557")
24	collector, _ := zmq.NewSocket(zmq.PULL)
25	collector.Bind("tcp://*:5558")
26
27	//  The body of the main task collects updates from clients and
28	//  publishes them back out to clients:
29
30	sequence := int64(0)
31	kvmap := make(map[string]*kvsimple.Kvmsg)
32
33	poller := zmq.NewPoller()
34	poller.Add(collector, zmq.POLLIN)
35	poller.Add(snapshot, zmq.POLLIN)
36LOOP:
37	for {
38		polled, err := poller.Poll(1000 * time.Millisecond)
39		if err != nil {
40			break
41		}
42		for _, item := range polled {
43			switch socket := item.Socket; socket {
44			case collector:
45				//  Apply state update sent from client
46				kvmsg, err := kvsimple.RecvKvmsg(collector)
47				if err != nil {
48					break LOOP //  Interrupted
49				}
50				sequence++
51				kvmsg.SetSequence(sequence)
52				kvmsg.Send(publisher)
53				kvmsg.Store(kvmap)
54				fmt.Println("I: publishing update", sequence)
55			case snapshot:
56				//  Execute state snapshot request
57				msg, err := snapshot.RecvMessage(0)
58				if err != nil {
59					break LOOP
60				}
61				identity := msg[0]
62
63				//  Request is in second frame of message
64				request := msg[1]
65				if request != "ICANHAZ?" {
66					fmt.Println("E: bad request, aborting")
67					break LOOP
68				}
69				subtree := msg[2]
70				//  Send state snapshot to client
71
72				//  For each entry in kvmap, send kvmsg to client
73				for _, kvmsg := range kvmap {
74					if key, _ := kvmsg.GetKey(); strings.HasPrefix(key, subtree) {
75						snapshot.Send(identity, zmq.SNDMORE)
76						kvmsg.Send(snapshot)
77					}
78				}
79
80				//  Now send END message with sequence number
81				fmt.Println("I: sending shapshot =", sequence)
82				snapshot.Send(identity, zmq.SNDMORE)
83				kvmsg := kvsimple.NewKvmsg(sequence)
84				kvmsg.SetKey("KTHXBAI")
85				kvmsg.SetBody(subtree)
86				kvmsg.Send(snapshot)
87			}
88		}
89	}
90	fmt.Printf("Interrupted\n%d messages handled\n", sequence)
91}
92