1package monitor 2 3import ( 4 "fmt" 5 "sync" 6 "time" 7 8 log "github.com/hashicorp/go-hclog" 9) 10 11// Monitor provides a mechanism to stream logs using go-hclog 12// InterceptLogger and SinkAdapter. It allows streaming of logs 13// at a different log level than what is set on the logger. 14type Monitor interface { 15 // Start returns a channel of log messages which are sent 16 // ever time a log message occurs 17 Start() <-chan []byte 18 19 // Stop de-registers the sink from the InterceptLogger 20 // and closes the log channels 21 Stop() 22} 23 24// monitor implements the Monitor interface 25type monitor struct { 26 // protects droppedCount and logCh 27 sync.Mutex 28 29 sink log.SinkAdapter 30 31 // logger is the logger we will be monitoring 32 logger log.InterceptLogger 33 34 // logCh is a buffered chan where we send logs when streaming 35 logCh chan []byte 36 37 // doneCh coordinates the shutdown of logCh 38 doneCh chan struct{} 39 40 // droppedCount is the current count of messages 41 // that were dropped from the logCh buffer. 42 // only access under lock 43 droppedCount int 44 bufSize int 45 // droppedDuration is the amount of time we should 46 // wait to check for dropped messages. Defaults 47 // to 3 seconds 48 droppedDuration time.Duration 49} 50 51// New creates a new Monitor. Start must be called in order to actually start 52// streaming logs 53func New(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) Monitor { 54 return new(buf, logger, opts) 55} 56 57func new(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) *monitor { 58 sw := &monitor{ 59 logger: logger, 60 logCh: make(chan []byte, buf), 61 doneCh: make(chan struct{}, 1), 62 bufSize: buf, 63 droppedDuration: 3 * time.Second, 64 } 65 66 opts.Output = sw 67 sink := log.NewSinkAdapter(opts) 68 sw.sink = sink 69 70 return sw 71} 72 73// Stop deregisters the sink and stops the monitoring process 74func (d *monitor) Stop() { 75 d.logger.DeregisterSink(d.sink) 76 close(d.doneCh) 77} 78 79// Start registers a sink on the monitor's logger and starts sending 80// received log messages over the returned channel. 81func (d *monitor) Start() <-chan []byte { 82 // register our sink with the logger 83 d.logger.RegisterSink(d.sink) 84 85 streamCh := make(chan []byte, d.bufSize) 86 87 // run a go routine that listens for streamed 88 // log messages and sends them to streamCh 89 go func() { 90 defer close(streamCh) 91 92 for { 93 select { 94 case log := <-d.logCh: 95 select { 96 case <-d.doneCh: 97 return 98 case streamCh <- log: 99 } 100 case <-d.doneCh: 101 return 102 } 103 } 104 }() 105 106 // run a go routine that periodically checks for 107 // dropped messages and makes room on the logCh 108 // to add a dropped message count warning 109 go func() { 110 // loop and check for dropped messages 111 for { 112 select { 113 case <-d.doneCh: 114 return 115 case <-time.After(d.droppedDuration): 116 d.Lock() 117 118 // Check if there have been any dropped messages. 119 if d.droppedCount > 0 { 120 dropped := fmt.Sprintf("[WARN] Monitor dropped %d logs during monitor request\n", d.droppedCount) 121 select { 122 case <-d.doneCh: 123 d.Unlock() 124 return 125 // Try sending dropped message count to logCh in case 126 // there is room in the buffer now. 127 case d.logCh <- []byte(dropped): 128 default: 129 // Drop a log message to make room for "Monitor dropped.." message 130 select { 131 case <-d.logCh: 132 d.droppedCount++ 133 dropped = fmt.Sprintf("[WARN] Monitor dropped %d logs during monitor request\n", d.droppedCount) 134 default: 135 } 136 d.logCh <- []byte(dropped) 137 } 138 d.droppedCount = 0 139 } 140 // unlock after handling dropped message 141 d.Unlock() 142 } 143 } 144 }() 145 146 return streamCh 147} 148 149// Write attempts to send latest log to logCh 150// it drops the log if channel is unavailable to receive 151func (d *monitor) Write(p []byte) (n int, err error) { 152 d.Lock() 153 defer d.Unlock() 154 155 // ensure logCh is still open 156 select { 157 case <-d.doneCh: 158 return 159 default: 160 } 161 162 bytes := make([]byte, len(p)) 163 copy(bytes, p) 164 165 select { 166 case d.logCh <- bytes: 167 default: 168 d.droppedCount++ 169 } 170 171 return len(p), nil 172} 173