1// Copyright 2017 VMware, Inc. All Rights Reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//    http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package iolog
16
17import (
18	"bytes"
19	"encoding/base64"
20	"encoding/binary"
21	"io"
22	"sync"
23	"time"
24)
25
26// Clock defines an interface that wraps time.Now()
27type Clock interface {
28	Now() time.Time
29}
30
31// LogClock is an implementation of the Clock interface that
32// returns time.Now() for use in the iolog package
33type LogClock struct{}
34
35// Now returns the local time time.Now()
36func (LogClock) Now() time.Time {
37	return time.Now()
38}
39
40// LogWriter tags log entries with a descriptive header and writes
41// them to the underlying io.Writer
42type LogWriter struct {
43	Clock
44	w    io.Writer
45	prev []byte
46
47	m      sync.Mutex
48	closed bool
49}
50
51// NewLogWriter wraps an io.WriteCloser in a LogWriter
52func NewLogWriter(w io.Writer, clock Clock) *LogWriter {
53	return &LogWriter{
54		w:     w,
55		Clock: clock,
56	}
57}
58
59// Write scans the supplied buffer, breaking off indvidual entries
60// and writing them to the underlying Writer, flushing any leftover bytes
61func (lw *LogWriter) Write(p []byte) (n int, err error) {
62	var (
63		start, end, i int
64		entry         []byte
65	)
66
67	for {
68		i = 0
69		if i = bytes.IndexByte(p[start:], '\n') + 1; i == 0 {
70			break
71		}
72		end = start + i
73		entry = p[start:end]
74
75		// do we have bytes left over from the last call?
76		if lw.prev != nil {
77			entry = append(lw.prev, entry...)
78			lw.prev = nil
79		}
80
81		// we have a complete entry, let's write it
82		n, err = lw.write(entry)
83		if err != nil {
84			return n, err
85		}
86
87		// advance starting index
88		start = end
89	}
90
91	if start < len(p) {
92		// save the rest of the buffer for the next call
93		lw.prev = append(lw.prev, p[start:]...)
94	}
95
96	return len(p), err
97}
98
99func (lw *LogWriter) write(b []byte) (int, error) {
100	var (
101		err  error
102		n, w int
103	)
104	for _, entry := range lw.split(b) {
105		w, err = lw.w.Write(entry)
106		n += w
107		if err != nil {
108			break
109		}
110	}
111	// if we have to return with error, we should let the caller know
112	// how many of the provided bytes were written, not including our header
113	return n - encodedHeaderLengthBytes, err
114}
115
116// split breaks a log entry into smaller entries if necessary, adding a header to each
117func (lw *LogWriter) split(b []byte) [][]byte {
118	// break the entry up into multiple entries if necessary
119	entries := [][]byte{}
120	for len(b) > maxEntrySizeBytes {
121		entries = append(entries, b[:maxEntrySizeBytes])
122		b = b[maxEntrySizeBytes:]
123	}
124	entries = append(entries, b)
125
126	for i := range entries {
127
128		entry := entries[i]
129
130		// Each entry has a 10-byte header that describes the entry as follows:
131		// The first 8 bytes are the timestamp in int64 unix epoch format
132		// The first 12 bits of the final two bytes represent the size of the entry
133		// 0x8 represents the stream - 0 for stdout, 1 for stderr
134		// 0x4 currently unused, reserved for a future flag
135		// 0x2 currently unused, reserved for a future flag
136		// 0x1 currently unused, reserved for a future flag
137		header := make([]byte, headerLengthBytes)
138		// prepare the header
139		size := len(entry)
140		s := uint16(size << 4) // make some room for stream and partial flags
141		stream := 0            // TODO(jzt): defaults to 0 (stdout) until we figure out how to add stream tag
142		s |= uint16(stream << 3)
143		binary.LittleEndian.PutUint64(header[:8], uint64(lw.Now().UTC().UnixNano()))
144		binary.LittleEndian.PutUint16(header[8:], s)
145
146		// base64 encode the header
147		encodedHeader := base64.StdEncoding.EncodeToString(header)
148		entries[i] = append([]byte(encodedHeader), entry...)
149	}
150	return entries
151}
152
153// Close will flush the remaining bytes that have not yet been written
154func (lw *LogWriter) Close() (err error) {
155	lw.m.Lock()
156	defer lw.m.Unlock()
157
158	if lw.closed {
159		return nil
160	}
161
162	// flush buffer if there are leftover bytes
163	if lw.prev != nil {
164		var n, w int
165		for n < len(lw.prev) {
166			w, err = lw.write(lw.prev[n:])
167			n += w
168			if err != nil {
169				break
170			}
171		}
172		// reset lw.prev
173		lw.prev = nil
174	}
175
176	if c, ok := lw.w.(io.Closer); ok {
177		c.Close()
178		lw.closed = true
179	}
180	return err
181}
182