1package tsm1_test
2
3import (
4	"fmt"
5	"io"
6	"os"
7	"reflect"
8	"testing"
9
10	"github.com/golang/snappy"
11	"github.com/influxdata/influxdb/pkg/slices"
12	"github.com/influxdata/influxdb/tsdb/engine/tsm1"
13)
14
15func TestWALWriter_WriteMulti_Single(t *testing.T) {
16	dir := MustTempDir()
17	defer os.RemoveAll(dir)
18	f := MustTempFile(dir)
19	w := tsm1.NewWALSegmentWriter(f)
20
21	p1 := tsm1.NewValue(1, 1.1)
22	p2 := tsm1.NewValue(1, int64(1))
23	p3 := tsm1.NewValue(1, true)
24	p4 := tsm1.NewValue(1, "string")
25	p5 := tsm1.NewValue(1, ^uint64(0))
26
27	values := map[string][]tsm1.Value{
28		"cpu,host=A#!~#float":    []tsm1.Value{p1},
29		"cpu,host=A#!~#int":      []tsm1.Value{p2},
30		"cpu,host=A#!~#bool":     []tsm1.Value{p3},
31		"cpu,host=A#!~#string":   []tsm1.Value{p4},
32		"cpu,host=A#!~#unsigned": []tsm1.Value{p5},
33	}
34
35	entry := &tsm1.WriteWALEntry{
36		Values: values,
37	}
38
39	if err := w.Write(mustMarshalEntry(entry)); err != nil {
40		fatal(t, "write points", err)
41	}
42
43	if err := w.Flush(); err != nil {
44		fatal(t, "flush", err)
45	}
46
47	if _, err := f.Seek(0, io.SeekStart); err != nil {
48		fatal(t, "seek", err)
49	}
50
51	r := tsm1.NewWALSegmentReader(f)
52
53	if !r.Next() {
54		t.Fatalf("expected next, got false")
55	}
56
57	we, err := r.Read()
58	if err != nil {
59		fatal(t, "read entry", err)
60	}
61
62	e, ok := we.(*tsm1.WriteWALEntry)
63	if !ok {
64		t.Fatalf("expected WriteWALEntry: got %#v", e)
65	}
66
67	for k, v := range e.Values {
68		for i, vv := range v {
69			if got, exp := vv.String(), values[k][i].String(); got != exp {
70				t.Fatalf("points mismatch: got %v, exp %v", got, exp)
71			}
72		}
73	}
74
75	if n := r.Count(); n != MustReadFileSize(f) {
76		t.Fatalf("wrong count of bytes read, got %d, exp %d", n, MustReadFileSize(f))
77	}
78}
79
80func TestWALWriter_WriteMulti_LargeBatch(t *testing.T) {
81	dir := MustTempDir()
82	defer os.RemoveAll(dir)
83	f := MustTempFile(dir)
84	w := tsm1.NewWALSegmentWriter(f)
85
86	var points []tsm1.Value
87	for i := 0; i < 100000; i++ {
88		points = append(points, tsm1.NewValue(int64(i), int64(1)))
89	}
90
91	values := map[string][]tsm1.Value{
92		"cpu,host=A,server=01,foo=bar,tag=really-long#!~#float": points,
93		"mem,host=A,server=01,foo=bar,tag=really-long#!~#float": points,
94	}
95
96	entry := &tsm1.WriteWALEntry{
97		Values: values,
98	}
99
100	if err := w.Write(mustMarshalEntry(entry)); err != nil {
101		fatal(t, "write points", err)
102	}
103
104	if err := w.Flush(); err != nil {
105		fatal(t, "flush", err)
106	}
107
108	if _, err := f.Seek(0, io.SeekStart); err != nil {
109		fatal(t, "seek", err)
110	}
111
112	r := tsm1.NewWALSegmentReader(f)
113
114	if !r.Next() {
115		t.Fatalf("expected next, got false")
116	}
117
118	we, err := r.Read()
119	if err != nil {
120		fatal(t, "read entry", err)
121	}
122
123	e, ok := we.(*tsm1.WriteWALEntry)
124	if !ok {
125		t.Fatalf("expected WriteWALEntry: got %#v", e)
126	}
127
128	for k, v := range e.Values {
129		for i, vv := range v {
130			if got, exp := vv.String(), values[k][i].String(); got != exp {
131				t.Fatalf("points mismatch: got %v, exp %v", got, exp)
132			}
133		}
134	}
135
136	if n := r.Count(); n != MustReadFileSize(f) {
137		t.Fatalf("wrong count of bytes read, got %d, exp %d", n, MustReadFileSize(f))
138	}
139}
140func TestWALWriter_WriteMulti_Multiple(t *testing.T) {
141	dir := MustTempDir()
142	defer os.RemoveAll(dir)
143	f := MustTempFile(dir)
144	w := tsm1.NewWALSegmentWriter(f)
145
146	p1 := tsm1.NewValue(1, int64(1))
147	p2 := tsm1.NewValue(1, int64(2))
148
149	exp := []struct {
150		key    string
151		values []tsm1.Value
152	}{
153		{"cpu,host=A#!~#value", []tsm1.Value{p1}},
154		{"cpu,host=B#!~#value", []tsm1.Value{p2}},
155	}
156
157	for _, v := range exp {
158		entry := &tsm1.WriteWALEntry{
159			Values: map[string][]tsm1.Value{v.key: v.values},
160		}
161
162		if err := w.Write(mustMarshalEntry(entry)); err != nil {
163			fatal(t, "write points", err)
164		}
165		if err := w.Flush(); err != nil {
166			fatal(t, "flush", err)
167		}
168	}
169
170	// Seek back to the beinning of the file for reading
171	if _, err := f.Seek(0, io.SeekStart); err != nil {
172		fatal(t, "seek", err)
173	}
174
175	r := tsm1.NewWALSegmentReader(f)
176
177	for _, ep := range exp {
178		if !r.Next() {
179			t.Fatalf("expected next, got false")
180		}
181
182		we, err := r.Read()
183		if err != nil {
184			fatal(t, "read entry", err)
185		}
186
187		e, ok := we.(*tsm1.WriteWALEntry)
188		if !ok {
189			t.Fatalf("expected WriteWALEntry: got %#v", e)
190		}
191
192		for k, v := range e.Values {
193			if got, exp := k, ep.key; got != exp {
194				t.Fatalf("key mismatch. got %v, exp %v", got, exp)
195			}
196
197			if got, exp := len(v), len(ep.values); got != exp {
198				t.Fatalf("values length mismatch: got %v, exp %v", got, exp)
199			}
200
201			for i, vv := range v {
202				if got, exp := vv.String(), ep.values[i].String(); got != exp {
203					t.Fatalf("points mismatch: got %v, exp %v", got, exp)
204				}
205			}
206		}
207	}
208
209	if n := r.Count(); n != MustReadFileSize(f) {
210		t.Fatalf("wrong count of bytes read, got %d, exp %d", n, MustReadFileSize(f))
211	}
212}
213
214func TestWALWriter_WriteDelete_Single(t *testing.T) {
215	dir := MustTempDir()
216	defer os.RemoveAll(dir)
217	f := MustTempFile(dir)
218	w := tsm1.NewWALSegmentWriter(f)
219
220	entry := &tsm1.DeleteWALEntry{
221		Keys: [][]byte{[]byte("cpu")},
222	}
223
224	if err := w.Write(mustMarshalEntry(entry)); err != nil {
225		fatal(t, "write points", err)
226	}
227
228	if err := w.Flush(); err != nil {
229		fatal(t, "flush", err)
230	}
231
232	if _, err := f.Seek(0, io.SeekStart); err != nil {
233		fatal(t, "seek", err)
234	}
235
236	r := tsm1.NewWALSegmentReader(f)
237
238	if !r.Next() {
239		t.Fatalf("expected next, got false")
240	}
241
242	we, err := r.Read()
243	if err != nil {
244		fatal(t, "read entry", err)
245	}
246
247	e, ok := we.(*tsm1.DeleteWALEntry)
248	if !ok {
249		t.Fatalf("expected WriteWALEntry: got %#v", e)
250	}
251
252	if got, exp := len(e.Keys), len(entry.Keys); got != exp {
253		t.Fatalf("key length mismatch: got %v, exp %v", got, exp)
254	}
255
256	if got, exp := string(e.Keys[0]), string(entry.Keys[0]); got != exp {
257		t.Fatalf("key mismatch: got %v, exp %v", got, exp)
258	}
259}
260
261func TestWALWriter_WriteMultiDelete_Multiple(t *testing.T) {
262	dir := MustTempDir()
263	defer os.RemoveAll(dir)
264	f := MustTempFile(dir)
265	w := tsm1.NewWALSegmentWriter(f)
266
267	p1 := tsm1.NewValue(1, true)
268	values := map[string][]tsm1.Value{
269		"cpu,host=A#!~#value": []tsm1.Value{p1},
270	}
271
272	writeEntry := &tsm1.WriteWALEntry{
273		Values: values,
274	}
275
276	if err := w.Write(mustMarshalEntry(writeEntry)); err != nil {
277		fatal(t, "write points", err)
278	}
279
280	if err := w.Flush(); err != nil {
281		fatal(t, "flush", err)
282	}
283
284	// Write the delete entry
285	deleteEntry := &tsm1.DeleteWALEntry{
286		Keys: [][]byte{[]byte("cpu,host=A#!~value")},
287	}
288
289	if err := w.Write(mustMarshalEntry(deleteEntry)); err != nil {
290		fatal(t, "write points", err)
291	}
292
293	if err := w.Flush(); err != nil {
294		fatal(t, "flush", err)
295	}
296
297	// Seek back to the beinning of the file for reading
298	if _, err := f.Seek(0, io.SeekStart); err != nil {
299		fatal(t, "seek", err)
300	}
301
302	r := tsm1.NewWALSegmentReader(f)
303
304	// Read the write points first
305	if !r.Next() {
306		t.Fatalf("expected next, got false")
307	}
308
309	we, err := r.Read()
310	if err != nil {
311		fatal(t, "read entry", err)
312	}
313
314	e, ok := we.(*tsm1.WriteWALEntry)
315	if !ok {
316		t.Fatalf("expected WriteWALEntry: got %#v", e)
317	}
318
319	for k, v := range e.Values {
320		if got, exp := len(v), len(values[k]); got != exp {
321			t.Fatalf("values length mismatch: got %v, exp %v", got, exp)
322		}
323
324		for i, vv := range v {
325			if got, exp := vv.String(), values[k][i].String(); got != exp {
326				t.Fatalf("points mismatch: got %v, exp %v", got, exp)
327			}
328		}
329	}
330
331	// Read the delete second
332	if !r.Next() {
333		t.Fatalf("expected next, got false")
334	}
335
336	we, err = r.Read()
337	if err != nil {
338		fatal(t, "read entry", err)
339	}
340
341	de, ok := we.(*tsm1.DeleteWALEntry)
342	if !ok {
343		t.Fatalf("expected DeleteWALEntry: got %#v", e)
344	}
345
346	if got, exp := len(de.Keys), len(deleteEntry.Keys); got != exp {
347		t.Fatalf("key length mismatch: got %v, exp %v", got, exp)
348	}
349
350	if got, exp := string(de.Keys[0]), string(deleteEntry.Keys[0]); got != exp {
351		t.Fatalf("key mismatch: got %v, exp %v", got, exp)
352	}
353}
354
355func TestWALWriter_WriteMultiDeleteRange_Multiple(t *testing.T) {
356	dir := MustTempDir()
357	defer os.RemoveAll(dir)
358	f := MustTempFile(dir)
359	w := tsm1.NewWALSegmentWriter(f)
360
361	p1 := tsm1.NewValue(1, 1.0)
362	p2 := tsm1.NewValue(2, 2.0)
363	p3 := tsm1.NewValue(3, 3.0)
364
365	values := map[string][]tsm1.Value{
366		"cpu,host=A#!~#value": []tsm1.Value{p1, p2, p3},
367	}
368
369	writeEntry := &tsm1.WriteWALEntry{
370		Values: values,
371	}
372
373	if err := w.Write(mustMarshalEntry(writeEntry)); err != nil {
374		fatal(t, "write points", err)
375	}
376
377	if err := w.Flush(); err != nil {
378		fatal(t, "flush", err)
379	}
380
381	// Write the delete entry
382	deleteEntry := &tsm1.DeleteRangeWALEntry{
383		Keys: [][]byte{[]byte("cpu,host=A#!~value")},
384		Min:  2,
385		Max:  3,
386	}
387
388	if err := w.Write(mustMarshalEntry(deleteEntry)); err != nil {
389		fatal(t, "write points", err)
390	}
391
392	if err := w.Flush(); err != nil {
393		fatal(t, "flush", err)
394	}
395
396	// Seek back to the beinning of the file for reading
397	if _, err := f.Seek(0, io.SeekStart); err != nil {
398		fatal(t, "seek", err)
399	}
400
401	r := tsm1.NewWALSegmentReader(f)
402
403	// Read the write points first
404	if !r.Next() {
405		t.Fatalf("expected next, got false")
406	}
407
408	we, err := r.Read()
409	if err != nil {
410		fatal(t, "read entry", err)
411	}
412
413	e, ok := we.(*tsm1.WriteWALEntry)
414	if !ok {
415		t.Fatalf("expected WriteWALEntry: got %#v", e)
416	}
417
418	for k, v := range e.Values {
419		if got, exp := len(v), len(values[k]); got != exp {
420			t.Fatalf("values length mismatch: got %v, exp %v", got, exp)
421		}
422
423		for i, vv := range v {
424			if got, exp := vv.String(), values[k][i].String(); got != exp {
425				t.Fatalf("points mismatch: got %v, exp %v", got, exp)
426			}
427		}
428	}
429
430	// Read the delete second
431	if !r.Next() {
432		t.Fatalf("expected next, got false")
433	}
434
435	we, err = r.Read()
436	if err != nil {
437		fatal(t, "read entry", err)
438	}
439
440	de, ok := we.(*tsm1.DeleteRangeWALEntry)
441	if !ok {
442		t.Fatalf("expected DeleteWALEntry: got %#v", e)
443	}
444
445	if got, exp := len(de.Keys), len(deleteEntry.Keys); got != exp {
446		t.Fatalf("key length mismatch: got %v, exp %v", got, exp)
447	}
448
449	if got, exp := string(de.Keys[0]), string(deleteEntry.Keys[0]); got != exp {
450		t.Fatalf("key mismatch: got %v, exp %v", got, exp)
451	}
452
453	if got, exp := de.Min, int64(2); got != exp {
454		t.Fatalf("min time mismatch: got %v, exp %v", got, exp)
455	}
456
457	if got, exp := de.Max, int64(3); got != exp {
458		t.Fatalf("min time mismatch: got %v, exp %v", got, exp)
459	}
460
461}
462
463func TestWAL_ClosedSegments(t *testing.T) {
464	dir := MustTempDir()
465	defer os.RemoveAll(dir)
466
467	w := tsm1.NewWAL(dir)
468	if err := w.Open(); err != nil {
469		t.Fatalf("error opening WAL: %v", err)
470	}
471
472	files, err := w.ClosedSegments()
473	if err != nil {
474		t.Fatalf("error getting closed segments: %v", err)
475	}
476
477	if got, exp := len(files), 0; got != exp {
478		t.Fatalf("close segment length mismatch: got %v, exp %v", got, exp)
479	}
480
481	if _, err := w.WriteMulti(map[string][]tsm1.Value{
482		"cpu,host=A#!~#value": []tsm1.Value{
483			tsm1.NewValue(1, 1.1),
484		},
485	}); err != nil {
486		t.Fatalf("error writing points: %v", err)
487	}
488
489	if err := w.Close(); err != nil {
490		t.Fatalf("error closing wal: %v", err)
491	}
492
493	// Re-open the WAL
494	w = tsm1.NewWAL(dir)
495	defer w.Close()
496	if err := w.Open(); err != nil {
497		t.Fatalf("error opening WAL: %v", err)
498	}
499
500	files, err = w.ClosedSegments()
501	if err != nil {
502		t.Fatalf("error getting closed segments: %v", err)
503	}
504	if got, exp := len(files), 0; got != exp {
505		t.Fatalf("close segment length mismatch: got %v, exp %v", got, exp)
506	}
507}
508
509func TestWAL_Delete(t *testing.T) {
510	dir := MustTempDir()
511	defer os.RemoveAll(dir)
512
513	w := tsm1.NewWAL(dir)
514	if err := w.Open(); err != nil {
515		t.Fatalf("error opening WAL: %v", err)
516	}
517
518	files, err := w.ClosedSegments()
519	if err != nil {
520		t.Fatalf("error getting closed segments: %v", err)
521	}
522
523	if got, exp := len(files), 0; got != exp {
524		t.Fatalf("close segment length mismatch: got %v, exp %v", got, exp)
525	}
526
527	if _, err := w.Delete([][]byte{[]byte("cpu")}); err != nil {
528		t.Fatalf("error writing points: %v", err)
529	}
530
531	if err := w.Close(); err != nil {
532		t.Fatalf("error closing wal: %v", err)
533	}
534
535	// Re-open the WAL
536	w = tsm1.NewWAL(dir)
537	defer w.Close()
538	if err := w.Open(); err != nil {
539		t.Fatalf("error opening WAL: %v", err)
540	}
541
542	files, err = w.ClosedSegments()
543	if err != nil {
544		t.Fatalf("error getting closed segments: %v", err)
545	}
546	if got, exp := len(files), 0; got != exp {
547		t.Fatalf("close segment length mismatch: got %v, exp %v", got, exp)
548	}
549}
550
551func TestWALWriter_Corrupt(t *testing.T) {
552	dir := MustTempDir()
553	defer os.RemoveAll(dir)
554	f := MustTempFile(dir)
555	w := tsm1.NewWALSegmentWriter(f)
556	corruption := []byte{1, 4, 0, 0, 0}
557
558	p1 := tsm1.NewValue(1, 1.1)
559	values := map[string][]tsm1.Value{
560		"cpu,host=A#!~#float": []tsm1.Value{p1},
561	}
562
563	entry := &tsm1.WriteWALEntry{
564		Values: values,
565	}
566	if err := w.Write(mustMarshalEntry(entry)); err != nil {
567		fatal(t, "write points", err)
568	}
569
570	if err := w.Flush(); err != nil {
571		fatal(t, "flush", err)
572	}
573
574	// Write some random bytes to the file to simulate corruption.
575	if _, err := f.Write(corruption); err != nil {
576		fatal(t, "corrupt WAL segment", err)
577	}
578
579	// Create the WAL segment reader.
580	if _, err := f.Seek(0, io.SeekStart); err != nil {
581		fatal(t, "seek", err)
582	}
583	r := tsm1.NewWALSegmentReader(f)
584
585	// Try to decode two entries.
586
587	if !r.Next() {
588		t.Fatalf("expected next, got false")
589	}
590	if _, err := r.Read(); err != nil {
591		fatal(t, "read entry", err)
592	}
593
594	if !r.Next() {
595		t.Fatalf("expected next, got false")
596	}
597	if _, err := r.Read(); err == nil {
598		fatal(t, "read entry did not return err", nil)
599	}
600
601	// Count should only return size of valid data.
602	expCount := MustReadFileSize(f) - int64(len(corruption))
603	if n := r.Count(); n != expCount {
604		t.Fatalf("wrong count of bytes read, got %d, exp %d", n, expCount)
605	}
606}
607
608// Reproduces a `panic: runtime error: makeslice: cap out of range` when run with
609// GOARCH=386 go test -run TestWALSegmentReader_Corrupt -v ./tsdb/engine/tsm1/
610func TestWALSegmentReader_Corrupt(t *testing.T) {
611	dir := MustTempDir()
612	defer os.RemoveAll(dir)
613	f := MustTempFile(dir)
614	w := tsm1.NewWALSegmentWriter(f)
615
616	p4 := tsm1.NewValue(1, "string")
617
618	values := map[string][]tsm1.Value{
619		"cpu,host=A#!~#string": []tsm1.Value{p4, p4},
620	}
621
622	entry := &tsm1.WriteWALEntry{
623		Values: values,
624	}
625
626	typ, b := mustMarshalEntry(entry)
627
628	// This causes the nvals field to overflow on 32 bit systems which produces a
629	// negative count and a panic when reading the segment.
630	b[25] = 255
631
632	if err := w.Write(typ, b); err != nil {
633		fatal(t, "write points", err)
634	}
635
636	if err := w.Flush(); err != nil {
637		fatal(t, "flush", err)
638	}
639
640	// Create the WAL segment reader.
641	if _, err := f.Seek(0, io.SeekStart); err != nil {
642		fatal(t, "seek", err)
643	}
644
645	r := tsm1.NewWALSegmentReader(f)
646	defer r.Close()
647
648	// Try to decode two entries.
649	for r.Next() {
650		r.Read()
651	}
652}
653
654func TestWriteWALSegment_UnmarshalBinary_WriteWALCorrupt(t *testing.T) {
655	p1 := tsm1.NewValue(1, 1.1)
656	p2 := tsm1.NewValue(1, int64(1))
657	p3 := tsm1.NewValue(1, true)
658	p4 := tsm1.NewValue(1, "string")
659	p5 := tsm1.NewValue(1, uint64(1))
660
661	values := map[string][]tsm1.Value{
662		"cpu,host=A#!~#float":    []tsm1.Value{p1, p1},
663		"cpu,host=A#!~#int":      []tsm1.Value{p2, p2},
664		"cpu,host=A#!~#bool":     []tsm1.Value{p3, p3},
665		"cpu,host=A#!~#string":   []tsm1.Value{p4, p4},
666		"cpu,host=A#!~#unsigned": []tsm1.Value{p5, p5},
667	}
668
669	w := &tsm1.WriteWALEntry{
670		Values: values,
671	}
672
673	b, err := w.MarshalBinary()
674	if err != nil {
675		t.Fatalf("unexpected error, got %v", err)
676	}
677
678	// Test every possible truncation of a write WAL entry
679	for i := 0; i < len(b); i++ {
680		// re-allocated to ensure capacity would be exceed if slicing
681		truncated := make([]byte, i)
682		copy(truncated, b[:i])
683		err := w.UnmarshalBinary(truncated)
684		if err != nil && err != tsm1.ErrWALCorrupt {
685			t.Fatalf("unexpected error: %v", err)
686		}
687	}
688}
689
690func TestDeleteWALEntry_UnmarshalBinary(t *testing.T) {
691	examples := []struct {
692		In  []string
693		Out [][]byte
694	}{
695		{
696			In:  []string{""},
697			Out: nil,
698		},
699		{
700			In:  []string{"foo"},
701			Out: [][]byte{[]byte("foo")},
702		},
703		{
704			In:  []string{"foo", "bar"},
705			Out: [][]byte{[]byte("foo"), []byte("bar")},
706		},
707		{
708			In:  []string{"foo", "bar", "z", "abc"},
709			Out: [][]byte{[]byte("foo"), []byte("bar"), []byte("z"), []byte("abc")},
710		},
711		{
712			In:  []string{"foo", "bar", "z", "a"},
713			Out: [][]byte{[]byte("foo"), []byte("bar"), []byte("z"), []byte("a")},
714		},
715	}
716
717	for i, example := range examples {
718		w := &tsm1.DeleteWALEntry{Keys: slices.StringsToBytes(example.In...)}
719		b, err := w.MarshalBinary()
720		if err != nil {
721			t.Fatalf("[example %d] unexpected error, got %v", i, err)
722		}
723
724		out := &tsm1.DeleteWALEntry{}
725		if err := out.UnmarshalBinary(b); err != nil {
726			t.Fatalf("[example %d] %v", i, err)
727		}
728
729		if !reflect.DeepEqual(example.Out, out.Keys) {
730			t.Errorf("[example %d] got %v, expected %v", i, out.Keys, example.Out)
731		}
732	}
733}
734
735func TestWriteWALSegment_UnmarshalBinary_DeleteWALCorrupt(t *testing.T) {
736	w := &tsm1.DeleteWALEntry{
737		Keys: [][]byte{[]byte("foo"), []byte("bar")},
738	}
739
740	b, err := w.MarshalBinary()
741	if err != nil {
742		t.Fatalf("unexpected error, got %v", err)
743	}
744
745	// Test every possible truncation of a write WAL entry
746	for i := 0; i < len(b); i++ {
747		// re-allocated to ensure capacity would be exceed if slicing
748		truncated := make([]byte, i)
749		copy(truncated, b[:i])
750		err := w.UnmarshalBinary(truncated)
751		if err != nil && err != tsm1.ErrWALCorrupt {
752			t.Fatalf("unexpected error: %v", err)
753		}
754	}
755}
756
757func TestWriteWALSegment_UnmarshalBinary_DeleteRangeWALCorrupt(t *testing.T) {
758	w := &tsm1.DeleteRangeWALEntry{
759		Keys: [][]byte{[]byte("foo"), []byte("bar")},
760		Min:  1,
761		Max:  2,
762	}
763
764	b, err := w.MarshalBinary()
765	if err != nil {
766		t.Fatalf("unexpected error, got %v", err)
767	}
768
769	// Test every possible truncation of a write WAL entry
770	for i := 0; i < len(b); i++ {
771		// re-allocated to ensure capacity would be exceed if slicing
772		truncated := make([]byte, i)
773		copy(truncated, b[:i])
774		err := w.UnmarshalBinary(truncated)
775		if err != nil && err != tsm1.ErrWALCorrupt {
776			t.Fatalf("unexpected error: %v", err)
777		}
778	}
779}
780
781func BenchmarkWALSegmentWriter(b *testing.B) {
782	points := map[string][]tsm1.Value{}
783	for i := 0; i < 5000; i++ {
784		k := "cpu,host=A#!~#value"
785		points[k] = append(points[k], tsm1.NewValue(int64(i), 1.1))
786	}
787
788	dir := MustTempDir()
789	defer os.RemoveAll(dir)
790
791	f := MustTempFile(dir)
792	w := tsm1.NewWALSegmentWriter(f)
793
794	write := &tsm1.WriteWALEntry{
795		Values: points,
796	}
797
798	b.ResetTimer()
799	for i := 0; i < b.N; i++ {
800		if err := w.Write(mustMarshalEntry(write)); err != nil {
801			b.Fatalf("unexpected error writing entry: %v", err)
802		}
803	}
804}
805
806func BenchmarkWALSegmentReader(b *testing.B) {
807	points := map[string][]tsm1.Value{}
808	for i := 0; i < 5000; i++ {
809		k := "cpu,host=A#!~#value"
810		points[k] = append(points[k], tsm1.NewValue(int64(i), 1.1))
811	}
812
813	dir := MustTempDir()
814	defer os.RemoveAll(dir)
815
816	f := MustTempFile(dir)
817	w := tsm1.NewWALSegmentWriter(f)
818
819	write := &tsm1.WriteWALEntry{
820		Values: points,
821	}
822
823	for i := 0; i < 100; i++ {
824		if err := w.Write(mustMarshalEntry(write)); err != nil {
825			b.Fatalf("unexpected error writing entry: %v", err)
826		}
827	}
828
829	r := tsm1.NewWALSegmentReader(f)
830	b.ResetTimer()
831
832	for i := 0; i < b.N; i++ {
833		b.StopTimer()
834		f.Seek(0, io.SeekStart)
835		b.StartTimer()
836
837		for r.Next() {
838			_, err := r.Read()
839			if err != nil {
840				b.Fatalf("unexpected error reading entry: %v", err)
841			}
842		}
843	}
844}
845
846// MustReadFileSize returns the size of the file, or panics.
847func MustReadFileSize(f *os.File) int64 {
848	stat, err := os.Stat(f.Name())
849	if err != nil {
850		panic(fmt.Sprintf("failed to get size of file at %s: %s", f.Name(), err.Error()))
851	}
852	return stat.Size()
853}
854
855func mustMarshalEntry(entry tsm1.WALEntry) (tsm1.WalEntryType, []byte) {
856	bytes := make([]byte, 1024<<2)
857
858	b, err := entry.Encode(bytes)
859	if err != nil {
860		panic(fmt.Sprintf("error encoding: %v", err))
861	}
862
863	return entry.Type(), snappy.Encode(b, b)
864}
865