1package logger // import "github.com/docker/docker/daemon/logger" 2 3import ( 4 "errors" 5 "sync" 6 "sync/atomic" 7 8 "github.com/sirupsen/logrus" 9) 10 11const ( 12 defaultRingMaxSize = 1e6 // 1MB 13) 14 15// RingLogger is a ring buffer that implements the Logger interface. 16// This is used when lossy logging is OK. 17type RingLogger struct { 18 buffer *messageRing 19 l Logger 20 logInfo Info 21 closeFlag int32 22} 23 24var _ SizedLogger = &RingLogger{} 25 26type ringWithReader struct { 27 *RingLogger 28} 29 30func (r *ringWithReader) ReadLogs(cfg ReadConfig) *LogWatcher { 31 reader, ok := r.l.(LogReader) 32 if !ok { 33 // something is wrong if we get here 34 panic("expected log reader") 35 } 36 return reader.ReadLogs(cfg) 37} 38 39func newRingLogger(driver Logger, logInfo Info, maxSize int64) *RingLogger { 40 l := &RingLogger{ 41 buffer: newRing(maxSize), 42 l: driver, 43 logInfo: logInfo, 44 } 45 go l.run() 46 return l 47} 48 49// NewRingLogger creates a new Logger that is implemented as a RingBuffer wrapping 50// the passed in logger. 51func NewRingLogger(driver Logger, logInfo Info, maxSize int64) Logger { 52 if maxSize < 0 { 53 maxSize = defaultRingMaxSize 54 } 55 l := newRingLogger(driver, logInfo, maxSize) 56 if _, ok := driver.(LogReader); ok { 57 return &ringWithReader{l} 58 } 59 return l 60} 61 62// BufSize returns the buffer size of the underlying logger. 63// Returns -1 if the logger doesn't match SizedLogger interface. 64func (r *RingLogger) BufSize() int { 65 if sl, ok := r.l.(SizedLogger); ok { 66 return sl.BufSize() 67 } 68 return -1 69} 70 71// Log queues messages into the ring buffer 72func (r *RingLogger) Log(msg *Message) error { 73 if r.closed() { 74 return errClosed 75 } 76 return r.buffer.Enqueue(msg) 77} 78 79// Name returns the name of the underlying logger 80func (r *RingLogger) Name() string { 81 return r.l.Name() 82} 83 84func (r *RingLogger) closed() bool { 85 return atomic.LoadInt32(&r.closeFlag) == 1 86} 87 88func (r *RingLogger) setClosed() { 89 atomic.StoreInt32(&r.closeFlag, 1) 90} 91 92// Close closes the logger 93func (r *RingLogger) Close() error { 94 r.setClosed() 95 r.buffer.Close() 96 // empty out the queue 97 var logErr bool 98 for _, msg := range r.buffer.Drain() { 99 if logErr { 100 // some error logging a previous message, so re-insert to message pool 101 // and assume log driver is hosed 102 PutMessage(msg) 103 continue 104 } 105 106 if err := r.l.Log(msg); err != nil { 107 logrus.WithField("driver", r.l.Name()). 108 WithField("container", r.logInfo.ContainerID). 109 WithError(err). 110 Errorf("Error writing log message") 111 logErr = true 112 } 113 } 114 return r.l.Close() 115} 116 117// run consumes messages from the ring buffer and forwards them to the underling 118// logger. 119// This is run in a goroutine when the RingLogger is created 120func (r *RingLogger) run() { 121 for { 122 if r.closed() { 123 return 124 } 125 msg, err := r.buffer.Dequeue() 126 if err != nil { 127 // buffer is closed 128 return 129 } 130 if err := r.l.Log(msg); err != nil { 131 logrus.WithField("driver", r.l.Name()). 132 WithField("container", r.logInfo.ContainerID). 133 WithError(err). 134 Errorf("Error writing log message") 135 } 136 } 137} 138 139type messageRing struct { 140 mu sync.Mutex 141 // signals callers of `Dequeue` to wake up either on `Close` or when a new `Message` is added 142 wait *sync.Cond 143 144 sizeBytes int64 // current buffer size 145 maxBytes int64 // max buffer size size 146 queue []*Message 147 closed bool 148} 149 150func newRing(maxBytes int64) *messageRing { 151 queueSize := 1000 152 if maxBytes == 0 || maxBytes == 1 { 153 // With 0 or 1 max byte size, the maximum size of the queue would only ever be 1 154 // message long. 155 queueSize = 1 156 } 157 158 r := &messageRing{queue: make([]*Message, 0, queueSize), maxBytes: maxBytes} 159 r.wait = sync.NewCond(&r.mu) 160 return r 161} 162 163// Enqueue adds a message to the buffer queue 164// If the message is too big for the buffer it drops the new message. 165// If there are no messages in the queue and the message is still too big, it adds the message anyway. 166func (r *messageRing) Enqueue(m *Message) error { 167 mSize := int64(len(m.Line)) 168 169 r.mu.Lock() 170 if r.closed { 171 r.mu.Unlock() 172 return errClosed 173 } 174 if mSize+r.sizeBytes > r.maxBytes && len(r.queue) > 0 { 175 r.wait.Signal() 176 r.mu.Unlock() 177 return nil 178 } 179 180 r.queue = append(r.queue, m) 181 r.sizeBytes += mSize 182 r.wait.Signal() 183 r.mu.Unlock() 184 return nil 185} 186 187// Dequeue pulls a message off the queue 188// If there are no messages, it waits for one. 189// If the buffer is closed, it will return immediately. 190func (r *messageRing) Dequeue() (*Message, error) { 191 r.mu.Lock() 192 for len(r.queue) == 0 && !r.closed { 193 r.wait.Wait() 194 } 195 196 if r.closed { 197 r.mu.Unlock() 198 return nil, errClosed 199 } 200 201 msg := r.queue[0] 202 r.queue = r.queue[1:] 203 r.sizeBytes -= int64(len(msg.Line)) 204 r.mu.Unlock() 205 return msg, nil 206} 207 208var errClosed = errors.New("closed") 209 210// Close closes the buffer ensuring no new messages can be added. 211// Any callers waiting to dequeue a message will be woken up. 212func (r *messageRing) Close() { 213 r.mu.Lock() 214 if r.closed { 215 r.mu.Unlock() 216 return 217 } 218 219 r.closed = true 220 r.wait.Broadcast() 221 r.mu.Unlock() 222} 223 224// Drain drains all messages from the queue. 225// This can be used after `Close()` to get any remaining messages that were in queue. 226func (r *messageRing) Drain() []*Message { 227 r.mu.Lock() 228 ls := make([]*Message, 0, len(r.queue)) 229 ls = append(ls, r.queue...) 230 r.sizeBytes = 0 231 r.queue = r.queue[:0] 232 r.mu.Unlock() 233 return ls 234} 235