1package logging 2 3import ( 4 "bufio" 5 "bytes" 6 "fmt" 7 "io/ioutil" 8 "os" 9 "path/filepath" 10 "sort" 11 "strconv" 12 "strings" 13 "sync" 14 "time" 15 16 hclog "github.com/hashicorp/go-hclog" 17) 18 19const ( 20 // logBufferSize is the size of the buffer. 21 logBufferSize = 64 * 1024 22 23 // bufferFlushDuration is the duration at which we flush the buffer. 24 bufferFlushDuration = 100 * time.Millisecond 25 26 // lineScanLimit is the number of bytes we will attempt to scan for new 27 // lines when approaching the end of the file to avoid a log line being 28 // split between two files. Any single line that is greater than this limit 29 // may be split. 30 lineScanLimit = 32 * 1024 31 32 // newLineDelimiter is the delimiter used for new lines. 33 newLineDelimiter = '\n' 34) 35 36// FileRotator writes bytes to a rotated set of files 37type FileRotator struct { 38 MaxFiles int // MaxFiles is the maximum number of rotated files allowed in a path 39 FileSize int64 // FileSize is the size a rotated file is allowed to grow 40 41 path string // path is the path on the file system where the rotated set of files are opened 42 baseFileName string // baseFileName is the base file name of the rotated files 43 logFileIdx int // logFileIdx is the current index of the rotated files 44 oldestLogFileIdx int // oldestLogFileIdx is the index of the oldest log file in a path 45 46 currentFile *os.File // currentFile is the file that is currently getting written 47 currentWr int64 // currentWr is the number of bytes written to the current file 48 bufw *bufio.Writer 49 bufLock sync.Mutex 50 51 flushTicker *time.Ticker 52 logger hclog.Logger 53 purgeCh chan struct{} 54 doneCh chan struct{} 55 56 closed bool 57 closedLock sync.Mutex 58} 59 60// NewFileRotator returns a new file rotator 61func NewFileRotator(path string, baseFile string, maxFiles int, 62 fileSize int64, logger hclog.Logger) (*FileRotator, error) { 63 logger = logger.Named("rotator") 64 rotator := &FileRotator{ 65 MaxFiles: maxFiles, 66 FileSize: fileSize, 67 68 path: path, 69 baseFileName: baseFile, 70 71 flushTicker: time.NewTicker(bufferFlushDuration), 72 logger: logger, 73 purgeCh: make(chan struct{}, 1), 74 doneCh: make(chan struct{}, 1), 75 } 76 77 if err := rotator.lastFile(); err != nil { 78 return nil, err 79 } 80 go rotator.purgeOldFiles() 81 go rotator.flushPeriodically() 82 return rotator, nil 83} 84 85// Write writes a byte array to a file and rotates the file if it's size becomes 86// equal to the maximum size the user has defined. 87func (f *FileRotator) Write(p []byte) (n int, err error) { 88 n = 0 89 var forceRotate bool 90 91 for n < len(p) { 92 // Check if we still have space in the current file, otherwise close and 93 // open the next file 94 if forceRotate || f.currentWr >= f.FileSize { 95 forceRotate = false 96 f.flushBuffer() 97 f.currentFile.Close() 98 if err := f.nextFile(); err != nil { 99 f.logger.Error("error creating next file", "err", err) 100 return 0, err 101 } 102 } 103 // Calculate the remaining size on this file and how much we have left 104 // to write 105 remainingSpace := f.FileSize - f.currentWr 106 remainingToWrite := int64(len(p[n:])) 107 108 // Check if we are near the end of the file. If we are we attempt to 109 // avoid a log line being split between two files. 110 var nw int 111 if (remainingSpace - lineScanLimit) < remainingToWrite { 112 // Scan for new line and if the data up to new line fits in current 113 // file, write to buffer 114 idx := bytes.IndexByte(p[n:], newLineDelimiter) 115 if idx >= 0 && (remainingSpace-int64(idx)-1) >= 0 { 116 // We have space so write it to buffer 117 nw, err = f.writeToBuffer(p[n : n+idx+1]) 118 } else if idx >= 0 { 119 // We found a new line but don't have space so just force rotate 120 forceRotate = true 121 } else if remainingToWrite > f.FileSize || f.FileSize-lineScanLimit < 0 { 122 // There is no new line remaining but there is no point in 123 // rotating since the remaining data will not even fit in the 124 // next file either so just fill this one up. 125 li := int64(n) + remainingSpace 126 if remainingSpace > remainingToWrite { 127 li = int64(n) + remainingToWrite 128 } 129 nw, err = f.writeToBuffer(p[n:li]) 130 } else { 131 // There is no new line in the data remaining for us to write 132 // and it will fit in the next file so rotate. 133 forceRotate = true 134 } 135 } else { 136 // Write all the bytes in the current file 137 nw, err = f.writeToBuffer(p[n:]) 138 } 139 140 // Increment the number of bytes written so far in this method 141 // invocation 142 n += nw 143 144 // Increment the total number of bytes in the file 145 f.currentWr += int64(n) 146 if err != nil { 147 f.logger.Error("error writing to file", "err", err) 148 149 // As bufio writer does not automatically recover in case of any 150 // io error, we need to recover from it manually resetting the 151 // writter. 152 f.createOrResetBuffer() 153 154 return 155 } 156 } 157 return 158} 159 160// nextFile opens the next file and purges older files if the number of rotated 161// files is larger than the maximum files configured by the user 162func (f *FileRotator) nextFile() error { 163 nextFileIdx := f.logFileIdx 164 for { 165 nextFileIdx += 1 166 logFileName := filepath.Join(f.path, fmt.Sprintf("%s.%d", f.baseFileName, nextFileIdx)) 167 if fi, err := os.Stat(logFileName); err == nil { 168 if fi.IsDir() || fi.Size() >= f.FileSize { 169 continue 170 } 171 } 172 f.logFileIdx = nextFileIdx 173 if err := f.createFile(); err != nil { 174 return err 175 } 176 break 177 } 178 // Purge old files if we have more files than MaxFiles 179 f.closedLock.Lock() 180 defer f.closedLock.Unlock() 181 if f.logFileIdx-f.oldestLogFileIdx >= f.MaxFiles && !f.closed { 182 select { 183 case f.purgeCh <- struct{}{}: 184 default: 185 } 186 } 187 return nil 188} 189 190// lastFile finds out the rotated file with the largest index in a path. 191func (f *FileRotator) lastFile() error { 192 finfos, err := ioutil.ReadDir(f.path) 193 if err != nil { 194 return err 195 } 196 197 prefix := fmt.Sprintf("%s.", f.baseFileName) 198 for _, fi := range finfos { 199 if fi.IsDir() { 200 continue 201 } 202 if strings.HasPrefix(fi.Name(), prefix) { 203 fileIdx := strings.TrimPrefix(fi.Name(), prefix) 204 n, err := strconv.Atoi(fileIdx) 205 if err != nil { 206 continue 207 } 208 if n > f.logFileIdx { 209 f.logFileIdx = n 210 } 211 } 212 } 213 if err := f.createFile(); err != nil { 214 return err 215 } 216 return nil 217} 218 219// createFile opens a new or existing file for writing 220func (f *FileRotator) createFile() error { 221 logFileName := filepath.Join(f.path, fmt.Sprintf("%s.%d", f.baseFileName, f.logFileIdx)) 222 cFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644) 223 if err != nil { 224 return err 225 } 226 227 f.currentFile = cFile 228 fi, err := f.currentFile.Stat() 229 if err != nil { 230 return err 231 } 232 f.currentWr = fi.Size() 233 f.createOrResetBuffer() 234 return nil 235} 236 237// flushPeriodically flushes the buffered writer every 100ms to the underlying 238// file 239func (f *FileRotator) flushPeriodically() { 240 for range f.flushTicker.C { 241 f.flushBuffer() 242 } 243} 244 245// Close flushes and closes the rotator. It never returns an error. 246func (f *FileRotator) Close() error { 247 f.closedLock.Lock() 248 defer f.closedLock.Unlock() 249 250 // Stop the ticker and flush for one last time 251 f.flushTicker.Stop() 252 f.flushBuffer() 253 254 // Stop the purge go routine 255 if !f.closed { 256 f.doneCh <- struct{}{} 257 close(f.purgeCh) 258 f.closed = true 259 f.currentFile.Close() 260 } 261 262 return nil 263} 264 265// purgeOldFiles removes older files and keeps only the last N files rotated for 266// a file 267func (f *FileRotator) purgeOldFiles() { 268 for { 269 select { 270 case <-f.purgeCh: 271 var fIndexes []int 272 files, err := ioutil.ReadDir(f.path) 273 if err != nil { 274 f.logger.Error("error getting directory listing", "err", err) 275 return 276 } 277 // Inserting all the rotated files in a slice 278 for _, fi := range files { 279 if strings.HasPrefix(fi.Name(), f.baseFileName) { 280 fileIdx := strings.TrimPrefix(fi.Name(), fmt.Sprintf("%s.", f.baseFileName)) 281 n, err := strconv.Atoi(fileIdx) 282 if err != nil { 283 f.logger.Error("error extracting file index", "err", err) 284 continue 285 } 286 fIndexes = append(fIndexes, n) 287 } 288 } 289 290 // Not continuing to delete files if the number of files is not more 291 // than MaxFiles 292 if len(fIndexes) <= f.MaxFiles { 293 continue 294 } 295 296 // Sorting the file indexes so that we can purge the older files and keep 297 // only the number of files as configured by the user 298 sort.Ints(fIndexes) 299 toDelete := fIndexes[0 : len(fIndexes)-f.MaxFiles] 300 for _, fIndex := range toDelete { 301 fname := filepath.Join(f.path, fmt.Sprintf("%s.%d", f.baseFileName, fIndex)) 302 err := os.RemoveAll(fname) 303 if err != nil { 304 f.logger.Error("error removing file", "filename", fname, "err", err) 305 } 306 } 307 f.oldestLogFileIdx = fIndexes[0] 308 case <-f.doneCh: 309 return 310 } 311 } 312} 313 314// flushBuffer flushes the buffer 315func (f *FileRotator) flushBuffer() error { 316 f.bufLock.Lock() 317 defer f.bufLock.Unlock() 318 if f.bufw != nil { 319 return f.bufw.Flush() 320 } 321 return nil 322} 323 324// writeToBuffer writes the byte array to buffer 325func (f *FileRotator) writeToBuffer(p []byte) (int, error) { 326 f.bufLock.Lock() 327 defer f.bufLock.Unlock() 328 return f.bufw.Write(p) 329} 330 331// createOrResetBuffer creates a new buffer if we don't have one otherwise 332// resets the buffer 333func (f *FileRotator) createOrResetBuffer() { 334 f.bufLock.Lock() 335 defer f.bufLock.Unlock() 336 if f.bufw == nil { 337 f.bufw = bufio.NewWriterSize(f.currentFile, logBufferSize) 338 } else { 339 f.bufw.Reset(f.currentFile) 340 } 341} 342