1package overlay
2
3//go:generate protoc -I.:../../Godeps/_workspace/src/github.com/gogo/protobuf  --gogo_out=import_path=github.com/docker/libnetwork/drivers/overlay,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. overlay.proto
4
5import (
6	"context"
7	"fmt"
8	"net"
9	"sync"
10
11	"github.com/docker/libnetwork/datastore"
12	"github.com/docker/libnetwork/discoverapi"
13	"github.com/docker/libnetwork/driverapi"
14	"github.com/docker/libnetwork/idm"
15	"github.com/docker/libnetwork/netlabel"
16	"github.com/docker/libnetwork/osl"
17	"github.com/docker/libnetwork/types"
18	"github.com/hashicorp/serf/serf"
19	"github.com/sirupsen/logrus"
20)
21
22const (
23	networkType  = "overlay"
24	vethPrefix   = "veth"
25	vethLen      = 7
26	vxlanIDStart = 256
27	vxlanIDEnd   = (1 << 24) - 1
28	vxlanPort    = 4789
29	vxlanEncap   = 50
30	secureOption = "encrypted"
31)
32
33var initVxlanIdm = make(chan (bool), 1)
34
35type driver struct {
36	eventCh          chan serf.Event
37	notifyCh         chan ovNotify
38	exitCh           chan chan struct{}
39	bindAddress      string
40	advertiseAddress string
41	neighIP          string
42	config           map[string]interface{}
43	peerDb           peerNetworkMap
44	secMap           *encrMap
45	serfInstance     *serf.Serf
46	networks         networkTable
47	store            datastore.DataStore
48	localStore       datastore.DataStore
49	vxlanIdm         *idm.Idm
50	initOS           sync.Once
51	joinOnce         sync.Once
52	localJoinOnce    sync.Once
53	keys             []*key
54	peerOpCh         chan *peerOperation
55	peerOpCancel     context.CancelFunc
56	sync.Mutex
57}
58
59// Init registers a new instance of overlay driver
60func Init(dc driverapi.DriverCallback, config map[string]interface{}) error {
61	c := driverapi.Capability{
62		DataScope:         datastore.GlobalScope,
63		ConnectivityScope: datastore.GlobalScope,
64	}
65	d := &driver{
66		networks: networkTable{},
67		peerDb: peerNetworkMap{
68			mp: map[string]*peerMap{},
69		},
70		secMap:   &encrMap{nodes: map[string][]*spi{}},
71		config:   config,
72		peerOpCh: make(chan *peerOperation),
73	}
74
75	// Launch the go routine for processing peer operations
76	ctx, cancel := context.WithCancel(context.Background())
77	d.peerOpCancel = cancel
78	go d.peerOpRoutine(ctx, d.peerOpCh)
79
80	if data, ok := config[netlabel.GlobalKVClient]; ok {
81		var err error
82		dsc, ok := data.(discoverapi.DatastoreConfigData)
83		if !ok {
84			return types.InternalErrorf("incorrect data in datastore configuration: %v", data)
85		}
86		d.store, err = datastore.NewDataStoreFromConfig(dsc)
87		if err != nil {
88			return types.InternalErrorf("failed to initialize data store: %v", err)
89		}
90	}
91
92	if data, ok := config[netlabel.LocalKVClient]; ok {
93		var err error
94		dsc, ok := data.(discoverapi.DatastoreConfigData)
95		if !ok {
96			return types.InternalErrorf("incorrect data in datastore configuration: %v", data)
97		}
98		d.localStore, err = datastore.NewDataStoreFromConfig(dsc)
99		if err != nil {
100			return types.InternalErrorf("failed to initialize local data store: %v", err)
101		}
102	}
103
104	if err := d.restoreEndpoints(); err != nil {
105		logrus.Warnf("Failure during overlay endpoints restore: %v", err)
106	}
107
108	// If an error happened when the network join the sandbox during the endpoints restore
109	// we should reset it now along with the once variable, so that subsequent endpoint joins
110	// outside of the restore path can potentially fix the network join and succeed.
111	for nid, n := range d.networks {
112		if n.initErr != nil {
113			logrus.Infof("resetting init error and once variable for network %s after unsuccessful endpoint restore: %v", nid, n.initErr)
114			n.initErr = nil
115			n.once = &sync.Once{}
116		}
117	}
118
119	return dc.RegisterDriver(networkType, d, c)
120}
121
122// Endpoints are stored in the local store. Restore them and reconstruct the overlay sandbox
123func (d *driver) restoreEndpoints() error {
124	if d.localStore == nil {
125		logrus.Warn("Cannot restore overlay endpoints because local datastore is missing")
126		return nil
127	}
128	kvol, err := d.localStore.List(datastore.Key(overlayEndpointPrefix), &endpoint{})
129	if err != nil && err != datastore.ErrKeyNotFound {
130		return fmt.Errorf("failed to read overlay endpoint from store: %v", err)
131	}
132
133	if err == datastore.ErrKeyNotFound {
134		return nil
135	}
136	for _, kvo := range kvol {
137		ep := kvo.(*endpoint)
138		n := d.network(ep.nid)
139		if n == nil {
140			logrus.Debugf("Network (%s) not found for restored endpoint (%s)", ep.nid[0:7], ep.id[0:7])
141			logrus.Debugf("Deleting stale overlay endpoint (%s) from store", ep.id[0:7])
142			if err := d.deleteEndpointFromStore(ep); err != nil {
143				logrus.Debugf("Failed to delete stale overlay endpoint (%s) from store", ep.id[0:7])
144			}
145			continue
146		}
147		n.addEndpoint(ep)
148
149		s := n.getSubnetforIP(ep.addr)
150		if s == nil {
151			return fmt.Errorf("could not find subnet for endpoint %s", ep.id)
152		}
153
154		if err := n.joinSandbox(true); err != nil {
155			return fmt.Errorf("restore network sandbox failed: %v", err)
156		}
157
158		if err := n.joinSubnetSandbox(s, true); err != nil {
159			return fmt.Errorf("restore subnet sandbox failed for %q: %v", s.subnetIP.String(), err)
160		}
161
162		Ifaces := make(map[string][]osl.IfaceOption)
163		vethIfaceOption := make([]osl.IfaceOption, 1)
164		vethIfaceOption = append(vethIfaceOption, n.sbox.InterfaceOptions().Master(s.brName))
165		Ifaces["veth+veth"] = vethIfaceOption
166
167		err := n.sbox.Restore(Ifaces, nil, nil, nil)
168		if err != nil {
169			return fmt.Errorf("failed to restore overlay sandbox: %v", err)
170		}
171
172		n.incEndpointCount()
173		d.peerAdd(ep.nid, ep.id, ep.addr.IP, ep.addr.Mask, ep.mac, net.ParseIP(d.advertiseAddress), false, false, true)
174	}
175	return nil
176}
177
178// Fini cleans up the driver resources
179func Fini(drv driverapi.Driver) {
180	d := drv.(*driver)
181
182	// Notify the peer go routine to return
183	if d.peerOpCancel != nil {
184		d.peerOpCancel()
185	}
186
187	if d.exitCh != nil {
188		waitCh := make(chan struct{})
189
190		d.exitCh <- waitCh
191
192		<-waitCh
193	}
194}
195
196func (d *driver) configure() error {
197
198	// Apply OS specific kernel configs if needed
199	d.initOS.Do(applyOStweaks)
200
201	if d.store == nil {
202		return nil
203	}
204
205	if d.vxlanIdm == nil {
206		return d.initializeVxlanIdm()
207	}
208
209	return nil
210}
211
212func (d *driver) initializeVxlanIdm() error {
213	var err error
214
215	initVxlanIdm <- true
216	defer func() { <-initVxlanIdm }()
217
218	if d.vxlanIdm != nil {
219		return nil
220	}
221
222	d.vxlanIdm, err = idm.New(d.store, "vxlan-id", vxlanIDStart, vxlanIDEnd)
223	if err != nil {
224		return fmt.Errorf("failed to initialize vxlan id manager: %v", err)
225	}
226
227	return nil
228}
229
230func (d *driver) Type() string {
231	return networkType
232}
233
234func (d *driver) IsBuiltIn() bool {
235	return true
236}
237
238func validateSelf(node string) error {
239	advIP := net.ParseIP(node)
240	if advIP == nil {
241		return fmt.Errorf("invalid self address (%s)", node)
242	}
243
244	addrs, err := net.InterfaceAddrs()
245	if err != nil {
246		return fmt.Errorf("Unable to get interface addresses %v", err)
247	}
248	for _, addr := range addrs {
249		ip, _, err := net.ParseCIDR(addr.String())
250		if err == nil && ip.Equal(advIP) {
251			return nil
252		}
253	}
254	return fmt.Errorf("Multi-Host overlay networking requires cluster-advertise(%s) to be configured with a local ip-address that is reachable within the cluster", advIP.String())
255}
256
257func (d *driver) nodeJoin(advertiseAddress, bindAddress string, self bool) {
258	if self && !d.isSerfAlive() {
259		d.Lock()
260		d.advertiseAddress = advertiseAddress
261		d.bindAddress = bindAddress
262		d.Unlock()
263
264		// If containers are already running on this network update the
265		// advertise address in the peerDB
266		d.localJoinOnce.Do(func() {
267			d.peerDBUpdateSelf()
268		})
269
270		// If there is no cluster store there is no need to start serf.
271		if d.store != nil {
272			if err := validateSelf(advertiseAddress); err != nil {
273				logrus.Warn(err.Error())
274			}
275			err := d.serfInit()
276			if err != nil {
277				logrus.Errorf("initializing serf instance failed: %v", err)
278				d.Lock()
279				d.advertiseAddress = ""
280				d.bindAddress = ""
281				d.Unlock()
282				return
283			}
284		}
285	}
286
287	d.Lock()
288	if !self {
289		d.neighIP = advertiseAddress
290	}
291	neighIP := d.neighIP
292	d.Unlock()
293
294	if d.serfInstance != nil && neighIP != "" {
295		var err error
296		d.joinOnce.Do(func() {
297			err = d.serfJoin(neighIP)
298			if err == nil {
299				d.pushLocalDb()
300			}
301		})
302		if err != nil {
303			logrus.Errorf("joining serf neighbor %s failed: %v", advertiseAddress, err)
304			d.Lock()
305			d.joinOnce = sync.Once{}
306			d.Unlock()
307			return
308		}
309	}
310}
311
312func (d *driver) pushLocalEndpointEvent(action, nid, eid string) {
313	n := d.network(nid)
314	if n == nil {
315		logrus.Debugf("Error pushing local endpoint event for network %s", nid)
316		return
317	}
318	ep := n.endpoint(eid)
319	if ep == nil {
320		logrus.Debugf("Error pushing local endpoint event for ep %s / %s", nid, eid)
321		return
322	}
323
324	if !d.isSerfAlive() {
325		return
326	}
327	d.notifyCh <- ovNotify{
328		action: "join",
329		nw:     n,
330		ep:     ep,
331	}
332}
333
334// DiscoverNew is a notification for a new discovery event, such as a new node joining a cluster
335func (d *driver) DiscoverNew(dType discoverapi.DiscoveryType, data interface{}) error {
336	var err error
337	switch dType {
338	case discoverapi.NodeDiscovery:
339		nodeData, ok := data.(discoverapi.NodeDiscoveryData)
340		if !ok || nodeData.Address == "" {
341			return fmt.Errorf("invalid discovery data")
342		}
343		d.nodeJoin(nodeData.Address, nodeData.BindAddress, nodeData.Self)
344	case discoverapi.DatastoreConfig:
345		if d.store != nil {
346			return types.ForbiddenErrorf("cannot accept datastore configuration: Overlay driver has a datastore configured already")
347		}
348		dsc, ok := data.(discoverapi.DatastoreConfigData)
349		if !ok {
350			return types.InternalErrorf("incorrect data in datastore configuration: %v", data)
351		}
352		d.store, err = datastore.NewDataStoreFromConfig(dsc)
353		if err != nil {
354			return types.InternalErrorf("failed to initialize data store: %v", err)
355		}
356	case discoverapi.EncryptionKeysConfig:
357		encrData, ok := data.(discoverapi.DriverEncryptionConfig)
358		if !ok {
359			return fmt.Errorf("invalid encryption key notification data")
360		}
361		keys := make([]*key, 0, len(encrData.Keys))
362		for i := 0; i < len(encrData.Keys); i++ {
363			k := &key{
364				value: encrData.Keys[i],
365				tag:   uint32(encrData.Tags[i]),
366			}
367			keys = append(keys, k)
368		}
369		if err := d.setKeys(keys); err != nil {
370			logrus.Warn(err)
371		}
372	case discoverapi.EncryptionKeysUpdate:
373		var newKey, delKey, priKey *key
374		encrData, ok := data.(discoverapi.DriverEncryptionUpdate)
375		if !ok {
376			return fmt.Errorf("invalid encryption key notification data")
377		}
378		if encrData.Key != nil {
379			newKey = &key{
380				value: encrData.Key,
381				tag:   uint32(encrData.Tag),
382			}
383		}
384		if encrData.Primary != nil {
385			priKey = &key{
386				value: encrData.Primary,
387				tag:   uint32(encrData.PrimaryTag),
388			}
389		}
390		if encrData.Prune != nil {
391			delKey = &key{
392				value: encrData.Prune,
393				tag:   uint32(encrData.PruneTag),
394			}
395		}
396		if err := d.updateKeys(newKey, priKey, delKey); err != nil {
397			logrus.Warn(err)
398		}
399	default:
400	}
401	return nil
402}
403
404// DiscoverDelete is a notification for a discovery delete event, such as a node leaving a cluster
405func (d *driver) DiscoverDelete(dType discoverapi.DiscoveryType, data interface{}) error {
406	return nil
407}
408