1package stats
2
3import (
4	"time"
5
6	"github.com/docker/docker/api/types"
7	"github.com/docker/docker/container"
8	"github.com/docker/docker/pkg/pubsub"
9	"github.com/sirupsen/logrus"
10)
11
12// Collect registers the container with the collector and adds it to
13// the event loop for collection on the specified interval returning
14// a channel for the subscriber to receive on.
15func (s *Collector) Collect(c *container.Container) chan interface{} {
16	s.m.Lock()
17	defer s.m.Unlock()
18	publisher, exists := s.publishers[c]
19	if !exists {
20		publisher = pubsub.NewPublisher(100*time.Millisecond, 1024)
21		s.publishers[c] = publisher
22	}
23	return publisher.Subscribe()
24}
25
26// StopCollection closes the channels for all subscribers and removes
27// the container from metrics collection.
28func (s *Collector) StopCollection(c *container.Container) {
29	s.m.Lock()
30	if publisher, exists := s.publishers[c]; exists {
31		publisher.Close()
32		delete(s.publishers, c)
33	}
34	s.m.Unlock()
35}
36
37// Unsubscribe removes a specific subscriber from receiving updates for a container's stats.
38func (s *Collector) Unsubscribe(c *container.Container, ch chan interface{}) {
39	s.m.Lock()
40	publisher := s.publishers[c]
41	if publisher != nil {
42		publisher.Evict(ch)
43		if publisher.Len() == 0 {
44			delete(s.publishers, c)
45		}
46	}
47	s.m.Unlock()
48}
49
50// Run starts the collectors and will indefinitely collect stats from the supervisor
51func (s *Collector) Run() {
52	type publishersPair struct {
53		container *container.Container
54		publisher *pubsub.Publisher
55	}
56	// we cannot determine the capacity here.
57	// it will grow enough in first iteration
58	var pairs []publishersPair
59
60	for range time.Tick(s.interval) {
61		// it does not make sense in the first iteration,
62		// but saves allocations in further iterations
63		pairs = pairs[:0]
64
65		s.m.Lock()
66		for container, publisher := range s.publishers {
67			// copy pointers here to release the lock ASAP
68			pairs = append(pairs, publishersPair{container, publisher})
69		}
70		s.m.Unlock()
71		if len(pairs) == 0 {
72			continue
73		}
74
75		systemUsage, err := s.getSystemCPUUsage()
76		if err != nil {
77			logrus.Errorf("collecting system cpu usage: %v", err)
78			continue
79		}
80
81		onlineCPUs, err := s.getNumberOnlineCPUs()
82		if err != nil {
83			logrus.Errorf("collecting system online cpu count: %v", err)
84			continue
85		}
86
87		for _, pair := range pairs {
88			stats, err := s.supervisor.GetContainerStats(pair.container)
89
90			switch err.(type) {
91			case nil:
92				// FIXME: move to containerd on Linux (not Windows)
93				stats.CPUStats.SystemUsage = systemUsage
94				stats.CPUStats.OnlineCPUs = onlineCPUs
95
96				pair.publisher.Publish(*stats)
97
98			case notRunningErr, notFoundErr:
99				// publish empty stats containing only name and ID if not running or not found
100				pair.publisher.Publish(types.StatsJSON{
101					Name: pair.container.Name,
102					ID:   pair.container.ID,
103				})
104
105			default:
106				logrus.Errorf("collecting stats for %s: %v", pair.container.ID, err)
107			}
108		}
109	}
110}
111
112type notRunningErr interface {
113	error
114	Conflict()
115}
116
117type notFoundErr interface {
118	error
119	NotFound()
120}
121