1package overlay
2
3import (
4	"fmt"
5	"net"
6	"strings"
7	"time"
8
9	"github.com/hashicorp/serf/serf"
10	"github.com/sirupsen/logrus"
11)
12
13type ovNotify struct {
14	action string
15	ep     *endpoint
16	nw     *network
17}
18
19type logWriter struct{}
20
21func (l *logWriter) Write(p []byte) (int, error) {
22	str := string(p)
23
24	switch {
25	case strings.Contains(str, "[WARN]"):
26		logrus.Warn(str)
27	case strings.Contains(str, "[DEBUG]"):
28		logrus.Debug(str)
29	case strings.Contains(str, "[INFO]"):
30		logrus.Info(str)
31	case strings.Contains(str, "[ERR]"):
32		logrus.Error(str)
33	}
34
35	return len(p), nil
36}
37
38func (d *driver) serfInit() error {
39	var err error
40
41	config := serf.DefaultConfig()
42	config.Init()
43	config.MemberlistConfig.BindAddr = d.advertiseAddress
44
45	d.eventCh = make(chan serf.Event, 4)
46	config.EventCh = d.eventCh
47	config.UserCoalescePeriod = 1 * time.Second
48	config.UserQuiescentPeriod = 50 * time.Millisecond
49
50	config.LogOutput = &logWriter{}
51	config.MemberlistConfig.LogOutput = config.LogOutput
52
53	s, err := serf.Create(config)
54	if err != nil {
55		return fmt.Errorf("failed to create cluster node: %v", err)
56	}
57	defer func() {
58		if err != nil {
59			s.Shutdown()
60		}
61	}()
62
63	d.serfInstance = s
64
65	d.notifyCh = make(chan ovNotify)
66	d.exitCh = make(chan chan struct{})
67
68	go d.startSerfLoop(d.eventCh, d.notifyCh, d.exitCh)
69	return nil
70}
71
72func (d *driver) serfJoin(neighIP string) error {
73	if neighIP == "" {
74		return fmt.Errorf("no neighbor to join")
75	}
76	if _, err := d.serfInstance.Join([]string{neighIP}, true); err != nil {
77		return fmt.Errorf("Failed to join the cluster at neigh IP %s: %v",
78			neighIP, err)
79	}
80	return nil
81}
82
83func (d *driver) notifyEvent(event ovNotify) {
84	ep := event.ep
85
86	ePayload := fmt.Sprintf("%s %s %s %s", event.action, ep.addr.IP.String(),
87		net.IP(ep.addr.Mask).String(), ep.mac.String())
88	eName := fmt.Sprintf("jl %s %s %s", d.serfInstance.LocalMember().Addr.String(),
89		event.nw.id, ep.id)
90
91	if err := d.serfInstance.UserEvent(eName, []byte(ePayload), true); err != nil {
92		logrus.Errorf("Sending user event failed: %v\n", err)
93	}
94}
95
96func (d *driver) processEvent(u serf.UserEvent) {
97	logrus.Debugf("Received user event name:%s, payload:%s LTime:%d \n", u.Name,
98		string(u.Payload), uint64(u.LTime))
99
100	var dummy, action, vtepStr, nid, eid, ipStr, maskStr, macStr string
101	if _, err := fmt.Sscan(u.Name, &dummy, &vtepStr, &nid, &eid); err != nil {
102		fmt.Printf("Failed to scan name string: %v\n", err)
103	}
104
105	if _, err := fmt.Sscan(string(u.Payload), &action,
106		&ipStr, &maskStr, &macStr); err != nil {
107		fmt.Printf("Failed to scan value string: %v\n", err)
108	}
109
110	logrus.Debugf("Parsed data = %s/%s/%s/%s/%s/%s\n", nid, eid, vtepStr, ipStr, maskStr, macStr)
111
112	mac, err := net.ParseMAC(macStr)
113	if err != nil {
114		logrus.Errorf("Failed to parse mac: %v\n", err)
115	}
116
117	if d.serfInstance.LocalMember().Addr.String() == vtepStr {
118		return
119	}
120
121	switch action {
122	case "join":
123		d.peerAdd(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, net.ParseIP(vtepStr), false, false, false)
124	case "leave":
125		d.peerDelete(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, net.ParseIP(vtepStr), false)
126	}
127}
128
129func (d *driver) processQuery(q *serf.Query) {
130	logrus.Debugf("Received query name:%s, payload:%s\n", q.Name,
131		string(q.Payload))
132
133	var nid, ipStr string
134	if _, err := fmt.Sscan(string(q.Payload), &nid, &ipStr); err != nil {
135		fmt.Printf("Failed to scan query payload string: %v\n", err)
136	}
137
138	pKey, pEntry, err := d.peerDbSearch(nid, net.ParseIP(ipStr))
139	if err != nil {
140		return
141	}
142
143	logrus.Debugf("Sending peer query resp mac %v, mask %s, vtep %s", pKey.peerMac, net.IP(pEntry.peerIPMask).String(), pEntry.vtep)
144	q.Respond([]byte(fmt.Sprintf("%s %s %s", pKey.peerMac.String(), net.IP(pEntry.peerIPMask).String(), pEntry.vtep.String())))
145}
146
147func (d *driver) resolvePeer(nid string, peerIP net.IP) (net.HardwareAddr, net.IPMask, net.IP, error) {
148	if d.serfInstance == nil {
149		return nil, nil, nil, fmt.Errorf("could not resolve peer: serf instance not initialized")
150	}
151
152	qPayload := fmt.Sprintf("%s %s", string(nid), peerIP.String())
153	resp, err := d.serfInstance.Query("peerlookup", []byte(qPayload), nil)
154	if err != nil {
155		return nil, nil, nil, fmt.Errorf("resolving peer by querying the cluster failed: %v", err)
156	}
157
158	respCh := resp.ResponseCh()
159	select {
160	case r := <-respCh:
161		var macStr, maskStr, vtepStr string
162		if _, err := fmt.Sscan(string(r.Payload), &macStr, &maskStr, &vtepStr); err != nil {
163			return nil, nil, nil, fmt.Errorf("bad response %q for the resolve query: %v", string(r.Payload), err)
164		}
165
166		mac, err := net.ParseMAC(macStr)
167		if err != nil {
168			return nil, nil, nil, fmt.Errorf("failed to parse mac: %v", err)
169		}
170
171		logrus.Debugf("Received peer query response, mac %s, vtep %s, mask %s", macStr, vtepStr, maskStr)
172		return mac, net.IPMask(net.ParseIP(maskStr).To4()), net.ParseIP(vtepStr), nil
173
174	case <-time.After(time.Second):
175		return nil, nil, nil, fmt.Errorf("timed out resolving peer by querying the cluster")
176	}
177}
178
179func (d *driver) startSerfLoop(eventCh chan serf.Event, notifyCh chan ovNotify,
180	exitCh chan chan struct{}) {
181
182	for {
183		select {
184		case notify, ok := <-notifyCh:
185			if !ok {
186				break
187			}
188
189			d.notifyEvent(notify)
190		case ch, ok := <-exitCh:
191			if !ok {
192				break
193			}
194
195			if err := d.serfInstance.Leave(); err != nil {
196				logrus.Errorf("failed leaving the cluster: %v\n", err)
197			}
198
199			d.serfInstance.Shutdown()
200			close(ch)
201			return
202		case e, ok := <-eventCh:
203			if !ok {
204				break
205			}
206
207			if e.EventType() == serf.EventQuery {
208				d.processQuery(e.(*serf.Query))
209				break
210			}
211
212			u, ok := e.(serf.UserEvent)
213			if !ok {
214				break
215			}
216			d.processEvent(u)
217		}
218	}
219}
220
221func (d *driver) isSerfAlive() bool {
222	d.Lock()
223	serfInstance := d.serfInstance
224	d.Unlock()
225	if serfInstance == nil || serfInstance.State() != serf.SerfAlive {
226		return false
227	}
228	return true
229}
230