1package stats // import "github.com/docker/docker/daemon/stats" 2 3import ( 4 "bufio" 5 "sync" 6 "time" 7 8 "github.com/docker/docker/api/types" 9 "github.com/docker/docker/container" 10 "github.com/docker/docker/pkg/pubsub" 11 "github.com/sirupsen/logrus" 12) 13 14// Collector manages and provides container resource stats 15type Collector struct { 16 m sync.Mutex 17 supervisor supervisor 18 interval time.Duration 19 publishers map[*container.Container]*pubsub.Publisher 20 bufReader *bufio.Reader 21 22 // The following fields are not set on Windows currently. 23 clockTicksPerSecond uint64 24} 25 26// NewCollector creates a stats collector that will poll the supervisor with the specified interval 27func NewCollector(supervisor supervisor, interval time.Duration) *Collector { 28 s := &Collector{ 29 interval: interval, 30 supervisor: supervisor, 31 publishers: make(map[*container.Container]*pubsub.Publisher), 32 bufReader: bufio.NewReaderSize(nil, 128), 33 } 34 35 platformNewStatsCollector(s) 36 37 return s 38} 39 40type supervisor interface { 41 // GetContainerStats collects all the stats related to a container 42 GetContainerStats(container *container.Container) (*types.StatsJSON, error) 43} 44 45// Collect registers the container with the collector and adds it to 46// the event loop for collection on the specified interval returning 47// a channel for the subscriber to receive on. 48func (s *Collector) Collect(c *container.Container) chan interface{} { 49 s.m.Lock() 50 defer s.m.Unlock() 51 publisher, exists := s.publishers[c] 52 if !exists { 53 publisher = pubsub.NewPublisher(100*time.Millisecond, 1024) 54 s.publishers[c] = publisher 55 } 56 return publisher.Subscribe() 57} 58 59// StopCollection closes the channels for all subscribers and removes 60// the container from metrics collection. 61func (s *Collector) StopCollection(c *container.Container) { 62 s.m.Lock() 63 if publisher, exists := s.publishers[c]; exists { 64 publisher.Close() 65 delete(s.publishers, c) 66 } 67 s.m.Unlock() 68} 69 70// Unsubscribe removes a specific subscriber from receiving updates for a container's stats. 71func (s *Collector) Unsubscribe(c *container.Container, ch chan interface{}) { 72 s.m.Lock() 73 publisher := s.publishers[c] 74 if publisher != nil { 75 publisher.Evict(ch) 76 if publisher.Len() == 0 { 77 delete(s.publishers, c) 78 } 79 } 80 s.m.Unlock() 81} 82 83// Run starts the collectors and will indefinitely collect stats from the supervisor 84func (s *Collector) Run() { 85 type publishersPair struct { 86 container *container.Container 87 publisher *pubsub.Publisher 88 } 89 // we cannot determine the capacity here. 90 // it will grow enough in first iteration 91 var pairs []publishersPair 92 93 for { 94 // Put sleep at the start so that it will always be hit, 95 // preventing a tight loop if no stats are collected. 96 time.Sleep(s.interval) 97 98 // it does not make sense in the first iteration, 99 // but saves allocations in further iterations 100 pairs = pairs[:0] 101 102 s.m.Lock() 103 for container, publisher := range s.publishers { 104 // copy pointers here to release the lock ASAP 105 pairs = append(pairs, publishersPair{container, publisher}) 106 } 107 s.m.Unlock() 108 if len(pairs) == 0 { 109 continue 110 } 111 112 onlineCPUs, err := s.getNumberOnlineCPUs() 113 if err != nil { 114 logrus.Errorf("collecting system online cpu count: %v", err) 115 continue 116 } 117 118 for _, pair := range pairs { 119 stats, err := s.supervisor.GetContainerStats(pair.container) 120 121 switch err.(type) { 122 case nil: 123 // Sample system CPU usage close to container usage to avoid 124 // noise in metric calculations. 125 systemUsage, err := s.getSystemCPUUsage() 126 if err != nil { 127 logrus.WithError(err).WithField("container_id", pair.container.ID).Errorf("collecting system cpu usage") 128 continue 129 } 130 131 // FIXME: move to containerd on Linux (not Windows) 132 stats.CPUStats.SystemUsage = systemUsage 133 stats.CPUStats.OnlineCPUs = onlineCPUs 134 135 pair.publisher.Publish(*stats) 136 137 case notRunningErr, notFoundErr: 138 // publish empty stats containing only name and ID if not running or not found 139 pair.publisher.Publish(types.StatsJSON{ 140 Name: pair.container.Name, 141 ID: pair.container.ID, 142 }) 143 144 default: 145 logrus.Errorf("collecting stats for %s: %v", pair.container.ID, err) 146 pair.publisher.Publish(types.StatsJSON{ 147 Name: pair.container.Name, 148 ID: pair.container.ID, 149 }) 150 } 151 } 152 } 153} 154 155type notRunningErr interface { 156 error 157 Conflict() 158} 159 160type notFoundErr interface { 161 error 162 NotFound() 163} 164