1package overlay 2 3//go:generate protoc -I.:../../Godeps/_workspace/src/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork/drivers/overlay,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. overlay.proto 4 5import ( 6 "context" 7 "fmt" 8 "net" 9 "sync" 10 11 "github.com/docker/libnetwork/datastore" 12 "github.com/docker/libnetwork/discoverapi" 13 "github.com/docker/libnetwork/driverapi" 14 "github.com/docker/libnetwork/idm" 15 "github.com/docker/libnetwork/netlabel" 16 "github.com/docker/libnetwork/osl" 17 "github.com/docker/libnetwork/types" 18 "github.com/hashicorp/serf/serf" 19 "github.com/sirupsen/logrus" 20) 21 22const ( 23 networkType = "overlay" 24 vethPrefix = "veth" 25 vethLen = 7 26 vxlanIDStart = 256 27 vxlanIDEnd = (1 << 24) - 1 28 vxlanPort = 4789 29 vxlanEncap = 50 30 secureOption = "encrypted" 31) 32 33var initVxlanIdm = make(chan (bool), 1) 34 35type driver struct { 36 eventCh chan serf.Event 37 notifyCh chan ovNotify 38 exitCh chan chan struct{} 39 bindAddress string 40 advertiseAddress string 41 neighIP string 42 config map[string]interface{} 43 peerDb peerNetworkMap 44 secMap *encrMap 45 serfInstance *serf.Serf 46 networks networkTable 47 store datastore.DataStore 48 localStore datastore.DataStore 49 vxlanIdm *idm.Idm 50 initOS sync.Once 51 joinOnce sync.Once 52 localJoinOnce sync.Once 53 keys []*key 54 peerOpCh chan *peerOperation 55 peerOpCancel context.CancelFunc 56 sync.Mutex 57} 58 59// Init registers a new instance of overlay driver 60func Init(dc driverapi.DriverCallback, config map[string]interface{}) error { 61 c := driverapi.Capability{ 62 DataScope: datastore.GlobalScope, 63 ConnectivityScope: datastore.GlobalScope, 64 } 65 d := &driver{ 66 networks: networkTable{}, 67 peerDb: peerNetworkMap{ 68 mp: map[string]*peerMap{}, 69 }, 70 secMap: &encrMap{nodes: map[string][]*spi{}}, 71 config: config, 72 peerOpCh: make(chan *peerOperation), 73 } 74 75 // Launch the go routine for processing peer operations 76 ctx, cancel := context.WithCancel(context.Background()) 77 d.peerOpCancel = cancel 78 go d.peerOpRoutine(ctx, d.peerOpCh) 79 80 if data, ok := config[netlabel.GlobalKVClient]; ok { 81 var err error 82 dsc, ok := data.(discoverapi.DatastoreConfigData) 83 if !ok { 84 return types.InternalErrorf("incorrect data in datastore configuration: %v", data) 85 } 86 d.store, err = datastore.NewDataStoreFromConfig(dsc) 87 if err != nil { 88 return types.InternalErrorf("failed to initialize data store: %v", err) 89 } 90 } 91 92 if data, ok := config[netlabel.LocalKVClient]; ok { 93 var err error 94 dsc, ok := data.(discoverapi.DatastoreConfigData) 95 if !ok { 96 return types.InternalErrorf("incorrect data in datastore configuration: %v", data) 97 } 98 d.localStore, err = datastore.NewDataStoreFromConfig(dsc) 99 if err != nil { 100 return types.InternalErrorf("failed to initialize local data store: %v", err) 101 } 102 } 103 104 if err := d.restoreEndpoints(); err != nil { 105 logrus.Warnf("Failure during overlay endpoints restore: %v", err) 106 } 107 108 // If an error happened when the network join the sandbox during the endpoints restore 109 // we should reset it now along with the once variable, so that subsequent endpoint joins 110 // outside of the restore path can potentially fix the network join and succeed. 111 for nid, n := range d.networks { 112 if n.initErr != nil { 113 logrus.Infof("resetting init error and once variable for network %s after unsuccessful endpoint restore: %v", nid, n.initErr) 114 n.initErr = nil 115 n.once = &sync.Once{} 116 } 117 } 118 119 return dc.RegisterDriver(networkType, d, c) 120} 121 122// Endpoints are stored in the local store. Restore them and reconstruct the overlay sandbox 123func (d *driver) restoreEndpoints() error { 124 if d.localStore == nil { 125 logrus.Warn("Cannot restore overlay endpoints because local datastore is missing") 126 return nil 127 } 128 kvol, err := d.localStore.List(datastore.Key(overlayEndpointPrefix), &endpoint{}) 129 if err != nil && err != datastore.ErrKeyNotFound { 130 return fmt.Errorf("failed to read overlay endpoint from store: %v", err) 131 } 132 133 if err == datastore.ErrKeyNotFound { 134 return nil 135 } 136 for _, kvo := range kvol { 137 ep := kvo.(*endpoint) 138 n := d.network(ep.nid) 139 if n == nil { 140 logrus.Debugf("Network (%s) not found for restored endpoint (%s)", ep.nid[0:7], ep.id[0:7]) 141 logrus.Debugf("Deleting stale overlay endpoint (%s) from store", ep.id[0:7]) 142 if err := d.deleteEndpointFromStore(ep); err != nil { 143 logrus.Debugf("Failed to delete stale overlay endpoint (%s) from store", ep.id[0:7]) 144 } 145 continue 146 } 147 n.addEndpoint(ep) 148 149 s := n.getSubnetforIP(ep.addr) 150 if s == nil { 151 return fmt.Errorf("could not find subnet for endpoint %s", ep.id) 152 } 153 154 if err := n.joinSandbox(true); err != nil { 155 return fmt.Errorf("restore network sandbox failed: %v", err) 156 } 157 158 if err := n.joinSubnetSandbox(s, true); err != nil { 159 return fmt.Errorf("restore subnet sandbox failed for %q: %v", s.subnetIP.String(), err) 160 } 161 162 Ifaces := make(map[string][]osl.IfaceOption) 163 vethIfaceOption := make([]osl.IfaceOption, 1) 164 vethIfaceOption = append(vethIfaceOption, n.sbox.InterfaceOptions().Master(s.brName)) 165 Ifaces["veth+veth"] = vethIfaceOption 166 167 err := n.sbox.Restore(Ifaces, nil, nil, nil) 168 if err != nil { 169 return fmt.Errorf("failed to restore overlay sandbox: %v", err) 170 } 171 172 n.incEndpointCount() 173 d.peerAdd(ep.nid, ep.id, ep.addr.IP, ep.addr.Mask, ep.mac, net.ParseIP(d.advertiseAddress), false, false, true) 174 } 175 return nil 176} 177 178// Fini cleans up the driver resources 179func Fini(drv driverapi.Driver) { 180 d := drv.(*driver) 181 182 // Notify the peer go routine to return 183 if d.peerOpCancel != nil { 184 d.peerOpCancel() 185 } 186 187 if d.exitCh != nil { 188 waitCh := make(chan struct{}) 189 190 d.exitCh <- waitCh 191 192 <-waitCh 193 } 194} 195 196func (d *driver) configure() error { 197 198 // Apply OS specific kernel configs if needed 199 d.initOS.Do(applyOStweaks) 200 201 if d.store == nil { 202 return nil 203 } 204 205 if d.vxlanIdm == nil { 206 return d.initializeVxlanIdm() 207 } 208 209 return nil 210} 211 212func (d *driver) initializeVxlanIdm() error { 213 var err error 214 215 initVxlanIdm <- true 216 defer func() { <-initVxlanIdm }() 217 218 if d.vxlanIdm != nil { 219 return nil 220 } 221 222 d.vxlanIdm, err = idm.New(d.store, "vxlan-id", vxlanIDStart, vxlanIDEnd) 223 if err != nil { 224 return fmt.Errorf("failed to initialize vxlan id manager: %v", err) 225 } 226 227 return nil 228} 229 230func (d *driver) Type() string { 231 return networkType 232} 233 234func (d *driver) IsBuiltIn() bool { 235 return true 236} 237 238func validateSelf(node string) error { 239 advIP := net.ParseIP(node) 240 if advIP == nil { 241 return fmt.Errorf("invalid self address (%s)", node) 242 } 243 244 addrs, err := net.InterfaceAddrs() 245 if err != nil { 246 return fmt.Errorf("Unable to get interface addresses %v", err) 247 } 248 for _, addr := range addrs { 249 ip, _, err := net.ParseCIDR(addr.String()) 250 if err == nil && ip.Equal(advIP) { 251 return nil 252 } 253 } 254 return fmt.Errorf("Multi-Host overlay networking requires cluster-advertise(%s) to be configured with a local ip-address that is reachable within the cluster", advIP.String()) 255} 256 257func (d *driver) nodeJoin(advertiseAddress, bindAddress string, self bool) { 258 if self && !d.isSerfAlive() { 259 d.Lock() 260 d.advertiseAddress = advertiseAddress 261 d.bindAddress = bindAddress 262 d.Unlock() 263 264 // If containers are already running on this network update the 265 // advertise address in the peerDB 266 d.localJoinOnce.Do(func() { 267 d.peerDBUpdateSelf() 268 }) 269 270 // If there is no cluster store there is no need to start serf. 271 if d.store != nil { 272 if err := validateSelf(advertiseAddress); err != nil { 273 logrus.Warn(err.Error()) 274 } 275 err := d.serfInit() 276 if err != nil { 277 logrus.Errorf("initializing serf instance failed: %v", err) 278 d.Lock() 279 d.advertiseAddress = "" 280 d.bindAddress = "" 281 d.Unlock() 282 return 283 } 284 } 285 } 286 287 d.Lock() 288 if !self { 289 d.neighIP = advertiseAddress 290 } 291 neighIP := d.neighIP 292 d.Unlock() 293 294 if d.serfInstance != nil && neighIP != "" { 295 var err error 296 d.joinOnce.Do(func() { 297 err = d.serfJoin(neighIP) 298 if err == nil { 299 d.pushLocalDb() 300 } 301 }) 302 if err != nil { 303 logrus.Errorf("joining serf neighbor %s failed: %v", advertiseAddress, err) 304 d.Lock() 305 d.joinOnce = sync.Once{} 306 d.Unlock() 307 return 308 } 309 } 310} 311 312func (d *driver) pushLocalEndpointEvent(action, nid, eid string) { 313 n := d.network(nid) 314 if n == nil { 315 logrus.Debugf("Error pushing local endpoint event for network %s", nid) 316 return 317 } 318 ep := n.endpoint(eid) 319 if ep == nil { 320 logrus.Debugf("Error pushing local endpoint event for ep %s / %s", nid, eid) 321 return 322 } 323 324 if !d.isSerfAlive() { 325 return 326 } 327 d.notifyCh <- ovNotify{ 328 action: "join", 329 nw: n, 330 ep: ep, 331 } 332} 333 334// DiscoverNew is a notification for a new discovery event, such as a new node joining a cluster 335func (d *driver) DiscoverNew(dType discoverapi.DiscoveryType, data interface{}) error { 336 var err error 337 switch dType { 338 case discoverapi.NodeDiscovery: 339 nodeData, ok := data.(discoverapi.NodeDiscoveryData) 340 if !ok || nodeData.Address == "" { 341 return fmt.Errorf("invalid discovery data") 342 } 343 d.nodeJoin(nodeData.Address, nodeData.BindAddress, nodeData.Self) 344 case discoverapi.DatastoreConfig: 345 if d.store != nil { 346 return types.ForbiddenErrorf("cannot accept datastore configuration: Overlay driver has a datastore configured already") 347 } 348 dsc, ok := data.(discoverapi.DatastoreConfigData) 349 if !ok { 350 return types.InternalErrorf("incorrect data in datastore configuration: %v", data) 351 } 352 d.store, err = datastore.NewDataStoreFromConfig(dsc) 353 if err != nil { 354 return types.InternalErrorf("failed to initialize data store: %v", err) 355 } 356 case discoverapi.EncryptionKeysConfig: 357 encrData, ok := data.(discoverapi.DriverEncryptionConfig) 358 if !ok { 359 return fmt.Errorf("invalid encryption key notification data") 360 } 361 keys := make([]*key, 0, len(encrData.Keys)) 362 for i := 0; i < len(encrData.Keys); i++ { 363 k := &key{ 364 value: encrData.Keys[i], 365 tag: uint32(encrData.Tags[i]), 366 } 367 keys = append(keys, k) 368 } 369 if err := d.setKeys(keys); err != nil { 370 logrus.Warn(err) 371 } 372 case discoverapi.EncryptionKeysUpdate: 373 var newKey, delKey, priKey *key 374 encrData, ok := data.(discoverapi.DriverEncryptionUpdate) 375 if !ok { 376 return fmt.Errorf("invalid encryption key notification data") 377 } 378 if encrData.Key != nil { 379 newKey = &key{ 380 value: encrData.Key, 381 tag: uint32(encrData.Tag), 382 } 383 } 384 if encrData.Primary != nil { 385 priKey = &key{ 386 value: encrData.Primary, 387 tag: uint32(encrData.PrimaryTag), 388 } 389 } 390 if encrData.Prune != nil { 391 delKey = &key{ 392 value: encrData.Prune, 393 tag: uint32(encrData.PruneTag), 394 } 395 } 396 if err := d.updateKeys(newKey, priKey, delKey); err != nil { 397 logrus.Warn(err) 398 } 399 default: 400 } 401 return nil 402} 403 404// DiscoverDelete is a notification for a discovery delete event, such as a node leaving a cluster 405func (d *driver) DiscoverDelete(dType discoverapi.DiscoveryType, data interface{}) error { 406 return nil 407} 408