1package stats // import "github.com/docker/docker/daemon/stats"
2
3import (
4	"bufio"
5	"sync"
6	"time"
7
8	"github.com/docker/docker/api/types"
9	"github.com/docker/docker/container"
10	"github.com/docker/docker/pkg/pubsub"
11	"github.com/sirupsen/logrus"
12)
13
14// Collector manages and provides container resource stats
15type Collector struct {
16	m          sync.Mutex
17	cond       *sync.Cond
18	supervisor supervisor
19	interval   time.Duration
20	publishers map[*container.Container]*pubsub.Publisher
21	bufReader  *bufio.Reader
22}
23
24// NewCollector creates a stats collector that will poll the supervisor with the specified interval
25func NewCollector(supervisor supervisor, interval time.Duration) *Collector {
26	s := &Collector{
27		interval:   interval,
28		supervisor: supervisor,
29		publishers: make(map[*container.Container]*pubsub.Publisher),
30		bufReader:  bufio.NewReaderSize(nil, 128),
31	}
32	s.cond = sync.NewCond(&s.m)
33	return s
34}
35
36type supervisor interface {
37	// GetContainerStats collects all the stats related to a container
38	GetContainerStats(container *container.Container) (*types.StatsJSON, error)
39}
40
41// Collect registers the container with the collector and adds it to
42// the event loop for collection on the specified interval returning
43// a channel for the subscriber to receive on.
44func (s *Collector) Collect(c *container.Container) chan interface{} {
45	s.cond.L.Lock()
46	defer s.cond.L.Unlock()
47
48	publisher, exists := s.publishers[c]
49	if !exists {
50		publisher = pubsub.NewPublisher(100*time.Millisecond, 1024)
51		s.publishers[c] = publisher
52	}
53
54	s.cond.Broadcast()
55	return publisher.Subscribe()
56}
57
58// StopCollection closes the channels for all subscribers and removes
59// the container from metrics collection.
60func (s *Collector) StopCollection(c *container.Container) {
61	s.m.Lock()
62	if publisher, exists := s.publishers[c]; exists {
63		publisher.Close()
64		delete(s.publishers, c)
65	}
66	s.m.Unlock()
67}
68
69// Unsubscribe removes a specific subscriber from receiving updates for a container's stats.
70func (s *Collector) Unsubscribe(c *container.Container, ch chan interface{}) {
71	s.m.Lock()
72	publisher := s.publishers[c]
73	if publisher != nil {
74		publisher.Evict(ch)
75		if publisher.Len() == 0 {
76			delete(s.publishers, c)
77		}
78	}
79	s.m.Unlock()
80}
81
82// Run starts the collectors and will indefinitely collect stats from the supervisor
83func (s *Collector) Run() {
84	type publishersPair struct {
85		container *container.Container
86		publisher *pubsub.Publisher
87	}
88	// we cannot determine the capacity here.
89	// it will grow enough in first iteration
90	var pairs []publishersPair
91
92	for {
93		s.cond.L.Lock()
94		for len(s.publishers) == 0 {
95			s.cond.Wait()
96		}
97
98		// it does not make sense in the first iteration,
99		// but saves allocations in further iterations
100		pairs = pairs[:0]
101
102		for container, publisher := range s.publishers {
103			// copy pointers here to release the lock ASAP
104			pairs = append(pairs, publishersPair{container, publisher})
105		}
106
107		s.cond.L.Unlock()
108
109		onlineCPUs, err := s.getNumberOnlineCPUs()
110		if err != nil {
111			logrus.Errorf("collecting system online cpu count: %v", err)
112			continue
113		}
114
115		for _, pair := range pairs {
116			stats, err := s.supervisor.GetContainerStats(pair.container)
117
118			switch err.(type) {
119			case nil:
120				// Sample system CPU usage close to container usage to avoid
121				// noise in metric calculations.
122				systemUsage, err := s.getSystemCPUUsage()
123				if err != nil {
124					logrus.WithError(err).WithField("container_id", pair.container.ID).Errorf("collecting system cpu usage")
125					continue
126				}
127
128				// FIXME: move to containerd on Linux (not Windows)
129				stats.CPUStats.SystemUsage = systemUsage
130				stats.CPUStats.OnlineCPUs = onlineCPUs
131
132				pair.publisher.Publish(*stats)
133
134			case notRunningErr, notFoundErr:
135				// publish empty stats containing only name and ID if not running or not found
136				pair.publisher.Publish(types.StatsJSON{
137					Name: pair.container.Name,
138					ID:   pair.container.ID,
139				})
140
141			default:
142				logrus.Errorf("collecting stats for %s: %v", pair.container.ID, err)
143				pair.publisher.Publish(types.StatsJSON{
144					Name: pair.container.Name,
145					ID:   pair.container.ID,
146				})
147			}
148		}
149
150		time.Sleep(s.interval)
151	}
152}
153
154type notRunningErr interface {
155	error
156	Conflict()
157}
158
159type notFoundErr interface {
160	error
161	NotFound()
162}
163