1package libnetwork
2
3import (
4	"fmt"
5	"strings"
6
7	"github.com/docker/libkv/store/boltdb"
8	"github.com/docker/libkv/store/consul"
9	"github.com/docker/libkv/store/etcd"
10	"github.com/docker/libkv/store/zookeeper"
11	"github.com/docker/libnetwork/datastore"
12	"github.com/sirupsen/logrus"
13)
14
15func registerKVStores() {
16	consul.Register()
17	zookeeper.Register()
18	etcd.Register()
19	boltdb.Register()
20}
21
22func (c *controller) initScopedStore(scope string, scfg *datastore.ScopeCfg) error {
23	store, err := datastore.NewDataStore(scope, scfg)
24	if err != nil {
25		return err
26	}
27	c.Lock()
28	c.stores = append(c.stores, store)
29	c.Unlock()
30
31	return nil
32}
33
34func (c *controller) initStores() error {
35	registerKVStores()
36
37	c.Lock()
38	if c.cfg == nil {
39		c.Unlock()
40		return nil
41	}
42	scopeConfigs := c.cfg.Scopes
43	c.stores = nil
44	c.Unlock()
45
46	for scope, scfg := range scopeConfigs {
47		if err := c.initScopedStore(scope, scfg); err != nil {
48			return err
49		}
50	}
51
52	c.startWatch()
53	return nil
54}
55
56func (c *controller) closeStores() {
57	for _, store := range c.getStores() {
58		store.Close()
59	}
60}
61
62func (c *controller) getStore(scope string) datastore.DataStore {
63	c.Lock()
64	defer c.Unlock()
65
66	for _, store := range c.stores {
67		if store.Scope() == scope {
68			return store
69		}
70	}
71
72	return nil
73}
74
75func (c *controller) getStores() []datastore.DataStore {
76	c.Lock()
77	defer c.Unlock()
78
79	return c.stores
80}
81
82func (c *controller) getNetworkFromStore(nid string) (*network, error) {
83	for _, store := range c.getStores() {
84		n := &network{id: nid, ctrlr: c}
85		err := store.GetObject(datastore.Key(n.Key()...), n)
86		// Continue searching in the next store if the key is not found in this store
87		if err != nil {
88			if err != datastore.ErrKeyNotFound {
89				logrus.Debugf("could not find network %s: %v", nid, err)
90			}
91			continue
92		}
93
94		ec := &endpointCnt{n: n}
95		err = store.GetObject(datastore.Key(ec.Key()...), ec)
96		if err != nil && !n.inDelete {
97			return nil, fmt.Errorf("could not find endpoint count for network %s: %v", n.Name(), err)
98		}
99
100		n.epCnt = ec
101		if n.scope == "" {
102			n.scope = store.Scope()
103		}
104		return n, nil
105	}
106
107	return nil, fmt.Errorf("network %s not found", nid)
108}
109
110func (c *controller) getNetworksForScope(scope string) ([]*network, error) {
111	var nl []*network
112
113	store := c.getStore(scope)
114	if store == nil {
115		return nil, nil
116	}
117
118	kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix),
119		&network{ctrlr: c})
120	if err != nil && err != datastore.ErrKeyNotFound {
121		return nil, fmt.Errorf("failed to get networks for scope %s: %v",
122			scope, err)
123	}
124
125	for _, kvo := range kvol {
126		n := kvo.(*network)
127		n.ctrlr = c
128
129		ec := &endpointCnt{n: n}
130		err = store.GetObject(datastore.Key(ec.Key()...), ec)
131		if err != nil && !n.inDelete {
132			logrus.Warnf("Could not find endpoint count key %s for network %s while listing: %v", datastore.Key(ec.Key()...), n.Name(), err)
133			continue
134		}
135
136		n.epCnt = ec
137		if n.scope == "" {
138			n.scope = scope
139		}
140		nl = append(nl, n)
141	}
142
143	return nl, nil
144}
145
146func (c *controller) getNetworksFromStore() ([]*network, error) {
147	var nl []*network
148
149	for _, store := range c.getStores() {
150		kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix),
151			&network{ctrlr: c})
152		// Continue searching in the next store if no keys found in this store
153		if err != nil {
154			if err != datastore.ErrKeyNotFound {
155				logrus.Debugf("failed to get networks for scope %s: %v", store.Scope(), err)
156			}
157			continue
158		}
159
160		kvep, err := store.Map(datastore.Key(epCntKeyPrefix), &endpointCnt{})
161		if err != nil {
162			if err != datastore.ErrKeyNotFound {
163				logrus.Warnf("failed to get endpoint_count map for scope %s: %v", store.Scope(), err)
164			}
165		}
166
167		for _, kvo := range kvol {
168			n := kvo.(*network)
169			n.Lock()
170			n.ctrlr = c
171			ec := &endpointCnt{n: n}
172			// Trim the leading & trailing "/" to make it consistent across all stores
173			if val, ok := kvep[strings.Trim(datastore.Key(ec.Key()...), "/")]; ok {
174				ec = val.(*endpointCnt)
175				ec.n = n
176				n.epCnt = ec
177			}
178			if n.scope == "" {
179				n.scope = store.Scope()
180			}
181			n.Unlock()
182			nl = append(nl, n)
183		}
184	}
185
186	return nl, nil
187}
188
189func (n *network) getEndpointFromStore(eid string) (*endpoint, error) {
190	var errors []string
191	for _, store := range n.ctrlr.getStores() {
192		ep := &endpoint{id: eid, network: n}
193		err := store.GetObject(datastore.Key(ep.Key()...), ep)
194		// Continue searching in the next store if the key is not found in this store
195		if err != nil {
196			if err != datastore.ErrKeyNotFound {
197				errors = append(errors, fmt.Sprintf("{%s:%v}, ", store.Scope(), err))
198				logrus.Debugf("could not find endpoint %s in %s: %v", eid, store.Scope(), err)
199			}
200			continue
201		}
202		return ep, nil
203	}
204	return nil, fmt.Errorf("could not find endpoint %s: %v", eid, errors)
205}
206
207func (n *network) getEndpointsFromStore() ([]*endpoint, error) {
208	var epl []*endpoint
209
210	tmp := endpoint{network: n}
211	for _, store := range n.getController().getStores() {
212		kvol, err := store.List(datastore.Key(tmp.KeyPrefix()...), &endpoint{network: n})
213		// Continue searching in the next store if no keys found in this store
214		if err != nil {
215			if err != datastore.ErrKeyNotFound {
216				logrus.Debugf("failed to get endpoints for network %s scope %s: %v",
217					n.Name(), store.Scope(), err)
218			}
219			continue
220		}
221
222		for _, kvo := range kvol {
223			ep := kvo.(*endpoint)
224			epl = append(epl, ep)
225		}
226	}
227
228	return epl, nil
229}
230
231func (c *controller) updateToStore(kvObject datastore.KVObject) error {
232	cs := c.getStore(kvObject.DataScope())
233	if cs == nil {
234		return ErrDataStoreNotInitialized(kvObject.DataScope())
235	}
236
237	if err := cs.PutObjectAtomic(kvObject); err != nil {
238		if err == datastore.ErrKeyModified {
239			return err
240		}
241		return fmt.Errorf("failed to update store for object type %T: %v", kvObject, err)
242	}
243
244	return nil
245}
246
247func (c *controller) deleteFromStore(kvObject datastore.KVObject) error {
248	cs := c.getStore(kvObject.DataScope())
249	if cs == nil {
250		return ErrDataStoreNotInitialized(kvObject.DataScope())
251	}
252
253retry:
254	if err := cs.DeleteObjectAtomic(kvObject); err != nil {
255		if err == datastore.ErrKeyModified {
256			if err := cs.GetObject(datastore.Key(kvObject.Key()...), kvObject); err != nil {
257				return fmt.Errorf("could not update the kvobject to latest when trying to delete: %v", err)
258			}
259			logrus.Warnf("Error (%v) deleting object %v, retrying....", err, kvObject.Key())
260			goto retry
261		}
262		return err
263	}
264
265	return nil
266}
267
268type netWatch struct {
269	localEps  map[string]*endpoint
270	remoteEps map[string]*endpoint
271	stopCh    chan struct{}
272}
273
274func (c *controller) getLocalEps(nw *netWatch) []*endpoint {
275	c.Lock()
276	defer c.Unlock()
277
278	var epl []*endpoint
279	for _, ep := range nw.localEps {
280		epl = append(epl, ep)
281	}
282
283	return epl
284}
285
286func (c *controller) watchSvcRecord(ep *endpoint) {
287	c.watchCh <- ep
288}
289
290func (c *controller) unWatchSvcRecord(ep *endpoint) {
291	c.unWatchCh <- ep
292}
293
294func (c *controller) networkWatchLoop(nw *netWatch, ep *endpoint, ecCh <-chan datastore.KVObject) {
295	for {
296		select {
297		case <-nw.stopCh:
298			return
299		case o := <-ecCh:
300			ec := o.(*endpointCnt)
301
302			epl, err := ec.n.getEndpointsFromStore()
303			if err != nil {
304				break
305			}
306
307			c.Lock()
308			var addEp []*endpoint
309
310			delEpMap := make(map[string]*endpoint)
311			renameEpMap := make(map[string]bool)
312			for k, v := range nw.remoteEps {
313				delEpMap[k] = v
314			}
315
316			for _, lEp := range epl {
317				if _, ok := nw.localEps[lEp.ID()]; ok {
318					continue
319				}
320
321				if ep, ok := nw.remoteEps[lEp.ID()]; ok {
322					// On a container rename EP ID will remain
323					// the same but the name will change. service
324					// records should reflect the change.
325					// Keep old EP entry in the delEpMap and add
326					// EP from the store (which has the new name)
327					// into the new list
328					if lEp.name == ep.name {
329						delete(delEpMap, lEp.ID())
330						continue
331					}
332					renameEpMap[lEp.ID()] = true
333				}
334				nw.remoteEps[lEp.ID()] = lEp
335				addEp = append(addEp, lEp)
336			}
337
338			// EPs whose name are to be deleted from the svc records
339			// should also be removed from nw's remote EP list, except
340			// the ones that are getting renamed.
341			for _, lEp := range delEpMap {
342				if !renameEpMap[lEp.ID()] {
343					delete(nw.remoteEps, lEp.ID())
344				}
345			}
346			c.Unlock()
347
348			for _, lEp := range delEpMap {
349				ep.getNetwork().updateSvcRecord(lEp, c.getLocalEps(nw), false)
350
351			}
352			for _, lEp := range addEp {
353				ep.getNetwork().updateSvcRecord(lEp, c.getLocalEps(nw), true)
354			}
355		}
356	}
357}
358
359func (c *controller) processEndpointCreate(nmap map[string]*netWatch, ep *endpoint) {
360	n := ep.getNetwork()
361	if !c.isDistributedControl() && n.Scope() == datastore.SwarmScope && n.driverIsMultihost() {
362		return
363	}
364
365	c.Lock()
366	nw, ok := nmap[n.ID()]
367	c.Unlock()
368
369	if ok {
370		// Update the svc db for the local endpoint join right away
371		n.updateSvcRecord(ep, c.getLocalEps(nw), true)
372
373		c.Lock()
374		nw.localEps[ep.ID()] = ep
375
376		// If we had learned that from the kv store remove it
377		// from remote ep list now that we know that this is
378		// indeed a local endpoint
379		delete(nw.remoteEps, ep.ID())
380		c.Unlock()
381		return
382	}
383
384	nw = &netWatch{
385		localEps:  make(map[string]*endpoint),
386		remoteEps: make(map[string]*endpoint),
387	}
388
389	// Update the svc db for the local endpoint join right away
390	// Do this before adding this ep to localEps so that we don't
391	// try to update this ep's container's svc records
392	n.updateSvcRecord(ep, c.getLocalEps(nw), true)
393
394	c.Lock()
395	nw.localEps[ep.ID()] = ep
396	nmap[n.ID()] = nw
397	nw.stopCh = make(chan struct{})
398	c.Unlock()
399
400	store := c.getStore(n.DataScope())
401	if store == nil {
402		return
403	}
404
405	if !store.Watchable() {
406		return
407	}
408
409	ch, err := store.Watch(n.getEpCnt(), nw.stopCh)
410	if err != nil {
411		logrus.Warnf("Error creating watch for network: %v", err)
412		return
413	}
414
415	go c.networkWatchLoop(nw, ep, ch)
416}
417
418func (c *controller) processEndpointDelete(nmap map[string]*netWatch, ep *endpoint) {
419	n := ep.getNetwork()
420	if !c.isDistributedControl() && n.Scope() == datastore.SwarmScope && n.driverIsMultihost() {
421		return
422	}
423
424	c.Lock()
425	nw, ok := nmap[n.ID()]
426
427	if ok {
428		delete(nw.localEps, ep.ID())
429		c.Unlock()
430
431		// Update the svc db about local endpoint leave right away
432		// Do this after we remove this ep from localEps so that we
433		// don't try to remove this svc record from this ep's container.
434		n.updateSvcRecord(ep, c.getLocalEps(nw), false)
435
436		c.Lock()
437		if len(nw.localEps) == 0 {
438			close(nw.stopCh)
439
440			// This is the last container going away for the network. Destroy
441			// this network's svc db entry
442			delete(c.svcRecords, n.ID())
443
444			delete(nmap, n.ID())
445		}
446	}
447	c.Unlock()
448}
449
450func (c *controller) watchLoop() {
451	for {
452		select {
453		case ep := <-c.watchCh:
454			c.processEndpointCreate(c.nmap, ep)
455		case ep := <-c.unWatchCh:
456			c.processEndpointDelete(c.nmap, ep)
457		}
458	}
459}
460
461func (c *controller) startWatch() {
462	if c.watchCh != nil {
463		return
464	}
465	c.watchCh = make(chan *endpoint)
466	c.unWatchCh = make(chan *endpoint)
467	c.nmap = make(map[string]*netWatch)
468
469	go c.watchLoop()
470}
471
472func (c *controller) networkCleanup() {
473	networks, err := c.getNetworksFromStore()
474	if err != nil {
475		logrus.Warnf("Could not retrieve networks from store(s) during network cleanup: %v", err)
476		return
477	}
478
479	for _, n := range networks {
480		if n.inDelete {
481			logrus.Infof("Removing stale network %s (%s)", n.Name(), n.ID())
482			if err := n.delete(true, true); err != nil {
483				logrus.Debugf("Error while removing stale network: %v", err)
484			}
485		}
486	}
487}
488
489var populateSpecial NetworkWalker = func(nw Network) bool {
490	if n := nw.(*network); n.hasSpecialDriver() && !n.ConfigOnly() {
491		if err := n.getController().addNetwork(n); err != nil {
492			logrus.Warnf("Failed to populate network %q with driver %q", nw.Name(), nw.Type())
493		}
494	}
495	return false
496}
497