1// Copyright 2017 VMware, Inc. All Rights Reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package iolog 16 17import ( 18 "bytes" 19 "encoding/base64" 20 "encoding/binary" 21 "io" 22 "sync" 23 "time" 24) 25 26// Clock defines an interface that wraps time.Now() 27type Clock interface { 28 Now() time.Time 29} 30 31// LogClock is an implementation of the Clock interface that 32// returns time.Now() for use in the iolog package 33type LogClock struct{} 34 35// Now returns the local time time.Now() 36func (LogClock) Now() time.Time { 37 return time.Now() 38} 39 40// LogWriter tags log entries with a descriptive header and writes 41// them to the underlying io.Writer 42type LogWriter struct { 43 Clock 44 w io.Writer 45 prev []byte 46 47 m sync.Mutex 48 closed bool 49} 50 51// NewLogWriter wraps an io.WriteCloser in a LogWriter 52func NewLogWriter(w io.Writer, clock Clock) *LogWriter { 53 return &LogWriter{ 54 w: w, 55 Clock: clock, 56 } 57} 58 59// Write scans the supplied buffer, breaking off indvidual entries 60// and writing them to the underlying Writer, flushing any leftover bytes 61func (lw *LogWriter) Write(p []byte) (n int, err error) { 62 var ( 63 start, end, i int 64 entry []byte 65 ) 66 67 for { 68 i = 0 69 if i = bytes.IndexByte(p[start:], '\n') + 1; i == 0 { 70 break 71 } 72 end = start + i 73 entry = p[start:end] 74 75 // do we have bytes left over from the last call? 76 if lw.prev != nil { 77 entry = append(lw.prev, entry...) 78 lw.prev = nil 79 } 80 81 // we have a complete entry, let's write it 82 n, err = lw.write(entry) 83 if err != nil { 84 return n, err 85 } 86 87 // advance starting index 88 start = end 89 } 90 91 if start < len(p) { 92 // save the rest of the buffer for the next call 93 lw.prev = append(lw.prev, p[start:]...) 94 } 95 96 return len(p), err 97} 98 99func (lw *LogWriter) write(b []byte) (int, error) { 100 var ( 101 err error 102 n, w int 103 ) 104 for _, entry := range lw.split(b) { 105 w, err = lw.w.Write(entry) 106 n += w 107 if err != nil { 108 break 109 } 110 } 111 // if we have to return with error, we should let the caller know 112 // how many of the provided bytes were written, not including our header 113 return n - encodedHeaderLengthBytes, err 114} 115 116// split breaks a log entry into smaller entries if necessary, adding a header to each 117func (lw *LogWriter) split(b []byte) [][]byte { 118 // break the entry up into multiple entries if necessary 119 entries := [][]byte{} 120 for len(b) > maxEntrySizeBytes { 121 entries = append(entries, b[:maxEntrySizeBytes]) 122 b = b[maxEntrySizeBytes:] 123 } 124 entries = append(entries, b) 125 126 for i := range entries { 127 128 entry := entries[i] 129 130 // Each entry has a 10-byte header that describes the entry as follows: 131 // The first 8 bytes are the timestamp in int64 unix epoch format 132 // The first 12 bits of the final two bytes represent the size of the entry 133 // 0x8 represents the stream - 0 for stdout, 1 for stderr 134 // 0x4 currently unused, reserved for a future flag 135 // 0x2 currently unused, reserved for a future flag 136 // 0x1 currently unused, reserved for a future flag 137 header := make([]byte, headerLengthBytes) 138 // prepare the header 139 size := len(entry) 140 s := uint16(size << 4) // make some room for stream and partial flags 141 stream := 0 // TODO(jzt): defaults to 0 (stdout) until we figure out how to add stream tag 142 s |= uint16(stream << 3) 143 binary.LittleEndian.PutUint64(header[:8], uint64(lw.Now().UTC().UnixNano())) 144 binary.LittleEndian.PutUint16(header[8:], s) 145 146 // base64 encode the header 147 encodedHeader := base64.StdEncoding.EncodeToString(header) 148 entries[i] = append([]byte(encodedHeader), entry...) 149 } 150 return entries 151} 152 153// Close will flush the remaining bytes that have not yet been written 154func (lw *LogWriter) Close() (err error) { 155 lw.m.Lock() 156 defer lw.m.Unlock() 157 158 if lw.closed { 159 return nil 160 } 161 162 // flush buffer if there are leftover bytes 163 if lw.prev != nil { 164 var n, w int 165 for n < len(lw.prev) { 166 w, err = lw.write(lw.prev[n:]) 167 n += w 168 if err != nil { 169 break 170 } 171 } 172 // reset lw.prev 173 lw.prev = nil 174 } 175 176 if c, ok := lw.w.(io.Closer); ok { 177 c.Close() 178 lw.closed = true 179 } 180 return err 181} 182