1package logger // import "github.com/docker/docker/daemon/logger" 2 3import ( 4 "io" 5 "os" 6 "path/filepath" 7 "sync" 8 "time" 9 10 "github.com/docker/docker/api/types/plugins/logdriver" 11 "github.com/docker/docker/pkg/plugingetter" 12 "github.com/pkg/errors" 13 "github.com/sirupsen/logrus" 14) 15 16// pluginAdapter takes a plugin and implements the Logger interface for logger 17// instances 18type pluginAdapter struct { 19 driverName string 20 id string 21 plugin logPlugin 22 fifoPath string 23 capabilities Capability 24 logInfo Info 25 26 // synchronize access to the log stream and shared buffer 27 mu sync.Mutex 28 enc logdriver.LogEntryEncoder 29 stream io.WriteCloser 30 // buf is shared for each `Log()` call to reduce allocations. 31 // buf must be protected by mutex 32 buf logdriver.LogEntry 33} 34 35func (a *pluginAdapter) Log(msg *Message) error { 36 a.mu.Lock() 37 38 a.buf.Line = msg.Line 39 a.buf.TimeNano = msg.Timestamp.UnixNano() 40 a.buf.Partial = msg.PLogMetaData != nil 41 a.buf.Source = msg.Source 42 43 err := a.enc.Encode(&a.buf) 44 a.buf.Reset() 45 46 a.mu.Unlock() 47 48 PutMessage(msg) 49 return err 50} 51 52func (a *pluginAdapter) Name() string { 53 return a.driverName 54} 55 56func (a *pluginAdapter) Close() error { 57 a.mu.Lock() 58 defer a.mu.Unlock() 59 60 if err := a.plugin.StopLogging(filepath.Join("/", "run", "docker", "logging", a.id)); err != nil { 61 return err 62 } 63 64 if err := a.stream.Close(); err != nil { 65 logrus.WithError(err).Error("error closing plugin fifo") 66 } 67 if err := os.Remove(a.fifoPath); err != nil && !os.IsNotExist(err) { 68 logrus.WithError(err).Error("error cleaning up plugin fifo") 69 } 70 71 // may be nil, especially for unit tests 72 if pluginGetter != nil { 73 pluginGetter.Get(a.Name(), extName, plugingetter.Release) 74 } 75 return nil 76} 77 78type pluginAdapterWithRead struct { 79 *pluginAdapter 80} 81 82func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher { 83 watcher := NewLogWatcher() 84 85 go func() { 86 defer close(watcher.Msg) 87 stream, err := a.plugin.ReadLogs(a.logInfo, config) 88 if err != nil { 89 watcher.Err <- errors.Wrap(err, "error getting log reader") 90 return 91 } 92 defer stream.Close() 93 94 dec := logdriver.NewLogEntryDecoder(stream) 95 for { 96 var buf logdriver.LogEntry 97 if err := dec.Decode(&buf); err != nil { 98 if err == io.EOF { 99 return 100 } 101 watcher.Err <- errors.Wrap(err, "error decoding log message") 102 return 103 } 104 105 msg := &Message{ 106 Timestamp: time.Unix(0, buf.TimeNano), 107 Line: buf.Line, 108 Source: buf.Source, 109 } 110 111 // plugin should handle this, but check just in case 112 if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) { 113 continue 114 } 115 if !config.Until.IsZero() && msg.Timestamp.After(config.Until) { 116 return 117 } 118 119 // send the message unless the consumer is gone 120 select { 121 case watcher.Msg <- msg: 122 case <-watcher.WatchConsumerGone(): 123 return 124 } 125 } 126 }() 127 128 return watcher 129} 130