1package logger // import "github.com/docker/docker/daemon/logger" 2 3import ( 4 "bytes" 5 "io" 6 "sync" 7 "time" 8 9 types "github.com/docker/docker/api/types/backend" 10 "github.com/docker/docker/pkg/stringid" 11 "github.com/sirupsen/logrus" 12) 13 14const ( 15 // readSize is the maximum bytes read during a single read 16 // operation. 17 readSize = 2 * 1024 18 19 // defaultBufSize provides a reasonable default for loggers that do 20 // not have an external limit to impose on log line size. 21 defaultBufSize = 16 * 1024 22) 23 24// Copier can copy logs from specified sources to Logger and attach Timestamp. 25// Writes are concurrent, so you need implement some sync in your logger. 26type Copier struct { 27 // srcs is map of name -> reader pairs, for example "stdout", "stderr" 28 srcs map[string]io.Reader 29 dst Logger 30 copyJobs sync.WaitGroup 31 closeOnce sync.Once 32 closed chan struct{} 33} 34 35// NewCopier creates a new Copier 36func NewCopier(srcs map[string]io.Reader, dst Logger) *Copier { 37 return &Copier{ 38 srcs: srcs, 39 dst: dst, 40 closed: make(chan struct{}), 41 } 42} 43 44// Run starts logs copying 45func (c *Copier) Run() { 46 for src, w := range c.srcs { 47 c.copyJobs.Add(1) 48 go c.copySrc(src, w) 49 } 50} 51 52func (c *Copier) copySrc(name string, src io.Reader) { 53 defer c.copyJobs.Done() 54 55 bufSize := defaultBufSize 56 if sizedLogger, ok := c.dst.(SizedLogger); ok { 57 size := sizedLogger.BufSize() 58 // Loggers that wrap another loggers would have BufSize(), but cannot return the size 59 // when the wrapped loggers doesn't have BufSize(). 60 if size > 0 { 61 bufSize = size 62 } 63 } 64 buf := make([]byte, bufSize) 65 66 n := 0 67 eof := false 68 var partialid string 69 var partialTS time.Time 70 var ordinal int 71 firstPartial := true 72 hasMorePartial := false 73 74 for { 75 select { 76 case <-c.closed: 77 return 78 default: 79 // Work out how much more data we are okay with reading this time. 80 upto := n + readSize 81 if upto > cap(buf) { 82 upto = cap(buf) 83 } 84 // Try to read that data. 85 if upto > n { 86 read, err := src.Read(buf[n:upto]) 87 if err != nil { 88 if err != io.EOF { 89 logReadsFailedCount.Inc(1) 90 logrus.Errorf("Error scanning log stream: %s", err) 91 return 92 } 93 eof = true 94 } 95 n += read 96 } 97 // If we have no data to log, and there's no more coming, we're done. 98 if n == 0 && eof { 99 return 100 } 101 // Break up the data that we've buffered up into lines, and log each in turn. 102 p := 0 103 104 for q := bytes.IndexByte(buf[p:n], '\n'); q >= 0; q = bytes.IndexByte(buf[p:n], '\n') { 105 select { 106 case <-c.closed: 107 return 108 default: 109 msg := NewMessage() 110 msg.Source = name 111 msg.Line = append(msg.Line, buf[p:p+q]...) 112 113 if hasMorePartial { 114 msg.PLogMetaData = &types.PartialLogMetaData{ID: partialid, Ordinal: ordinal, Last: true} 115 116 // reset 117 partialid = "" 118 ordinal = 0 119 firstPartial = true 120 hasMorePartial = false 121 } 122 if msg.PLogMetaData == nil { 123 msg.Timestamp = time.Now().UTC() 124 } else { 125 msg.Timestamp = partialTS 126 } 127 128 if logErr := c.dst.Log(msg); logErr != nil { 129 logWritesFailedCount.Inc(1) 130 logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr) 131 } 132 } 133 p += q + 1 134 } 135 // If there's no more coming, or the buffer is full but 136 // has no newlines, log whatever we haven't logged yet, 137 // noting that it's a partial log line. 138 if eof || (p == 0 && n == len(buf)) { 139 if p < n { 140 msg := NewMessage() 141 msg.Source = name 142 msg.Line = append(msg.Line, buf[p:n]...) 143 144 // Generate unique partialID for first partial. Use it across partials. 145 // Record timestamp for first partial. Use it across partials. 146 // Initialize Ordinal for first partial. Increment it across partials. 147 if firstPartial { 148 msg.Timestamp = time.Now().UTC() 149 partialTS = msg.Timestamp 150 partialid = stringid.GenerateRandomID() 151 ordinal = 1 152 firstPartial = false 153 totalPartialLogs.Inc(1) 154 } else { 155 msg.Timestamp = partialTS 156 } 157 msg.PLogMetaData = &types.PartialLogMetaData{ID: partialid, Ordinal: ordinal, Last: false} 158 ordinal++ 159 hasMorePartial = true 160 161 if logErr := c.dst.Log(msg); logErr != nil { 162 logWritesFailedCount.Inc(1) 163 logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr) 164 } 165 p = 0 166 n = 0 167 } 168 if eof { 169 return 170 } 171 } 172 // Move any unlogged data to the front of the buffer in preparation for another read. 173 if p > 0 { 174 copy(buf[0:], buf[p:n]) 175 n -= p 176 } 177 } 178 } 179} 180 181// Wait waits until all copying is done 182func (c *Copier) Wait() { 183 c.copyJobs.Wait() 184} 185 186// Close closes the copier 187func (c *Copier) Close() { 188 c.closeOnce.Do(func() { 189 close(c.closed) 190 }) 191} 192