1// Copyright 2014 Google Inc. All Rights Reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Manager of cAdvisor-monitored containers.
16package manager
17
18import (
19	"encoding/json"
20	"flag"
21	"fmt"
22	"os"
23	"path"
24	"strconv"
25	"strings"
26	"sync"
27	"time"
28
29	"github.com/google/cadvisor/cache/memory"
30	"github.com/google/cadvisor/collector"
31	"github.com/google/cadvisor/container"
32	"github.com/google/cadvisor/container/docker"
33	"github.com/google/cadvisor/container/raw"
34	"github.com/google/cadvisor/events"
35	"github.com/google/cadvisor/fs"
36	info "github.com/google/cadvisor/info/v1"
37	"github.com/google/cadvisor/info/v2"
38	"github.com/google/cadvisor/utils/cpuload"
39	"github.com/google/cadvisor/utils/oomparser"
40	"github.com/google/cadvisor/utils/sysfs"
41
42	"github.com/golang/glog"
43	"github.com/opencontainers/runc/libcontainer/cgroups"
44)
45
46var globalHousekeepingInterval = flag.Duration("global_housekeeping_interval", 1*time.Minute, "Interval between global housekeepings")
47var logCadvisorUsage = flag.Bool("log_cadvisor_usage", false, "Whether to log the usage of the cAdvisor container")
48var enableLoadReader = flag.Bool("enable_load_reader", false, "Whether to enable cpu load reader")
49var eventStorageAgeLimit = flag.String("event_storage_age_limit", "default=24h", "Max length of time for which to store events (per type). Value is a comma separated list of key values, where the keys are event types (e.g.: creation, oom) or \"default\" and the value is a duration. Default is applied to all non-specified event types")
50var eventStorageEventLimit = flag.String("event_storage_event_limit", "default=100000", "Max number of events to store (per type). Value is a comma separated list of key values, where the keys are event types (e.g.: creation, oom) or \"default\" and the value is an integer. Default is applied to all non-specified event types")
51var applicationMetricsCountLimit = flag.Int("application_metrics_count_limit", 100, "Max number of application metrics to store (per container)")
52
53// The Manager interface defines operations for starting a manager and getting
54// container and machine information.
55type Manager interface {
56	// Start the manager. Calling other manager methods before this returns
57	// may produce undefined behavior.
58	Start() error
59
60	// Stops the manager.
61	Stop() error
62
63	// Get information about a container.
64	GetContainerInfo(containerName string, query *info.ContainerInfoRequest) (*info.ContainerInfo, error)
65
66	// Get V2 information about a container.
67	GetContainerInfoV2(containerName string, options v2.RequestOptions) (map[string]v2.ContainerInfo, error)
68
69	// Get information about all subcontainers of the specified container (includes self).
70	SubcontainersInfo(containerName string, query *info.ContainerInfoRequest) ([]*info.ContainerInfo, error)
71
72	// Gets all the Docker containers. Return is a map from full container name to ContainerInfo.
73	AllDockerContainers(query *info.ContainerInfoRequest) (map[string]info.ContainerInfo, error)
74
75	// Gets information about a specific Docker container. The specified name is within the Docker namespace.
76	DockerContainer(dockerName string, query *info.ContainerInfoRequest) (info.ContainerInfo, error)
77
78	// Gets spec for all containers based on request options.
79	GetContainerSpec(containerName string, options v2.RequestOptions) (map[string]v2.ContainerSpec, error)
80
81	// Gets summary stats for all containers based on request options.
82	GetDerivedStats(containerName string, options v2.RequestOptions) (map[string]v2.DerivedStats, error)
83
84	// Get info for all requested containers based on the request options.
85	GetRequestedContainersInfo(containerName string, options v2.RequestOptions) (map[string]*info.ContainerInfo, error)
86
87	// Returns true if the named container exists.
88	Exists(containerName string) bool
89
90	// Get information about the machine.
91	GetMachineInfo() (*info.MachineInfo, error)
92
93	// Get version information about different components we depend on.
94	GetVersionInfo() (*info.VersionInfo, error)
95
96	// Get filesystem information for a given label.
97	// Returns information for all global filesystems if label is empty.
98	GetFsInfo(label string) ([]v2.FsInfo, error)
99
100	// Get ps output for a container.
101	GetProcessList(containerName string, options v2.RequestOptions) ([]v2.ProcessInfo, error)
102
103	// Get events streamed through passedChannel that fit the request.
104	WatchForEvents(request *events.Request) (*events.EventChannel, error)
105
106	// Get past events that have been detected and that fit the request.
107	GetPastEvents(request *events.Request) ([]*info.Event, error)
108
109	CloseEventChannel(watch_id int)
110
111	// Get status information about docker.
112	DockerInfo() (DockerStatus, error)
113
114	// Get details about interesting docker images.
115	DockerImages() ([]DockerImage, error)
116
117	// Returns debugging information. Map of lines per category.
118	DebugInfo() map[string][]string
119}
120
121// New takes a memory storage and returns a new manager.
122func New(memoryCache *memory.InMemoryCache, sysfs sysfs.SysFs, maxHousekeepingInterval time.Duration, allowDynamicHousekeeping bool) (Manager, error) {
123	if memoryCache == nil {
124		return nil, fmt.Errorf("manager requires memory storage")
125	}
126
127	// Detect the container we are running on.
128	selfContainer, err := cgroups.GetThisCgroupDir("cpu")
129	if err != nil {
130		return nil, err
131	}
132	glog.Infof("cAdvisor running in container: %q", selfContainer)
133
134	dockerInfo, err := docker.DockerInfo()
135	if err != nil {
136		glog.Warningf("Unable to connect to Docker: %v", err)
137	}
138	context := fs.Context{DockerRoot: docker.RootDir(), DockerInfo: dockerInfo}
139	fsInfo, err := fs.NewFsInfo(context)
140	if err != nil {
141		return nil, err
142	}
143
144	// If cAdvisor was started with host's rootfs mounted, assume that its running
145	// in its own namespaces.
146	inHostNamespace := false
147	if _, err := os.Stat("/rootfs/proc"); os.IsNotExist(err) {
148		inHostNamespace = true
149	}
150	newManager := &manager{
151		containers:               make(map[namespacedContainerName]*containerData),
152		quitChannels:             make([]chan error, 0, 2),
153		memoryCache:              memoryCache,
154		fsInfo:                   fsInfo,
155		cadvisorContainer:        selfContainer,
156		inHostNamespace:          inHostNamespace,
157		startupTime:              time.Now(),
158		maxHousekeepingInterval:  maxHousekeepingInterval,
159		allowDynamicHousekeeping: allowDynamicHousekeeping,
160	}
161
162	machineInfo, err := getMachineInfo(sysfs, fsInfo, inHostNamespace)
163	if err != nil {
164		return nil, err
165	}
166	newManager.machineInfo = *machineInfo
167	glog.Infof("Machine: %+v", newManager.machineInfo)
168
169	versionInfo, err := getVersionInfo()
170	if err != nil {
171		return nil, err
172	}
173	glog.Infof("Version: %+v", *versionInfo)
174
175	newManager.eventHandler = events.NewEventManager(parseEventsStoragePolicy())
176	return newManager, nil
177}
178
179// A namespaced container name.
180type namespacedContainerName struct {
181	// The namespace of the container. Can be empty for the root namespace.
182	Namespace string
183
184	// The name of the container in this namespace.
185	Name string
186}
187
188type manager struct {
189	containers               map[namespacedContainerName]*containerData
190	containersLock           sync.RWMutex
191	memoryCache              *memory.InMemoryCache
192	fsInfo                   fs.FsInfo
193	machineInfo              info.MachineInfo
194	quitChannels             []chan error
195	cadvisorContainer        string
196	inHostNamespace          bool
197	loadReader               cpuload.CpuLoadReader
198	eventHandler             events.EventManager
199	startupTime              time.Time
200	maxHousekeepingInterval  time.Duration
201	allowDynamicHousekeeping bool
202}
203
204// Start the container manager.
205func (self *manager) Start() error {
206	// Register Docker container factory.
207	err := docker.Register(self, self.fsInfo)
208	if err != nil {
209		glog.Errorf("Docker container factory registration failed: %v.", err)
210	}
211
212	// Register the raw driver.
213	err = raw.Register(self, self.fsInfo)
214	if err != nil {
215		glog.Errorf("Registration of the raw container factory failed: %v", err)
216	}
217
218	self.DockerInfo()
219	self.DockerImages()
220
221	if *enableLoadReader {
222		// Create cpu load reader.
223		cpuLoadReader, err := cpuload.New()
224		if err != nil {
225			// TODO(rjnagal): Promote to warning once we support cpu load inside namespaces.
226			glog.Infof("Could not initialize cpu load reader: %s", err)
227		} else {
228			err = cpuLoadReader.Start()
229			if err != nil {
230				glog.Warningf("Could not start cpu load stat collector: %s", err)
231			} else {
232				self.loadReader = cpuLoadReader
233			}
234		}
235	}
236
237	// Watch for OOMs.
238	err = self.watchForNewOoms()
239	if err != nil {
240		glog.Warningf("Could not configure a source for OOM detection, disabling OOM events: %v", err)
241	}
242
243	// If there are no factories, don't start any housekeeping and serve the information we do have.
244	if !container.HasFactories() {
245		return nil
246	}
247
248	// Create root and then recover all containers.
249	err = self.createContainer("/")
250	if err != nil {
251		return err
252	}
253	glog.Infof("Starting recovery of all containers")
254	err = self.detectSubcontainers("/")
255	if err != nil {
256		return err
257	}
258	glog.Infof("Recovery completed")
259
260	// Watch for new container.
261	quitWatcher := make(chan error)
262	err = self.watchForNewContainers(quitWatcher)
263	if err != nil {
264		return err
265	}
266	self.quitChannels = append(self.quitChannels, quitWatcher)
267
268	// Look for new containers in the main housekeeping thread.
269	quitGlobalHousekeeping := make(chan error)
270	self.quitChannels = append(self.quitChannels, quitGlobalHousekeeping)
271	go self.globalHousekeeping(quitGlobalHousekeeping)
272
273	return nil
274}
275
276func (self *manager) Stop() error {
277	// Stop and wait on all quit channels.
278	for i, c := range self.quitChannels {
279		// Send the exit signal and wait on the thread to exit (by closing the channel).
280		c <- nil
281		err := <-c
282		if err != nil {
283			// Remove the channels that quit successfully.
284			self.quitChannels = self.quitChannels[i:]
285			return err
286		}
287	}
288	self.quitChannels = make([]chan error, 0, 2)
289	if self.loadReader != nil {
290		self.loadReader.Stop()
291		self.loadReader = nil
292	}
293	return nil
294}
295
296func (self *manager) globalHousekeeping(quit chan error) {
297	// Long housekeeping is either 100ms or half of the housekeeping interval.
298	longHousekeeping := 100 * time.Millisecond
299	if *globalHousekeepingInterval/2 < longHousekeeping {
300		longHousekeeping = *globalHousekeepingInterval / 2
301	}
302
303	ticker := time.Tick(*globalHousekeepingInterval)
304	for {
305		select {
306		case t := <-ticker:
307			start := time.Now()
308
309			// Check for new containers.
310			err := self.detectSubcontainers("/")
311			if err != nil {
312				glog.Errorf("Failed to detect containers: %s", err)
313			}
314
315			// Log if housekeeping took too long.
316			duration := time.Since(start)
317			if duration >= longHousekeeping {
318				glog.V(3).Infof("Global Housekeeping(%d) took %s", t.Unix(), duration)
319			}
320		case <-quit:
321			// Quit if asked to do so.
322			quit <- nil
323			glog.Infof("Exiting global housekeeping thread")
324			return
325		}
326	}
327}
328
329func (self *manager) getContainerData(containerName string) (*containerData, error) {
330	var cont *containerData
331	var ok bool
332	func() {
333		self.containersLock.RLock()
334		defer self.containersLock.RUnlock()
335
336		// Ensure we have the container.
337		cont, ok = self.containers[namespacedContainerName{
338			Name: containerName,
339		}]
340	}()
341	if !ok {
342		return nil, fmt.Errorf("unknown container %q", containerName)
343	}
344	return cont, nil
345}
346
347func (self *manager) GetDerivedStats(containerName string, options v2.RequestOptions) (map[string]v2.DerivedStats, error) {
348	conts, err := self.getRequestedContainers(containerName, options)
349	if err != nil {
350		return nil, err
351	}
352	stats := make(map[string]v2.DerivedStats)
353	for name, cont := range conts {
354		d, err := cont.DerivedStats()
355		if err != nil {
356			return nil, err
357		}
358		stats[name] = d
359	}
360	return stats, nil
361}
362
363func (self *manager) GetContainerSpec(containerName string, options v2.RequestOptions) (map[string]v2.ContainerSpec, error) {
364	conts, err := self.getRequestedContainers(containerName, options)
365	if err != nil {
366		return nil, err
367	}
368	specs := make(map[string]v2.ContainerSpec)
369	for name, cont := range conts {
370		cinfo, err := cont.GetInfo()
371		if err != nil {
372			return nil, err
373		}
374		spec := self.getV2Spec(cinfo)
375		specs[name] = spec
376	}
377	return specs, nil
378}
379
380// Get V2 container spec from v1 container info.
381func (self *manager) getV2Spec(cinfo *containerInfo) v2.ContainerSpec {
382	spec := self.getAdjustedSpec(cinfo)
383	return v2.ContainerSpecFromV1(&spec, cinfo.Aliases, cinfo.Namespace)
384}
385
386func (self *manager) getAdjustedSpec(cinfo *containerInfo) info.ContainerSpec {
387	spec := cinfo.Spec
388
389	// Set default value to an actual value
390	if spec.HasMemory {
391		// Memory.Limit is 0 means there's no limit
392		if spec.Memory.Limit == 0 {
393			spec.Memory.Limit = uint64(self.machineInfo.MemoryCapacity)
394		}
395	}
396	return spec
397}
398
399func (self *manager) GetContainerInfo(containerName string, query *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
400	cont, err := self.getContainerData(containerName)
401	if err != nil {
402		return nil, err
403	}
404	return self.containerDataToContainerInfo(cont, query)
405}
406
407func (self *manager) GetContainerInfoV2(containerName string, options v2.RequestOptions) (map[string]v2.ContainerInfo, error) {
408	containers, err := self.getRequestedContainers(containerName, options)
409	if err != nil {
410		return nil, err
411	}
412
413	infos := make(map[string]v2.ContainerInfo, len(containers))
414	for name, container := range containers {
415		cinfo, err := container.GetInfo()
416		if err != nil {
417			return nil, err
418		}
419
420		var nilTime time.Time // Ignored.
421		stats, err := self.memoryCache.RecentStats(name, nilTime, nilTime, options.Count)
422		if err != nil {
423			return nil, err
424		}
425
426		infos[name] = v2.ContainerInfo{
427			Spec:  self.getV2Spec(cinfo),
428			Stats: v2.ContainerStatsFromV1(&cinfo.Spec, stats),
429		}
430	}
431
432	return infos, nil
433}
434
435func (self *manager) containerDataToContainerInfo(cont *containerData, query *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
436	// Get the info from the container.
437	cinfo, err := cont.GetInfo()
438	if err != nil {
439		return nil, err
440	}
441
442	stats, err := self.memoryCache.RecentStats(cinfo.Name, query.Start, query.End, query.NumStats)
443	if err != nil {
444		return nil, err
445	}
446
447	// Make a copy of the info for the user.
448	ret := &info.ContainerInfo{
449		ContainerReference: cinfo.ContainerReference,
450		Subcontainers:      cinfo.Subcontainers,
451		Spec:               self.getAdjustedSpec(cinfo),
452		Stats:              stats,
453	}
454	return ret, nil
455}
456
457func (self *manager) getContainer(containerName string) (*containerData, error) {
458	self.containersLock.RLock()
459	defer self.containersLock.RUnlock()
460	cont, ok := self.containers[namespacedContainerName{Name: containerName}]
461	if !ok {
462		return nil, fmt.Errorf("unknown container %q", containerName)
463	}
464	return cont, nil
465}
466
467func (self *manager) getSubcontainers(containerName string) map[string]*containerData {
468	self.containersLock.RLock()
469	defer self.containersLock.RUnlock()
470	containersMap := make(map[string]*containerData, len(self.containers))
471
472	// Get all the unique subcontainers of the specified container
473	matchedName := path.Join(containerName, "/")
474	for i := range self.containers {
475		name := self.containers[i].info.Name
476		if name == containerName || strings.HasPrefix(name, matchedName) {
477			containersMap[self.containers[i].info.Name] = self.containers[i]
478		}
479	}
480	return containersMap
481}
482
483func (self *manager) SubcontainersInfo(containerName string, query *info.ContainerInfoRequest) ([]*info.ContainerInfo, error) {
484	containersMap := self.getSubcontainers(containerName)
485
486	containers := make([]*containerData, 0, len(containersMap))
487	for _, cont := range containersMap {
488		containers = append(containers, cont)
489	}
490	return self.containerDataSliceToContainerInfoSlice(containers, query)
491}
492
493func (self *manager) getAllDockerContainers() map[string]*containerData {
494	self.containersLock.RLock()
495	defer self.containersLock.RUnlock()
496	containers := make(map[string]*containerData, len(self.containers))
497
498	// Get containers in the Docker namespace.
499	for name, cont := range self.containers {
500		if name.Namespace == docker.DockerNamespace {
501			containers[cont.info.Name] = cont
502		}
503	}
504	return containers
505}
506
507func (self *manager) AllDockerContainers(query *info.ContainerInfoRequest) (map[string]info.ContainerInfo, error) {
508	containers := self.getAllDockerContainers()
509
510	output := make(map[string]info.ContainerInfo, len(containers))
511	for name, cont := range containers {
512		inf, err := self.containerDataToContainerInfo(cont, query)
513		if err != nil {
514			return nil, err
515		}
516		output[name] = *inf
517	}
518	return output, nil
519}
520
521func (self *manager) getDockerContainer(containerName string) (*containerData, error) {
522	self.containersLock.RLock()
523	defer self.containersLock.RUnlock()
524
525	// Check for the container in the Docker container namespace.
526	cont, ok := self.containers[namespacedContainerName{
527		Namespace: docker.DockerNamespace,
528		Name:      containerName,
529	}]
530	if !ok {
531		return nil, fmt.Errorf("unable to find Docker container %q", containerName)
532	}
533	return cont, nil
534}
535
536func (self *manager) DockerContainer(containerName string, query *info.ContainerInfoRequest) (info.ContainerInfo, error) {
537	container, err := self.getDockerContainer(containerName)
538	if err != nil {
539		return info.ContainerInfo{}, err
540	}
541
542	inf, err := self.containerDataToContainerInfo(container, query)
543	if err != nil {
544		return info.ContainerInfo{}, err
545	}
546	return *inf, nil
547}
548
549func (self *manager) containerDataSliceToContainerInfoSlice(containers []*containerData, query *info.ContainerInfoRequest) ([]*info.ContainerInfo, error) {
550	if len(containers) == 0 {
551		return nil, fmt.Errorf("no containers found")
552	}
553
554	// Get the info for each container.
555	output := make([]*info.ContainerInfo, 0, len(containers))
556	for i := range containers {
557		cinfo, err := self.containerDataToContainerInfo(containers[i], query)
558		if err != nil {
559			// Skip containers with errors, we try to degrade gracefully.
560			continue
561		}
562		output = append(output, cinfo)
563	}
564
565	return output, nil
566}
567
568func (self *manager) GetRequestedContainersInfo(containerName string, options v2.RequestOptions) (map[string]*info.ContainerInfo, error) {
569	containers, err := self.getRequestedContainers(containerName, options)
570	if err != nil {
571		return nil, err
572	}
573	containersMap := make(map[string]*info.ContainerInfo)
574	query := info.ContainerInfoRequest{
575		NumStats: options.Count,
576	}
577	for name, data := range containers {
578		info, err := self.containerDataToContainerInfo(data, &query)
579		if err != nil {
580			// Skip containers with errors, we try to degrade gracefully.
581			continue
582		}
583		containersMap[name] = info
584	}
585	return containersMap, nil
586}
587
588func (self *manager) getRequestedContainers(containerName string, options v2.RequestOptions) (map[string]*containerData, error) {
589	containersMap := make(map[string]*containerData)
590	switch options.IdType {
591	case v2.TypeName:
592		if options.Recursive == false {
593			cont, err := self.getContainer(containerName)
594			if err != nil {
595				return containersMap, err
596			}
597			containersMap[cont.info.Name] = cont
598		} else {
599			containersMap = self.getSubcontainers(containerName)
600			if len(containersMap) == 0 {
601				return containersMap, fmt.Errorf("unknown container: %q", containerName)
602			}
603		}
604	case v2.TypeDocker:
605		if options.Recursive == false {
606			containerName = strings.TrimPrefix(containerName, "/")
607			cont, err := self.getDockerContainer(containerName)
608			if err != nil {
609				return containersMap, err
610			}
611			containersMap[cont.info.Name] = cont
612		} else {
613			if containerName != "/" {
614				return containersMap, fmt.Errorf("invalid request for docker container %q with subcontainers", containerName)
615			}
616			containersMap = self.getAllDockerContainers()
617		}
618	default:
619		return containersMap, fmt.Errorf("invalid request type %q", options.IdType)
620	}
621	return containersMap, nil
622}
623
624func (self *manager) GetFsInfo(label string) ([]v2.FsInfo, error) {
625	var empty time.Time
626	// Get latest data from filesystems hanging off root container.
627	stats, err := self.memoryCache.RecentStats("/", empty, empty, 1)
628	if err != nil {
629		return nil, err
630	}
631	dev := ""
632	if len(label) != 0 {
633		dev, err = self.fsInfo.GetDeviceForLabel(label)
634		if err != nil {
635			return nil, err
636		}
637	}
638	fsInfo := []v2.FsInfo{}
639	for _, fs := range stats[0].Filesystem {
640		if len(label) != 0 && fs.Device != dev {
641			continue
642		}
643		mountpoint, err := self.fsInfo.GetMountpointForDevice(fs.Device)
644		if err != nil {
645			return nil, err
646		}
647		labels, err := self.fsInfo.GetLabelsForDevice(fs.Device)
648		if err != nil {
649			return nil, err
650		}
651		fi := v2.FsInfo{
652			Device:     fs.Device,
653			Mountpoint: mountpoint,
654			Capacity:   fs.Limit,
655			Usage:      fs.Usage,
656			Available:  fs.Available,
657			Labels:     labels,
658		}
659		fsInfo = append(fsInfo, fi)
660	}
661	return fsInfo, nil
662}
663
664func (m *manager) GetMachineInfo() (*info.MachineInfo, error) {
665	// Copy and return the MachineInfo.
666	return &m.machineInfo, nil
667}
668
669func (m *manager) GetVersionInfo() (*info.VersionInfo, error) {
670	// TODO: Consider caching this and periodically updating.  The VersionInfo may change if
671	// the docker daemon is started after the cAdvisor client is created.  Caching the value
672	// would be helpful so we would be able to return the last known docker version if
673	// docker was down at the time of a query.
674	return getVersionInfo()
675}
676
677func (m *manager) Exists(containerName string) bool {
678	m.containersLock.Lock()
679	defer m.containersLock.Unlock()
680
681	namespacedName := namespacedContainerName{
682		Name: containerName,
683	}
684
685	_, ok := m.containers[namespacedName]
686	if ok {
687		return true
688	}
689	return false
690}
691
692func (m *manager) GetProcessList(containerName string, options v2.RequestOptions) ([]v2.ProcessInfo, error) {
693	// override recursive. Only support single container listing.
694	options.Recursive = false
695	conts, err := m.getRequestedContainers(containerName, options)
696	if err != nil {
697		return nil, err
698	}
699	if len(conts) != 1 {
700		return nil, fmt.Errorf("Expected the request to match only one container")
701	}
702	// TODO(rjnagal): handle count? Only if we can do count by type (eg. top 5 cpu users)
703	ps := []v2.ProcessInfo{}
704	for _, cont := range conts {
705		ps, err = cont.GetProcessList(m.cadvisorContainer, m.inHostNamespace)
706		if err != nil {
707			return nil, err
708		}
709	}
710	return ps, nil
711}
712
713func (m *manager) registerCollectors(collectorConfigs map[string]string, cont *containerData) error {
714	for k, v := range collectorConfigs {
715		configFile, err := cont.ReadFile(v, m.inHostNamespace)
716		if err != nil {
717			return fmt.Errorf("failed to read config file %q for config %q, container %q: %v", k, v, cont.info.Name, err)
718		}
719		glog.V(3).Infof("Got config from %q: %q", v, configFile)
720
721		if strings.HasPrefix(k, "prometheus") || strings.HasPrefix(k, "Prometheus") {
722			newCollector, err := collector.NewPrometheusCollector(k, configFile, *applicationMetricsCountLimit)
723			if err != nil {
724				glog.Infof("failed to create collector for container %q, config %q: %v", cont.info.Name, k, err)
725				return err
726			}
727			err = cont.collectorManager.RegisterCollector(newCollector)
728			if err != nil {
729				glog.Infof("failed to register collector for container %q, config %q: %v", cont.info.Name, k, err)
730				return err
731			}
732		} else {
733			newCollector, err := collector.NewCollector(k, configFile, *applicationMetricsCountLimit)
734			if err != nil {
735				glog.Infof("failed to create collector for container %q, config %q: %v", cont.info.Name, k, err)
736				return err
737			}
738			err = cont.collectorManager.RegisterCollector(newCollector)
739			if err != nil {
740				glog.Infof("failed to register collector for container %q, config %q: %v", cont.info.Name, k, err)
741				return err
742			}
743		}
744	}
745	return nil
746}
747
748// Create a container.
749func (m *manager) createContainer(containerName string) error {
750	m.containersLock.Lock()
751	defer m.containersLock.Unlock()
752
753	namespacedName := namespacedContainerName{
754		Name: containerName,
755	}
756
757	// Check that the container didn't already exist.
758	if _, ok := m.containers[namespacedName]; ok {
759		return nil
760	}
761
762	handler, accept, err := container.NewContainerHandler(containerName, m.inHostNamespace)
763	if err != nil {
764		return err
765	}
766	if !accept {
767		// ignoring this container.
768		glog.V(4).Infof("ignoring container %q", containerName)
769		return nil
770	}
771	collectorManager, err := collector.NewCollectorManager()
772	if err != nil {
773		return err
774	}
775
776	logUsage := *logCadvisorUsage && containerName == m.cadvisorContainer
777	cont, err := newContainerData(containerName, m.memoryCache, handler, m.loadReader, logUsage, collectorManager, m.maxHousekeepingInterval, m.allowDynamicHousekeeping)
778	if err != nil {
779		return err
780	}
781
782	// Add collectors
783	labels := handler.GetContainerLabels()
784	collectorConfigs := collector.GetCollectorConfigs(labels)
785	err = m.registerCollectors(collectorConfigs, cont)
786	if err != nil {
787		glog.Infof("failed to register collectors for %q: %v", containerName, err)
788	}
789
790	// Add the container name and all its aliases. The aliases must be within the namespace of the factory.
791	m.containers[namespacedName] = cont
792	for _, alias := range cont.info.Aliases {
793		m.containers[namespacedContainerName{
794			Namespace: cont.info.Namespace,
795			Name:      alias,
796		}] = cont
797	}
798
799	glog.V(3).Infof("Added container: %q (aliases: %v, namespace: %q)", containerName, cont.info.Aliases, cont.info.Namespace)
800
801	contSpec, err := cont.handler.GetSpec()
802	if err != nil {
803		return err
804	}
805
806	contRef, err := cont.handler.ContainerReference()
807	if err != nil {
808		return err
809	}
810
811	newEvent := &info.Event{
812		ContainerName: contRef.Name,
813		Timestamp:     contSpec.CreationTime,
814		EventType:     info.EventContainerCreation,
815	}
816	err = m.eventHandler.AddEvent(newEvent)
817	if err != nil {
818		return err
819	}
820
821	// Start the container's housekeeping.
822	return cont.Start()
823}
824
825func (m *manager) destroyContainer(containerName string) error {
826	m.containersLock.Lock()
827	defer m.containersLock.Unlock()
828
829	namespacedName := namespacedContainerName{
830		Name: containerName,
831	}
832	cont, ok := m.containers[namespacedName]
833	if !ok {
834		// Already destroyed, done.
835		return nil
836	}
837
838	// Tell the container to stop.
839	err := cont.Stop()
840	if err != nil {
841		return err
842	}
843
844	// Remove the container from our records (and all its aliases).
845	delete(m.containers, namespacedName)
846	for _, alias := range cont.info.Aliases {
847		delete(m.containers, namespacedContainerName{
848			Namespace: cont.info.Namespace,
849			Name:      alias,
850		})
851	}
852	glog.V(3).Infof("Destroyed container: %q (aliases: %v, namespace: %q)", containerName, cont.info.Aliases, cont.info.Namespace)
853
854	contRef, err := cont.handler.ContainerReference()
855	if err != nil {
856		return err
857	}
858
859	newEvent := &info.Event{
860		ContainerName: contRef.Name,
861		Timestamp:     time.Now(),
862		EventType:     info.EventContainerDeletion,
863	}
864	err = m.eventHandler.AddEvent(newEvent)
865	if err != nil {
866		return err
867	}
868	return nil
869}
870
871// Detect all containers that have been added or deleted from the specified container.
872func (m *manager) getContainersDiff(containerName string) (added []info.ContainerReference, removed []info.ContainerReference, err error) {
873	m.containersLock.RLock()
874	defer m.containersLock.RUnlock()
875
876	// Get all subcontainers recursively.
877	cont, ok := m.containers[namespacedContainerName{
878		Name: containerName,
879	}]
880	if !ok {
881		return nil, nil, fmt.Errorf("failed to find container %q while checking for new containers", containerName)
882	}
883	allContainers, err := cont.handler.ListContainers(container.ListRecursive)
884	if err != nil {
885		return nil, nil, err
886	}
887	allContainers = append(allContainers, info.ContainerReference{Name: containerName})
888
889	// Determine which were added and which were removed.
890	allContainersSet := make(map[string]*containerData)
891	for name, d := range m.containers {
892		// Only add the canonical name.
893		if d.info.Name == name.Name {
894			allContainersSet[name.Name] = d
895		}
896	}
897
898	// Added containers
899	for _, c := range allContainers {
900		delete(allContainersSet, c.Name)
901		_, ok := m.containers[namespacedContainerName{
902			Name: c.Name,
903		}]
904		if !ok {
905			added = append(added, c)
906		}
907	}
908
909	// Removed ones are no longer in the container listing.
910	for _, d := range allContainersSet {
911		removed = append(removed, d.info.ContainerReference)
912	}
913
914	return
915}
916
917// Detect the existing subcontainers and reflect the setup here.
918func (m *manager) detectSubcontainers(containerName string) error {
919	added, removed, err := m.getContainersDiff(containerName)
920	if err != nil {
921		return err
922	}
923
924	// Add the new containers.
925	for _, cont := range added {
926		err = m.createContainer(cont.Name)
927		if err != nil {
928			glog.Errorf("Failed to create existing container: %s: %s", cont.Name, err)
929		}
930	}
931
932	// Remove the old containers.
933	for _, cont := range removed {
934		err = m.destroyContainer(cont.Name)
935		if err != nil {
936			glog.Errorf("Failed to destroy existing container: %s: %s", cont.Name, err)
937		}
938	}
939
940	return nil
941}
942
943// Watches for new containers started in the system. Runs forever unless there is a setup error.
944func (self *manager) watchForNewContainers(quit chan error) error {
945	var root *containerData
946	var ok bool
947	func() {
948		self.containersLock.RLock()
949		defer self.containersLock.RUnlock()
950		root, ok = self.containers[namespacedContainerName{
951			Name: "/",
952		}]
953	}()
954	if !ok {
955		return fmt.Errorf("root container does not exist when watching for new containers")
956	}
957
958	// Register for new subcontainers.
959	eventsChannel := make(chan container.SubcontainerEvent, 16)
960	err := root.handler.WatchSubcontainers(eventsChannel)
961	if err != nil {
962		return err
963	}
964
965	// There is a race between starting the watch and new container creation so we do a detection before we read new containers.
966	err = self.detectSubcontainers("/")
967	if err != nil {
968		return err
969	}
970
971	// Listen to events from the container handler.
972	go func() {
973		for {
974			select {
975			case event := <-eventsChannel:
976				switch {
977				case event.EventType == container.SubcontainerAdd:
978					err = self.createContainer(event.Name)
979				case event.EventType == container.SubcontainerDelete:
980					err = self.destroyContainer(event.Name)
981				}
982				if err != nil {
983					glog.Warningf("Failed to process watch event: %v", err)
984				}
985			case <-quit:
986				// Stop processing events if asked to quit.
987				err := root.handler.StopWatchingSubcontainers()
988				quit <- err
989				if err == nil {
990					glog.Infof("Exiting thread watching subcontainers")
991					return
992				}
993			}
994		}
995	}()
996	return nil
997}
998
999func (self *manager) watchForNewOoms() error {
1000	glog.Infof("Started watching for new ooms in manager")
1001	outStream := make(chan *oomparser.OomInstance, 10)
1002	oomLog, err := oomparser.New()
1003	if err != nil {
1004		return err
1005	}
1006	go oomLog.StreamOoms(outStream)
1007
1008	go func() {
1009		for oomInstance := range outStream {
1010			// Surface OOM and OOM kill events.
1011			newEvent := &info.Event{
1012				ContainerName: oomInstance.ContainerName,
1013				Timestamp:     oomInstance.TimeOfDeath,
1014				EventType:     info.EventOom,
1015			}
1016			err := self.eventHandler.AddEvent(newEvent)
1017			if err != nil {
1018				glog.Errorf("failed to add OOM event for %q: %v", oomInstance.ContainerName, err)
1019			}
1020			glog.V(3).Infof("Created an OOM event in container %q at %v", oomInstance.ContainerName, oomInstance.TimeOfDeath)
1021
1022			newEvent = &info.Event{
1023				ContainerName: oomInstance.VictimContainerName,
1024				Timestamp:     oomInstance.TimeOfDeath,
1025				EventType:     info.EventOomKill,
1026				EventData: info.EventData{
1027					OomKill: &info.OomKillEventData{
1028						Pid:         oomInstance.Pid,
1029						ProcessName: oomInstance.ProcessName,
1030					},
1031				},
1032			}
1033			err = self.eventHandler.AddEvent(newEvent)
1034			if err != nil {
1035				glog.Errorf("failed to add OOM kill event for %q: %v", oomInstance.ContainerName, err)
1036			}
1037		}
1038	}()
1039	return nil
1040}
1041
1042// can be called by the api which will take events returned on the channel
1043func (self *manager) WatchForEvents(request *events.Request) (*events.EventChannel, error) {
1044	return self.eventHandler.WatchEvents(request)
1045}
1046
1047// can be called by the api which will return all events satisfying the request
1048func (self *manager) GetPastEvents(request *events.Request) ([]*info.Event, error) {
1049	return self.eventHandler.GetEvents(request)
1050}
1051
1052// called by the api when a client is no longer listening to the channel
1053func (self *manager) CloseEventChannel(watch_id int) {
1054	self.eventHandler.StopWatch(watch_id)
1055}
1056
1057// Parses the events StoragePolicy from the flags.
1058func parseEventsStoragePolicy() events.StoragePolicy {
1059	policy := events.DefaultStoragePolicy()
1060
1061	// Parse max age.
1062	parts := strings.Split(*eventStorageAgeLimit, ",")
1063	for _, part := range parts {
1064		items := strings.Split(part, "=")
1065		if len(items) != 2 {
1066			glog.Warningf("Unknown event storage policy %q when parsing max age", part)
1067			continue
1068		}
1069		dur, err := time.ParseDuration(items[1])
1070		if err != nil {
1071			glog.Warningf("Unable to parse event max age duration %q: %v", items[1], err)
1072			continue
1073		}
1074		if items[0] == "default" {
1075			policy.DefaultMaxAge = dur
1076			continue
1077		}
1078		policy.PerTypeMaxAge[info.EventType(items[0])] = dur
1079	}
1080
1081	// Parse max number.
1082	parts = strings.Split(*eventStorageEventLimit, ",")
1083	for _, part := range parts {
1084		items := strings.Split(part, "=")
1085		if len(items) != 2 {
1086			glog.Warningf("Unknown event storage policy %q when parsing max event limit", part)
1087			continue
1088		}
1089		val, err := strconv.Atoi(items[1])
1090		if err != nil {
1091			glog.Warningf("Unable to parse integer from %q: %v", items[1], err)
1092			continue
1093		}
1094		if items[0] == "default" {
1095			policy.DefaultMaxNumEvents = val
1096			continue
1097		}
1098		policy.PerTypeMaxNumEvents[info.EventType(items[0])] = val
1099	}
1100
1101	return policy
1102}
1103
1104type DockerStatus struct {
1105	Version       string            `json:"version"`
1106	KernelVersion string            `json:"kernel_version"`
1107	OS            string            `json:"os"`
1108	Hostname      string            `json:"hostname"`
1109	RootDir       string            `json:"root_dir"`
1110	Driver        string            `json:"driver"`
1111	DriverStatus  map[string]string `json:"driver_status"`
1112	ExecDriver    string            `json:"exec_driver"`
1113	NumImages     int               `json:"num_images"`
1114	NumContainers int               `json:"num_containers"`
1115}
1116
1117type DockerImage struct {
1118	ID          string   `json:"id"`
1119	RepoTags    []string `json:"repo_tags"` // repository name and tags.
1120	Created     int64    `json:"created"`   // unix time since creation.
1121	VirtualSize int64    `json:"virtual_size"`
1122	Size        int64    `json:"size"`
1123}
1124
1125func (m *manager) DockerImages() ([]DockerImage, error) {
1126	images, err := docker.DockerImages()
1127	if err != nil {
1128		return nil, err
1129	}
1130	out := []DockerImage{}
1131	const unknownTag = "<none>:<none>"
1132	for _, image := range images {
1133		if len(image.RepoTags) == 1 && image.RepoTags[0] == unknownTag {
1134			// images with repo or tags are uninteresting.
1135			continue
1136		}
1137		di := DockerImage{
1138			ID:          image.ID,
1139			RepoTags:    image.RepoTags,
1140			Created:     image.Created,
1141			VirtualSize: image.VirtualSize,
1142			Size:        image.Size,
1143		}
1144		out = append(out, di)
1145	}
1146	return out, nil
1147}
1148
1149func (m *manager) DockerInfo() (DockerStatus, error) {
1150	info, err := docker.DockerInfo()
1151	if err != nil {
1152		return DockerStatus{}, err
1153	}
1154	versionInfo, err := m.GetVersionInfo()
1155	if err != nil {
1156		return DockerStatus{}, err
1157	}
1158	out := DockerStatus{}
1159	out.Version = versionInfo.DockerVersion
1160	if val, ok := info["KernelVersion"]; ok {
1161		out.KernelVersion = val
1162	}
1163	if val, ok := info["OperatingSystem"]; ok {
1164		out.OS = val
1165	}
1166	if val, ok := info["Name"]; ok {
1167		out.Hostname = val
1168	}
1169	if val, ok := info["DockerRootDir"]; ok {
1170		out.RootDir = val
1171	}
1172	if val, ok := info["Driver"]; ok {
1173		out.Driver = val
1174	}
1175	if val, ok := info["ExecutionDriver"]; ok {
1176		out.ExecDriver = val
1177	}
1178	if val, ok := info["Images"]; ok {
1179		n, err := strconv.Atoi(val)
1180		if err == nil {
1181			out.NumImages = n
1182		}
1183	}
1184	if val, ok := info["Containers"]; ok {
1185		n, err := strconv.Atoi(val)
1186		if err == nil {
1187			out.NumContainers = n
1188		}
1189	}
1190	if val, ok := info["DriverStatus"]; ok {
1191		var driverStatus [][]string
1192		err := json.Unmarshal([]byte(val), &driverStatus)
1193		if err != nil {
1194			return DockerStatus{}, err
1195		}
1196		out.DriverStatus = make(map[string]string)
1197		for _, v := range driverStatus {
1198			if len(v) == 2 {
1199				out.DriverStatus[v[0]] = v[1]
1200			}
1201		}
1202	}
1203	return out, nil
1204}
1205
1206func (m *manager) DebugInfo() map[string][]string {
1207	debugInfo := container.DebugInfo()
1208
1209	// Get unique containers.
1210	var conts map[*containerData]struct{}
1211	func() {
1212		m.containersLock.RLock()
1213		defer m.containersLock.RUnlock()
1214
1215		conts = make(map[*containerData]struct{}, len(m.containers))
1216		for _, c := range m.containers {
1217			conts[c] = struct{}{}
1218		}
1219	}()
1220
1221	// List containers.
1222	lines := make([]string, 0, len(conts))
1223	for cont := range conts {
1224		lines = append(lines, cont.info.Name)
1225		if cont.info.Namespace != "" {
1226			lines = append(lines, fmt.Sprintf("\tNamespace: %s", cont.info.Namespace))
1227		}
1228
1229		if len(cont.info.Aliases) != 0 {
1230			lines = append(lines, "\tAliases:")
1231			for _, alias := range cont.info.Aliases {
1232				lines = append(lines, fmt.Sprintf("\t\t%s", alias))
1233			}
1234		}
1235	}
1236
1237	debugInfo["Managed containers"] = lines
1238	return debugInfo
1239}
1240