1package stats // import "github.com/docker/docker/daemon/stats" 2 3import ( 4 "bufio" 5 "sync" 6 "time" 7 8 "github.com/docker/docker/api/types" 9 "github.com/docker/docker/container" 10 "github.com/docker/docker/pkg/pubsub" 11 "github.com/sirupsen/logrus" 12) 13 14// Collector manages and provides container resource stats 15type Collector struct { 16 m sync.Mutex 17 cond *sync.Cond 18 supervisor supervisor 19 interval time.Duration 20 publishers map[*container.Container]*pubsub.Publisher 21 bufReader *bufio.Reader 22} 23 24// NewCollector creates a stats collector that will poll the supervisor with the specified interval 25func NewCollector(supervisor supervisor, interval time.Duration) *Collector { 26 s := &Collector{ 27 interval: interval, 28 supervisor: supervisor, 29 publishers: make(map[*container.Container]*pubsub.Publisher), 30 bufReader: bufio.NewReaderSize(nil, 128), 31 } 32 s.cond = sync.NewCond(&s.m) 33 return s 34} 35 36type supervisor interface { 37 // GetContainerStats collects all the stats related to a container 38 GetContainerStats(container *container.Container) (*types.StatsJSON, error) 39} 40 41// Collect registers the container with the collector and adds it to 42// the event loop for collection on the specified interval returning 43// a channel for the subscriber to receive on. 44func (s *Collector) Collect(c *container.Container) chan interface{} { 45 s.cond.L.Lock() 46 defer s.cond.L.Unlock() 47 48 publisher, exists := s.publishers[c] 49 if !exists { 50 publisher = pubsub.NewPublisher(100*time.Millisecond, 1024) 51 s.publishers[c] = publisher 52 } 53 54 s.cond.Broadcast() 55 return publisher.Subscribe() 56} 57 58// StopCollection closes the channels for all subscribers and removes 59// the container from metrics collection. 60func (s *Collector) StopCollection(c *container.Container) { 61 s.m.Lock() 62 if publisher, exists := s.publishers[c]; exists { 63 publisher.Close() 64 delete(s.publishers, c) 65 } 66 s.m.Unlock() 67} 68 69// Unsubscribe removes a specific subscriber from receiving updates for a container's stats. 70func (s *Collector) Unsubscribe(c *container.Container, ch chan interface{}) { 71 s.m.Lock() 72 publisher := s.publishers[c] 73 if publisher != nil { 74 publisher.Evict(ch) 75 if publisher.Len() == 0 { 76 delete(s.publishers, c) 77 } 78 } 79 s.m.Unlock() 80} 81 82// Run starts the collectors and will indefinitely collect stats from the supervisor 83func (s *Collector) Run() { 84 type publishersPair struct { 85 container *container.Container 86 publisher *pubsub.Publisher 87 } 88 // we cannot determine the capacity here. 89 // it will grow enough in first iteration 90 var pairs []publishersPair 91 92 for { 93 s.cond.L.Lock() 94 for len(s.publishers) == 0 { 95 s.cond.Wait() 96 } 97 98 // it does not make sense in the first iteration, 99 // but saves allocations in further iterations 100 pairs = pairs[:0] 101 102 for container, publisher := range s.publishers { 103 // copy pointers here to release the lock ASAP 104 pairs = append(pairs, publishersPair{container, publisher}) 105 } 106 107 s.cond.L.Unlock() 108 109 onlineCPUs, err := s.getNumberOnlineCPUs() 110 if err != nil { 111 logrus.Errorf("collecting system online cpu count: %v", err) 112 continue 113 } 114 115 for _, pair := range pairs { 116 stats, err := s.supervisor.GetContainerStats(pair.container) 117 118 switch err.(type) { 119 case nil: 120 // Sample system CPU usage close to container usage to avoid 121 // noise in metric calculations. 122 systemUsage, err := s.getSystemCPUUsage() 123 if err != nil { 124 logrus.WithError(err).WithField("container_id", pair.container.ID).Errorf("collecting system cpu usage") 125 continue 126 } 127 128 // FIXME: move to containerd on Linux (not Windows) 129 stats.CPUStats.SystemUsage = systemUsage 130 stats.CPUStats.OnlineCPUs = onlineCPUs 131 132 pair.publisher.Publish(*stats) 133 134 case notRunningErr, notFoundErr: 135 // publish empty stats containing only name and ID if not running or not found 136 pair.publisher.Publish(types.StatsJSON{ 137 Name: pair.container.Name, 138 ID: pair.container.ID, 139 }) 140 141 default: 142 logrus.Errorf("collecting stats for %s: %v", pair.container.ID, err) 143 pair.publisher.Publish(types.StatsJSON{ 144 Name: pair.container.Name, 145 ID: pair.container.ID, 146 }) 147 } 148 } 149 150 time.Sleep(s.interval) 151 } 152} 153 154type notRunningErr interface { 155 error 156 Conflict() 157} 158 159type notFoundErr interface { 160 error 161 NotFound() 162} 163