1package logger // import "github.com/docker/docker/daemon/logger"
2
3import (
4	"io"
5	"os"
6	"path/filepath"
7	"sync"
8	"time"
9
10	"github.com/docker/docker/api/types/plugins/logdriver"
11	"github.com/docker/docker/pkg/plugingetter"
12	"github.com/pkg/errors"
13	"github.com/sirupsen/logrus"
14)
15
16// pluginAdapter takes a plugin and implements the Logger interface for logger
17// instances
18type pluginAdapter struct {
19	driverName   string
20	id           string
21	plugin       logPlugin
22	fifoPath     string
23	capabilities Capability
24	logInfo      Info
25
26	// synchronize access to the log stream and shared buffer
27	mu     sync.Mutex
28	enc    logdriver.LogEntryEncoder
29	stream io.WriteCloser
30	// buf is shared for each `Log()` call to reduce allocations.
31	// buf must be protected by mutex
32	buf logdriver.LogEntry
33}
34
35func (a *pluginAdapter) Log(msg *Message) error {
36	a.mu.Lock()
37
38	a.buf.Line = msg.Line
39	a.buf.TimeNano = msg.Timestamp.UnixNano()
40	a.buf.Partial = msg.PLogMetaData != nil
41	a.buf.Source = msg.Source
42
43	err := a.enc.Encode(&a.buf)
44	a.buf.Reset()
45
46	a.mu.Unlock()
47
48	PutMessage(msg)
49	return err
50}
51
52func (a *pluginAdapter) Name() string {
53	return a.driverName
54}
55
56func (a *pluginAdapter) Close() error {
57	a.mu.Lock()
58	defer a.mu.Unlock()
59
60	if err := a.plugin.StopLogging(filepath.Join("/", "run", "docker", "logging", a.id)); err != nil {
61		return err
62	}
63
64	if err := a.stream.Close(); err != nil {
65		logrus.WithError(err).Error("error closing plugin fifo")
66	}
67	if err := os.Remove(a.fifoPath); err != nil && !os.IsNotExist(err) {
68		logrus.WithError(err).Error("error cleaning up plugin fifo")
69	}
70
71	// may be nil, especially for unit tests
72	if pluginGetter != nil {
73		pluginGetter.Get(a.Name(), extName, plugingetter.Release)
74	}
75	return nil
76}
77
78type pluginAdapterWithRead struct {
79	*pluginAdapter
80}
81
82func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher {
83	watcher := NewLogWatcher()
84
85	go func() {
86		defer close(watcher.Msg)
87		stream, err := a.plugin.ReadLogs(a.logInfo, config)
88		if err != nil {
89			watcher.Err <- errors.Wrap(err, "error getting log reader")
90			return
91		}
92		defer stream.Close()
93
94		dec := logdriver.NewLogEntryDecoder(stream)
95		for {
96			var buf logdriver.LogEntry
97			if err := dec.Decode(&buf); err != nil {
98				if err == io.EOF {
99					return
100				}
101				watcher.Err <- errors.Wrap(err, "error decoding log message")
102				return
103			}
104
105			msg := &Message{
106				Timestamp: time.Unix(0, buf.TimeNano),
107				Line:      buf.Line,
108				Source:    buf.Source,
109			}
110
111			// plugin should handle this, but check just in case
112			if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
113				continue
114			}
115			if !config.Until.IsZero() && msg.Timestamp.After(config.Until) {
116				return
117			}
118
119			// send the message unless the consumer is gone
120			select {
121			case watcher.Msg <- msg:
122			case <-watcher.WatchConsumerGone():
123				return
124			}
125		}
126	}()
127
128	return watcher
129}
130