1// 2// Clone server Model Five 3// 4 5package main 6 7import ( 8 zmq "github.com/pebbe/zmq4" 9 "github.com/pebbe/zmq4/examples/kvmsg" 10 11 "errors" 12 "fmt" 13 "log" 14 "strconv" 15 "strings" 16 "time" 17) 18 19// Our server is defined by these properties 20type clonesrv_t struct { 21 kvmap map[string]*kvmsg.Kvmsg // Key-value store 22 port int // Main port we're working on 23 sequence int64 // How many updates we're at 24 snapshot *zmq.Socket // Handle snapshot requests 25 publisher *zmq.Socket // Publish updates to clients 26 collector *zmq.Socket // Collect updates from clients 27} 28 29func main() { 30 31 srv := &clonesrv_t{ 32 port: 5556, 33 kvmap: make(map[string]*kvmsg.Kvmsg), 34 } 35 36 // Set up our clone server sockets 37 srv.snapshot, _ = zmq.NewSocket(zmq.ROUTER) 38 srv.snapshot.Bind(fmt.Sprint("tcp://*:", srv.port)) 39 srv.publisher, _ = zmq.NewSocket(zmq.PUB) 40 srv.publisher.Bind(fmt.Sprint("tcp://*:", srv.port+1)) 41 srv.collector, _ = zmq.NewSocket(zmq.PULL) 42 srv.collector.Bind(fmt.Sprint("tcp://*:", srv.port+2)) 43 44 // Register our handlers with reactor 45 reactor := zmq.NewReactor() 46 reactor.AddSocket(srv.snapshot, zmq.POLLIN, 47 func(e zmq.State) error { return snapshots(srv) }) 48 reactor.AddSocket(srv.collector, zmq.POLLIN, 49 func(e zmq.State) error { return collector(srv) }) 50 reactor.AddChannelTime(time.Tick(1000*time.Millisecond), 1, 51 func(v interface{}) error { return flush_ttl(srv) }) 52 53 log.Println(reactor.Run(100 * time.Millisecond)) // precision: .1 seconds 54} 55 56// This is the reactor handler for the snapshot socket; it accepts 57// just the ICANHAZ? request and replies with a state snapshot ending 58// with a KTHXBAI message: 59 60func snapshots(srv *clonesrv_t) (err error) { 61 62 msg, err := srv.snapshot.RecvMessage(0) 63 if err != nil { 64 return 65 } 66 identity := msg[0] 67 68 // Request is in second frame of message 69 request := msg[1] 70 if request != "ICANHAZ?" { 71 err = errors.New("E: bad request, aborting") 72 return 73 } 74 subtree := msg[2] 75 76 // Send state socket to client 77 for _, kvmsg := range srv.kvmap { 78 if key, _ := kvmsg.GetKey(); strings.HasPrefix(key, subtree) { 79 srv.snapshot.Send(identity, zmq.SNDMORE) 80 kvmsg.Send(srv.snapshot) 81 } 82 } 83 84 // Now send END message with sequence number 85 log.Println("I: sending shapshot =", srv.sequence) 86 srv.snapshot.Send(identity, zmq.SNDMORE) 87 kvmsg := kvmsg.NewKvmsg(srv.sequence) 88 kvmsg.SetKey("KTHXBAI") 89 kvmsg.SetBody(subtree) 90 kvmsg.Send(srv.snapshot) 91 92 return 93} 94 95// We store each update with a new sequence number, and if necessary, a 96// time-to-live. We publish updates immediately on our publisher socket: 97 98func collector(srv *clonesrv_t) (err error) { 99 kvmsg, err := kvmsg.RecvKvmsg(srv.collector) 100 if err != nil { 101 return 102 } 103 104 srv.sequence++ 105 kvmsg.SetSequence(srv.sequence) 106 kvmsg.Send(srv.publisher) 107 if ttls, e := kvmsg.GetProp("ttl"); e == nil { 108 // change duration into specific time, using the same property: ugly! 109 ttl, e := strconv.ParseInt(ttls, 10, 64) 110 if e != nil { 111 err = e 112 return 113 } 114 kvmsg.SetProp("ttl", fmt.Sprint(time.Now().Add(time.Duration(ttl)*time.Second).Unix())) 115 } 116 kvmsg.Store(srv.kvmap) 117 log.Println("I: publishing update =", srv.sequence) 118 119 return 120} 121 122// At regular intervals we flush ephemeral values that have expired. This 123// could be slow on very large data sets: 124 125func flush_ttl(srv *clonesrv_t) (err error) { 126 127 for _, kvmsg := range srv.kvmap { 128 129 // If key-value pair has expired, delete it and publish the 130 // fact to listening clients. 131 132 if ttls, e := kvmsg.GetProp("ttl"); e == nil { 133 ttl, e := strconv.ParseInt(ttls, 10, 64) 134 if e != nil { 135 err = e 136 continue 137 } 138 if time.Now().After(time.Unix(ttl, 0)) { 139 srv.sequence++ 140 kvmsg.SetSequence(srv.sequence) 141 kvmsg.SetBody("") 142 e = kvmsg.Send(srv.publisher) 143 if e != nil { 144 err = e 145 } 146 kvmsg.Store(srv.kvmap) 147 log.Println("I: publishing delete =", srv.sequence) 148 } 149 } 150 } 151 return 152} 153