1//
2//  Clone server Model Five
3//
4
5package main
6
7import (
8	zmq "github.com/pebbe/zmq4"
9	"github.com/pebbe/zmq4/examples/kvmsg"
10
11	"errors"
12	"fmt"
13	"log"
14	"strconv"
15	"strings"
16	"time"
17)
18
19//  Our server is defined by these properties
20type clonesrv_t struct {
21	kvmap     map[string]*kvmsg.Kvmsg //  Key-value store
22	port      int                     //  Main port we're working on
23	sequence  int64                   //  How many updates we're at
24	snapshot  *zmq.Socket             //  Handle snapshot requests
25	publisher *zmq.Socket             //  Publish updates to clients
26	collector *zmq.Socket             //  Collect updates from clients
27}
28
29func main() {
30
31	srv := &clonesrv_t{
32		port:  5556,
33		kvmap: make(map[string]*kvmsg.Kvmsg),
34	}
35
36	//  Set up our clone server sockets
37	srv.snapshot, _ = zmq.NewSocket(zmq.ROUTER)
38	srv.snapshot.Bind(fmt.Sprint("tcp://*:", srv.port))
39	srv.publisher, _ = zmq.NewSocket(zmq.PUB)
40	srv.publisher.Bind(fmt.Sprint("tcp://*:", srv.port+1))
41	srv.collector, _ = zmq.NewSocket(zmq.PULL)
42	srv.collector.Bind(fmt.Sprint("tcp://*:", srv.port+2))
43
44	//  Register our handlers with reactor
45	reactor := zmq.NewReactor()
46	reactor.AddSocket(srv.snapshot, zmq.POLLIN,
47		func(e zmq.State) error { return snapshots(srv) })
48	reactor.AddSocket(srv.collector, zmq.POLLIN,
49		func(e zmq.State) error { return collector(srv) })
50	reactor.AddChannelTime(time.Tick(1000*time.Millisecond), 1,
51		func(v interface{}) error { return flush_ttl(srv) })
52
53	log.Println(reactor.Run(100 * time.Millisecond)) // precision: .1 seconds
54}
55
56//  This is the reactor handler for the snapshot socket; it accepts
57//  just the ICANHAZ? request and replies with a state snapshot ending
58//  with a KTHXBAI message:
59
60func snapshots(srv *clonesrv_t) (err error) {
61
62	msg, err := srv.snapshot.RecvMessage(0)
63	if err != nil {
64		return
65	}
66	identity := msg[0]
67
68	//  Request is in second frame of message
69	request := msg[1]
70	if request != "ICANHAZ?" {
71		err = errors.New("E: bad request, aborting")
72		return
73	}
74	subtree := msg[2]
75
76	//  Send state socket to client
77	for _, kvmsg := range srv.kvmap {
78		if key, _ := kvmsg.GetKey(); strings.HasPrefix(key, subtree) {
79			srv.snapshot.Send(identity, zmq.SNDMORE)
80			kvmsg.Send(srv.snapshot)
81		}
82	}
83
84	//  Now send END message with sequence number
85	log.Println("I: sending shapshot =", srv.sequence)
86	srv.snapshot.Send(identity, zmq.SNDMORE)
87	kvmsg := kvmsg.NewKvmsg(srv.sequence)
88	kvmsg.SetKey("KTHXBAI")
89	kvmsg.SetBody(subtree)
90	kvmsg.Send(srv.snapshot)
91
92	return
93}
94
95//  We store each update with a new sequence number, and if necessary, a
96//  time-to-live. We publish updates immediately on our publisher socket:
97
98func collector(srv *clonesrv_t) (err error) {
99	kvmsg, err := kvmsg.RecvKvmsg(srv.collector)
100	if err != nil {
101		return
102	}
103
104	srv.sequence++
105	kvmsg.SetSequence(srv.sequence)
106	kvmsg.Send(srv.publisher)
107	if ttls, e := kvmsg.GetProp("ttl"); e == nil {
108		// change duration into specific time, using the same property: ugly!
109		ttl, e := strconv.ParseInt(ttls, 10, 64)
110		if e != nil {
111			err = e
112			return
113		}
114		kvmsg.SetProp("ttl", fmt.Sprint(time.Now().Add(time.Duration(ttl)*time.Second).Unix()))
115	}
116	kvmsg.Store(srv.kvmap)
117	log.Println("I: publishing update =", srv.sequence)
118
119	return
120}
121
122//  At regular intervals we flush ephemeral values that have expired. This
123//  could be slow on very large data sets:
124
125func flush_ttl(srv *clonesrv_t) (err error) {
126
127	for _, kvmsg := range srv.kvmap {
128
129		//  If key-value pair has expired, delete it and publish the
130		//  fact to listening clients.
131
132		if ttls, e := kvmsg.GetProp("ttl"); e == nil {
133			ttl, e := strconv.ParseInt(ttls, 10, 64)
134			if e != nil {
135				err = e
136				continue
137			}
138			if time.Now().After(time.Unix(ttl, 0)) {
139				srv.sequence++
140				kvmsg.SetSequence(srv.sequence)
141				kvmsg.SetBody("")
142				e = kvmsg.Send(srv.publisher)
143				if e != nil {
144					err = e
145				}
146				kvmsg.Store(srv.kvmap)
147				log.Println("I: publishing delete =", srv.sequence)
148			}
149		}
150	}
151	return
152}
153