1package monitor
2
3import (
4	"fmt"
5	"sync"
6	"time"
7
8	log "github.com/hashicorp/go-hclog"
9)
10
11// Monitor provides a mechanism to stream logs using go-hclog
12// InterceptLogger and SinkAdapter. It allows streaming of logs
13// at a different log level than what is set on the logger.
14type Monitor interface {
15	// Start returns a channel of log messages which are sent
16	// ever time a log message occurs
17	Start() <-chan []byte
18
19	// Stop de-registers the sink from the InterceptLogger
20	// and closes the log channels
21	Stop()
22}
23
24// monitor implements the Monitor interface
25type monitor struct {
26	// protects droppedCount and logCh
27	sync.Mutex
28
29	sink log.SinkAdapter
30
31	// logger is the logger we will be monitoring
32	logger log.InterceptLogger
33
34	// logCh is a buffered chan where we send logs when streaming
35	logCh chan []byte
36
37	// doneCh coordinates the shutdown of logCh
38	doneCh chan struct{}
39
40	// droppedCount is the current count of messages
41	// that were dropped from the logCh buffer.
42	// only access under lock
43	droppedCount int
44	bufSize      int
45	// droppedDuration is the amount of time we should
46	// wait to check for dropped messages. Defaults
47	// to 3 seconds
48	droppedDuration time.Duration
49}
50
51// New creates a new Monitor. Start must be called in order to actually start
52// streaming logs
53func New(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) Monitor {
54	return new(buf, logger, opts)
55}
56
57func new(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) *monitor {
58	sw := &monitor{
59		logger:          logger,
60		logCh:           make(chan []byte, buf),
61		doneCh:          make(chan struct{}, 1),
62		bufSize:         buf,
63		droppedDuration: 3 * time.Second,
64	}
65
66	opts.Output = sw
67	sink := log.NewSinkAdapter(opts)
68	sw.sink = sink
69
70	return sw
71}
72
73// Stop deregisters the sink and stops the monitoring process
74func (d *monitor) Stop() {
75	d.logger.DeregisterSink(d.sink)
76	close(d.doneCh)
77}
78
79// Start registers a sink on the monitor's logger and starts sending
80// received log messages over the returned channel.
81func (d *monitor) Start() <-chan []byte {
82	// register our sink with the logger
83	d.logger.RegisterSink(d.sink)
84
85	streamCh := make(chan []byte, d.bufSize)
86
87	// run a go routine that listens for streamed
88	// log messages and sends them to streamCh
89	go func() {
90		defer close(streamCh)
91
92		for {
93			select {
94			case log := <-d.logCh:
95				select {
96				case <-d.doneCh:
97					return
98				case streamCh <- log:
99				}
100			case <-d.doneCh:
101				return
102			}
103		}
104	}()
105
106	// run a go routine that periodically checks for
107	// dropped messages and makes room on the logCh
108	// to add a dropped message count warning
109	go func() {
110		// loop and check for dropped messages
111		for {
112			select {
113			case <-d.doneCh:
114				return
115			case <-time.After(d.droppedDuration):
116				d.Lock()
117
118				// Check if there have been any dropped messages.
119				if d.droppedCount > 0 {
120					dropped := fmt.Sprintf("[WARN] Monitor dropped %d logs during monitor request\n", d.droppedCount)
121					select {
122					case <-d.doneCh:
123						d.Unlock()
124						return
125					// Try sending dropped message count to logCh in case
126					// there is room in the buffer now.
127					case d.logCh <- []byte(dropped):
128					default:
129						// Drop a log message to make room for "Monitor dropped.." message
130						select {
131						case <-d.logCh:
132							d.droppedCount++
133							dropped = fmt.Sprintf("[WARN] Monitor dropped %d logs during monitor request\n", d.droppedCount)
134						default:
135						}
136						d.logCh <- []byte(dropped)
137					}
138					d.droppedCount = 0
139				}
140				// unlock after handling dropped message
141				d.Unlock()
142			}
143		}
144	}()
145
146	return streamCh
147}
148
149// Write attempts to send latest log to logCh
150// it drops the log if channel is unavailable to receive
151func (d *monitor) Write(p []byte) (n int, err error) {
152	d.Lock()
153	defer d.Unlock()
154
155	// ensure logCh is still open
156	select {
157	case <-d.doneCh:
158		return
159	default:
160	}
161
162	bytes := make([]byte, len(p))
163	copy(bytes, p)
164
165	select {
166	case d.logCh <- bytes:
167	default:
168		d.droppedCount++
169	}
170
171	return len(p), nil
172}
173