1package stats // import "github.com/docker/docker/daemon/stats"
2
3import (
4	"bufio"
5	"sync"
6	"time"
7
8	"github.com/docker/docker/api/types"
9	"github.com/docker/docker/container"
10	"github.com/docker/docker/pkg/pubsub"
11	"github.com/sirupsen/logrus"
12)
13
14// Collector manages and provides container resource stats
15type Collector struct {
16	m          sync.Mutex
17	supervisor supervisor
18	interval   time.Duration
19	publishers map[*container.Container]*pubsub.Publisher
20	bufReader  *bufio.Reader
21
22	// The following fields are not set on Windows currently.
23	clockTicksPerSecond uint64
24}
25
26// NewCollector creates a stats collector that will poll the supervisor with the specified interval
27func NewCollector(supervisor supervisor, interval time.Duration) *Collector {
28	s := &Collector{
29		interval:   interval,
30		supervisor: supervisor,
31		publishers: make(map[*container.Container]*pubsub.Publisher),
32		bufReader:  bufio.NewReaderSize(nil, 128),
33	}
34
35	platformNewStatsCollector(s)
36
37	return s
38}
39
40type supervisor interface {
41	// GetContainerStats collects all the stats related to a container
42	GetContainerStats(container *container.Container) (*types.StatsJSON, error)
43}
44
45// Collect registers the container with the collector and adds it to
46// the event loop for collection on the specified interval returning
47// a channel for the subscriber to receive on.
48func (s *Collector) Collect(c *container.Container) chan interface{} {
49	s.m.Lock()
50	defer s.m.Unlock()
51	publisher, exists := s.publishers[c]
52	if !exists {
53		publisher = pubsub.NewPublisher(100*time.Millisecond, 1024)
54		s.publishers[c] = publisher
55	}
56	return publisher.Subscribe()
57}
58
59// StopCollection closes the channels for all subscribers and removes
60// the container from metrics collection.
61func (s *Collector) StopCollection(c *container.Container) {
62	s.m.Lock()
63	if publisher, exists := s.publishers[c]; exists {
64		publisher.Close()
65		delete(s.publishers, c)
66	}
67	s.m.Unlock()
68}
69
70// Unsubscribe removes a specific subscriber from receiving updates for a container's stats.
71func (s *Collector) Unsubscribe(c *container.Container, ch chan interface{}) {
72	s.m.Lock()
73	publisher := s.publishers[c]
74	if publisher != nil {
75		publisher.Evict(ch)
76		if publisher.Len() == 0 {
77			delete(s.publishers, c)
78		}
79	}
80	s.m.Unlock()
81}
82
83// Run starts the collectors and will indefinitely collect stats from the supervisor
84func (s *Collector) Run() {
85	type publishersPair struct {
86		container *container.Container
87		publisher *pubsub.Publisher
88	}
89	// we cannot determine the capacity here.
90	// it will grow enough in first iteration
91	var pairs []publishersPair
92
93	for {
94		// Put sleep at the start so that it will always be hit,
95		// preventing a tight loop if no stats are collected.
96		time.Sleep(s.interval)
97
98		// it does not make sense in the first iteration,
99		// but saves allocations in further iterations
100		pairs = pairs[:0]
101
102		s.m.Lock()
103		for container, publisher := range s.publishers {
104			// copy pointers here to release the lock ASAP
105			pairs = append(pairs, publishersPair{container, publisher})
106		}
107		s.m.Unlock()
108		if len(pairs) == 0 {
109			continue
110		}
111
112		onlineCPUs, err := s.getNumberOnlineCPUs()
113		if err != nil {
114			logrus.Errorf("collecting system online cpu count: %v", err)
115			continue
116		}
117
118		for _, pair := range pairs {
119			stats, err := s.supervisor.GetContainerStats(pair.container)
120
121			switch err.(type) {
122			case nil:
123				// Sample system CPU usage close to container usage to avoid
124				// noise in metric calculations.
125				systemUsage, err := s.getSystemCPUUsage()
126				if err != nil {
127					logrus.WithError(err).WithField("container_id", pair.container.ID).Errorf("collecting system cpu usage")
128					continue
129				}
130
131				// FIXME: move to containerd on Linux (not Windows)
132				stats.CPUStats.SystemUsage = systemUsage
133				stats.CPUStats.OnlineCPUs = onlineCPUs
134
135				pair.publisher.Publish(*stats)
136
137			case notRunningErr, notFoundErr:
138				// publish empty stats containing only name and ID if not running or not found
139				pair.publisher.Publish(types.StatsJSON{
140					Name: pair.container.Name,
141					ID:   pair.container.ID,
142				})
143
144			default:
145				logrus.Errorf("collecting stats for %s: %v", pair.container.ID, err)
146				pair.publisher.Publish(types.StatsJSON{
147					Name: pair.container.Name,
148					ID:   pair.container.ID,
149				})
150			}
151		}
152	}
153}
154
155type notRunningErr interface {
156	error
157	Conflict()
158}
159
160type notFoundErr interface {
161	error
162	NotFound()
163}
164