1package logger // import "github.com/docker/docker/daemon/logger"
2
3import (
4	"bytes"
5	"io"
6	"sync"
7	"time"
8
9	types "github.com/docker/docker/api/types/backend"
10	"github.com/docker/docker/pkg/stringid"
11	"github.com/sirupsen/logrus"
12)
13
14const (
15	// readSize is the maximum bytes read during a single read
16	// operation.
17	readSize = 2 * 1024
18
19	// defaultBufSize provides a reasonable default for loggers that do
20	// not have an external limit to impose on log line size.
21	defaultBufSize = 16 * 1024
22)
23
24// Copier can copy logs from specified sources to Logger and attach Timestamp.
25// Writes are concurrent, so you need implement some sync in your logger.
26type Copier struct {
27	// srcs is map of name -> reader pairs, for example "stdout", "stderr"
28	srcs      map[string]io.Reader
29	dst       Logger
30	copyJobs  sync.WaitGroup
31	closeOnce sync.Once
32	closed    chan struct{}
33}
34
35// NewCopier creates a new Copier
36func NewCopier(srcs map[string]io.Reader, dst Logger) *Copier {
37	return &Copier{
38		srcs:   srcs,
39		dst:    dst,
40		closed: make(chan struct{}),
41	}
42}
43
44// Run starts logs copying
45func (c *Copier) Run() {
46	for src, w := range c.srcs {
47		c.copyJobs.Add(1)
48		go c.copySrc(src, w)
49	}
50}
51
52func (c *Copier) copySrc(name string, src io.Reader) {
53	defer c.copyJobs.Done()
54
55	bufSize := defaultBufSize
56	if sizedLogger, ok := c.dst.(SizedLogger); ok {
57		size := sizedLogger.BufSize()
58		// Loggers that wrap another loggers would have BufSize(), but cannot return the size
59		// when the wrapped loggers doesn't have BufSize().
60		if size > 0 {
61			bufSize = size
62		}
63	}
64	buf := make([]byte, bufSize)
65
66	n := 0
67	eof := false
68	var partialid string
69	var partialTS time.Time
70	var ordinal int
71	firstPartial := true
72	hasMorePartial := false
73
74	for {
75		select {
76		case <-c.closed:
77			return
78		default:
79			// Work out how much more data we are okay with reading this time.
80			upto := n + readSize
81			if upto > cap(buf) {
82				upto = cap(buf)
83			}
84			// Try to read that data.
85			if upto > n {
86				read, err := src.Read(buf[n:upto])
87				if err != nil {
88					if err != io.EOF {
89						logReadsFailedCount.Inc(1)
90						logrus.Errorf("Error scanning log stream: %s", err)
91						return
92					}
93					eof = true
94				}
95				n += read
96			}
97			// If we have no data to log, and there's no more coming, we're done.
98			if n == 0 && eof {
99				return
100			}
101			// Break up the data that we've buffered up into lines, and log each in turn.
102			p := 0
103
104			for q := bytes.IndexByte(buf[p:n], '\n'); q >= 0; q = bytes.IndexByte(buf[p:n], '\n') {
105				select {
106				case <-c.closed:
107					return
108				default:
109					msg := NewMessage()
110					msg.Source = name
111					msg.Line = append(msg.Line, buf[p:p+q]...)
112
113					if hasMorePartial {
114						msg.PLogMetaData = &types.PartialLogMetaData{ID: partialid, Ordinal: ordinal, Last: true}
115
116						// reset
117						partialid = ""
118						ordinal = 0
119						firstPartial = true
120						hasMorePartial = false
121					}
122					if msg.PLogMetaData == nil {
123						msg.Timestamp = time.Now().UTC()
124					} else {
125						msg.Timestamp = partialTS
126					}
127
128					if logErr := c.dst.Log(msg); logErr != nil {
129						logWritesFailedCount.Inc(1)
130						logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr)
131					}
132				}
133				p += q + 1
134			}
135			// If there's no more coming, or the buffer is full but
136			// has no newlines, log whatever we haven't logged yet,
137			// noting that it's a partial log line.
138			if eof || (p == 0 && n == len(buf)) {
139				if p < n {
140					msg := NewMessage()
141					msg.Source = name
142					msg.Line = append(msg.Line, buf[p:n]...)
143
144					// Generate unique partialID for first partial. Use it across partials.
145					// Record timestamp for first partial. Use it across partials.
146					// Initialize Ordinal for first partial. Increment it across partials.
147					if firstPartial {
148						msg.Timestamp = time.Now().UTC()
149						partialTS = msg.Timestamp
150						partialid = stringid.GenerateRandomID()
151						ordinal = 1
152						firstPartial = false
153						totalPartialLogs.Inc(1)
154					} else {
155						msg.Timestamp = partialTS
156					}
157					msg.PLogMetaData = &types.PartialLogMetaData{ID: partialid, Ordinal: ordinal, Last: false}
158					ordinal++
159					hasMorePartial = true
160
161					if logErr := c.dst.Log(msg); logErr != nil {
162						logWritesFailedCount.Inc(1)
163						logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr)
164					}
165					p = 0
166					n = 0
167				}
168				if eof {
169					return
170				}
171			}
172			// Move any unlogged data to the front of the buffer in preparation for another read.
173			if p > 0 {
174				copy(buf[0:], buf[p:n])
175				n -= p
176			}
177		}
178	}
179}
180
181// Wait waits until all copying is done
182func (c *Copier) Wait() {
183	c.copyJobs.Wait()
184}
185
186// Close closes the copier
187func (c *Copier) Close() {
188	c.closeOnce.Do(func() {
189		close(c.closed)
190	})
191}
192