1package libnetwork 2 3import ( 4 "fmt" 5 "strings" 6 7 "github.com/docker/libkv/store/boltdb" 8 "github.com/docker/libkv/store/consul" 9 "github.com/docker/libkv/store/etcd" 10 "github.com/docker/libkv/store/zookeeper" 11 "github.com/docker/libnetwork/datastore" 12 "github.com/sirupsen/logrus" 13) 14 15func registerKVStores() { 16 consul.Register() 17 zookeeper.Register() 18 etcd.Register() 19 boltdb.Register() 20} 21 22func (c *controller) initScopedStore(scope string, scfg *datastore.ScopeCfg) error { 23 store, err := datastore.NewDataStore(scope, scfg) 24 if err != nil { 25 return err 26 } 27 c.Lock() 28 c.stores = append(c.stores, store) 29 c.Unlock() 30 31 return nil 32} 33 34func (c *controller) initStores() error { 35 registerKVStores() 36 37 c.Lock() 38 if c.cfg == nil { 39 c.Unlock() 40 return nil 41 } 42 scopeConfigs := c.cfg.Scopes 43 c.stores = nil 44 c.Unlock() 45 46 for scope, scfg := range scopeConfigs { 47 if err := c.initScopedStore(scope, scfg); err != nil { 48 return err 49 } 50 } 51 52 c.startWatch() 53 return nil 54} 55 56func (c *controller) closeStores() { 57 for _, store := range c.getStores() { 58 store.Close() 59 } 60} 61 62func (c *controller) getStore(scope string) datastore.DataStore { 63 c.Lock() 64 defer c.Unlock() 65 66 for _, store := range c.stores { 67 if store.Scope() == scope { 68 return store 69 } 70 } 71 72 return nil 73} 74 75func (c *controller) getStores() []datastore.DataStore { 76 c.Lock() 77 defer c.Unlock() 78 79 return c.stores 80} 81 82func (c *controller) getNetworkFromStore(nid string) (*network, error) { 83 for _, store := range c.getStores() { 84 n := &network{id: nid, ctrlr: c} 85 err := store.GetObject(datastore.Key(n.Key()...), n) 86 // Continue searching in the next store if the key is not found in this store 87 if err != nil { 88 if err != datastore.ErrKeyNotFound { 89 logrus.Debugf("could not find network %s: %v", nid, err) 90 } 91 continue 92 } 93 94 ec := &endpointCnt{n: n} 95 err = store.GetObject(datastore.Key(ec.Key()...), ec) 96 if err != nil && !n.inDelete { 97 return nil, fmt.Errorf("could not find endpoint count for network %s: %v", n.Name(), err) 98 } 99 100 n.epCnt = ec 101 if n.scope == "" { 102 n.scope = store.Scope() 103 } 104 return n, nil 105 } 106 107 return nil, fmt.Errorf("network %s not found", nid) 108} 109 110func (c *controller) getNetworksForScope(scope string) ([]*network, error) { 111 var nl []*network 112 113 store := c.getStore(scope) 114 if store == nil { 115 return nil, nil 116 } 117 118 kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix), 119 &network{ctrlr: c}) 120 if err != nil && err != datastore.ErrKeyNotFound { 121 return nil, fmt.Errorf("failed to get networks for scope %s: %v", 122 scope, err) 123 } 124 125 for _, kvo := range kvol { 126 n := kvo.(*network) 127 n.ctrlr = c 128 129 ec := &endpointCnt{n: n} 130 err = store.GetObject(datastore.Key(ec.Key()...), ec) 131 if err != nil && !n.inDelete { 132 logrus.Warnf("Could not find endpoint count key %s for network %s while listing: %v", datastore.Key(ec.Key()...), n.Name(), err) 133 continue 134 } 135 136 n.epCnt = ec 137 if n.scope == "" { 138 n.scope = scope 139 } 140 nl = append(nl, n) 141 } 142 143 return nl, nil 144} 145 146func (c *controller) getNetworksFromStore() ([]*network, error) { 147 var nl []*network 148 149 for _, store := range c.getStores() { 150 kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix), 151 &network{ctrlr: c}) 152 // Continue searching in the next store if no keys found in this store 153 if err != nil { 154 if err != datastore.ErrKeyNotFound { 155 logrus.Debugf("failed to get networks for scope %s: %v", store.Scope(), err) 156 } 157 continue 158 } 159 160 kvep, err := store.Map(datastore.Key(epCntKeyPrefix), &endpointCnt{}) 161 if err != nil { 162 if err != datastore.ErrKeyNotFound { 163 logrus.Warnf("failed to get endpoint_count map for scope %s: %v", store.Scope(), err) 164 } 165 } 166 167 for _, kvo := range kvol { 168 n := kvo.(*network) 169 n.Lock() 170 n.ctrlr = c 171 ec := &endpointCnt{n: n} 172 // Trim the leading & trailing "/" to make it consistent across all stores 173 if val, ok := kvep[strings.Trim(datastore.Key(ec.Key()...), "/")]; ok { 174 ec = val.(*endpointCnt) 175 ec.n = n 176 n.epCnt = ec 177 } 178 if n.scope == "" { 179 n.scope = store.Scope() 180 } 181 n.Unlock() 182 nl = append(nl, n) 183 } 184 } 185 186 return nl, nil 187} 188 189func (n *network) getEndpointFromStore(eid string) (*endpoint, error) { 190 var errors []string 191 for _, store := range n.ctrlr.getStores() { 192 ep := &endpoint{id: eid, network: n} 193 err := store.GetObject(datastore.Key(ep.Key()...), ep) 194 // Continue searching in the next store if the key is not found in this store 195 if err != nil { 196 if err != datastore.ErrKeyNotFound { 197 errors = append(errors, fmt.Sprintf("{%s:%v}, ", store.Scope(), err)) 198 logrus.Debugf("could not find endpoint %s in %s: %v", eid, store.Scope(), err) 199 } 200 continue 201 } 202 return ep, nil 203 } 204 return nil, fmt.Errorf("could not find endpoint %s: %v", eid, errors) 205} 206 207func (n *network) getEndpointsFromStore() ([]*endpoint, error) { 208 var epl []*endpoint 209 210 tmp := endpoint{network: n} 211 for _, store := range n.getController().getStores() { 212 kvol, err := store.List(datastore.Key(tmp.KeyPrefix()...), &endpoint{network: n}) 213 // Continue searching in the next store if no keys found in this store 214 if err != nil { 215 if err != datastore.ErrKeyNotFound { 216 logrus.Debugf("failed to get endpoints for network %s scope %s: %v", 217 n.Name(), store.Scope(), err) 218 } 219 continue 220 } 221 222 for _, kvo := range kvol { 223 ep := kvo.(*endpoint) 224 epl = append(epl, ep) 225 } 226 } 227 228 return epl, nil 229} 230 231func (c *controller) updateToStore(kvObject datastore.KVObject) error { 232 cs := c.getStore(kvObject.DataScope()) 233 if cs == nil { 234 return ErrDataStoreNotInitialized(kvObject.DataScope()) 235 } 236 237 if err := cs.PutObjectAtomic(kvObject); err != nil { 238 if err == datastore.ErrKeyModified { 239 return err 240 } 241 return fmt.Errorf("failed to update store for object type %T: %v", kvObject, err) 242 } 243 244 return nil 245} 246 247func (c *controller) deleteFromStore(kvObject datastore.KVObject) error { 248 cs := c.getStore(kvObject.DataScope()) 249 if cs == nil { 250 return ErrDataStoreNotInitialized(kvObject.DataScope()) 251 } 252 253retry: 254 if err := cs.DeleteObjectAtomic(kvObject); err != nil { 255 if err == datastore.ErrKeyModified { 256 if err := cs.GetObject(datastore.Key(kvObject.Key()...), kvObject); err != nil { 257 return fmt.Errorf("could not update the kvobject to latest when trying to delete: %v", err) 258 } 259 logrus.Warnf("Error (%v) deleting object %v, retrying....", err, kvObject.Key()) 260 goto retry 261 } 262 return err 263 } 264 265 return nil 266} 267 268type netWatch struct { 269 localEps map[string]*endpoint 270 remoteEps map[string]*endpoint 271 stopCh chan struct{} 272} 273 274func (c *controller) getLocalEps(nw *netWatch) []*endpoint { 275 c.Lock() 276 defer c.Unlock() 277 278 var epl []*endpoint 279 for _, ep := range nw.localEps { 280 epl = append(epl, ep) 281 } 282 283 return epl 284} 285 286func (c *controller) watchSvcRecord(ep *endpoint) { 287 c.watchCh <- ep 288} 289 290func (c *controller) unWatchSvcRecord(ep *endpoint) { 291 c.unWatchCh <- ep 292} 293 294func (c *controller) networkWatchLoop(nw *netWatch, ep *endpoint, ecCh <-chan datastore.KVObject) { 295 for { 296 select { 297 case <-nw.stopCh: 298 return 299 case o := <-ecCh: 300 ec := o.(*endpointCnt) 301 302 epl, err := ec.n.getEndpointsFromStore() 303 if err != nil { 304 break 305 } 306 307 c.Lock() 308 var addEp []*endpoint 309 310 delEpMap := make(map[string]*endpoint) 311 renameEpMap := make(map[string]bool) 312 for k, v := range nw.remoteEps { 313 delEpMap[k] = v 314 } 315 316 for _, lEp := range epl { 317 if _, ok := nw.localEps[lEp.ID()]; ok { 318 continue 319 } 320 321 if ep, ok := nw.remoteEps[lEp.ID()]; ok { 322 // On a container rename EP ID will remain 323 // the same but the name will change. service 324 // records should reflect the change. 325 // Keep old EP entry in the delEpMap and add 326 // EP from the store (which has the new name) 327 // into the new list 328 if lEp.name == ep.name { 329 delete(delEpMap, lEp.ID()) 330 continue 331 } 332 renameEpMap[lEp.ID()] = true 333 } 334 nw.remoteEps[lEp.ID()] = lEp 335 addEp = append(addEp, lEp) 336 } 337 338 // EPs whose name are to be deleted from the svc records 339 // should also be removed from nw's remote EP list, except 340 // the ones that are getting renamed. 341 for _, lEp := range delEpMap { 342 if !renameEpMap[lEp.ID()] { 343 delete(nw.remoteEps, lEp.ID()) 344 } 345 } 346 c.Unlock() 347 348 for _, lEp := range delEpMap { 349 ep.getNetwork().updateSvcRecord(lEp, c.getLocalEps(nw), false) 350 351 } 352 for _, lEp := range addEp { 353 ep.getNetwork().updateSvcRecord(lEp, c.getLocalEps(nw), true) 354 } 355 } 356 } 357} 358 359func (c *controller) processEndpointCreate(nmap map[string]*netWatch, ep *endpoint) { 360 n := ep.getNetwork() 361 if !c.isDistributedControl() && n.Scope() == datastore.SwarmScope && n.driverIsMultihost() { 362 return 363 } 364 365 c.Lock() 366 nw, ok := nmap[n.ID()] 367 c.Unlock() 368 369 if ok { 370 // Update the svc db for the local endpoint join right away 371 n.updateSvcRecord(ep, c.getLocalEps(nw), true) 372 373 c.Lock() 374 nw.localEps[ep.ID()] = ep 375 376 // If we had learned that from the kv store remove it 377 // from remote ep list now that we know that this is 378 // indeed a local endpoint 379 delete(nw.remoteEps, ep.ID()) 380 c.Unlock() 381 return 382 } 383 384 nw = &netWatch{ 385 localEps: make(map[string]*endpoint), 386 remoteEps: make(map[string]*endpoint), 387 } 388 389 // Update the svc db for the local endpoint join right away 390 // Do this before adding this ep to localEps so that we don't 391 // try to update this ep's container's svc records 392 n.updateSvcRecord(ep, c.getLocalEps(nw), true) 393 394 c.Lock() 395 nw.localEps[ep.ID()] = ep 396 nmap[n.ID()] = nw 397 nw.stopCh = make(chan struct{}) 398 c.Unlock() 399 400 store := c.getStore(n.DataScope()) 401 if store == nil { 402 return 403 } 404 405 if !store.Watchable() { 406 return 407 } 408 409 ch, err := store.Watch(n.getEpCnt(), nw.stopCh) 410 if err != nil { 411 logrus.Warnf("Error creating watch for network: %v", err) 412 return 413 } 414 415 go c.networkWatchLoop(nw, ep, ch) 416} 417 418func (c *controller) processEndpointDelete(nmap map[string]*netWatch, ep *endpoint) { 419 n := ep.getNetwork() 420 if !c.isDistributedControl() && n.Scope() == datastore.SwarmScope && n.driverIsMultihost() { 421 return 422 } 423 424 c.Lock() 425 nw, ok := nmap[n.ID()] 426 427 if ok { 428 delete(nw.localEps, ep.ID()) 429 c.Unlock() 430 431 // Update the svc db about local endpoint leave right away 432 // Do this after we remove this ep from localEps so that we 433 // don't try to remove this svc record from this ep's container. 434 n.updateSvcRecord(ep, c.getLocalEps(nw), false) 435 436 c.Lock() 437 if len(nw.localEps) == 0 { 438 close(nw.stopCh) 439 440 // This is the last container going away for the network. Destroy 441 // this network's svc db entry 442 delete(c.svcRecords, n.ID()) 443 444 delete(nmap, n.ID()) 445 } 446 } 447 c.Unlock() 448} 449 450func (c *controller) watchLoop() { 451 for { 452 select { 453 case ep := <-c.watchCh: 454 c.processEndpointCreate(c.nmap, ep) 455 case ep := <-c.unWatchCh: 456 c.processEndpointDelete(c.nmap, ep) 457 } 458 } 459} 460 461func (c *controller) startWatch() { 462 if c.watchCh != nil { 463 return 464 } 465 c.watchCh = make(chan *endpoint) 466 c.unWatchCh = make(chan *endpoint) 467 c.nmap = make(map[string]*netWatch) 468 469 go c.watchLoop() 470} 471 472func (c *controller) networkCleanup() { 473 networks, err := c.getNetworksFromStore() 474 if err != nil { 475 logrus.Warnf("Could not retrieve networks from store(s) during network cleanup: %v", err) 476 return 477 } 478 479 for _, n := range networks { 480 if n.inDelete { 481 logrus.Infof("Removing stale network %s (%s)", n.Name(), n.ID()) 482 if err := n.delete(true, true); err != nil { 483 logrus.Debugf("Error while removing stale network: %v", err) 484 } 485 } 486 } 487} 488 489var populateSpecial NetworkWalker = func(nw Network) bool { 490 if n := nw.(*network); n.hasSpecialDriver() && !n.ConfigOnly() { 491 if err := n.getController().addNetwork(n); err != nil { 492 logrus.Warnf("Failed to populate network %q with driver %q", nw.Name(), nw.Type()) 493 } 494 } 495 return false 496} 497