1// Copyright 2014 Google Inc. All Rights Reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// Manager of cAdvisor-monitored containers. 16package manager 17 18import ( 19 "encoding/json" 20 "flag" 21 "fmt" 22 "os" 23 "path" 24 "strconv" 25 "strings" 26 "sync" 27 "time" 28 29 "github.com/google/cadvisor/cache/memory" 30 "github.com/google/cadvisor/collector" 31 "github.com/google/cadvisor/container" 32 "github.com/google/cadvisor/container/docker" 33 "github.com/google/cadvisor/container/raw" 34 "github.com/google/cadvisor/events" 35 "github.com/google/cadvisor/fs" 36 info "github.com/google/cadvisor/info/v1" 37 "github.com/google/cadvisor/info/v2" 38 "github.com/google/cadvisor/utils/cpuload" 39 "github.com/google/cadvisor/utils/oomparser" 40 "github.com/google/cadvisor/utils/sysfs" 41 42 "github.com/golang/glog" 43 "github.com/opencontainers/runc/libcontainer/cgroups" 44) 45 46var globalHousekeepingInterval = flag.Duration("global_housekeeping_interval", 1*time.Minute, "Interval between global housekeepings") 47var logCadvisorUsage = flag.Bool("log_cadvisor_usage", false, "Whether to log the usage of the cAdvisor container") 48var enableLoadReader = flag.Bool("enable_load_reader", false, "Whether to enable cpu load reader") 49var eventStorageAgeLimit = flag.String("event_storage_age_limit", "default=24h", "Max length of time for which to store events (per type). Value is a comma separated list of key values, where the keys are event types (e.g.: creation, oom) or \"default\" and the value is a duration. Default is applied to all non-specified event types") 50var eventStorageEventLimit = flag.String("event_storage_event_limit", "default=100000", "Max number of events to store (per type). Value is a comma separated list of key values, where the keys are event types (e.g.: creation, oom) or \"default\" and the value is an integer. Default is applied to all non-specified event types") 51var applicationMetricsCountLimit = flag.Int("application_metrics_count_limit", 100, "Max number of application metrics to store (per container)") 52 53// The Manager interface defines operations for starting a manager and getting 54// container and machine information. 55type Manager interface { 56 // Start the manager. Calling other manager methods before this returns 57 // may produce undefined behavior. 58 Start() error 59 60 // Stops the manager. 61 Stop() error 62 63 // Get information about a container. 64 GetContainerInfo(containerName string, query *info.ContainerInfoRequest) (*info.ContainerInfo, error) 65 66 // Get V2 information about a container. 67 GetContainerInfoV2(containerName string, options v2.RequestOptions) (map[string]v2.ContainerInfo, error) 68 69 // Get information about all subcontainers of the specified container (includes self). 70 SubcontainersInfo(containerName string, query *info.ContainerInfoRequest) ([]*info.ContainerInfo, error) 71 72 // Gets all the Docker containers. Return is a map from full container name to ContainerInfo. 73 AllDockerContainers(query *info.ContainerInfoRequest) (map[string]info.ContainerInfo, error) 74 75 // Gets information about a specific Docker container. The specified name is within the Docker namespace. 76 DockerContainer(dockerName string, query *info.ContainerInfoRequest) (info.ContainerInfo, error) 77 78 // Gets spec for all containers based on request options. 79 GetContainerSpec(containerName string, options v2.RequestOptions) (map[string]v2.ContainerSpec, error) 80 81 // Gets summary stats for all containers based on request options. 82 GetDerivedStats(containerName string, options v2.RequestOptions) (map[string]v2.DerivedStats, error) 83 84 // Get info for all requested containers based on the request options. 85 GetRequestedContainersInfo(containerName string, options v2.RequestOptions) (map[string]*info.ContainerInfo, error) 86 87 // Returns true if the named container exists. 88 Exists(containerName string) bool 89 90 // Get information about the machine. 91 GetMachineInfo() (*info.MachineInfo, error) 92 93 // Get version information about different components we depend on. 94 GetVersionInfo() (*info.VersionInfo, error) 95 96 // Get filesystem information for a given label. 97 // Returns information for all global filesystems if label is empty. 98 GetFsInfo(label string) ([]v2.FsInfo, error) 99 100 // Get ps output for a container. 101 GetProcessList(containerName string, options v2.RequestOptions) ([]v2.ProcessInfo, error) 102 103 // Get events streamed through passedChannel that fit the request. 104 WatchForEvents(request *events.Request) (*events.EventChannel, error) 105 106 // Get past events that have been detected and that fit the request. 107 GetPastEvents(request *events.Request) ([]*info.Event, error) 108 109 CloseEventChannel(watch_id int) 110 111 // Get status information about docker. 112 DockerInfo() (DockerStatus, error) 113 114 // Get details about interesting docker images. 115 DockerImages() ([]DockerImage, error) 116 117 // Returns debugging information. Map of lines per category. 118 DebugInfo() map[string][]string 119} 120 121// New takes a memory storage and returns a new manager. 122func New(memoryCache *memory.InMemoryCache, sysfs sysfs.SysFs, maxHousekeepingInterval time.Duration, allowDynamicHousekeeping bool) (Manager, error) { 123 if memoryCache == nil { 124 return nil, fmt.Errorf("manager requires memory storage") 125 } 126 127 // Detect the container we are running on. 128 selfContainer, err := cgroups.GetThisCgroupDir("cpu") 129 if err != nil { 130 return nil, err 131 } 132 glog.Infof("cAdvisor running in container: %q", selfContainer) 133 134 dockerInfo, err := docker.DockerInfo() 135 if err != nil { 136 glog.Warningf("Unable to connect to Docker: %v", err) 137 } 138 context := fs.Context{DockerRoot: docker.RootDir(), DockerInfo: dockerInfo} 139 fsInfo, err := fs.NewFsInfo(context) 140 if err != nil { 141 return nil, err 142 } 143 144 // If cAdvisor was started with host's rootfs mounted, assume that its running 145 // in its own namespaces. 146 inHostNamespace := false 147 if _, err := os.Stat("/rootfs/proc"); os.IsNotExist(err) { 148 inHostNamespace = true 149 } 150 newManager := &manager{ 151 containers: make(map[namespacedContainerName]*containerData), 152 quitChannels: make([]chan error, 0, 2), 153 memoryCache: memoryCache, 154 fsInfo: fsInfo, 155 cadvisorContainer: selfContainer, 156 inHostNamespace: inHostNamespace, 157 startupTime: time.Now(), 158 maxHousekeepingInterval: maxHousekeepingInterval, 159 allowDynamicHousekeeping: allowDynamicHousekeeping, 160 } 161 162 machineInfo, err := getMachineInfo(sysfs, fsInfo, inHostNamespace) 163 if err != nil { 164 return nil, err 165 } 166 newManager.machineInfo = *machineInfo 167 glog.Infof("Machine: %+v", newManager.machineInfo) 168 169 versionInfo, err := getVersionInfo() 170 if err != nil { 171 return nil, err 172 } 173 glog.Infof("Version: %+v", *versionInfo) 174 175 newManager.eventHandler = events.NewEventManager(parseEventsStoragePolicy()) 176 return newManager, nil 177} 178 179// A namespaced container name. 180type namespacedContainerName struct { 181 // The namespace of the container. Can be empty for the root namespace. 182 Namespace string 183 184 // The name of the container in this namespace. 185 Name string 186} 187 188type manager struct { 189 containers map[namespacedContainerName]*containerData 190 containersLock sync.RWMutex 191 memoryCache *memory.InMemoryCache 192 fsInfo fs.FsInfo 193 machineInfo info.MachineInfo 194 quitChannels []chan error 195 cadvisorContainer string 196 inHostNamespace bool 197 loadReader cpuload.CpuLoadReader 198 eventHandler events.EventManager 199 startupTime time.Time 200 maxHousekeepingInterval time.Duration 201 allowDynamicHousekeeping bool 202} 203 204// Start the container manager. 205func (self *manager) Start() error { 206 // Register Docker container factory. 207 err := docker.Register(self, self.fsInfo) 208 if err != nil { 209 glog.Errorf("Docker container factory registration failed: %v.", err) 210 } 211 212 // Register the raw driver. 213 err = raw.Register(self, self.fsInfo) 214 if err != nil { 215 glog.Errorf("Registration of the raw container factory failed: %v", err) 216 } 217 218 self.DockerInfo() 219 self.DockerImages() 220 221 if *enableLoadReader { 222 // Create cpu load reader. 223 cpuLoadReader, err := cpuload.New() 224 if err != nil { 225 // TODO(rjnagal): Promote to warning once we support cpu load inside namespaces. 226 glog.Infof("Could not initialize cpu load reader: %s", err) 227 } else { 228 err = cpuLoadReader.Start() 229 if err != nil { 230 glog.Warningf("Could not start cpu load stat collector: %s", err) 231 } else { 232 self.loadReader = cpuLoadReader 233 } 234 } 235 } 236 237 // Watch for OOMs. 238 err = self.watchForNewOoms() 239 if err != nil { 240 glog.Warningf("Could not configure a source for OOM detection, disabling OOM events: %v", err) 241 } 242 243 // If there are no factories, don't start any housekeeping and serve the information we do have. 244 if !container.HasFactories() { 245 return nil 246 } 247 248 // Create root and then recover all containers. 249 err = self.createContainer("/") 250 if err != nil { 251 return err 252 } 253 glog.Infof("Starting recovery of all containers") 254 err = self.detectSubcontainers("/") 255 if err != nil { 256 return err 257 } 258 glog.Infof("Recovery completed") 259 260 // Watch for new container. 261 quitWatcher := make(chan error) 262 err = self.watchForNewContainers(quitWatcher) 263 if err != nil { 264 return err 265 } 266 self.quitChannels = append(self.quitChannels, quitWatcher) 267 268 // Look for new containers in the main housekeeping thread. 269 quitGlobalHousekeeping := make(chan error) 270 self.quitChannels = append(self.quitChannels, quitGlobalHousekeeping) 271 go self.globalHousekeeping(quitGlobalHousekeeping) 272 273 return nil 274} 275 276func (self *manager) Stop() error { 277 // Stop and wait on all quit channels. 278 for i, c := range self.quitChannels { 279 // Send the exit signal and wait on the thread to exit (by closing the channel). 280 c <- nil 281 err := <-c 282 if err != nil { 283 // Remove the channels that quit successfully. 284 self.quitChannels = self.quitChannels[i:] 285 return err 286 } 287 } 288 self.quitChannels = make([]chan error, 0, 2) 289 if self.loadReader != nil { 290 self.loadReader.Stop() 291 self.loadReader = nil 292 } 293 return nil 294} 295 296func (self *manager) globalHousekeeping(quit chan error) { 297 // Long housekeeping is either 100ms or half of the housekeeping interval. 298 longHousekeeping := 100 * time.Millisecond 299 if *globalHousekeepingInterval/2 < longHousekeeping { 300 longHousekeeping = *globalHousekeepingInterval / 2 301 } 302 303 ticker := time.Tick(*globalHousekeepingInterval) 304 for { 305 select { 306 case t := <-ticker: 307 start := time.Now() 308 309 // Check for new containers. 310 err := self.detectSubcontainers("/") 311 if err != nil { 312 glog.Errorf("Failed to detect containers: %s", err) 313 } 314 315 // Log if housekeeping took too long. 316 duration := time.Since(start) 317 if duration >= longHousekeeping { 318 glog.V(3).Infof("Global Housekeeping(%d) took %s", t.Unix(), duration) 319 } 320 case <-quit: 321 // Quit if asked to do so. 322 quit <- nil 323 glog.Infof("Exiting global housekeeping thread") 324 return 325 } 326 } 327} 328 329func (self *manager) getContainerData(containerName string) (*containerData, error) { 330 var cont *containerData 331 var ok bool 332 func() { 333 self.containersLock.RLock() 334 defer self.containersLock.RUnlock() 335 336 // Ensure we have the container. 337 cont, ok = self.containers[namespacedContainerName{ 338 Name: containerName, 339 }] 340 }() 341 if !ok { 342 return nil, fmt.Errorf("unknown container %q", containerName) 343 } 344 return cont, nil 345} 346 347func (self *manager) GetDerivedStats(containerName string, options v2.RequestOptions) (map[string]v2.DerivedStats, error) { 348 conts, err := self.getRequestedContainers(containerName, options) 349 if err != nil { 350 return nil, err 351 } 352 stats := make(map[string]v2.DerivedStats) 353 for name, cont := range conts { 354 d, err := cont.DerivedStats() 355 if err != nil { 356 return nil, err 357 } 358 stats[name] = d 359 } 360 return stats, nil 361} 362 363func (self *manager) GetContainerSpec(containerName string, options v2.RequestOptions) (map[string]v2.ContainerSpec, error) { 364 conts, err := self.getRequestedContainers(containerName, options) 365 if err != nil { 366 return nil, err 367 } 368 specs := make(map[string]v2.ContainerSpec) 369 for name, cont := range conts { 370 cinfo, err := cont.GetInfo() 371 if err != nil { 372 return nil, err 373 } 374 spec := self.getV2Spec(cinfo) 375 specs[name] = spec 376 } 377 return specs, nil 378} 379 380// Get V2 container spec from v1 container info. 381func (self *manager) getV2Spec(cinfo *containerInfo) v2.ContainerSpec { 382 spec := self.getAdjustedSpec(cinfo) 383 return v2.ContainerSpecFromV1(&spec, cinfo.Aliases, cinfo.Namespace) 384} 385 386func (self *manager) getAdjustedSpec(cinfo *containerInfo) info.ContainerSpec { 387 spec := cinfo.Spec 388 389 // Set default value to an actual value 390 if spec.HasMemory { 391 // Memory.Limit is 0 means there's no limit 392 if spec.Memory.Limit == 0 { 393 spec.Memory.Limit = uint64(self.machineInfo.MemoryCapacity) 394 } 395 } 396 return spec 397} 398 399func (self *manager) GetContainerInfo(containerName string, query *info.ContainerInfoRequest) (*info.ContainerInfo, error) { 400 cont, err := self.getContainerData(containerName) 401 if err != nil { 402 return nil, err 403 } 404 return self.containerDataToContainerInfo(cont, query) 405} 406 407func (self *manager) GetContainerInfoV2(containerName string, options v2.RequestOptions) (map[string]v2.ContainerInfo, error) { 408 containers, err := self.getRequestedContainers(containerName, options) 409 if err != nil { 410 return nil, err 411 } 412 413 infos := make(map[string]v2.ContainerInfo, len(containers)) 414 for name, container := range containers { 415 cinfo, err := container.GetInfo() 416 if err != nil { 417 return nil, err 418 } 419 420 var nilTime time.Time // Ignored. 421 stats, err := self.memoryCache.RecentStats(name, nilTime, nilTime, options.Count) 422 if err != nil { 423 return nil, err 424 } 425 426 infos[name] = v2.ContainerInfo{ 427 Spec: self.getV2Spec(cinfo), 428 Stats: v2.ContainerStatsFromV1(&cinfo.Spec, stats), 429 } 430 } 431 432 return infos, nil 433} 434 435func (self *manager) containerDataToContainerInfo(cont *containerData, query *info.ContainerInfoRequest) (*info.ContainerInfo, error) { 436 // Get the info from the container. 437 cinfo, err := cont.GetInfo() 438 if err != nil { 439 return nil, err 440 } 441 442 stats, err := self.memoryCache.RecentStats(cinfo.Name, query.Start, query.End, query.NumStats) 443 if err != nil { 444 return nil, err 445 } 446 447 // Make a copy of the info for the user. 448 ret := &info.ContainerInfo{ 449 ContainerReference: cinfo.ContainerReference, 450 Subcontainers: cinfo.Subcontainers, 451 Spec: self.getAdjustedSpec(cinfo), 452 Stats: stats, 453 } 454 return ret, nil 455} 456 457func (self *manager) getContainer(containerName string) (*containerData, error) { 458 self.containersLock.RLock() 459 defer self.containersLock.RUnlock() 460 cont, ok := self.containers[namespacedContainerName{Name: containerName}] 461 if !ok { 462 return nil, fmt.Errorf("unknown container %q", containerName) 463 } 464 return cont, nil 465} 466 467func (self *manager) getSubcontainers(containerName string) map[string]*containerData { 468 self.containersLock.RLock() 469 defer self.containersLock.RUnlock() 470 containersMap := make(map[string]*containerData, len(self.containers)) 471 472 // Get all the unique subcontainers of the specified container 473 matchedName := path.Join(containerName, "/") 474 for i := range self.containers { 475 name := self.containers[i].info.Name 476 if name == containerName || strings.HasPrefix(name, matchedName) { 477 containersMap[self.containers[i].info.Name] = self.containers[i] 478 } 479 } 480 return containersMap 481} 482 483func (self *manager) SubcontainersInfo(containerName string, query *info.ContainerInfoRequest) ([]*info.ContainerInfo, error) { 484 containersMap := self.getSubcontainers(containerName) 485 486 containers := make([]*containerData, 0, len(containersMap)) 487 for _, cont := range containersMap { 488 containers = append(containers, cont) 489 } 490 return self.containerDataSliceToContainerInfoSlice(containers, query) 491} 492 493func (self *manager) getAllDockerContainers() map[string]*containerData { 494 self.containersLock.RLock() 495 defer self.containersLock.RUnlock() 496 containers := make(map[string]*containerData, len(self.containers)) 497 498 // Get containers in the Docker namespace. 499 for name, cont := range self.containers { 500 if name.Namespace == docker.DockerNamespace { 501 containers[cont.info.Name] = cont 502 } 503 } 504 return containers 505} 506 507func (self *manager) AllDockerContainers(query *info.ContainerInfoRequest) (map[string]info.ContainerInfo, error) { 508 containers := self.getAllDockerContainers() 509 510 output := make(map[string]info.ContainerInfo, len(containers)) 511 for name, cont := range containers { 512 inf, err := self.containerDataToContainerInfo(cont, query) 513 if err != nil { 514 return nil, err 515 } 516 output[name] = *inf 517 } 518 return output, nil 519} 520 521func (self *manager) getDockerContainer(containerName string) (*containerData, error) { 522 self.containersLock.RLock() 523 defer self.containersLock.RUnlock() 524 525 // Check for the container in the Docker container namespace. 526 cont, ok := self.containers[namespacedContainerName{ 527 Namespace: docker.DockerNamespace, 528 Name: containerName, 529 }] 530 if !ok { 531 return nil, fmt.Errorf("unable to find Docker container %q", containerName) 532 } 533 return cont, nil 534} 535 536func (self *manager) DockerContainer(containerName string, query *info.ContainerInfoRequest) (info.ContainerInfo, error) { 537 container, err := self.getDockerContainer(containerName) 538 if err != nil { 539 return info.ContainerInfo{}, err 540 } 541 542 inf, err := self.containerDataToContainerInfo(container, query) 543 if err != nil { 544 return info.ContainerInfo{}, err 545 } 546 return *inf, nil 547} 548 549func (self *manager) containerDataSliceToContainerInfoSlice(containers []*containerData, query *info.ContainerInfoRequest) ([]*info.ContainerInfo, error) { 550 if len(containers) == 0 { 551 return nil, fmt.Errorf("no containers found") 552 } 553 554 // Get the info for each container. 555 output := make([]*info.ContainerInfo, 0, len(containers)) 556 for i := range containers { 557 cinfo, err := self.containerDataToContainerInfo(containers[i], query) 558 if err != nil { 559 // Skip containers with errors, we try to degrade gracefully. 560 continue 561 } 562 output = append(output, cinfo) 563 } 564 565 return output, nil 566} 567 568func (self *manager) GetRequestedContainersInfo(containerName string, options v2.RequestOptions) (map[string]*info.ContainerInfo, error) { 569 containers, err := self.getRequestedContainers(containerName, options) 570 if err != nil { 571 return nil, err 572 } 573 containersMap := make(map[string]*info.ContainerInfo) 574 query := info.ContainerInfoRequest{ 575 NumStats: options.Count, 576 } 577 for name, data := range containers { 578 info, err := self.containerDataToContainerInfo(data, &query) 579 if err != nil { 580 // Skip containers with errors, we try to degrade gracefully. 581 continue 582 } 583 containersMap[name] = info 584 } 585 return containersMap, nil 586} 587 588func (self *manager) getRequestedContainers(containerName string, options v2.RequestOptions) (map[string]*containerData, error) { 589 containersMap := make(map[string]*containerData) 590 switch options.IdType { 591 case v2.TypeName: 592 if options.Recursive == false { 593 cont, err := self.getContainer(containerName) 594 if err != nil { 595 return containersMap, err 596 } 597 containersMap[cont.info.Name] = cont 598 } else { 599 containersMap = self.getSubcontainers(containerName) 600 if len(containersMap) == 0 { 601 return containersMap, fmt.Errorf("unknown container: %q", containerName) 602 } 603 } 604 case v2.TypeDocker: 605 if options.Recursive == false { 606 containerName = strings.TrimPrefix(containerName, "/") 607 cont, err := self.getDockerContainer(containerName) 608 if err != nil { 609 return containersMap, err 610 } 611 containersMap[cont.info.Name] = cont 612 } else { 613 if containerName != "/" { 614 return containersMap, fmt.Errorf("invalid request for docker container %q with subcontainers", containerName) 615 } 616 containersMap = self.getAllDockerContainers() 617 } 618 default: 619 return containersMap, fmt.Errorf("invalid request type %q", options.IdType) 620 } 621 return containersMap, nil 622} 623 624func (self *manager) GetFsInfo(label string) ([]v2.FsInfo, error) { 625 var empty time.Time 626 // Get latest data from filesystems hanging off root container. 627 stats, err := self.memoryCache.RecentStats("/", empty, empty, 1) 628 if err != nil { 629 return nil, err 630 } 631 dev := "" 632 if len(label) != 0 { 633 dev, err = self.fsInfo.GetDeviceForLabel(label) 634 if err != nil { 635 return nil, err 636 } 637 } 638 fsInfo := []v2.FsInfo{} 639 for _, fs := range stats[0].Filesystem { 640 if len(label) != 0 && fs.Device != dev { 641 continue 642 } 643 mountpoint, err := self.fsInfo.GetMountpointForDevice(fs.Device) 644 if err != nil { 645 return nil, err 646 } 647 labels, err := self.fsInfo.GetLabelsForDevice(fs.Device) 648 if err != nil { 649 return nil, err 650 } 651 fi := v2.FsInfo{ 652 Device: fs.Device, 653 Mountpoint: mountpoint, 654 Capacity: fs.Limit, 655 Usage: fs.Usage, 656 Available: fs.Available, 657 Labels: labels, 658 } 659 fsInfo = append(fsInfo, fi) 660 } 661 return fsInfo, nil 662} 663 664func (m *manager) GetMachineInfo() (*info.MachineInfo, error) { 665 // Copy and return the MachineInfo. 666 return &m.machineInfo, nil 667} 668 669func (m *manager) GetVersionInfo() (*info.VersionInfo, error) { 670 // TODO: Consider caching this and periodically updating. The VersionInfo may change if 671 // the docker daemon is started after the cAdvisor client is created. Caching the value 672 // would be helpful so we would be able to return the last known docker version if 673 // docker was down at the time of a query. 674 return getVersionInfo() 675} 676 677func (m *manager) Exists(containerName string) bool { 678 m.containersLock.Lock() 679 defer m.containersLock.Unlock() 680 681 namespacedName := namespacedContainerName{ 682 Name: containerName, 683 } 684 685 _, ok := m.containers[namespacedName] 686 if ok { 687 return true 688 } 689 return false 690} 691 692func (m *manager) GetProcessList(containerName string, options v2.RequestOptions) ([]v2.ProcessInfo, error) { 693 // override recursive. Only support single container listing. 694 options.Recursive = false 695 conts, err := m.getRequestedContainers(containerName, options) 696 if err != nil { 697 return nil, err 698 } 699 if len(conts) != 1 { 700 return nil, fmt.Errorf("Expected the request to match only one container") 701 } 702 // TODO(rjnagal): handle count? Only if we can do count by type (eg. top 5 cpu users) 703 ps := []v2.ProcessInfo{} 704 for _, cont := range conts { 705 ps, err = cont.GetProcessList(m.cadvisorContainer, m.inHostNamespace) 706 if err != nil { 707 return nil, err 708 } 709 } 710 return ps, nil 711} 712 713func (m *manager) registerCollectors(collectorConfigs map[string]string, cont *containerData) error { 714 for k, v := range collectorConfigs { 715 configFile, err := cont.ReadFile(v, m.inHostNamespace) 716 if err != nil { 717 return fmt.Errorf("failed to read config file %q for config %q, container %q: %v", k, v, cont.info.Name, err) 718 } 719 glog.V(3).Infof("Got config from %q: %q", v, configFile) 720 721 if strings.HasPrefix(k, "prometheus") || strings.HasPrefix(k, "Prometheus") { 722 newCollector, err := collector.NewPrometheusCollector(k, configFile, *applicationMetricsCountLimit) 723 if err != nil { 724 glog.Infof("failed to create collector for container %q, config %q: %v", cont.info.Name, k, err) 725 return err 726 } 727 err = cont.collectorManager.RegisterCollector(newCollector) 728 if err != nil { 729 glog.Infof("failed to register collector for container %q, config %q: %v", cont.info.Name, k, err) 730 return err 731 } 732 } else { 733 newCollector, err := collector.NewCollector(k, configFile, *applicationMetricsCountLimit) 734 if err != nil { 735 glog.Infof("failed to create collector for container %q, config %q: %v", cont.info.Name, k, err) 736 return err 737 } 738 err = cont.collectorManager.RegisterCollector(newCollector) 739 if err != nil { 740 glog.Infof("failed to register collector for container %q, config %q: %v", cont.info.Name, k, err) 741 return err 742 } 743 } 744 } 745 return nil 746} 747 748// Create a container. 749func (m *manager) createContainer(containerName string) error { 750 m.containersLock.Lock() 751 defer m.containersLock.Unlock() 752 753 namespacedName := namespacedContainerName{ 754 Name: containerName, 755 } 756 757 // Check that the container didn't already exist. 758 if _, ok := m.containers[namespacedName]; ok { 759 return nil 760 } 761 762 handler, accept, err := container.NewContainerHandler(containerName, m.inHostNamespace) 763 if err != nil { 764 return err 765 } 766 if !accept { 767 // ignoring this container. 768 glog.V(4).Infof("ignoring container %q", containerName) 769 return nil 770 } 771 collectorManager, err := collector.NewCollectorManager() 772 if err != nil { 773 return err 774 } 775 776 logUsage := *logCadvisorUsage && containerName == m.cadvisorContainer 777 cont, err := newContainerData(containerName, m.memoryCache, handler, m.loadReader, logUsage, collectorManager, m.maxHousekeepingInterval, m.allowDynamicHousekeeping) 778 if err != nil { 779 return err 780 } 781 782 // Add collectors 783 labels := handler.GetContainerLabels() 784 collectorConfigs := collector.GetCollectorConfigs(labels) 785 err = m.registerCollectors(collectorConfigs, cont) 786 if err != nil { 787 glog.Infof("failed to register collectors for %q: %v", containerName, err) 788 } 789 790 // Add the container name and all its aliases. The aliases must be within the namespace of the factory. 791 m.containers[namespacedName] = cont 792 for _, alias := range cont.info.Aliases { 793 m.containers[namespacedContainerName{ 794 Namespace: cont.info.Namespace, 795 Name: alias, 796 }] = cont 797 } 798 799 glog.V(3).Infof("Added container: %q (aliases: %v, namespace: %q)", containerName, cont.info.Aliases, cont.info.Namespace) 800 801 contSpec, err := cont.handler.GetSpec() 802 if err != nil { 803 return err 804 } 805 806 contRef, err := cont.handler.ContainerReference() 807 if err != nil { 808 return err 809 } 810 811 newEvent := &info.Event{ 812 ContainerName: contRef.Name, 813 Timestamp: contSpec.CreationTime, 814 EventType: info.EventContainerCreation, 815 } 816 err = m.eventHandler.AddEvent(newEvent) 817 if err != nil { 818 return err 819 } 820 821 // Start the container's housekeeping. 822 return cont.Start() 823} 824 825func (m *manager) destroyContainer(containerName string) error { 826 m.containersLock.Lock() 827 defer m.containersLock.Unlock() 828 829 namespacedName := namespacedContainerName{ 830 Name: containerName, 831 } 832 cont, ok := m.containers[namespacedName] 833 if !ok { 834 // Already destroyed, done. 835 return nil 836 } 837 838 // Tell the container to stop. 839 err := cont.Stop() 840 if err != nil { 841 return err 842 } 843 844 // Remove the container from our records (and all its aliases). 845 delete(m.containers, namespacedName) 846 for _, alias := range cont.info.Aliases { 847 delete(m.containers, namespacedContainerName{ 848 Namespace: cont.info.Namespace, 849 Name: alias, 850 }) 851 } 852 glog.V(3).Infof("Destroyed container: %q (aliases: %v, namespace: %q)", containerName, cont.info.Aliases, cont.info.Namespace) 853 854 contRef, err := cont.handler.ContainerReference() 855 if err != nil { 856 return err 857 } 858 859 newEvent := &info.Event{ 860 ContainerName: contRef.Name, 861 Timestamp: time.Now(), 862 EventType: info.EventContainerDeletion, 863 } 864 err = m.eventHandler.AddEvent(newEvent) 865 if err != nil { 866 return err 867 } 868 return nil 869} 870 871// Detect all containers that have been added or deleted from the specified container. 872func (m *manager) getContainersDiff(containerName string) (added []info.ContainerReference, removed []info.ContainerReference, err error) { 873 m.containersLock.RLock() 874 defer m.containersLock.RUnlock() 875 876 // Get all subcontainers recursively. 877 cont, ok := m.containers[namespacedContainerName{ 878 Name: containerName, 879 }] 880 if !ok { 881 return nil, nil, fmt.Errorf("failed to find container %q while checking for new containers", containerName) 882 } 883 allContainers, err := cont.handler.ListContainers(container.ListRecursive) 884 if err != nil { 885 return nil, nil, err 886 } 887 allContainers = append(allContainers, info.ContainerReference{Name: containerName}) 888 889 // Determine which were added and which were removed. 890 allContainersSet := make(map[string]*containerData) 891 for name, d := range m.containers { 892 // Only add the canonical name. 893 if d.info.Name == name.Name { 894 allContainersSet[name.Name] = d 895 } 896 } 897 898 // Added containers 899 for _, c := range allContainers { 900 delete(allContainersSet, c.Name) 901 _, ok := m.containers[namespacedContainerName{ 902 Name: c.Name, 903 }] 904 if !ok { 905 added = append(added, c) 906 } 907 } 908 909 // Removed ones are no longer in the container listing. 910 for _, d := range allContainersSet { 911 removed = append(removed, d.info.ContainerReference) 912 } 913 914 return 915} 916 917// Detect the existing subcontainers and reflect the setup here. 918func (m *manager) detectSubcontainers(containerName string) error { 919 added, removed, err := m.getContainersDiff(containerName) 920 if err != nil { 921 return err 922 } 923 924 // Add the new containers. 925 for _, cont := range added { 926 err = m.createContainer(cont.Name) 927 if err != nil { 928 glog.Errorf("Failed to create existing container: %s: %s", cont.Name, err) 929 } 930 } 931 932 // Remove the old containers. 933 for _, cont := range removed { 934 err = m.destroyContainer(cont.Name) 935 if err != nil { 936 glog.Errorf("Failed to destroy existing container: %s: %s", cont.Name, err) 937 } 938 } 939 940 return nil 941} 942 943// Watches for new containers started in the system. Runs forever unless there is a setup error. 944func (self *manager) watchForNewContainers(quit chan error) error { 945 var root *containerData 946 var ok bool 947 func() { 948 self.containersLock.RLock() 949 defer self.containersLock.RUnlock() 950 root, ok = self.containers[namespacedContainerName{ 951 Name: "/", 952 }] 953 }() 954 if !ok { 955 return fmt.Errorf("root container does not exist when watching for new containers") 956 } 957 958 // Register for new subcontainers. 959 eventsChannel := make(chan container.SubcontainerEvent, 16) 960 err := root.handler.WatchSubcontainers(eventsChannel) 961 if err != nil { 962 return err 963 } 964 965 // There is a race between starting the watch and new container creation so we do a detection before we read new containers. 966 err = self.detectSubcontainers("/") 967 if err != nil { 968 return err 969 } 970 971 // Listen to events from the container handler. 972 go func() { 973 for { 974 select { 975 case event := <-eventsChannel: 976 switch { 977 case event.EventType == container.SubcontainerAdd: 978 err = self.createContainer(event.Name) 979 case event.EventType == container.SubcontainerDelete: 980 err = self.destroyContainer(event.Name) 981 } 982 if err != nil { 983 glog.Warningf("Failed to process watch event: %v", err) 984 } 985 case <-quit: 986 // Stop processing events if asked to quit. 987 err := root.handler.StopWatchingSubcontainers() 988 quit <- err 989 if err == nil { 990 glog.Infof("Exiting thread watching subcontainers") 991 return 992 } 993 } 994 } 995 }() 996 return nil 997} 998 999func (self *manager) watchForNewOoms() error { 1000 glog.Infof("Started watching for new ooms in manager") 1001 outStream := make(chan *oomparser.OomInstance, 10) 1002 oomLog, err := oomparser.New() 1003 if err != nil { 1004 return err 1005 } 1006 go oomLog.StreamOoms(outStream) 1007 1008 go func() { 1009 for oomInstance := range outStream { 1010 // Surface OOM and OOM kill events. 1011 newEvent := &info.Event{ 1012 ContainerName: oomInstance.ContainerName, 1013 Timestamp: oomInstance.TimeOfDeath, 1014 EventType: info.EventOom, 1015 } 1016 err := self.eventHandler.AddEvent(newEvent) 1017 if err != nil { 1018 glog.Errorf("failed to add OOM event for %q: %v", oomInstance.ContainerName, err) 1019 } 1020 glog.V(3).Infof("Created an OOM event in container %q at %v", oomInstance.ContainerName, oomInstance.TimeOfDeath) 1021 1022 newEvent = &info.Event{ 1023 ContainerName: oomInstance.VictimContainerName, 1024 Timestamp: oomInstance.TimeOfDeath, 1025 EventType: info.EventOomKill, 1026 EventData: info.EventData{ 1027 OomKill: &info.OomKillEventData{ 1028 Pid: oomInstance.Pid, 1029 ProcessName: oomInstance.ProcessName, 1030 }, 1031 }, 1032 } 1033 err = self.eventHandler.AddEvent(newEvent) 1034 if err != nil { 1035 glog.Errorf("failed to add OOM kill event for %q: %v", oomInstance.ContainerName, err) 1036 } 1037 } 1038 }() 1039 return nil 1040} 1041 1042// can be called by the api which will take events returned on the channel 1043func (self *manager) WatchForEvents(request *events.Request) (*events.EventChannel, error) { 1044 return self.eventHandler.WatchEvents(request) 1045} 1046 1047// can be called by the api which will return all events satisfying the request 1048func (self *manager) GetPastEvents(request *events.Request) ([]*info.Event, error) { 1049 return self.eventHandler.GetEvents(request) 1050} 1051 1052// called by the api when a client is no longer listening to the channel 1053func (self *manager) CloseEventChannel(watch_id int) { 1054 self.eventHandler.StopWatch(watch_id) 1055} 1056 1057// Parses the events StoragePolicy from the flags. 1058func parseEventsStoragePolicy() events.StoragePolicy { 1059 policy := events.DefaultStoragePolicy() 1060 1061 // Parse max age. 1062 parts := strings.Split(*eventStorageAgeLimit, ",") 1063 for _, part := range parts { 1064 items := strings.Split(part, "=") 1065 if len(items) != 2 { 1066 glog.Warningf("Unknown event storage policy %q when parsing max age", part) 1067 continue 1068 } 1069 dur, err := time.ParseDuration(items[1]) 1070 if err != nil { 1071 glog.Warningf("Unable to parse event max age duration %q: %v", items[1], err) 1072 continue 1073 } 1074 if items[0] == "default" { 1075 policy.DefaultMaxAge = dur 1076 continue 1077 } 1078 policy.PerTypeMaxAge[info.EventType(items[0])] = dur 1079 } 1080 1081 // Parse max number. 1082 parts = strings.Split(*eventStorageEventLimit, ",") 1083 for _, part := range parts { 1084 items := strings.Split(part, "=") 1085 if len(items) != 2 { 1086 glog.Warningf("Unknown event storage policy %q when parsing max event limit", part) 1087 continue 1088 } 1089 val, err := strconv.Atoi(items[1]) 1090 if err != nil { 1091 glog.Warningf("Unable to parse integer from %q: %v", items[1], err) 1092 continue 1093 } 1094 if items[0] == "default" { 1095 policy.DefaultMaxNumEvents = val 1096 continue 1097 } 1098 policy.PerTypeMaxNumEvents[info.EventType(items[0])] = val 1099 } 1100 1101 return policy 1102} 1103 1104type DockerStatus struct { 1105 Version string `json:"version"` 1106 KernelVersion string `json:"kernel_version"` 1107 OS string `json:"os"` 1108 Hostname string `json:"hostname"` 1109 RootDir string `json:"root_dir"` 1110 Driver string `json:"driver"` 1111 DriverStatus map[string]string `json:"driver_status"` 1112 ExecDriver string `json:"exec_driver"` 1113 NumImages int `json:"num_images"` 1114 NumContainers int `json:"num_containers"` 1115} 1116 1117type DockerImage struct { 1118 ID string `json:"id"` 1119 RepoTags []string `json:"repo_tags"` // repository name and tags. 1120 Created int64 `json:"created"` // unix time since creation. 1121 VirtualSize int64 `json:"virtual_size"` 1122 Size int64 `json:"size"` 1123} 1124 1125func (m *manager) DockerImages() ([]DockerImage, error) { 1126 images, err := docker.DockerImages() 1127 if err != nil { 1128 return nil, err 1129 } 1130 out := []DockerImage{} 1131 const unknownTag = "<none>:<none>" 1132 for _, image := range images { 1133 if len(image.RepoTags) == 1 && image.RepoTags[0] == unknownTag { 1134 // images with repo or tags are uninteresting. 1135 continue 1136 } 1137 di := DockerImage{ 1138 ID: image.ID, 1139 RepoTags: image.RepoTags, 1140 Created: image.Created, 1141 VirtualSize: image.VirtualSize, 1142 Size: image.Size, 1143 } 1144 out = append(out, di) 1145 } 1146 return out, nil 1147} 1148 1149func (m *manager) DockerInfo() (DockerStatus, error) { 1150 info, err := docker.DockerInfo() 1151 if err != nil { 1152 return DockerStatus{}, err 1153 } 1154 versionInfo, err := m.GetVersionInfo() 1155 if err != nil { 1156 return DockerStatus{}, err 1157 } 1158 out := DockerStatus{} 1159 out.Version = versionInfo.DockerVersion 1160 if val, ok := info["KernelVersion"]; ok { 1161 out.KernelVersion = val 1162 } 1163 if val, ok := info["OperatingSystem"]; ok { 1164 out.OS = val 1165 } 1166 if val, ok := info["Name"]; ok { 1167 out.Hostname = val 1168 } 1169 if val, ok := info["DockerRootDir"]; ok { 1170 out.RootDir = val 1171 } 1172 if val, ok := info["Driver"]; ok { 1173 out.Driver = val 1174 } 1175 if val, ok := info["ExecutionDriver"]; ok { 1176 out.ExecDriver = val 1177 } 1178 if val, ok := info["Images"]; ok { 1179 n, err := strconv.Atoi(val) 1180 if err == nil { 1181 out.NumImages = n 1182 } 1183 } 1184 if val, ok := info["Containers"]; ok { 1185 n, err := strconv.Atoi(val) 1186 if err == nil { 1187 out.NumContainers = n 1188 } 1189 } 1190 if val, ok := info["DriverStatus"]; ok { 1191 var driverStatus [][]string 1192 err := json.Unmarshal([]byte(val), &driverStatus) 1193 if err != nil { 1194 return DockerStatus{}, err 1195 } 1196 out.DriverStatus = make(map[string]string) 1197 for _, v := range driverStatus { 1198 if len(v) == 2 { 1199 out.DriverStatus[v[0]] = v[1] 1200 } 1201 } 1202 } 1203 return out, nil 1204} 1205 1206func (m *manager) DebugInfo() map[string][]string { 1207 debugInfo := container.DebugInfo() 1208 1209 // Get unique containers. 1210 var conts map[*containerData]struct{} 1211 func() { 1212 m.containersLock.RLock() 1213 defer m.containersLock.RUnlock() 1214 1215 conts = make(map[*containerData]struct{}, len(m.containers)) 1216 for _, c := range m.containers { 1217 conts[c] = struct{}{} 1218 } 1219 }() 1220 1221 // List containers. 1222 lines := make([]string, 0, len(conts)) 1223 for cont := range conts { 1224 lines = append(lines, cont.info.Name) 1225 if cont.info.Namespace != "" { 1226 lines = append(lines, fmt.Sprintf("\tNamespace: %s", cont.info.Namespace)) 1227 } 1228 1229 if len(cont.info.Aliases) != 0 { 1230 lines = append(lines, "\tAliases:") 1231 for _, alias := range cont.info.Aliases { 1232 lines = append(lines, fmt.Sprintf("\t\t%s", alias)) 1233 } 1234 } 1235 } 1236 1237 debugInfo["Managed containers"] = lines 1238 return debugInfo 1239} 1240