1package logger // import "github.com/docker/docker/daemon/logger"
2
3import (
4	"io"
5	"os"
6	"path/filepath"
7	"sync"
8	"time"
9
10	"github.com/docker/docker/api/types/plugins/logdriver"
11	"github.com/docker/docker/pkg/plugingetter"
12	"github.com/pkg/errors"
13	"github.com/sirupsen/logrus"
14)
15
16// pluginAdapter takes a plugin and implements the Logger interface for logger
17// instances
18type pluginAdapter struct {
19	driverName   string
20	id           string
21	plugin       logPlugin
22	fifoPath     string
23	capabilities Capability
24	logInfo      Info
25
26	// synchronize access to the log stream and shared buffer
27	mu     sync.Mutex
28	enc    logdriver.LogEntryEncoder
29	stream io.WriteCloser
30	// buf is shared for each `Log()` call to reduce allocations.
31	// buf must be protected by mutex
32	buf logdriver.LogEntry
33}
34
35func (a *pluginAdapter) Log(msg *Message) error {
36	a.mu.Lock()
37
38	a.buf.Line = msg.Line
39	a.buf.TimeNano = msg.Timestamp.UnixNano()
40	a.buf.Partial = msg.PLogMetaData != nil
41	a.buf.Source = msg.Source
42	if msg.PLogMetaData != nil {
43		a.buf.PartialLogMetadata = &logdriver.PartialLogEntryMetadata{
44			Id:      msg.PLogMetaData.ID,
45			Last:    msg.PLogMetaData.Last,
46			Ordinal: int32(msg.PLogMetaData.Ordinal),
47		}
48	}
49
50	err := a.enc.Encode(&a.buf)
51	a.buf.Reset()
52
53	a.mu.Unlock()
54
55	PutMessage(msg)
56	return err
57}
58
59func (a *pluginAdapter) Name() string {
60	return a.driverName
61}
62
63func (a *pluginAdapter) Close() error {
64	a.mu.Lock()
65	defer a.mu.Unlock()
66
67	if err := a.plugin.StopLogging(filepath.Join("/", "run", "docker", "logging", a.id)); err != nil {
68		return err
69	}
70
71	if err := a.stream.Close(); err != nil {
72		logrus.WithError(err).Error("error closing plugin fifo")
73	}
74	if err := os.Remove(a.fifoPath); err != nil && !os.IsNotExist(err) {
75		logrus.WithError(err).Error("error cleaning up plugin fifo")
76	}
77
78	// may be nil, especially for unit tests
79	if pluginGetter != nil {
80		pluginGetter.Get(a.Name(), extName, plugingetter.Release)
81	}
82	return nil
83}
84
85type pluginAdapterWithRead struct {
86	*pluginAdapter
87}
88
89func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher {
90	watcher := NewLogWatcher()
91
92	go func() {
93		defer close(watcher.Msg)
94		stream, err := a.plugin.ReadLogs(a.logInfo, config)
95		if err != nil {
96			watcher.Err <- errors.Wrap(err, "error getting log reader")
97			return
98		}
99		defer stream.Close()
100
101		dec := logdriver.NewLogEntryDecoder(stream)
102		for {
103			var buf logdriver.LogEntry
104			if err := dec.Decode(&buf); err != nil {
105				if err == io.EOF {
106					return
107				}
108				watcher.Err <- errors.Wrap(err, "error decoding log message")
109				return
110			}
111
112			msg := &Message{
113				Timestamp: time.Unix(0, buf.TimeNano),
114				Line:      buf.Line,
115				Source:    buf.Source,
116			}
117
118			// plugin should handle this, but check just in case
119			if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
120				continue
121			}
122			if !config.Until.IsZero() && msg.Timestamp.After(config.Until) {
123				return
124			}
125
126			// send the message unless the consumer is gone
127			select {
128			case watcher.Msg <- msg:
129			case <-watcher.WatchConsumerGone():
130				return
131			}
132		}
133	}()
134
135	return watcher
136}
137