1package tstorage 2 3import ( 4 "bufio" 5 "encoding/binary" 6 "errors" 7 "fmt" 8 "io" 9 "io/fs" 10 "math" 11 "os" 12 "path/filepath" 13 "strconv" 14 "sync" 15 "sync/atomic" 16) 17 18// diskWAL contains multiple segment files. One segment is responsible for one partition. 19// They can be easily sorted because they are named using the created timestamp. 20// Macro layout is like: 21/* 22 .wal/ 23 ├── 0 24 └── 1 25*/ 26type diskWAL struct { 27 dir string 28 bufferedSize int 29 // Buffered-writer to the active segment 30 w *bufio.Writer 31 // File descriptor to the active segment 32 fd *os.File 33 index uint32 34 mu sync.Mutex 35} 36 37func newDiskWAL(dir string, bufferedSize int) (wal, error) { 38 if err := os.MkdirAll(dir, fs.ModePerm); err != nil { 39 return nil, fmt.Errorf("failed to make WAL dir: %w", err) 40 } 41 w := &diskWAL{ 42 dir: dir, 43 bufferedSize: bufferedSize, 44 } 45 f, err := w.createSegmentFile(dir) 46 if err != nil { 47 return nil, err 48 } 49 w.fd = f 50 w.w = bufio.NewWriterSize(f, bufferedSize) 51 52 return w, nil 53} 54 55// append appends the given entry to the end of a file via the file descriptor it has. 56func (w *diskWAL) append(op walOperation, rows []Row) error { 57 w.mu.Lock() 58 defer w.mu.Unlock() 59 60 switch op { 61 case operationInsert: 62 for _, row := range rows { 63 // Write the operation type 64 if err := w.w.WriteByte(byte(op)); err != nil { 65 return fmt.Errorf("failed to write operation: %w", err) 66 } 67 name := marshalMetricName(row.Metric, row.Labels) 68 // Write the length of the metric name 69 lBuf := make([]byte, binary.MaxVarintLen64) 70 n := binary.PutUvarint(lBuf, uint64(len(name))) 71 if _, err := w.w.Write(lBuf[:n]); err != nil { 72 return fmt.Errorf("failed to write the length of the metric name: %w", err) 73 } 74 // Write the metric name 75 if _, err := w.w.WriteString(name); err != nil { 76 return fmt.Errorf("failed to write the metric name: %w", err) 77 } 78 // Write the timestamp 79 tsBuf := make([]byte, binary.MaxVarintLen64) 80 n = binary.PutVarint(tsBuf, row.DataPoint.Timestamp) 81 if _, err := w.w.Write(tsBuf[:n]); err != nil { 82 return fmt.Errorf("failed to write the timestamp: %w", err) 83 } 84 // Write the value 85 vBuf := make([]byte, binary.MaxVarintLen64) 86 n = binary.PutUvarint(vBuf, math.Float64bits(row.DataPoint.Value)) 87 if _, err := w.w.Write(vBuf[:n]); err != nil { 88 return fmt.Errorf("failed to write the value: %w", err) 89 } 90 } 91 default: 92 return fmt.Errorf("unknown operation %v given", op) 93 } 94 if w.bufferedSize == 0 { 95 return w.flush() 96 } 97 98 return nil 99} 100 101// flush flushes all buffered entries to the underlying file. 102func (w *diskWAL) flush() error { 103 if err := w.w.Flush(); err != nil { 104 return fmt.Errorf("failed to flush buffered-data into the underlying WAL file: %w", err) 105 } 106 return nil 107} 108 109// punctuate set boundary and creates a new segment. 110func (w *diskWAL) punctuate() error { 111 w.mu.Lock() 112 defer w.mu.Unlock() 113 if err := w.flush(); err != nil { 114 return err 115 } 116 if err := w.fd.Close(); err != nil { 117 return err 118 } 119 f, err := w.createSegmentFile(w.dir) 120 if err != nil { 121 return err 122 } 123 w.fd = f 124 w.w = bufio.NewWriterSize(f, w.bufferedSize) 125 return nil 126} 127 128// truncateOldest removes only the oldest segment. 129func (w *diskWAL) removeOldest() error { 130 w.mu.Lock() 131 defer w.mu.Unlock() 132 files, err := os.ReadDir(w.dir) 133 if err != nil { 134 return fmt.Errorf("failed to read WAL directory: %w", err) 135 } 136 if len(files) == 0 { 137 return fmt.Errorf("no segment found") 138 } 139 return os.RemoveAll(filepath.Join(w.dir, files[0].Name())) 140} 141 142// removeAll removes all segment files. 143func (w *diskWAL) removeAll() error { 144 w.mu.Lock() 145 defer w.mu.Unlock() 146 if err := w.fd.Close(); err != nil { 147 return err 148 } 149 if err := os.RemoveAll(w.dir); err != nil { 150 return fmt.Errorf("failed to remove files under %q: %w", w.dir, err) 151 } 152 return os.MkdirAll(w.dir, fs.ModePerm) 153} 154 155// refresh removes all segment files and make a new segment. 156func (w *diskWAL) refresh() error { 157 if err := w.removeAll(); err != nil { 158 return err 159 } 160 w.mu.Lock() 161 defer w.mu.Unlock() 162 163 f, err := w.createSegmentFile(w.dir) 164 if err != nil { 165 return err 166 } 167 w.fd = f 168 w.w = bufio.NewWriterSize(f, w.bufferedSize) 169 return nil 170} 171 172// createSegmentFile creates a new file with the name of the numbering index. 173func (w *diskWAL) createSegmentFile(dir string) (*os.File, error) { 174 name := strconv.Itoa(int(atomic.LoadUint32(&w.index))) 175 f, err := os.OpenFile(filepath.Join(dir, name), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) 176 if err != nil { 177 return nil, fmt.Errorf("failed to create segment file: %w", err) 178 } 179 atomic.AddUint32(&w.index, 1) 180 return f, nil 181} 182 183type walRecord struct { 184 op walOperation 185 row Row 186} 187 188type diskWALReader struct { 189 dir string 190 files []os.DirEntry 191 rowsToInsert []Row 192} 193 194func newDiskWALReader(dir string) (*diskWALReader, error) { 195 files, err := os.ReadDir(dir) 196 if err != nil { 197 return nil, fmt.Errorf("failed to read the WAL dir: %w", err) 198 } 199 200 return &diskWALReader{ 201 dir: dir, 202 files: files, 203 rowsToInsert: make([]Row, 0), 204 }, nil 205} 206 207// readAll reads all segment files and caches the result for each operation. 208func (f *diskWALReader) readAll() error { 209 for _, file := range f.files { 210 if file.IsDir() { 211 return fmt.Errorf("unexpected directory found under the WAL directory: %s", file.Name()) 212 } 213 fd, err := os.Open(filepath.Join(f.dir, file.Name())) 214 if err != nil { 215 return fmt.Errorf("failed to open WAL segment file: %w", err) 216 } 217 segment := &segment{ 218 file: fd, 219 r: bufio.NewReader(fd), 220 } 221 for segment.next() { 222 rec := segment.record() 223 switch rec.op { 224 case operationInsert: 225 f.rowsToInsert = append(f.rowsToInsert, rec.row) 226 } 227 } 228 if err := segment.close(); err != nil { 229 return err 230 } 231 232 err = segment.error() 233 if errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF) { 234 // It is not unusual for a line to be invalid, as it may well terminate in the middle of writing to the WAL. 235 return nil 236 } 237 if err != nil { 238 return fmt.Errorf("encounter an error while reading WAL segment file %q: %w", file.Name(), segment.error()) 239 } 240 } 241 return nil 242} 243 244// segment represents a segment file. 245type segment struct { 246 file *os.File 247 r *bufio.Reader 248 // FIXME: Use interface to support other operation type 249 current walRecord 250 err error 251} 252 253func (f *segment) next() bool { 254 op, err := f.r.ReadByte() 255 if errors.Is(err, io.EOF) { 256 return false 257 } 258 if err != nil { 259 f.err = err 260 return false 261 } 262 switch walOperation(op) { 263 case operationInsert: 264 // Read the length of metric name. 265 metricLen, err := binary.ReadUvarint(f.r) 266 if err != nil { 267 f.err = fmt.Errorf("failed to read the length of metric name: %w", err) 268 return false 269 } 270 // Read the metric name. 271 metric := make([]byte, int(metricLen)) 272 if _, err := io.ReadFull(f.r, metric); err != nil { 273 f.err = fmt.Errorf("failed to read the metric name: %w", err) 274 return false 275 } 276 // Read timestamp. 277 ts, err := binary.ReadVarint(f.r) 278 if err != nil { 279 f.err = fmt.Errorf("failed to read timestamp: %w", err) 280 return false 281 } 282 // Read value. 283 val, err := binary.ReadUvarint(f.r) 284 if err != nil { 285 f.err = fmt.Errorf("failed to read value: %w", err) 286 return false 287 } 288 f.current = walRecord{ 289 op: walOperation(op), 290 row: Row{ 291 Metric: string(metric), 292 DataPoint: DataPoint{ 293 Timestamp: ts, 294 Value: math.Float64frombits(val), 295 }, 296 }, 297 } 298 default: 299 f.err = fmt.Errorf("unknown operation %v found", op) 300 return false 301 } 302 303 return true 304} 305 306// error gives back an error if it has been facing an error while reading. 307func (f *segment) error() error { 308 return f.err 309} 310 311func (f *segment) record() *walRecord { 312 return &f.current 313} 314 315func (f *segment) close() error { 316 return f.file.Close() 317} 318