1package tstorage
2
3import (
4	"bufio"
5	"encoding/binary"
6	"errors"
7	"fmt"
8	"io"
9	"io/fs"
10	"math"
11	"os"
12	"path/filepath"
13	"strconv"
14	"sync"
15	"sync/atomic"
16)
17
18// diskWAL contains multiple segment files. One segment is responsible for one partition.
19// They can be easily sorted because they are named using the created timestamp.
20// Macro layout is like:
21/*
22  .wal/
23  ├── 0
24  └── 1
25*/
26type diskWAL struct {
27	dir          string
28	bufferedSize int
29	// Buffered-writer to the active segment
30	w *bufio.Writer
31	// File descriptor to the active segment
32	fd    *os.File
33	index uint32
34	mu    sync.Mutex
35}
36
37func newDiskWAL(dir string, bufferedSize int) (wal, error) {
38	if err := os.MkdirAll(dir, fs.ModePerm); err != nil {
39		return nil, fmt.Errorf("failed to make WAL dir: %w", err)
40	}
41	w := &diskWAL{
42		dir:          dir,
43		bufferedSize: bufferedSize,
44	}
45	f, err := w.createSegmentFile(dir)
46	if err != nil {
47		return nil, err
48	}
49	w.fd = f
50	w.w = bufio.NewWriterSize(f, bufferedSize)
51
52	return w, nil
53}
54
55// append appends the given entry to the end of a file via the file descriptor it has.
56func (w *diskWAL) append(op walOperation, rows []Row) error {
57	w.mu.Lock()
58	defer w.mu.Unlock()
59
60	switch op {
61	case operationInsert:
62		for _, row := range rows {
63			// Write the operation type
64			if err := w.w.WriteByte(byte(op)); err != nil {
65				return fmt.Errorf("failed to write operation: %w", err)
66			}
67			name := marshalMetricName(row.Metric, row.Labels)
68			// Write the length of the metric name
69			lBuf := make([]byte, binary.MaxVarintLen64)
70			n := binary.PutUvarint(lBuf, uint64(len(name)))
71			if _, err := w.w.Write(lBuf[:n]); err != nil {
72				return fmt.Errorf("failed to write the length of the metric name: %w", err)
73			}
74			// Write the metric name
75			if _, err := w.w.WriteString(name); err != nil {
76				return fmt.Errorf("failed to write the metric name: %w", err)
77			}
78			// Write the timestamp
79			tsBuf := make([]byte, binary.MaxVarintLen64)
80			n = binary.PutVarint(tsBuf, row.DataPoint.Timestamp)
81			if _, err := w.w.Write(tsBuf[:n]); err != nil {
82				return fmt.Errorf("failed to write the timestamp: %w", err)
83			}
84			// Write the value
85			vBuf := make([]byte, binary.MaxVarintLen64)
86			n = binary.PutUvarint(vBuf, math.Float64bits(row.DataPoint.Value))
87			if _, err := w.w.Write(vBuf[:n]); err != nil {
88				return fmt.Errorf("failed to write the value: %w", err)
89			}
90		}
91	default:
92		return fmt.Errorf("unknown operation %v given", op)
93	}
94	if w.bufferedSize == 0 {
95		return w.flush()
96	}
97
98	return nil
99}
100
101// flush flushes all buffered entries to the underlying file.
102func (w *diskWAL) flush() error {
103	if err := w.w.Flush(); err != nil {
104		return fmt.Errorf("failed to flush buffered-data into the underlying WAL file: %w", err)
105	}
106	return nil
107}
108
109// punctuate set boundary and creates a new segment.
110func (w *diskWAL) punctuate() error {
111	w.mu.Lock()
112	defer w.mu.Unlock()
113	if err := w.flush(); err != nil {
114		return err
115	}
116	if err := w.fd.Close(); err != nil {
117		return err
118	}
119	f, err := w.createSegmentFile(w.dir)
120	if err != nil {
121		return err
122	}
123	w.fd = f
124	w.w = bufio.NewWriterSize(f, w.bufferedSize)
125	return nil
126}
127
128// truncateOldest removes only the oldest segment.
129func (w *diskWAL) removeOldest() error {
130	w.mu.Lock()
131	defer w.mu.Unlock()
132	files, err := os.ReadDir(w.dir)
133	if err != nil {
134		return fmt.Errorf("failed to read WAL directory: %w", err)
135	}
136	if len(files) == 0 {
137		return fmt.Errorf("no segment found")
138	}
139	return os.RemoveAll(filepath.Join(w.dir, files[0].Name()))
140}
141
142// removeAll removes all segment files.
143func (w *diskWAL) removeAll() error {
144	w.mu.Lock()
145	defer w.mu.Unlock()
146	if err := w.fd.Close(); err != nil {
147		return err
148	}
149	if err := os.RemoveAll(w.dir); err != nil {
150		return fmt.Errorf("failed to remove files under %q: %w", w.dir, err)
151	}
152	return os.MkdirAll(w.dir, fs.ModePerm)
153}
154
155// refresh removes all segment files and make a new segment.
156func (w *diskWAL) refresh() error {
157	if err := w.removeAll(); err != nil {
158		return err
159	}
160	w.mu.Lock()
161	defer w.mu.Unlock()
162
163	f, err := w.createSegmentFile(w.dir)
164	if err != nil {
165		return err
166	}
167	w.fd = f
168	w.w = bufio.NewWriterSize(f, w.bufferedSize)
169	return nil
170}
171
172// createSegmentFile creates a new file with the name of the numbering index.
173func (w *diskWAL) createSegmentFile(dir string) (*os.File, error) {
174	name := strconv.Itoa(int(atomic.LoadUint32(&w.index)))
175	f, err := os.OpenFile(filepath.Join(dir, name), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
176	if err != nil {
177		return nil, fmt.Errorf("failed to create segment file: %w", err)
178	}
179	atomic.AddUint32(&w.index, 1)
180	return f, nil
181}
182
183type walRecord struct {
184	op  walOperation
185	row Row
186}
187
188type diskWALReader struct {
189	dir          string
190	files        []os.DirEntry
191	rowsToInsert []Row
192}
193
194func newDiskWALReader(dir string) (*diskWALReader, error) {
195	files, err := os.ReadDir(dir)
196	if err != nil {
197		return nil, fmt.Errorf("failed to read the WAL dir: %w", err)
198	}
199
200	return &diskWALReader{
201		dir:          dir,
202		files:        files,
203		rowsToInsert: make([]Row, 0),
204	}, nil
205}
206
207// readAll reads all segment files and caches the result for each operation.
208func (f *diskWALReader) readAll() error {
209	for _, file := range f.files {
210		if file.IsDir() {
211			return fmt.Errorf("unexpected directory found under the WAL directory: %s", file.Name())
212		}
213		fd, err := os.Open(filepath.Join(f.dir, file.Name()))
214		if err != nil {
215			return fmt.Errorf("failed to open WAL segment file: %w", err)
216		}
217		segment := &segment{
218			file: fd,
219			r:    bufio.NewReader(fd),
220		}
221		for segment.next() {
222			rec := segment.record()
223			switch rec.op {
224			case operationInsert:
225				f.rowsToInsert = append(f.rowsToInsert, rec.row)
226			}
227		}
228		if err := segment.close(); err != nil {
229			return err
230		}
231
232		err = segment.error()
233		if errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF) {
234			// It is not unusual for a line to be invalid, as it may well terminate in the middle of writing to the WAL.
235			return nil
236		}
237		if err != nil {
238			return fmt.Errorf("encounter an error while reading WAL segment file %q: %w", file.Name(), segment.error())
239		}
240	}
241	return nil
242}
243
244// segment represents a segment file.
245type segment struct {
246	file *os.File
247	r    *bufio.Reader
248	// FIXME: Use interface to support other operation type
249	current walRecord
250	err     error
251}
252
253func (f *segment) next() bool {
254	op, err := f.r.ReadByte()
255	if errors.Is(err, io.EOF) {
256		return false
257	}
258	if err != nil {
259		f.err = err
260		return false
261	}
262	switch walOperation(op) {
263	case operationInsert:
264		// Read the length of metric name.
265		metricLen, err := binary.ReadUvarint(f.r)
266		if err != nil {
267			f.err = fmt.Errorf("failed to read the length of metric name: %w", err)
268			return false
269		}
270		// Read the metric name.
271		metric := make([]byte, int(metricLen))
272		if _, err := io.ReadFull(f.r, metric); err != nil {
273			f.err = fmt.Errorf("failed to read the metric name: %w", err)
274			return false
275		}
276		// Read timestamp.
277		ts, err := binary.ReadVarint(f.r)
278		if err != nil {
279			f.err = fmt.Errorf("failed to read timestamp: %w", err)
280			return false
281		}
282		// Read value.
283		val, err := binary.ReadUvarint(f.r)
284		if err != nil {
285			f.err = fmt.Errorf("failed to read value: %w", err)
286			return false
287		}
288		f.current = walRecord{
289			op: walOperation(op),
290			row: Row{
291				Metric: string(metric),
292				DataPoint: DataPoint{
293					Timestamp: ts,
294					Value:     math.Float64frombits(val),
295				},
296			},
297		}
298	default:
299		f.err = fmt.Errorf("unknown operation %v found", op)
300		return false
301	}
302
303	return true
304}
305
306// error gives back an error if it has been facing an error while reading.
307func (f *segment) error() error {
308	return f.err
309}
310
311func (f *segment) record() *walRecord {
312	return &f.current
313}
314
315func (f *segment) close() error {
316	return f.file.Close()
317}
318