1package logger // import "github.com/docker/docker/daemon/logger"
2
3import (
4	"errors"
5	"sync"
6	"sync/atomic"
7
8	"github.com/sirupsen/logrus"
9)
10
11const (
12	defaultRingMaxSize = 1e6 // 1MB
13)
14
15// RingLogger is a ring buffer that implements the Logger interface.
16// This is used when lossy logging is OK.
17type RingLogger struct {
18	buffer    *messageRing
19	l         Logger
20	logInfo   Info
21	closeFlag int32
22}
23
24var _ SizedLogger = &RingLogger{}
25
26type ringWithReader struct {
27	*RingLogger
28}
29
30func (r *ringWithReader) ReadLogs(cfg ReadConfig) *LogWatcher {
31	reader, ok := r.l.(LogReader)
32	if !ok {
33		// something is wrong if we get here
34		panic("expected log reader")
35	}
36	return reader.ReadLogs(cfg)
37}
38
39func newRingLogger(driver Logger, logInfo Info, maxSize int64) *RingLogger {
40	l := &RingLogger{
41		buffer:  newRing(maxSize),
42		l:       driver,
43		logInfo: logInfo,
44	}
45	go l.run()
46	return l
47}
48
49// NewRingLogger creates a new Logger that is implemented as a RingBuffer wrapping
50// the passed in logger.
51func NewRingLogger(driver Logger, logInfo Info, maxSize int64) Logger {
52	if maxSize < 0 {
53		maxSize = defaultRingMaxSize
54	}
55	l := newRingLogger(driver, logInfo, maxSize)
56	if _, ok := driver.(LogReader); ok {
57		return &ringWithReader{l}
58	}
59	return l
60}
61
62// BufSize returns the buffer size of the underlying logger.
63// Returns -1 if the logger doesn't match SizedLogger interface.
64func (r *RingLogger) BufSize() int {
65	if sl, ok := r.l.(SizedLogger); ok {
66		return sl.BufSize()
67	}
68	return -1
69}
70
71// Log queues messages into the ring buffer
72func (r *RingLogger) Log(msg *Message) error {
73	if r.closed() {
74		return errClosed
75	}
76	return r.buffer.Enqueue(msg)
77}
78
79// Name returns the name of the underlying logger
80func (r *RingLogger) Name() string {
81	return r.l.Name()
82}
83
84func (r *RingLogger) closed() bool {
85	return atomic.LoadInt32(&r.closeFlag) == 1
86}
87
88func (r *RingLogger) setClosed() {
89	atomic.StoreInt32(&r.closeFlag, 1)
90}
91
92// Close closes the logger
93func (r *RingLogger) Close() error {
94	r.setClosed()
95	r.buffer.Close()
96	// empty out the queue
97	var logErr bool
98	for _, msg := range r.buffer.Drain() {
99		if logErr {
100			// some error logging a previous message, so re-insert to message pool
101			// and assume log driver is hosed
102			PutMessage(msg)
103			continue
104		}
105
106		if err := r.l.Log(msg); err != nil {
107			logrus.WithField("driver", r.l.Name()).
108				WithField("container", r.logInfo.ContainerID).
109				WithError(err).
110				Errorf("Error writing log message")
111			logErr = true
112		}
113	}
114	return r.l.Close()
115}
116
117// run consumes messages from the ring buffer and forwards them to the underling
118// logger.
119// This is run in a goroutine when the RingLogger is created
120func (r *RingLogger) run() {
121	for {
122		if r.closed() {
123			return
124		}
125		msg, err := r.buffer.Dequeue()
126		if err != nil {
127			// buffer is closed
128			return
129		}
130		if err := r.l.Log(msg); err != nil {
131			logrus.WithField("driver", r.l.Name()).
132				WithField("container", r.logInfo.ContainerID).
133				WithError(err).
134				Errorf("Error writing log message")
135		}
136	}
137}
138
139type messageRing struct {
140	mu sync.Mutex
141	// signals callers of `Dequeue` to wake up either on `Close` or when a new `Message` is added
142	wait *sync.Cond
143
144	sizeBytes int64 // current buffer size
145	maxBytes  int64 // max buffer size size
146	queue     []*Message
147	closed    bool
148}
149
150func newRing(maxBytes int64) *messageRing {
151	queueSize := 1000
152	if maxBytes == 0 || maxBytes == 1 {
153		// With 0 or 1 max byte size, the maximum size of the queue would only ever be 1
154		// message long.
155		queueSize = 1
156	}
157
158	r := &messageRing{queue: make([]*Message, 0, queueSize), maxBytes: maxBytes}
159	r.wait = sync.NewCond(&r.mu)
160	return r
161}
162
163// Enqueue adds a message to the buffer queue
164// If the message is too big for the buffer it drops the new message.
165// If there are no messages in the queue and the message is still too big, it adds the message anyway.
166func (r *messageRing) Enqueue(m *Message) error {
167	mSize := int64(len(m.Line))
168
169	r.mu.Lock()
170	if r.closed {
171		r.mu.Unlock()
172		return errClosed
173	}
174	if mSize+r.sizeBytes > r.maxBytes && len(r.queue) > 0 {
175		r.wait.Signal()
176		r.mu.Unlock()
177		return nil
178	}
179
180	r.queue = append(r.queue, m)
181	r.sizeBytes += mSize
182	r.wait.Signal()
183	r.mu.Unlock()
184	return nil
185}
186
187// Dequeue pulls a message off the queue
188// If there are no messages, it waits for one.
189// If the buffer is closed, it will return immediately.
190func (r *messageRing) Dequeue() (*Message, error) {
191	r.mu.Lock()
192	for len(r.queue) == 0 && !r.closed {
193		r.wait.Wait()
194	}
195
196	if r.closed {
197		r.mu.Unlock()
198		return nil, errClosed
199	}
200
201	msg := r.queue[0]
202	r.queue = r.queue[1:]
203	r.sizeBytes -= int64(len(msg.Line))
204	r.mu.Unlock()
205	return msg, nil
206}
207
208var errClosed = errors.New("closed")
209
210// Close closes the buffer ensuring no new messages can be added.
211// Any callers waiting to dequeue a message will be woken up.
212func (r *messageRing) Close() {
213	r.mu.Lock()
214	if r.closed {
215		r.mu.Unlock()
216		return
217	}
218
219	r.closed = true
220	r.wait.Broadcast()
221	r.mu.Unlock()
222}
223
224// Drain drains all messages from the queue.
225// This can be used after `Close()` to get any remaining messages that were in queue.
226func (r *messageRing) Drain() []*Message {
227	r.mu.Lock()
228	ls := make([]*Message, 0, len(r.queue))
229	ls = append(ls, r.queue...)
230	r.sizeBytes = 0
231	r.queue = r.queue[:0]
232	r.mu.Unlock()
233	return ls
234}
235