1package tstorage 2 3import ( 4 "encoding/json" 5 "errors" 6 "fmt" 7 "io" 8 "io/fs" 9 "os" 10 "path/filepath" 11 "regexp" 12 "sort" 13 "sync" 14 "time" 15 16 "github.com/nakabonne/tstorage/internal/cgroup" 17 "github.com/nakabonne/tstorage/internal/timerpool" 18) 19 20var ( 21 ErrNoDataPoints = errors.New("no data points found") 22 23 // Limit the concurrency for data ingestion to GOMAXPROCS, since this operation 24 // is CPU bound, so there is no sense in running more than GOMAXPROCS concurrent 25 // goroutines on data ingestion path. 26 defaultWorkersLimit = cgroup.AvailableCPUs() 27 28 partitionDirRegex = regexp.MustCompile(`^p-.+`) 29) 30 31// TimestampPrecision represents precision of timestamps. See WithTimestampPrecision 32type TimestampPrecision string 33 34const ( 35 Nanoseconds TimestampPrecision = "ns" 36 Microseconds TimestampPrecision = "us" 37 Milliseconds TimestampPrecision = "ms" 38 Seconds TimestampPrecision = "s" 39 40 defaultPartitionDuration = 1 * time.Hour 41 defaultRetention = 336 * time.Hour 42 defaultTimestampPrecision = Nanoseconds 43 defaultWriteTimeout = 30 * time.Second 44 defaultWALBufferedSize = 4096 45 46 writablePartitionsNum = 2 47 checkExpiredInterval = time.Hour 48 49 walDirName = "wal" 50) 51 52// Storage provides goroutine safe capabilities of insertion into and retrieval from the time-series storage. 53type Storage interface { 54 Reader 55 // InsertRows ingests the given rows to the time-series storage. 56 // If the timestamp is empty, it uses the machine's local timestamp in UTC. 57 // The precision of timestamps is nanoseconds by default. It can be changed using WithTimestampPrecision. 58 InsertRows(rows []Row) error 59 // Close gracefully shutdowns by flushing any unwritten data to the underlying disk partition. 60 Close() error 61} 62 63// Reader provides reading access to time series data. 64type Reader interface { 65 // Select gives back a list of data points that matches a set of the given metric and 66 // labels within the given start-end range. Keep in mind that start is inclusive, end is exclusive, 67 // and both must be Unix timestamp. ErrNoDataPoints will be returned if no data points found. 68 Select(metric string, labels []Label, start, end int64) (points []*DataPoint, err error) 69} 70 71// Row includes a data point along with properties to identify a kind of metrics. 72type Row struct { 73 // The unique name of metric. 74 // This field must be set. 75 Metric string 76 // An optional key-value properties to further detailed identification. 77 Labels []Label 78 // This field must be set. 79 DataPoint 80} 81 82// DataPoint represents a data point, the smallest unit of time series data. 83type DataPoint struct { 84 // The actual value. This field must be set. 85 Value float64 86 // Unix timestamp. 87 Timestamp int64 88} 89 90// Option is an optional setting for NewStorage. 91type Option func(*storage) 92 93// WithDataPath specifies the path to directory that stores time-series data. 94// Use this to make time-series data persistent on disk. 95// 96// Defaults to empty string which means no data will get persisted. 97func WithDataPath(dataPath string) Option { 98 return func(s *storage) { 99 s.dataPath = dataPath 100 } 101} 102 103// WithPartitionDuration specifies the timestamp range of partitions. 104// Once it exceeds the given time range, the new partition gets inserted. 105// 106// A partition is a chunk of time-series data with the timestamp range. 107// It acts as a fully independent database containing all data 108// points for its time range. 109// 110// Defaults to 1h 111func WithPartitionDuration(duration time.Duration) Option { 112 return func(s *storage) { 113 s.partitionDuration = duration 114 } 115} 116 117// WithRetention specifies when to remove old data. 118// Data points will get automatically removed from the disk after a 119// specified period of time after a disk partition was created. 120// Defaults to 14d. 121func WithRetention(retention time.Duration) Option { 122 return func(s *storage) { 123 s.retention = retention 124 } 125} 126 127// WithTimestampPrecision specifies the precision of timestamps to be used by all operations. 128// 129// Defaults to Nanoseconds 130func WithTimestampPrecision(precision TimestampPrecision) Option { 131 return func(s *storage) { 132 s.timestampPrecision = precision 133 } 134} 135 136// WithWriteTimeout specifies the timeout to wait when workers are busy. 137// 138// The storage limits the number of concurrent goroutines to prevent from out of memory 139// errors and CPU trashing even if too many goroutines attempt to write. 140// 141// Defaults to 30s. 142func WithWriteTimeout(timeout time.Duration) Option { 143 return func(s *storage) { 144 s.writeTimeout = timeout 145 } 146} 147 148// WithLogger specifies the logger to emit verbose output. 149// 150// Defaults to a logger implementation that does nothing. 151func WithLogger(logger Logger) Option { 152 return func(s *storage) { 153 s.logger = logger 154 } 155} 156 157// WithWAL specifies the buffered byte size before flushing a WAL file. 158// The larger the size, the less frequently the file is written and more write performance at the expense of durability. 159// Giving 0 means it writes to a file whenever data point comes in. 160// Giving -1 disables using WAL. 161// 162// Defaults to 4096. 163func WithWALBufferedSize(size int) Option { 164 return func(s *storage) { 165 s.walBufferedSize = size 166 } 167} 168 169// NewStorage gives back a new storage, which stores time-series data in the process memory by default. 170// 171// Give the WithDataPath option for running as a on-disk storage. Specify a directory with data already exists, 172// then it will be read as the initial data. 173func NewStorage(opts ...Option) (Storage, error) { 174 s := &storage{ 175 partitionList: newPartitionList(), 176 workersLimitCh: make(chan struct{}, defaultWorkersLimit), 177 partitionDuration: defaultPartitionDuration, 178 retention: defaultRetention, 179 timestampPrecision: defaultTimestampPrecision, 180 writeTimeout: defaultWriteTimeout, 181 walBufferedSize: defaultWALBufferedSize, 182 wal: &nopWAL{}, 183 logger: &nopLogger{}, 184 doneCh: make(chan struct{}, 0), 185 } 186 for _, opt := range opts { 187 opt(s) 188 } 189 190 if s.inMemoryMode() { 191 s.newPartition(nil, false) 192 return s, nil 193 } 194 195 if err := os.MkdirAll(s.dataPath, fs.ModePerm); err != nil { 196 return nil, fmt.Errorf("failed to make data directory %s: %w", s.dataPath, err) 197 } 198 199 walDir := filepath.Join(s.dataPath, walDirName) 200 if s.walBufferedSize >= 0 { 201 wal, err := newDiskWAL(walDir, s.walBufferedSize) 202 if err != nil { 203 return nil, err 204 } 205 s.wal = wal 206 } 207 208 // Read existent partitions from the disk. 209 dirs, err := os.ReadDir(s.dataPath) 210 if err != nil { 211 return nil, fmt.Errorf("failed to open data directory: %w", err) 212 } 213 if len(dirs) == 0 { 214 s.newPartition(nil, false) 215 return s, nil 216 } 217 isPartitionDir := func(f fs.DirEntry) bool { 218 return f.IsDir() && partitionDirRegex.MatchString(f.Name()) 219 } 220 partitions := make([]partition, 0, len(dirs)) 221 for _, e := range dirs { 222 if !isPartitionDir(e) { 223 continue 224 } 225 path := filepath.Join(s.dataPath, e.Name()) 226 part, err := openDiskPartition(path, s.retention) 227 if errors.Is(err, ErrNoDataPoints) { 228 continue 229 } 230 if errors.Is(err, errInvalidPartition) { 231 // It should be recovered by WAL 232 continue 233 } 234 if err != nil { 235 return nil, fmt.Errorf("failed to open disk partition for %s: %w", path, err) 236 } 237 partitions = append(partitions, part) 238 } 239 sort.Slice(partitions, func(i, j int) bool { 240 return partitions[i].minTimestamp() < partitions[j].minTimestamp() 241 }) 242 for _, p := range partitions { 243 s.newPartition(p, false) 244 } 245 // Start WAL recovery if there is. 246 if err := s.recoverWAL(walDir); err != nil { 247 return nil, fmt.Errorf("failed to recover WAL: %w", err) 248 } 249 s.newPartition(nil, false) 250 251 // periodically check and permanently remove expired partitions. 252 go func() { 253 ticker := time.NewTicker(checkExpiredInterval) 254 defer ticker.Stop() 255 for { 256 select { 257 case <-s.doneCh: 258 return 259 case <-ticker.C: 260 err := s.removeExpiredPartitions() 261 if err != nil { 262 s.logger.Printf("%v\n", err) 263 } 264 } 265 } 266 }() 267 return s, nil 268} 269 270type storage struct { 271 partitionList partitionList 272 273 walBufferedSize int 274 wal wal 275 partitionDuration time.Duration 276 retention time.Duration 277 timestampPrecision TimestampPrecision 278 dataPath string 279 writeTimeout time.Duration 280 281 logger Logger 282 workersLimitCh chan struct{} 283 // wg must be incremented to guarantee all writes are done gracefully. 284 wg sync.WaitGroup 285 286 doneCh chan struct{} 287} 288 289func (s *storage) InsertRows(rows []Row) error { 290 s.wg.Add(1) 291 defer s.wg.Done() 292 293 insert := func() error { 294 defer func() { <-s.workersLimitCh }() 295 if err := s.ensureActiveHead(); err != nil { 296 return err 297 } 298 iterator := s.partitionList.newIterator() 299 n := s.partitionList.size() 300 rowsToInsert := rows 301 // Starting at the head partition, try to insert rows, and loop to insert outdated rows 302 // into older partitions. Any rows more than `writablePartitionsNum` partitions out 303 // of date are dropped. 304 for i := 0; i < n && i < writablePartitionsNum; i++ { 305 if len(rowsToInsert) == 0 { 306 break 307 } 308 if !iterator.next() { 309 break 310 } 311 outdatedRows, err := iterator.value().insertRows(rowsToInsert) 312 if err != nil { 313 return fmt.Errorf("failed to insert rows: %w", err) 314 } 315 rowsToInsert = outdatedRows 316 } 317 return nil 318 } 319 320 // Limit the number of concurrent goroutines to prevent from out of memory 321 // errors and CPU trashing even if too many goroutines attempt to write. 322 select { 323 case s.workersLimitCh <- struct{}{}: 324 return insert() 325 default: 326 } 327 328 // Seems like all workers are busy; wait for up to writeTimeout 329 330 t := timerpool.Get(s.writeTimeout) 331 select { 332 case s.workersLimitCh <- struct{}{}: 333 timerpool.Put(t) 334 return insert() 335 case <-t.C: 336 timerpool.Put(t) 337 return fmt.Errorf("failed to write a data point in %s, since it is overloaded with %d concurrent writers", 338 s.writeTimeout, defaultWorkersLimit) 339 } 340} 341 342// ensureActiveHead ensures the head of partitionList is an active partition. 343// If none, it creates a new one. 344func (s *storage) ensureActiveHead() error { 345 head := s.partitionList.getHead() 346 if head != nil && head.active() { 347 return nil 348 } 349 350 // All partitions seems to be inactive so add a new partition to the list. 351 if err := s.newPartition(nil, true); err != nil { 352 return err 353 } 354 go func() { 355 if err := s.flushPartitions(); err != nil { 356 s.logger.Printf("failed to flush in-memory partitions: %v", err) 357 } 358 }() 359 return nil 360} 361 362func (s *storage) Select(metric string, labels []Label, start, end int64) ([]*DataPoint, error) { 363 if metric == "" { 364 return nil, fmt.Errorf("metric must be set") 365 } 366 if start >= end { 367 return nil, fmt.Errorf("the given start is greater than end") 368 } 369 points := make([]*DataPoint, 0) 370 371 // Iterate over all partitions from the newest one. 372 iterator := s.partitionList.newIterator() 373 for iterator.next() { 374 part := iterator.value() 375 if part == nil { 376 return nil, fmt.Errorf("unexpected empty partition found") 377 } 378 if part.minTimestamp() == 0 { 379 // Skip the partition that has no points. 380 continue 381 } 382 if part.maxTimestamp() < start { 383 // No need to keep going anymore 384 break 385 } 386 if part.minTimestamp() > end { 387 continue 388 } 389 ps, err := part.selectDataPoints(metric, labels, start, end) 390 if errors.Is(err, ErrNoDataPoints) { 391 continue 392 } 393 if err != nil { 394 return nil, fmt.Errorf("failed to select data points: %w", err) 395 } 396 // in order to keep the order in ascending. 397 points = append(ps, points...) 398 } 399 if len(points) == 0 { 400 return nil, ErrNoDataPoints 401 } 402 return points, nil 403} 404 405func (s *storage) Close() error { 406 s.wg.Wait() 407 close(s.doneCh) 408 if err := s.wal.flush(); err != nil { 409 return fmt.Errorf("failed to flush buffered WAL: %w", err) 410 } 411 412 // TODO: Prevent from new goroutines calling InsertRows(), for graceful shutdown. 413 414 // Make all writable partitions read-only by inserting as same number of those. 415 for i := 0; i < writablePartitionsNum; i++ { 416 if err := s.newPartition(nil, true); err != nil { 417 return err 418 } 419 } 420 if err := s.flushPartitions(); err != nil { 421 return fmt.Errorf("failed to close storage: %w", err) 422 } 423 if err := s.removeExpiredPartitions(); err != nil { 424 return fmt.Errorf("failed to remove expired partitions: %w", err) 425 } 426 // All partitions have been flushed, so WAL isn't needed anymore. 427 if err := s.wal.removeAll(); err != nil { 428 return fmt.Errorf("failed to remove WAL: %w", err) 429 } 430 return nil 431} 432 433func (s *storage) newPartition(p partition, punctuateWal bool) error { 434 if p == nil { 435 p = newMemoryPartition(s.wal, s.partitionDuration, s.timestampPrecision) 436 } 437 s.partitionList.insert(p) 438 if punctuateWal { 439 return s.wal.punctuate() 440 } 441 return nil 442} 443 444// flushPartitions persists all in-memory partitions ready to persisted. 445// For the in-memory mode, just removes it from the partition list. 446func (s *storage) flushPartitions() error { 447 // Keep the first two partitions as is even if they are inactive, 448 // to accept out-of-order data points. 449 i := 0 450 iterator := s.partitionList.newIterator() 451 for iterator.next() { 452 if i < writablePartitionsNum { 453 i++ 454 continue 455 } 456 part := iterator.value() 457 if part == nil { 458 return fmt.Errorf("unexpected empty partition found") 459 } 460 memPart, ok := part.(*memoryPartition) 461 if !ok { 462 continue 463 } 464 465 if s.inMemoryMode() { 466 if err := s.partitionList.remove(part); err != nil { 467 return fmt.Errorf("failed to remove partition: %w", err) 468 } 469 continue 470 } 471 472 // Start swapping in-memory partition for disk one. 473 // The disk partition will place at where in-memory one existed. 474 475 dir := filepath.Join(s.dataPath, fmt.Sprintf("p-%d-%d", memPart.minTimestamp(), memPart.maxTimestamp())) 476 if err := s.flush(dir, memPart); err != nil { 477 return fmt.Errorf("failed to compact memory partition into %s: %w", dir, err) 478 } 479 newPart, err := openDiskPartition(dir, s.retention) 480 if errors.Is(err, ErrNoDataPoints) { 481 if err := s.partitionList.remove(part); err != nil { 482 return fmt.Errorf("failed to remove partition: %w", err) 483 } 484 continue 485 } 486 if err != nil { 487 return fmt.Errorf("failed to generate disk partition for %s: %w", dir, err) 488 } 489 if err := s.partitionList.swap(part, newPart); err != nil { 490 return fmt.Errorf("failed to swap partitions: %w", err) 491 } 492 493 if err := s.wal.removeOldest(); err != nil { 494 return fmt.Errorf("failed to remove oldest WAL segment: %w", err) 495 } 496 } 497 return nil 498} 499 500// flush compacts the data points in the given partition and flushes them to the given directory. 501func (s *storage) flush(dirPath string, m *memoryPartition) error { 502 if dirPath == "" { 503 return fmt.Errorf("dir path is required") 504 } 505 506 if err := os.MkdirAll(dirPath, fs.ModePerm); err != nil { 507 return fmt.Errorf("failed to make directory %q: %w", dirPath, err) 508 } 509 510 f, err := os.Create(filepath.Join(dirPath, dataFileName)) 511 if err != nil { 512 return fmt.Errorf("failed to create file %q: %w", dirPath, err) 513 } 514 defer f.Close() 515 encoder := newSeriesEncoder(f) 516 517 metrics := map[string]diskMetric{} 518 m.metrics.Range(func(key, value interface{}) bool { 519 mt, ok := value.(*memoryMetric) 520 if !ok { 521 s.logger.Printf("unknown value found\n") 522 return false 523 } 524 offset, err := f.Seek(0, io.SeekCurrent) 525 if err != nil { 526 s.logger.Printf("failed to set file offset of metric %q: %v\n", mt.name, err) 527 return false 528 } 529 530 if err := mt.encodeAllPoints(encoder); err != nil { 531 s.logger.Printf("failed to encode a data point that metric is %q: %v\n", mt.name, err) 532 return false 533 } 534 535 if err := encoder.flush(); err != nil { 536 s.logger.Printf("failed to flush data points that metric is %q: %v\n", mt.name, err) 537 return false 538 } 539 540 totalNumPoints := mt.size + int64(len(mt.outOfOrderPoints)) 541 metrics[mt.name] = diskMetric{ 542 Name: mt.name, 543 Offset: offset, 544 MinTimestamp: mt.minTimestamp, 545 MaxTimestamp: mt.maxTimestamp, 546 NumDataPoints: totalNumPoints, 547 } 548 return true 549 }) 550 551 b, err := json.Marshal(&meta{ 552 MinTimestamp: m.minTimestamp(), 553 MaxTimestamp: m.maxTimestamp(), 554 NumDataPoints: m.size(), 555 Metrics: metrics, 556 CreatedAt: time.Now(), 557 }) 558 if err != nil { 559 return fmt.Errorf("failed to encode metadata: %w", err) 560 } 561 562 // It should write the meta file at last because what valid meta file exists proves the disk partition is valid. 563 metaPath := filepath.Join(dirPath, metaFileName) 564 if err := os.WriteFile(metaPath, b, fs.ModePerm); err != nil { 565 return fmt.Errorf("failed to write metadata to %s: %w", metaPath, err) 566 } 567 return nil 568} 569 570func (s *storage) removeExpiredPartitions() error { 571 expiredList := make([]partition, 0) 572 iterator := s.partitionList.newIterator() 573 for iterator.next() { 574 part := iterator.value() 575 if part == nil { 576 return fmt.Errorf("unexpected nil partition found") 577 } 578 if part.expired() { 579 expiredList = append(expiredList, part) 580 } 581 } 582 583 for i := range expiredList { 584 if err := s.partitionList.remove(expiredList[i]); err != nil { 585 return fmt.Errorf("failed to remove expired partition") 586 } 587 } 588 return nil 589} 590 591// recoverWAL inserts all records within the given wal, and then removes all WAL segment files. 592func (s *storage) recoverWAL(walDir string) error { 593 reader, err := newDiskWALReader(walDir) 594 if errors.Is(err, os.ErrNotExist) { 595 return nil 596 } 597 if err != nil { 598 return err 599 } 600 601 if err := reader.readAll(); err != nil { 602 return fmt.Errorf("failed to read WAL: %w", err) 603 } 604 605 if len(reader.rowsToInsert) == 0 { 606 return nil 607 } 608 if err := s.InsertRows(reader.rowsToInsert); err != nil { 609 return fmt.Errorf("failed to insert rows recovered from WAL: %w", err) 610 } 611 return s.wal.refresh() 612} 613 614func (s *storage) inMemoryMode() bool { 615 return s.dataPath == "" 616} 617