1// 2// Clone server Model Four 3// 4 5package main 6 7import ( 8 zmq "github.com/pebbe/zmq4" 9 "github.com/pebbe/zmq4/examples/kvsimple" 10 11 "fmt" 12 "strings" 13 "time" 14) 15 16// The main task is identical to clonesrv3 except for where it 17// handles subtrees. 18 19func main() { 20 snapshot, _ := zmq.NewSocket(zmq.ROUTER) 21 snapshot.Bind("tcp://*:5556") 22 publisher, _ := zmq.NewSocket(zmq.PUB) 23 publisher.Bind("tcp://*:5557") 24 collector, _ := zmq.NewSocket(zmq.PULL) 25 collector.Bind("tcp://*:5558") 26 27 // The body of the main task collects updates from clients and 28 // publishes them back out to clients: 29 30 sequence := int64(0) 31 kvmap := make(map[string]*kvsimple.Kvmsg) 32 33 poller := zmq.NewPoller() 34 poller.Add(collector, zmq.POLLIN) 35 poller.Add(snapshot, zmq.POLLIN) 36LOOP: 37 for { 38 polled, err := poller.Poll(1000 * time.Millisecond) 39 if err != nil { 40 break 41 } 42 for _, item := range polled { 43 switch socket := item.Socket; socket { 44 case collector: 45 // Apply state update sent from client 46 kvmsg, err := kvsimple.RecvKvmsg(collector) 47 if err != nil { 48 break LOOP // Interrupted 49 } 50 sequence++ 51 kvmsg.SetSequence(sequence) 52 kvmsg.Send(publisher) 53 kvmsg.Store(kvmap) 54 fmt.Println("I: publishing update", sequence) 55 case snapshot: 56 // Execute state snapshot request 57 msg, err := snapshot.RecvMessage(0) 58 if err != nil { 59 break LOOP 60 } 61 identity := msg[0] 62 63 // Request is in second frame of message 64 request := msg[1] 65 if request != "ICANHAZ?" { 66 fmt.Println("E: bad request, aborting") 67 break LOOP 68 } 69 subtree := msg[2] 70 // Send state snapshot to client 71 72 // For each entry in kvmap, send kvmsg to client 73 for _, kvmsg := range kvmap { 74 if key, _ := kvmsg.GetKey(); strings.HasPrefix(key, subtree) { 75 snapshot.Send(identity, zmq.SNDMORE) 76 kvmsg.Send(snapshot) 77 } 78 } 79 80 // Now send END message with sequence number 81 fmt.Println("I: sending shapshot =", sequence) 82 snapshot.Send(identity, zmq.SNDMORE) 83 kvmsg := kvsimple.NewKvmsg(sequence) 84 kvmsg.SetKey("KTHXBAI") 85 kvmsg.SetBody(subtree) 86 kvmsg.Send(snapshot) 87 } 88 } 89 } 90 fmt.Printf("Interrupted\n%d messages handled\n", sequence) 91} 92