1package overlay 2 3import ( 4 "fmt" 5 "net" 6 "strings" 7 "time" 8 9 "github.com/sirupsen/logrus" 10 "github.com/hashicorp/serf/serf" 11) 12 13type ovNotify struct { 14 action string 15 ep *endpoint 16 nw *network 17} 18 19type logWriter struct{} 20 21func (l *logWriter) Write(p []byte) (int, error) { 22 str := string(p) 23 24 switch { 25 case strings.Contains(str, "[WARN]"): 26 logrus.Warn(str) 27 case strings.Contains(str, "[DEBUG]"): 28 logrus.Debug(str) 29 case strings.Contains(str, "[INFO]"): 30 logrus.Info(str) 31 case strings.Contains(str, "[ERR]"): 32 logrus.Error(str) 33 } 34 35 return len(p), nil 36} 37 38func (d *driver) serfInit() error { 39 var err error 40 41 config := serf.DefaultConfig() 42 config.Init() 43 config.MemberlistConfig.BindAddr = d.advertiseAddress 44 45 d.eventCh = make(chan serf.Event, 4) 46 config.EventCh = d.eventCh 47 config.UserCoalescePeriod = 1 * time.Second 48 config.UserQuiescentPeriod = 50 * time.Millisecond 49 50 config.LogOutput = &logWriter{} 51 config.MemberlistConfig.LogOutput = config.LogOutput 52 53 s, err := serf.Create(config) 54 if err != nil { 55 return fmt.Errorf("failed to create cluster node: %v", err) 56 } 57 defer func() { 58 if err != nil { 59 s.Shutdown() 60 } 61 }() 62 63 d.serfInstance = s 64 65 d.notifyCh = make(chan ovNotify) 66 d.exitCh = make(chan chan struct{}) 67 68 go d.startSerfLoop(d.eventCh, d.notifyCh, d.exitCh) 69 return nil 70} 71 72func (d *driver) serfJoin(neighIP string) error { 73 if neighIP == "" { 74 return fmt.Errorf("no neighbor to join") 75 } 76 if _, err := d.serfInstance.Join([]string{neighIP}, false); err != nil { 77 return fmt.Errorf("Failed to join the cluster at neigh IP %s: %v", 78 neighIP, err) 79 } 80 return nil 81} 82 83func (d *driver) notifyEvent(event ovNotify) { 84 ep := event.ep 85 86 ePayload := fmt.Sprintf("%s %s %s %s", event.action, ep.addr.IP.String(), 87 net.IP(ep.addr.Mask).String(), ep.mac.String()) 88 eName := fmt.Sprintf("jl %s %s %s", d.serfInstance.LocalMember().Addr.String(), 89 event.nw.id, ep.id) 90 91 if err := d.serfInstance.UserEvent(eName, []byte(ePayload), true); err != nil { 92 logrus.Errorf("Sending user event failed: %v\n", err) 93 } 94} 95 96func (d *driver) processEvent(u serf.UserEvent) { 97 logrus.Debugf("Received user event name:%s, payload:%s\n", u.Name, 98 string(u.Payload)) 99 100 var dummy, action, vtepStr, nid, eid, ipStr, maskStr, macStr string 101 if _, err := fmt.Sscan(u.Name, &dummy, &vtepStr, &nid, &eid); err != nil { 102 fmt.Printf("Failed to scan name string: %v\n", err) 103 } 104 105 if _, err := fmt.Sscan(string(u.Payload), &action, 106 &ipStr, &maskStr, &macStr); err != nil { 107 fmt.Printf("Failed to scan value string: %v\n", err) 108 } 109 110 logrus.Debugf("Parsed data = %s/%s/%s/%s/%s/%s\n", nid, eid, vtepStr, ipStr, maskStr, macStr) 111 112 mac, err := net.ParseMAC(macStr) 113 if err != nil { 114 logrus.Errorf("Failed to parse mac: %v\n", err) 115 } 116 117 if d.serfInstance.LocalMember().Addr.String() == vtepStr { 118 return 119 } 120 121 switch action { 122 case "join": 123 if err := d.peerAdd(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, 124 net.ParseIP(vtepStr), true); err != nil { 125 logrus.Errorf("Peer add failed in the driver: %v\n", err) 126 } 127 case "leave": 128 if err := d.peerDelete(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, 129 net.ParseIP(vtepStr), true); err != nil { 130 logrus.Errorf("Peer delete failed in the driver: %v\n", err) 131 } 132 } 133} 134 135func (d *driver) processQuery(q *serf.Query) { 136 logrus.Debugf("Received query name:%s, payload:%s\n", q.Name, 137 string(q.Payload)) 138 139 var nid, ipStr string 140 if _, err := fmt.Sscan(string(q.Payload), &nid, &ipStr); err != nil { 141 fmt.Printf("Failed to scan query payload string: %v\n", err) 142 } 143 144 peerMac, peerIPMask, vtep, err := d.peerDbSearch(nid, net.ParseIP(ipStr)) 145 if err != nil { 146 return 147 } 148 149 q.Respond([]byte(fmt.Sprintf("%s %s %s", peerMac.String(), net.IP(peerIPMask).String(), vtep.String()))) 150} 151 152func (d *driver) resolvePeer(nid string, peerIP net.IP) (net.HardwareAddr, net.IPMask, net.IP, error) { 153 if d.serfInstance == nil { 154 return nil, nil, nil, fmt.Errorf("could not resolve peer: serf instance not initialized") 155 } 156 157 qPayload := fmt.Sprintf("%s %s", string(nid), peerIP.String()) 158 resp, err := d.serfInstance.Query("peerlookup", []byte(qPayload), nil) 159 if err != nil { 160 return nil, nil, nil, fmt.Errorf("resolving peer by querying the cluster failed: %v", err) 161 } 162 163 respCh := resp.ResponseCh() 164 select { 165 case r := <-respCh: 166 var macStr, maskStr, vtepStr string 167 if _, err := fmt.Sscan(string(r.Payload), &macStr, &maskStr, &vtepStr); err != nil { 168 return nil, nil, nil, fmt.Errorf("bad response %q for the resolve query: %v", string(r.Payload), err) 169 } 170 171 mac, err := net.ParseMAC(macStr) 172 if err != nil { 173 return nil, nil, nil, fmt.Errorf("failed to parse mac: %v", err) 174 } 175 176 return mac, net.IPMask(net.ParseIP(maskStr).To4()), net.ParseIP(vtepStr), nil 177 178 case <-time.After(time.Second): 179 return nil, nil, nil, fmt.Errorf("timed out resolving peer by querying the cluster") 180 } 181} 182 183func (d *driver) startSerfLoop(eventCh chan serf.Event, notifyCh chan ovNotify, 184 exitCh chan chan struct{}) { 185 186 for { 187 select { 188 case notify, ok := <-notifyCh: 189 if !ok { 190 break 191 } 192 193 d.notifyEvent(notify) 194 case ch, ok := <-exitCh: 195 if !ok { 196 break 197 } 198 199 if err := d.serfInstance.Leave(); err != nil { 200 logrus.Errorf("failed leaving the cluster: %v\n", err) 201 } 202 203 d.serfInstance.Shutdown() 204 close(ch) 205 return 206 case e, ok := <-eventCh: 207 if !ok { 208 break 209 } 210 211 if e.EventType() == serf.EventQuery { 212 d.processQuery(e.(*serf.Query)) 213 break 214 } 215 216 u, ok := e.(serf.UserEvent) 217 if !ok { 218 break 219 } 220 d.processEvent(u) 221 } 222 } 223} 224 225func (d *driver) isSerfAlive() bool { 226 d.Lock() 227 serfInstance := d.serfInstance 228 d.Unlock() 229 if serfInstance == nil || serfInstance.State() != serf.SerfAlive { 230 return false 231 } 232 return true 233} 234