1package tstorage
2
3import (
4	"encoding/json"
5	"errors"
6	"fmt"
7	"io"
8	"io/fs"
9	"os"
10	"path/filepath"
11	"regexp"
12	"sort"
13	"sync"
14	"time"
15
16	"github.com/nakabonne/tstorage/internal/cgroup"
17	"github.com/nakabonne/tstorage/internal/timerpool"
18)
19
20var (
21	ErrNoDataPoints = errors.New("no data points found")
22
23	// Limit the concurrency for data ingestion to GOMAXPROCS, since this operation
24	// is CPU bound, so there is no sense in running more than GOMAXPROCS concurrent
25	// goroutines on data ingestion path.
26	defaultWorkersLimit = cgroup.AvailableCPUs()
27
28	partitionDirRegex = regexp.MustCompile(`^p-.+`)
29)
30
31// TimestampPrecision represents precision of timestamps. See WithTimestampPrecision
32type TimestampPrecision string
33
34const (
35	Nanoseconds  TimestampPrecision = "ns"
36	Microseconds TimestampPrecision = "us"
37	Milliseconds TimestampPrecision = "ms"
38	Seconds      TimestampPrecision = "s"
39
40	defaultPartitionDuration  = 1 * time.Hour
41	defaultRetention          = 336 * time.Hour
42	defaultTimestampPrecision = Nanoseconds
43	defaultWriteTimeout       = 30 * time.Second
44	defaultWALBufferedSize    = 4096
45
46	writablePartitionsNum = 2
47	checkExpiredInterval  = time.Hour
48
49	walDirName = "wal"
50)
51
52// Storage provides goroutine safe capabilities of insertion into and retrieval from the time-series storage.
53type Storage interface {
54	Reader
55	// InsertRows ingests the given rows to the time-series storage.
56	// If the timestamp is empty, it uses the machine's local timestamp in UTC.
57	// The precision of timestamps is nanoseconds by default. It can be changed using WithTimestampPrecision.
58	InsertRows(rows []Row) error
59	// Close gracefully shutdowns by flushing any unwritten data to the underlying disk partition.
60	Close() error
61}
62
63// Reader provides reading access to time series data.
64type Reader interface {
65	// Select gives back a list of data points that matches a set of the given metric and
66	// labels within the given start-end range. Keep in mind that start is inclusive, end is exclusive,
67	// and both must be Unix timestamp. ErrNoDataPoints will be returned if no data points found.
68	Select(metric string, labels []Label, start, end int64) (points []*DataPoint, err error)
69}
70
71// Row includes a data point along with properties to identify a kind of metrics.
72type Row struct {
73	// The unique name of metric.
74	// This field must be set.
75	Metric string
76	// An optional key-value properties to further detailed identification.
77	Labels []Label
78	// This field must be set.
79	DataPoint
80}
81
82// DataPoint represents a data point, the smallest unit of time series data.
83type DataPoint struct {
84	// The actual value. This field must be set.
85	Value float64
86	// Unix timestamp.
87	Timestamp int64
88}
89
90// Option is an optional setting for NewStorage.
91type Option func(*storage)
92
93// WithDataPath specifies the path to directory that stores time-series data.
94// Use this to make time-series data persistent on disk.
95//
96// Defaults to empty string which means no data will get persisted.
97func WithDataPath(dataPath string) Option {
98	return func(s *storage) {
99		s.dataPath = dataPath
100	}
101}
102
103// WithPartitionDuration specifies the timestamp range of partitions.
104// Once it exceeds the given time range, the new partition gets inserted.
105//
106// A partition is a chunk of time-series data with the timestamp range.
107// It acts as a fully independent database containing all data
108// points for its time range.
109//
110// Defaults to 1h
111func WithPartitionDuration(duration time.Duration) Option {
112	return func(s *storage) {
113		s.partitionDuration = duration
114	}
115}
116
117// WithRetention specifies when to remove old data.
118// Data points will get automatically removed from the disk after a
119// specified period of time after a disk partition was created.
120// Defaults to 14d.
121func WithRetention(retention time.Duration) Option {
122	return func(s *storage) {
123		s.retention = retention
124	}
125}
126
127// WithTimestampPrecision specifies the precision of timestamps to be used by all operations.
128//
129// Defaults to Nanoseconds
130func WithTimestampPrecision(precision TimestampPrecision) Option {
131	return func(s *storage) {
132		s.timestampPrecision = precision
133	}
134}
135
136// WithWriteTimeout specifies the timeout to wait when workers are busy.
137//
138// The storage limits the number of concurrent goroutines to prevent from out of memory
139// errors and CPU trashing even if too many goroutines attempt to write.
140//
141// Defaults to 30s.
142func WithWriteTimeout(timeout time.Duration) Option {
143	return func(s *storage) {
144		s.writeTimeout = timeout
145	}
146}
147
148// WithLogger specifies the logger to emit verbose output.
149//
150// Defaults to a logger implementation that does nothing.
151func WithLogger(logger Logger) Option {
152	return func(s *storage) {
153		s.logger = logger
154	}
155}
156
157// WithWAL specifies the buffered byte size before flushing a WAL file.
158// The larger the size, the less frequently the file is written and more write performance at the expense of durability.
159// Giving 0 means it writes to a file whenever data point comes in.
160// Giving -1 disables using WAL.
161//
162// Defaults to 4096.
163func WithWALBufferedSize(size int) Option {
164	return func(s *storage) {
165		s.walBufferedSize = size
166	}
167}
168
169// NewStorage gives back a new storage, which stores time-series data in the process memory by default.
170//
171// Give the WithDataPath option for running as a on-disk storage. Specify a directory with data already exists,
172// then it will be read as the initial data.
173func NewStorage(opts ...Option) (Storage, error) {
174	s := &storage{
175		partitionList:      newPartitionList(),
176		workersLimitCh:     make(chan struct{}, defaultWorkersLimit),
177		partitionDuration:  defaultPartitionDuration,
178		retention:          defaultRetention,
179		timestampPrecision: defaultTimestampPrecision,
180		writeTimeout:       defaultWriteTimeout,
181		walBufferedSize:    defaultWALBufferedSize,
182		wal:                &nopWAL{},
183		logger:             &nopLogger{},
184		doneCh:             make(chan struct{}, 0),
185	}
186	for _, opt := range opts {
187		opt(s)
188	}
189
190	if s.inMemoryMode() {
191		s.newPartition(nil, false)
192		return s, nil
193	}
194
195	if err := os.MkdirAll(s.dataPath, fs.ModePerm); err != nil {
196		return nil, fmt.Errorf("failed to make data directory %s: %w", s.dataPath, err)
197	}
198
199	walDir := filepath.Join(s.dataPath, walDirName)
200	if s.walBufferedSize >= 0 {
201		wal, err := newDiskWAL(walDir, s.walBufferedSize)
202		if err != nil {
203			return nil, err
204		}
205		s.wal = wal
206	}
207
208	// Read existent partitions from the disk.
209	dirs, err := os.ReadDir(s.dataPath)
210	if err != nil {
211		return nil, fmt.Errorf("failed to open data directory: %w", err)
212	}
213	if len(dirs) == 0 {
214		s.newPartition(nil, false)
215		return s, nil
216	}
217	isPartitionDir := func(f fs.DirEntry) bool {
218		return f.IsDir() && partitionDirRegex.MatchString(f.Name())
219	}
220	partitions := make([]partition, 0, len(dirs))
221	for _, e := range dirs {
222		if !isPartitionDir(e) {
223			continue
224		}
225		path := filepath.Join(s.dataPath, e.Name())
226		part, err := openDiskPartition(path, s.retention)
227		if errors.Is(err, ErrNoDataPoints) {
228			continue
229		}
230		if errors.Is(err, errInvalidPartition) {
231			// It should be recovered by WAL
232			continue
233		}
234		if err != nil {
235			return nil, fmt.Errorf("failed to open disk partition for %s: %w", path, err)
236		}
237		partitions = append(partitions, part)
238	}
239	sort.Slice(partitions, func(i, j int) bool {
240		return partitions[i].minTimestamp() < partitions[j].minTimestamp()
241	})
242	for _, p := range partitions {
243		s.newPartition(p, false)
244	}
245	// Start WAL recovery if there is.
246	if err := s.recoverWAL(walDir); err != nil {
247		return nil, fmt.Errorf("failed to recover WAL: %w", err)
248	}
249	s.newPartition(nil, false)
250
251	// periodically check and permanently remove expired partitions.
252	go func() {
253		ticker := time.NewTicker(checkExpiredInterval)
254		defer ticker.Stop()
255		for {
256			select {
257			case <-s.doneCh:
258				return
259			case <-ticker.C:
260				err := s.removeExpiredPartitions()
261				if err != nil {
262					s.logger.Printf("%v\n", err)
263				}
264			}
265		}
266	}()
267	return s, nil
268}
269
270type storage struct {
271	partitionList partitionList
272
273	walBufferedSize    int
274	wal                wal
275	partitionDuration  time.Duration
276	retention          time.Duration
277	timestampPrecision TimestampPrecision
278	dataPath           string
279	writeTimeout       time.Duration
280
281	logger         Logger
282	workersLimitCh chan struct{}
283	// wg must be incremented to guarantee all writes are done gracefully.
284	wg sync.WaitGroup
285
286	doneCh chan struct{}
287}
288
289func (s *storage) InsertRows(rows []Row) error {
290	s.wg.Add(1)
291	defer s.wg.Done()
292
293	insert := func() error {
294		defer func() { <-s.workersLimitCh }()
295		if err := s.ensureActiveHead(); err != nil {
296			return err
297		}
298		iterator := s.partitionList.newIterator()
299		n := s.partitionList.size()
300		rowsToInsert := rows
301		// Starting at the head partition, try to insert rows, and loop to insert outdated rows
302		// into older partitions. Any rows more than `writablePartitionsNum` partitions out
303		// of date are dropped.
304		for i := 0; i < n && i < writablePartitionsNum; i++ {
305			if len(rowsToInsert) == 0 {
306				break
307			}
308			if !iterator.next() {
309				break
310			}
311			outdatedRows, err := iterator.value().insertRows(rowsToInsert)
312			if err != nil {
313				return fmt.Errorf("failed to insert rows: %w", err)
314			}
315			rowsToInsert = outdatedRows
316		}
317		return nil
318	}
319
320	// Limit the number of concurrent goroutines to prevent from out of memory
321	// errors and CPU trashing even if too many goroutines attempt to write.
322	select {
323	case s.workersLimitCh <- struct{}{}:
324		return insert()
325	default:
326	}
327
328	// Seems like all workers are busy; wait for up to writeTimeout
329
330	t := timerpool.Get(s.writeTimeout)
331	select {
332	case s.workersLimitCh <- struct{}{}:
333		timerpool.Put(t)
334		return insert()
335	case <-t.C:
336		timerpool.Put(t)
337		return fmt.Errorf("failed to write a data point in %s, since it is overloaded with %d concurrent writers",
338			s.writeTimeout, defaultWorkersLimit)
339	}
340}
341
342// ensureActiveHead ensures the head of partitionList is an active partition.
343// If none, it creates a new one.
344func (s *storage) ensureActiveHead() error {
345	head := s.partitionList.getHead()
346	if head != nil && head.active() {
347		return nil
348	}
349
350	// All partitions seems to be inactive so add a new partition to the list.
351	if err := s.newPartition(nil, true); err != nil {
352		return err
353	}
354	go func() {
355		if err := s.flushPartitions(); err != nil {
356			s.logger.Printf("failed to flush in-memory partitions: %v", err)
357		}
358	}()
359	return nil
360}
361
362func (s *storage) Select(metric string, labels []Label, start, end int64) ([]*DataPoint, error) {
363	if metric == "" {
364		return nil, fmt.Errorf("metric must be set")
365	}
366	if start >= end {
367		return nil, fmt.Errorf("the given start is greater than end")
368	}
369	points := make([]*DataPoint, 0)
370
371	// Iterate over all partitions from the newest one.
372	iterator := s.partitionList.newIterator()
373	for iterator.next() {
374		part := iterator.value()
375		if part == nil {
376			return nil, fmt.Errorf("unexpected empty partition found")
377		}
378		if part.minTimestamp() == 0 {
379			// Skip the partition that has no points.
380			continue
381		}
382		if part.maxTimestamp() < start {
383			// No need to keep going anymore
384			break
385		}
386		if part.minTimestamp() > end {
387			continue
388		}
389		ps, err := part.selectDataPoints(metric, labels, start, end)
390		if errors.Is(err, ErrNoDataPoints) {
391			continue
392		}
393		if err != nil {
394			return nil, fmt.Errorf("failed to select data points: %w", err)
395		}
396		// in order to keep the order in ascending.
397		points = append(ps, points...)
398	}
399	if len(points) == 0 {
400		return nil, ErrNoDataPoints
401	}
402	return points, nil
403}
404
405func (s *storage) Close() error {
406	s.wg.Wait()
407	close(s.doneCh)
408	if err := s.wal.flush(); err != nil {
409		return fmt.Errorf("failed to flush buffered WAL: %w", err)
410	}
411
412	// TODO: Prevent from new goroutines calling InsertRows(), for graceful shutdown.
413
414	// Make all writable partitions read-only by inserting as same number of those.
415	for i := 0; i < writablePartitionsNum; i++ {
416		if err := s.newPartition(nil, true); err != nil {
417			return err
418		}
419	}
420	if err := s.flushPartitions(); err != nil {
421		return fmt.Errorf("failed to close storage: %w", err)
422	}
423	if err := s.removeExpiredPartitions(); err != nil {
424		return fmt.Errorf("failed to remove expired partitions: %w", err)
425	}
426	// All partitions have been flushed, so WAL isn't needed anymore.
427	if err := s.wal.removeAll(); err != nil {
428		return fmt.Errorf("failed to remove WAL: %w", err)
429	}
430	return nil
431}
432
433func (s *storage) newPartition(p partition, punctuateWal bool) error {
434	if p == nil {
435		p = newMemoryPartition(s.wal, s.partitionDuration, s.timestampPrecision)
436	}
437	s.partitionList.insert(p)
438	if punctuateWal {
439		return s.wal.punctuate()
440	}
441	return nil
442}
443
444// flushPartitions persists all in-memory partitions ready to persisted.
445// For the in-memory mode, just removes it from the partition list.
446func (s *storage) flushPartitions() error {
447	// Keep the first two partitions as is even if they are inactive,
448	// to accept out-of-order data points.
449	i := 0
450	iterator := s.partitionList.newIterator()
451	for iterator.next() {
452		if i < writablePartitionsNum {
453			i++
454			continue
455		}
456		part := iterator.value()
457		if part == nil {
458			return fmt.Errorf("unexpected empty partition found")
459		}
460		memPart, ok := part.(*memoryPartition)
461		if !ok {
462			continue
463		}
464
465		if s.inMemoryMode() {
466			if err := s.partitionList.remove(part); err != nil {
467				return fmt.Errorf("failed to remove partition: %w", err)
468			}
469			continue
470		}
471
472		// Start swapping in-memory partition for disk one.
473		// The disk partition will place at where in-memory one existed.
474
475		dir := filepath.Join(s.dataPath, fmt.Sprintf("p-%d-%d", memPart.minTimestamp(), memPart.maxTimestamp()))
476		if err := s.flush(dir, memPart); err != nil {
477			return fmt.Errorf("failed to compact memory partition into %s: %w", dir, err)
478		}
479		newPart, err := openDiskPartition(dir, s.retention)
480		if errors.Is(err, ErrNoDataPoints) {
481			if err := s.partitionList.remove(part); err != nil {
482				return fmt.Errorf("failed to remove partition: %w", err)
483			}
484			continue
485		}
486		if err != nil {
487			return fmt.Errorf("failed to generate disk partition for %s: %w", dir, err)
488		}
489		if err := s.partitionList.swap(part, newPart); err != nil {
490			return fmt.Errorf("failed to swap partitions: %w", err)
491		}
492
493		if err := s.wal.removeOldest(); err != nil {
494			return fmt.Errorf("failed to remove oldest WAL segment: %w", err)
495		}
496	}
497	return nil
498}
499
500// flush compacts the data points in the given partition and flushes them to the given directory.
501func (s *storage) flush(dirPath string, m *memoryPartition) error {
502	if dirPath == "" {
503		return fmt.Errorf("dir path is required")
504	}
505
506	if err := os.MkdirAll(dirPath, fs.ModePerm); err != nil {
507		return fmt.Errorf("failed to make directory %q: %w", dirPath, err)
508	}
509
510	f, err := os.Create(filepath.Join(dirPath, dataFileName))
511	if err != nil {
512		return fmt.Errorf("failed to create file %q: %w", dirPath, err)
513	}
514	defer f.Close()
515	encoder := newSeriesEncoder(f)
516
517	metrics := map[string]diskMetric{}
518	m.metrics.Range(func(key, value interface{}) bool {
519		mt, ok := value.(*memoryMetric)
520		if !ok {
521			s.logger.Printf("unknown value found\n")
522			return false
523		}
524		offset, err := f.Seek(0, io.SeekCurrent)
525		if err != nil {
526			s.logger.Printf("failed to set file offset of metric %q: %v\n", mt.name, err)
527			return false
528		}
529
530		if err := mt.encodeAllPoints(encoder); err != nil {
531			s.logger.Printf("failed to encode a data point that metric is %q: %v\n", mt.name, err)
532			return false
533		}
534
535		if err := encoder.flush(); err != nil {
536			s.logger.Printf("failed to flush data points that metric is %q: %v\n", mt.name, err)
537			return false
538		}
539
540		totalNumPoints := mt.size + int64(len(mt.outOfOrderPoints))
541		metrics[mt.name] = diskMetric{
542			Name:          mt.name,
543			Offset:        offset,
544			MinTimestamp:  mt.minTimestamp,
545			MaxTimestamp:  mt.maxTimestamp,
546			NumDataPoints: totalNumPoints,
547		}
548		return true
549	})
550
551	b, err := json.Marshal(&meta{
552		MinTimestamp:  m.minTimestamp(),
553		MaxTimestamp:  m.maxTimestamp(),
554		NumDataPoints: m.size(),
555		Metrics:       metrics,
556		CreatedAt:     time.Now(),
557	})
558	if err != nil {
559		return fmt.Errorf("failed to encode metadata: %w", err)
560	}
561
562	// It should write the meta file at last because what valid meta file exists proves the disk partition is valid.
563	metaPath := filepath.Join(dirPath, metaFileName)
564	if err := os.WriteFile(metaPath, b, fs.ModePerm); err != nil {
565		return fmt.Errorf("failed to write metadata to %s: %w", metaPath, err)
566	}
567	return nil
568}
569
570func (s *storage) removeExpiredPartitions() error {
571	expiredList := make([]partition, 0)
572	iterator := s.partitionList.newIterator()
573	for iterator.next() {
574		part := iterator.value()
575		if part == nil {
576			return fmt.Errorf("unexpected nil partition found")
577		}
578		if part.expired() {
579			expiredList = append(expiredList, part)
580		}
581	}
582
583	for i := range expiredList {
584		if err := s.partitionList.remove(expiredList[i]); err != nil {
585			return fmt.Errorf("failed to remove expired partition")
586		}
587	}
588	return nil
589}
590
591// recoverWAL inserts all records within the given wal, and then removes all WAL segment files.
592func (s *storage) recoverWAL(walDir string) error {
593	reader, err := newDiskWALReader(walDir)
594	if errors.Is(err, os.ErrNotExist) {
595		return nil
596	}
597	if err != nil {
598		return err
599	}
600
601	if err := reader.readAll(); err != nil {
602		return fmt.Errorf("failed to read WAL: %w", err)
603	}
604
605	if len(reader.rowsToInsert) == 0 {
606		return nil
607	}
608	if err := s.InsertRows(reader.rowsToInsert); err != nil {
609		return fmt.Errorf("failed to insert rows recovered from WAL: %w", err)
610	}
611	return s.wal.refresh()
612}
613
614func (s *storage) inMemoryMode() bool {
615	return s.dataPath == ""
616}
617