1package tsm1_test
2
3import (
4	"fmt"
5	"math/rand"
6	"reflect"
7	"testing"
8	"time"
9
10	"github.com/davecgh/go-spew/spew"
11	"github.com/influxdata/influxdb/tsdb/engine/tsm1"
12)
13
14func TestEncoding_FloatBlock(t *testing.T) {
15	valueCount := 1000
16	times := getTimes(valueCount, 60, time.Second)
17	values := make([]tsm1.Value, len(times))
18	for i, t := range times {
19		values[i] = tsm1.NewValue(t, float64(i))
20	}
21
22	b, err := tsm1.Values(values).Encode(nil)
23	if err != nil {
24		t.Fatalf("unexpected error: %v", err)
25	}
26
27	var decodedValues []tsm1.Value
28	decodedValues, err = tsm1.DecodeBlock(b, decodedValues)
29	if err != nil {
30		t.Fatalf("unexpected error decoding block: %v", err)
31	}
32
33	if !reflect.DeepEqual(decodedValues, values) {
34		t.Fatalf("unexpected results:\n\tgot: %s\n\texp: %s\n", spew.Sdump(decodedValues), spew.Sdump(values))
35	}
36}
37
38func TestEncoding_FloatBlock_ZeroTime(t *testing.T) {
39	values := make([]tsm1.Value, 3)
40	for i := 0; i < 3; i++ {
41		values[i] = tsm1.NewValue(0, float64(i))
42	}
43
44	b, err := tsm1.Values(values).Encode(nil)
45	if err != nil {
46		t.Fatalf("unexpected error: %v", err)
47	}
48
49	var decodedValues []tsm1.Value
50	decodedValues, err = tsm1.DecodeBlock(b, decodedValues)
51	if err != nil {
52		t.Fatalf("unexpected error decoding block: %v", err)
53	}
54
55	if !reflect.DeepEqual(decodedValues, values) {
56		t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
57	}
58}
59
60func TestEncoding_FloatBlock_SimilarFloats(t *testing.T) {
61	values := make([]tsm1.Value, 5)
62	values[0] = tsm1.NewValue(1444238178437870000, 6.00065e+06)
63	values[1] = tsm1.NewValue(1444238185286830000, 6.000656e+06)
64	values[2] = tsm1.NewValue(1444238188441501000, 6.000657e+06)
65	values[3] = tsm1.NewValue(1444238195286811000, 6.000659e+06)
66	values[4] = tsm1.NewValue(1444238198439917000, 6.000661e+06)
67
68	b, err := tsm1.Values(values).Encode(nil)
69	if err != nil {
70		t.Fatalf("unexpected error: %v", err)
71	}
72
73	var decodedValues []tsm1.Value
74	decodedValues, err = tsm1.DecodeBlock(b, decodedValues)
75	if err != nil {
76		t.Fatalf("unexpected error decoding block: %v", err)
77	}
78
79	if !reflect.DeepEqual(decodedValues, values) {
80		t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
81	}
82}
83
84func TestEncoding_IntBlock_Basic(t *testing.T) {
85	valueCount := 1000
86	times := getTimes(valueCount, 60, time.Second)
87	values := make([]tsm1.Value, len(times))
88	for i, t := range times {
89		values[i] = tsm1.NewValue(t, int64(i))
90	}
91
92	b, err := tsm1.Values(values).Encode(nil)
93	if err != nil {
94		t.Fatalf("unexpected error: %v", err)
95	}
96
97	var decodedValues []tsm1.Value
98	decodedValues, err = tsm1.DecodeBlock(b, decodedValues)
99	if err != nil {
100		t.Fatalf("unexpected error decoding block: %v", err)
101	}
102
103	if len(decodedValues) != len(values) {
104		t.Fatalf("unexpected results length:\n\tgot: %v\n\texp: %v\n", len(decodedValues), len(values))
105	}
106
107	for i := 0; i < len(decodedValues); i++ {
108		if decodedValues[i].UnixNano() != values[i].UnixNano() {
109			t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues[i].UnixNano(), values[i].UnixNano())
110		}
111
112		if decodedValues[i].Value() != values[i].Value() {
113			t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues[i].Value(), values[i].Value())
114		}
115	}
116}
117
118func TestEncoding_IntBlock_Negatives(t *testing.T) {
119	valueCount := 1000
120	times := getTimes(valueCount, 60, time.Second)
121	values := make([]tsm1.Value, len(times))
122	for i, t := range times {
123		v := int64(i)
124		if i%2 == 0 {
125			v = -v
126		}
127		values[i] = tsm1.NewValue(t, int64(v))
128	}
129
130	b, err := tsm1.Values(values).Encode(nil)
131	if err != nil {
132		t.Fatalf("unexpected error: %v", err)
133	}
134
135	var decodedValues []tsm1.Value
136	decodedValues, err = tsm1.DecodeBlock(b, decodedValues)
137	if err != nil {
138		t.Fatalf("unexpected error decoding block: %v", err)
139	}
140
141	if !reflect.DeepEqual(decodedValues, values) {
142		t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
143	}
144}
145
146func TestEncoding_UIntBlock_Basic(t *testing.T) {
147	valueCount := 1000
148	times := getTimes(valueCount, 60, time.Second)
149	values := make([]tsm1.Value, len(times))
150	for i, t := range times {
151		values[i] = tsm1.NewValue(t, uint64(i))
152	}
153
154	b, err := tsm1.Values(values).Encode(nil)
155	if err != nil {
156		t.Fatalf("unexpected error: %v", err)
157	}
158
159	var decodedValues []tsm1.Value
160	decodedValues, err = tsm1.DecodeBlock(b, decodedValues)
161	if err != nil {
162		t.Fatalf("unexpected error decoding block: %v", err)
163	}
164
165	if len(decodedValues) != len(values) {
166		t.Fatalf("unexpected results length:\n\tgot: %v\n\texp: %v\n", len(decodedValues), len(values))
167	}
168
169	for i := 0; i < len(decodedValues); i++ {
170		if decodedValues[i].UnixNano() != values[i].UnixNano() {
171			t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues[i].UnixNano(), values[i].UnixNano())
172		}
173
174		if decodedValues[i].Value() != values[i].Value() {
175			t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues[i].Value(), values[i].Value())
176		}
177	}
178}
179
180// TestEncoding_UIntBlock_MaxValues encodes uint64 numbers starting at max (18446744073709551615)
181// down to 18446744073709550616
182func TestEncoding_UIntBlock_MaxValues(t *testing.T) {
183	valueCount := 1000
184	times := getTimes(valueCount, 60, time.Second)
185	values := make([]tsm1.Value, len(times))
186	for i, t := range times {
187		values[i] = tsm1.NewValue(t, ^uint64(i))
188	}
189
190	b, err := tsm1.Values(values).Encode(nil)
191	if err != nil {
192		t.Fatalf("unexpected error: %v", err)
193	}
194
195	var decodedValues []tsm1.Value
196	decodedValues, err = tsm1.DecodeBlock(b, decodedValues)
197	if err != nil {
198		t.Fatalf("unexpected error decoding block: %v", err)
199	}
200
201	if !reflect.DeepEqual(decodedValues, values) {
202		t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
203	}
204}
205
206func TestEncoding_BooleanBlock_Basic(t *testing.T) {
207	valueCount := 1000
208	times := getTimes(valueCount, 60, time.Second)
209	values := make([]tsm1.Value, len(times))
210	for i, t := range times {
211		v := true
212		if i%2 == 0 {
213			v = false
214		}
215		values[i] = tsm1.NewValue(t, v)
216	}
217
218	b, err := tsm1.Values(values).Encode(nil)
219	if err != nil {
220		t.Fatalf("unexpected error: %v", err)
221	}
222
223	var decodedValues []tsm1.Value
224	decodedValues, err = tsm1.DecodeBlock(b, decodedValues)
225	if err != nil {
226		t.Fatalf("unexpected error decoding block: %v", err)
227	}
228
229	if !reflect.DeepEqual(decodedValues, values) {
230		t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
231	}
232}
233
234func TestEncoding_StringBlock_Basic(t *testing.T) {
235	valueCount := 1000
236	times := getTimes(valueCount, 60, time.Second)
237	values := make([]tsm1.Value, len(times))
238	for i, t := range times {
239		values[i] = tsm1.NewValue(t, fmt.Sprintf("value %d", i))
240	}
241
242	b, err := tsm1.Values(values).Encode(nil)
243	if err != nil {
244		t.Fatalf("unexpected error: %v", err)
245	}
246
247	var decodedValues []tsm1.Value
248	decodedValues, err = tsm1.DecodeBlock(b, decodedValues)
249	if err != nil {
250		t.Fatalf("unexpected error decoding block: %v", err)
251	}
252
253	if !reflect.DeepEqual(decodedValues, values) {
254		t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
255	}
256}
257
258func TestEncoding_BlockType(t *testing.T) {
259	tests := []struct {
260		value     interface{}
261		blockType byte
262	}{
263		{value: float64(1.0), blockType: tsm1.BlockFloat64},
264		{value: int64(1), blockType: tsm1.BlockInteger},
265		{value: uint64(1), blockType: tsm1.BlockUnsigned},
266		{value: true, blockType: tsm1.BlockBoolean},
267		{value: "string", blockType: tsm1.BlockString},
268	}
269
270	for _, test := range tests {
271		var values []tsm1.Value
272		values = append(values, tsm1.NewValue(0, test.value))
273
274		b, err := tsm1.Values(values).Encode(nil)
275		if err != nil {
276			t.Fatalf("unexpected error: %v", err)
277		}
278
279		bt, err := tsm1.BlockType(b)
280		if err != nil {
281			t.Fatalf("unexpected error decoding block type: %v", err)
282		}
283
284		if got, exp := bt, test.blockType; got != exp {
285			t.Fatalf("block type mismatch: got %v, exp %v", got, exp)
286		}
287	}
288
289	_, err := tsm1.BlockType([]byte{10})
290	if err == nil {
291		t.Fatalf("expected error decoding block type, got nil")
292	}
293}
294
295func TestEncoding_Count(t *testing.T) {
296	tests := []struct {
297		value     interface{}
298		blockType byte
299	}{
300		{value: float64(1.0), blockType: tsm1.BlockFloat64},
301		{value: int64(1), blockType: tsm1.BlockInteger},
302		{value: uint64(1), blockType: tsm1.BlockUnsigned},
303		{value: true, blockType: tsm1.BlockBoolean},
304		{value: "string", blockType: tsm1.BlockString},
305	}
306
307	for _, test := range tests {
308		var values []tsm1.Value
309		values = append(values, tsm1.NewValue(0, test.value))
310
311		b, err := tsm1.Values(values).Encode(nil)
312		if err != nil {
313			t.Fatalf("unexpected error: %v", err)
314		}
315
316		cnt, err := tsm1.BlockCount(b)
317		if err != nil {
318			t.Fatalf("Block is corrupted: %v", err)
319		}
320		if got, exp := cnt, 1; got != exp {
321			t.Fatalf("block count mismatch: got %v, exp %v", got, exp)
322		}
323	}
324}
325
326func TestValues_MergeFloat(t *testing.T) {
327	tests := []struct {
328		a, b, exp []tsm1.Value
329	}{
330
331		{ // empty a
332			a: []tsm1.Value{},
333
334			b: []tsm1.Value{
335				tsm1.NewValue(1, 1.2),
336				tsm1.NewValue(2, 2.2),
337			},
338			exp: []tsm1.Value{
339				tsm1.NewValue(1, 1.2),
340				tsm1.NewValue(2, 2.2),
341			},
342		},
343		{ // empty b
344			a: []tsm1.Value{
345				tsm1.NewValue(1, 1.1),
346				tsm1.NewValue(2, 2.1),
347			},
348
349			b: []tsm1.Value{},
350			exp: []tsm1.Value{
351				tsm1.NewValue(1, 1.1),
352				tsm1.NewValue(2, 2.1),
353			},
354		},
355		{
356			a: []tsm1.Value{
357				tsm1.NewValue(0, 0.0),
358				tsm1.NewValue(1, 1.1),
359				tsm1.NewValue(2, 2.1),
360			},
361			b: []tsm1.Value{
362				tsm1.NewValue(2, 2.2),
363				tsm1.NewValue(2, 2.2), // duplicate data
364			},
365			exp: []tsm1.Value{
366				tsm1.NewValue(0, 0.0),
367				tsm1.NewValue(1, 1.1),
368				tsm1.NewValue(2, 2.2),
369			},
370		},
371		{
372			a: []tsm1.Value{
373				tsm1.NewValue(0, 0.0),
374				tsm1.NewValue(1, 1.1),
375				tsm1.NewValue(1, 1.1), // duplicate data
376				tsm1.NewValue(2, 2.1),
377			},
378			b: []tsm1.Value{
379				tsm1.NewValue(2, 2.2),
380				tsm1.NewValue(2, 2.2), // duplicate data
381			},
382			exp: []tsm1.Value{
383				tsm1.NewValue(0, 0.0),
384				tsm1.NewValue(1, 1.1),
385				tsm1.NewValue(2, 2.2),
386			},
387		},
388
389		{
390			a: []tsm1.Value{
391				tsm1.NewValue(1, 1.1),
392			},
393			b: []tsm1.Value{
394				tsm1.NewValue(0, 0.0),
395				tsm1.NewValue(1, 1.2), // overwrites a
396				tsm1.NewValue(2, 2.2),
397				tsm1.NewValue(3, 3.2),
398				tsm1.NewValue(4, 4.2),
399			},
400			exp: []tsm1.Value{
401				tsm1.NewValue(0, 0.0),
402				tsm1.NewValue(1, 1.2),
403				tsm1.NewValue(2, 2.2),
404				tsm1.NewValue(3, 3.2),
405				tsm1.NewValue(4, 4.2),
406			},
407		},
408		{
409			a: []tsm1.Value{
410				tsm1.NewValue(1, 1.1),
411				tsm1.NewValue(2, 2.1),
412				tsm1.NewValue(3, 3.1),
413				tsm1.NewValue(4, 4.1),
414			},
415
416			b: []tsm1.Value{
417				tsm1.NewValue(1, 1.2), // overwrites a
418				tsm1.NewValue(2, 2.2), // overwrites a
419			},
420			exp: []tsm1.Value{
421				tsm1.NewValue(1, 1.2),
422				tsm1.NewValue(2, 2.2),
423				tsm1.NewValue(3, 3.1),
424				tsm1.NewValue(4, 4.1),
425			},
426		},
427		{
428			a: []tsm1.Value{
429				tsm1.NewValue(1, 1.1),
430				tsm1.NewValue(2, 2.1),
431				tsm1.NewValue(3, 3.1),
432				tsm1.NewValue(4, 4.1),
433			},
434
435			b: []tsm1.Value{
436				tsm1.NewValue(1, 1.2), // overwrites a
437				tsm1.NewValue(2, 2.2), // overwrites a
438				tsm1.NewValue(3, 3.2),
439				tsm1.NewValue(4, 4.2),
440			},
441			exp: []tsm1.Value{
442				tsm1.NewValue(1, 1.2),
443				tsm1.NewValue(2, 2.2),
444				tsm1.NewValue(3, 3.2),
445				tsm1.NewValue(4, 4.2),
446			},
447		},
448		{
449			a: []tsm1.Value{
450				tsm1.NewValue(0, 0.0),
451				tsm1.NewValue(1, 1.1),
452				tsm1.NewValue(2, 2.1),
453				tsm1.NewValue(3, 3.1),
454				tsm1.NewValue(4, 4.1),
455			},
456			b: []tsm1.Value{
457				tsm1.NewValue(0, 0.0),
458				tsm1.NewValue(2, 2.2),
459				tsm1.NewValue(4, 4.2),
460			},
461			exp: []tsm1.Value{
462				tsm1.NewValue(0, 0.0),
463				tsm1.NewValue(1, 1.1),
464				tsm1.NewValue(2, 2.2),
465				tsm1.NewValue(3, 3.1),
466				tsm1.NewValue(4, 4.2),
467			},
468		},
469
470		{
471			a: []tsm1.Value{
472				tsm1.NewValue(1462498658242869207, 0.0),
473				tsm1.NewValue(1462498658288956853, 1.1),
474			},
475			b: []tsm1.Value{
476				tsm1.NewValue(1462498658242870810, 0.0),
477				tsm1.NewValue(1462498658262911238, 2.2),
478				tsm1.NewValue(1462498658282415038, 4.2),
479				tsm1.NewValue(1462498658282417760, 4.2),
480			},
481			exp: []tsm1.Value{
482				tsm1.NewValue(1462498658242869207, 0.0),
483				tsm1.NewValue(1462498658242870810, 0.0),
484				tsm1.NewValue(1462498658262911238, 2.2),
485				tsm1.NewValue(1462498658282415038, 4.2),
486				tsm1.NewValue(1462498658282417760, 4.2),
487				tsm1.NewValue(1462498658288956853, 1.1),
488			},
489		},
490		{
491			a: []tsm1.Value{
492				tsm1.NewValue(4, 4.0),
493				tsm1.NewValue(5, 5.0),
494				tsm1.NewValue(6, 6.0),
495			},
496			b: []tsm1.Value{
497				tsm1.NewValue(1, 1.0),
498				tsm1.NewValue(2, 2.0),
499				tsm1.NewValue(3, 3.0),
500			},
501			exp: []tsm1.Value{
502				tsm1.NewValue(1, 1.0),
503				tsm1.NewValue(2, 2.0),
504				tsm1.NewValue(3, 3.0),
505				tsm1.NewValue(4, 4.0),
506				tsm1.NewValue(5, 5.0),
507				tsm1.NewValue(6, 6.0),
508			},
509		},
510		{
511			a: []tsm1.Value{
512				tsm1.NewValue(5, 5.0),
513				tsm1.NewValue(6, 6.0),
514			},
515			b: []tsm1.Value{
516				tsm1.NewValue(1, 1.0),
517				tsm1.NewValue(2, 2.0),
518				tsm1.NewValue(3, 3.0),
519				tsm1.NewValue(4, 4.0),
520				tsm1.NewValue(7, 7.0),
521				tsm1.NewValue(8, 8.0),
522			},
523			exp: []tsm1.Value{
524				tsm1.NewValue(1, 1.0),
525				tsm1.NewValue(2, 2.0),
526				tsm1.NewValue(3, 3.0),
527				tsm1.NewValue(4, 4.0),
528				tsm1.NewValue(5, 5.0),
529				tsm1.NewValue(6, 6.0),
530				tsm1.NewValue(7, 7.0),
531				tsm1.NewValue(8, 8.0),
532			},
533		},
534		{
535			a: []tsm1.Value{
536				tsm1.NewValue(1, 1.0),
537				tsm1.NewValue(2, 2.0),
538				tsm1.NewValue(3, 3.0),
539			},
540			b: []tsm1.Value{
541				tsm1.NewValue(4, 4.0),
542				tsm1.NewValue(5, 5.0),
543				tsm1.NewValue(6, 6.0),
544			},
545			exp: []tsm1.Value{
546				tsm1.NewValue(1, 1.0),
547				tsm1.NewValue(2, 2.0),
548				tsm1.NewValue(3, 3.0),
549				tsm1.NewValue(4, 4.0),
550				tsm1.NewValue(5, 5.0),
551				tsm1.NewValue(6, 6.0),
552			},
553		},
554	}
555
556	for i, test := range tests {
557		got := tsm1.Values(test.a).Merge(test.b)
558		if exp, got := len(test.exp), len(got); exp != got {
559			t.Fatalf("test(%d): value length mismatch: exp %v, got %v", i, exp, got)
560		}
561
562		dedup := tsm1.Values(append(test.a, test.b...)).Deduplicate()
563
564		for i := range test.exp {
565			if exp, got := test.exp[i].String(), got[i].String(); exp != got {
566				t.Fatalf("value mismatch:\n exp %v\n got %v", exp, got)
567			}
568
569			if exp, got := test.exp[i].String(), dedup[i].String(); exp != got {
570				t.Fatalf("value mismatch:\n exp %v\n got %v", exp, got)
571			}
572		}
573	}
574}
575
576func TestIntegerValues_Merge(t *testing.T) {
577	integerValue := func(t int64, f int64) tsm1.IntegerValue {
578		return tsm1.NewValue(t, f).(tsm1.IntegerValue)
579	}
580
581	tests := []struct {
582		a, b, exp []tsm1.IntegerValue
583	}{
584
585		{ // empty a
586			a: []tsm1.IntegerValue{},
587
588			b: []tsm1.IntegerValue{
589				integerValue(1, 10),
590				integerValue(2, 20),
591			},
592			exp: []tsm1.IntegerValue{
593				integerValue(1, 10),
594				integerValue(2, 20),
595			},
596		},
597		{ // empty b
598			a: []tsm1.IntegerValue{
599				integerValue(1, 1),
600				integerValue(2, 2),
601			},
602
603			b: []tsm1.IntegerValue{},
604			exp: []tsm1.IntegerValue{
605				integerValue(1, 1),
606				integerValue(2, 2),
607			},
608		},
609		{
610			a: []tsm1.IntegerValue{
611				integerValue(1, 1),
612			},
613			b: []tsm1.IntegerValue{
614				integerValue(0, 0),
615				integerValue(1, 10), // overwrites a
616				integerValue(2, 20),
617				integerValue(3, 30),
618				integerValue(4, 40),
619			},
620			exp: []tsm1.IntegerValue{
621				integerValue(0, 0),
622				integerValue(1, 10),
623				integerValue(2, 20),
624				integerValue(3, 30),
625				integerValue(4, 40),
626			},
627		},
628		{
629			a: []tsm1.IntegerValue{
630				integerValue(1, 1),
631				integerValue(2, 2),
632				integerValue(3, 3),
633				integerValue(4, 4),
634			},
635
636			b: []tsm1.IntegerValue{
637				integerValue(1, 10), // overwrites a
638				integerValue(2, 20), // overwrites a
639			},
640			exp: []tsm1.IntegerValue{
641				integerValue(1, 10),
642				integerValue(2, 20),
643				integerValue(3, 3),
644				integerValue(4, 4),
645			},
646		},
647		{
648			a: []tsm1.IntegerValue{
649				integerValue(1, 1),
650				integerValue(2, 2),
651				integerValue(3, 3),
652				integerValue(4, 4),
653			},
654
655			b: []tsm1.IntegerValue{
656				integerValue(1, 10), // overwrites a
657				integerValue(2, 20), // overwrites a
658				integerValue(3, 30),
659				integerValue(4, 40),
660			},
661			exp: []tsm1.IntegerValue{
662				integerValue(1, 10),
663				integerValue(2, 20),
664				integerValue(3, 30),
665				integerValue(4, 40),
666			},
667		},
668		{
669			a: []tsm1.IntegerValue{
670				integerValue(0, 0),
671				integerValue(1, 1),
672				integerValue(2, 2),
673				integerValue(3, 3),
674				integerValue(4, 4),
675			},
676			b: []tsm1.IntegerValue{
677				integerValue(0, 0),
678				integerValue(2, 20),
679				integerValue(4, 40),
680			},
681			exp: []tsm1.IntegerValue{
682				integerValue(0, 0.0),
683				integerValue(1, 1),
684				integerValue(2, 20),
685				integerValue(3, 3),
686				integerValue(4, 40),
687			},
688		},
689	}
690
691	for i, test := range tests {
692		if i != 2 {
693			continue
694		}
695
696		got := tsm1.IntegerValues(test.a).Merge(test.b)
697		if exp, got := len(test.exp), len(got); exp != got {
698			t.Fatalf("test(%d): value length mismatch: exp %v, got %v", i, exp, got)
699		}
700
701		dedup := tsm1.IntegerValues(append(test.a, test.b...)).Deduplicate()
702
703		for i := range test.exp {
704			if exp, got := test.exp[i].String(), got[i].String(); exp != got {
705				t.Fatalf("value mismatch:\n exp %v\n got %v", exp, got)
706			}
707
708			if exp, got := test.exp[i].String(), dedup[i].String(); exp != got {
709				t.Fatalf("value mismatch:\n exp %v\n got %v", exp, got)
710			}
711		}
712	}
713}
714
715func TestUnsignedValues_Merge(t *testing.T) {
716	uintValue := func(t int64, f uint64) tsm1.UnsignedValue {
717		return tsm1.NewValue(t, f).(tsm1.UnsignedValue)
718	}
719
720	tests := []struct {
721		a, b, exp []tsm1.UnsignedValue
722	}{
723
724		{ // empty a
725			a: []tsm1.UnsignedValue{},
726
727			b: []tsm1.UnsignedValue{
728				uintValue(1, 10),
729				uintValue(2, 20),
730			},
731			exp: []tsm1.UnsignedValue{
732				uintValue(1, 10),
733				uintValue(2, 20),
734			},
735		},
736		{ // empty b
737			a: []tsm1.UnsignedValue{
738				uintValue(1, 1),
739				uintValue(2, 2),
740			},
741
742			b: []tsm1.UnsignedValue{},
743			exp: []tsm1.UnsignedValue{
744				uintValue(1, 1),
745				uintValue(2, 2),
746			},
747		},
748		{
749			a: []tsm1.UnsignedValue{
750				uintValue(1, 1),
751			},
752			b: []tsm1.UnsignedValue{
753				uintValue(0, 0),
754				uintValue(1, 10), // overwrites a
755				uintValue(2, 20),
756				uintValue(3, 30),
757				uintValue(4, 40),
758			},
759			exp: []tsm1.UnsignedValue{
760				uintValue(0, 0),
761				uintValue(1, 10),
762				uintValue(2, 20),
763				uintValue(3, 30),
764				uintValue(4, 40),
765			},
766		},
767		{
768			a: []tsm1.UnsignedValue{
769				uintValue(1, 1),
770				uintValue(2, 2),
771				uintValue(3, 3),
772				uintValue(4, 4),
773			},
774
775			b: []tsm1.UnsignedValue{
776				uintValue(1, ^uint64(0)), // overwrites a
777				uintValue(2, 20),         // overwrites a
778			},
779			exp: []tsm1.UnsignedValue{
780				uintValue(1, ^uint64(0)),
781				uintValue(2, 20),
782				uintValue(3, 3),
783				uintValue(4, 4),
784			},
785		},
786		{
787			a: []tsm1.UnsignedValue{
788				uintValue(1, 1),
789				uintValue(2, 2),
790				uintValue(3, 3),
791				uintValue(4, 4),
792			},
793
794			b: []tsm1.UnsignedValue{
795				uintValue(1, 10), // overwrites a
796				uintValue(2, 20), // overwrites a
797				uintValue(3, 30),
798				uintValue(4, 40),
799			},
800			exp: []tsm1.UnsignedValue{
801				uintValue(1, 10),
802				uintValue(2, 20),
803				uintValue(3, 30),
804				uintValue(4, 40),
805			},
806		},
807		{
808			a: []tsm1.UnsignedValue{
809				uintValue(0, 0),
810				uintValue(1, 1),
811				uintValue(2, 2),
812				uintValue(3, 3),
813				uintValue(4, 4),
814			},
815			b: []tsm1.UnsignedValue{
816				uintValue(0, 0),
817				uintValue(2, 20),
818				uintValue(4, 40),
819			},
820			exp: []tsm1.UnsignedValue{
821				uintValue(0, 0.0),
822				uintValue(1, 1),
823				uintValue(2, 20),
824				uintValue(3, 3),
825				uintValue(4, 40),
826			},
827		},
828	}
829
830	for i, test := range tests {
831		if i != 2 {
832			continue
833		}
834
835		got := tsm1.UnsignedValues(test.a).Merge(test.b)
836		if exp, got := len(test.exp), len(got); exp != got {
837			t.Fatalf("test(%d): value length mismatch: exp %v, got %v", i, exp, got)
838		}
839
840		dedup := tsm1.UnsignedValues(append(test.a, test.b...)).Deduplicate()
841
842		for i := range test.exp {
843			if exp, got := test.exp[i].String(), got[i].String(); exp != got {
844				t.Fatalf("value mismatch:\n exp %v\n got %v", exp, got)
845			}
846
847			if exp, got := test.exp[i].String(), dedup[i].String(); exp != got {
848				t.Fatalf("value mismatch:\n exp %v\n got %v", exp, got)
849			}
850		}
851	}
852}
853
854func TestFloatValues_Merge(t *testing.T) {
855	floatValue := func(t int64, f float64) tsm1.FloatValue {
856		return tsm1.NewValue(t, f).(tsm1.FloatValue)
857	}
858
859	tests := []struct {
860		a, b, exp []tsm1.FloatValue
861	}{
862
863		{ // empty a
864			a: []tsm1.FloatValue{},
865
866			b: []tsm1.FloatValue{
867				floatValue(1, 1.2),
868				floatValue(2, 2.2),
869			},
870			exp: []tsm1.FloatValue{
871				floatValue(1, 1.2),
872				floatValue(2, 2.2),
873			},
874		},
875		{ // empty b
876			a: []tsm1.FloatValue{
877				floatValue(1, 1.1),
878				floatValue(2, 2.1),
879			},
880
881			b: []tsm1.FloatValue{},
882			exp: []tsm1.FloatValue{
883				floatValue(1, 1.1),
884				floatValue(2, 2.1),
885			},
886		},
887		{
888			a: []tsm1.FloatValue{
889				floatValue(1, 1.1),
890			},
891			b: []tsm1.FloatValue{
892				floatValue(0, 0.0),
893				floatValue(1, 1.2), // overwrites a
894				floatValue(2, 2.2),
895				floatValue(3, 3.2),
896				floatValue(4, 4.2),
897			},
898			exp: []tsm1.FloatValue{
899				floatValue(0, 0.0),
900				floatValue(1, 1.2),
901				floatValue(2, 2.2),
902				floatValue(3, 3.2),
903				floatValue(4, 4.2),
904			},
905		},
906		{
907			a: []tsm1.FloatValue{
908				floatValue(1, 1.1),
909				floatValue(2, 2.1),
910				floatValue(3, 3.1),
911				floatValue(4, 4.1),
912			},
913
914			b: []tsm1.FloatValue{
915				floatValue(1, 1.2), // overwrites a
916				floatValue(2, 2.2), // overwrites a
917			},
918			exp: []tsm1.FloatValue{
919				floatValue(1, 1.2),
920				floatValue(2, 2.2),
921				floatValue(3, 3.1),
922				floatValue(4, 4.1),
923			},
924		},
925		{
926			a: []tsm1.FloatValue{
927				floatValue(1, 1.1),
928				floatValue(2, 2.1),
929				floatValue(3, 3.1),
930				floatValue(4, 4.1),
931			},
932
933			b: []tsm1.FloatValue{
934				floatValue(1, 1.2), // overwrites a
935				floatValue(2, 2.2), // overwrites a
936				floatValue(3, 3.2),
937				floatValue(4, 4.2),
938			},
939			exp: []tsm1.FloatValue{
940				floatValue(1, 1.2),
941				floatValue(2, 2.2),
942				floatValue(3, 3.2),
943				floatValue(4, 4.2),
944			},
945		},
946		{
947			a: []tsm1.FloatValue{
948				floatValue(0, 0.0),
949				floatValue(1, 1.1),
950				floatValue(2, 2.1),
951				floatValue(3, 3.1),
952				floatValue(4, 4.1),
953			},
954			b: []tsm1.FloatValue{
955				floatValue(0, 0.0),
956				floatValue(2, 2.2),
957				floatValue(4, 4.2),
958			},
959			exp: []tsm1.FloatValue{
960				floatValue(0, 0.0),
961				floatValue(1, 1.1),
962				floatValue(2, 2.2),
963				floatValue(3, 3.1),
964				floatValue(4, 4.2),
965			},
966		},
967	}
968
969	for i, test := range tests {
970		got := tsm1.FloatValues(test.a).Merge(test.b)
971		if exp, got := len(test.exp), len(got); exp != got {
972			t.Fatalf("test(%d): value length mismatch: exp %v, got %v", i, exp, got)
973		}
974
975		dedup := tsm1.FloatValues(append(test.a, test.b...)).Deduplicate()
976
977		for i := range test.exp {
978			if exp, got := test.exp[i].String(), got[i].String(); exp != got {
979				t.Fatalf("value mismatch:\n exp %v\n got %v", exp, got)
980			}
981
982			if exp, got := test.exp[i].String(), dedup[i].String(); exp != got {
983				t.Fatalf("value mismatch:\n exp %v\n got %v", exp, got)
984			}
985		}
986	}
987}
988
989func TestBooleanValues_Merge(t *testing.T) {
990	booleanValue := func(t int64, f bool) tsm1.BooleanValue {
991		return tsm1.NewValue(t, f).(tsm1.BooleanValue)
992	}
993
994	tests := []struct {
995		a, b, exp []tsm1.BooleanValue
996	}{
997
998		{ // empty a
999			a: []tsm1.BooleanValue{},
1000
1001			b: []tsm1.BooleanValue{
1002				booleanValue(1, true),
1003				booleanValue(2, true),
1004			},
1005			exp: []tsm1.BooleanValue{
1006				booleanValue(1, true),
1007				booleanValue(2, true),
1008			},
1009		},
1010		{ // empty b
1011			a: []tsm1.BooleanValue{
1012				booleanValue(1, true),
1013				booleanValue(2, true),
1014			},
1015
1016			b: []tsm1.BooleanValue{},
1017			exp: []tsm1.BooleanValue{
1018				booleanValue(1, true),
1019				booleanValue(2, true),
1020			},
1021		},
1022		{
1023			a: []tsm1.BooleanValue{
1024				booleanValue(1, true),
1025			},
1026			b: []tsm1.BooleanValue{
1027				booleanValue(0, false),
1028				booleanValue(1, false), // overwrites a
1029				booleanValue(2, false),
1030				booleanValue(3, false),
1031				booleanValue(4, false),
1032			},
1033			exp: []tsm1.BooleanValue{
1034				booleanValue(0, false),
1035				booleanValue(1, false),
1036				booleanValue(2, false),
1037				booleanValue(3, false),
1038				booleanValue(4, false),
1039			},
1040		},
1041		{
1042			a: []tsm1.BooleanValue{
1043				booleanValue(1, true),
1044				booleanValue(2, true),
1045				booleanValue(3, true),
1046				booleanValue(4, true),
1047			},
1048
1049			b: []tsm1.BooleanValue{
1050				booleanValue(1, false), // overwrites a
1051				booleanValue(2, false), // overwrites a
1052			},
1053			exp: []tsm1.BooleanValue{
1054				booleanValue(1, false), // overwrites a
1055				booleanValue(2, false), // overwrites a
1056				booleanValue(3, true),
1057				booleanValue(4, true),
1058			},
1059		},
1060		{
1061			a: []tsm1.BooleanValue{
1062				booleanValue(1, true),
1063				booleanValue(2, true),
1064				booleanValue(3, true),
1065				booleanValue(4, true),
1066			},
1067
1068			b: []tsm1.BooleanValue{
1069				booleanValue(1, false), // overwrites a
1070				booleanValue(2, false), // overwrites a
1071				booleanValue(3, false),
1072				booleanValue(4, false),
1073			},
1074			exp: []tsm1.BooleanValue{
1075				booleanValue(1, false),
1076				booleanValue(2, false),
1077				booleanValue(3, false),
1078				booleanValue(4, false),
1079			},
1080		},
1081		{
1082			a: []tsm1.BooleanValue{
1083				booleanValue(0, true),
1084				booleanValue(1, true),
1085				booleanValue(2, true),
1086				booleanValue(3, true),
1087				booleanValue(4, true),
1088			},
1089			b: []tsm1.BooleanValue{
1090				booleanValue(0, false),
1091				booleanValue(2, false),
1092				booleanValue(4, false),
1093			},
1094			exp: []tsm1.BooleanValue{
1095				booleanValue(0, false),
1096				booleanValue(1, true),
1097				booleanValue(2, false),
1098				booleanValue(3, true),
1099				booleanValue(4, false),
1100			},
1101		},
1102	}
1103
1104	for i, test := range tests {
1105		got := tsm1.BooleanValues(test.a).Merge(test.b)
1106		if exp, got := len(test.exp), len(got); exp != got {
1107			t.Fatalf("test(%d): value length mismatch: exp %v, got %v", i, exp, got)
1108		}
1109
1110		dedup := tsm1.BooleanValues(append(test.a, test.b...)).Deduplicate()
1111
1112		for i := range test.exp {
1113			if exp, got := test.exp[i].String(), got[i].String(); exp != got {
1114				t.Fatalf("value mismatch:\n exp %v\n got %v", exp, got)
1115			}
1116
1117			if exp, got := test.exp[i].String(), dedup[i].String(); exp != got {
1118				t.Fatalf("value mismatch:\n exp %v\n got %v", exp, got)
1119			}
1120		}
1121	}
1122}
1123
1124func TestStringValues_Merge(t *testing.T) {
1125	stringValue := func(t int64, f string) tsm1.StringValue {
1126		return tsm1.NewValue(t, f).(tsm1.StringValue)
1127	}
1128
1129	tests := []struct {
1130		a, b, exp []tsm1.StringValue
1131	}{
1132
1133		{ // empty a
1134			a: []tsm1.StringValue{},
1135
1136			b: []tsm1.StringValue{
1137				stringValue(1, "10"),
1138				stringValue(2, "20"),
1139			},
1140			exp: []tsm1.StringValue{
1141				stringValue(1, "10"),
1142				stringValue(2, "20"),
1143			},
1144		},
1145		{ // empty b
1146			a: []tsm1.StringValue{
1147				stringValue(1, "1"),
1148				stringValue(2, "2"),
1149			},
1150
1151			b: []tsm1.StringValue{},
1152			exp: []tsm1.StringValue{
1153				stringValue(1, "1"),
1154				stringValue(2, "2"),
1155			},
1156		},
1157		{
1158			a: []tsm1.StringValue{
1159				stringValue(1, "1"),
1160			},
1161			b: []tsm1.StringValue{
1162				stringValue(0, "0"),
1163				stringValue(1, "10"), // overwrites a
1164				stringValue(2, "20"),
1165				stringValue(3, "30"),
1166				stringValue(4, "40"),
1167			},
1168			exp: []tsm1.StringValue{
1169				stringValue(0, "0"),
1170				stringValue(1, "10"),
1171				stringValue(2, "20"),
1172				stringValue(3, "30"),
1173				stringValue(4, "40"),
1174			},
1175		},
1176		{
1177			a: []tsm1.StringValue{
1178				stringValue(1, "1"),
1179				stringValue(2, "2"),
1180				stringValue(3, "3"),
1181				stringValue(4, "4"),
1182			},
1183
1184			b: []tsm1.StringValue{
1185				stringValue(1, "10"), // overwrites a
1186				stringValue(2, "20"), // overwrites a
1187			},
1188			exp: []tsm1.StringValue{
1189				stringValue(1, "10"),
1190				stringValue(2, "20"),
1191				stringValue(3, "3"),
1192				stringValue(4, "4"),
1193			},
1194		},
1195		{
1196			a: []tsm1.StringValue{
1197				stringValue(1, "1"),
1198				stringValue(2, "2"),
1199				stringValue(3, "3"),
1200				stringValue(4, "4"),
1201			},
1202
1203			b: []tsm1.StringValue{
1204				stringValue(1, "10"), // overwrites a
1205				stringValue(2, "20"), // overwrites a
1206				stringValue(3, "30"),
1207				stringValue(4, "40"),
1208			},
1209			exp: []tsm1.StringValue{
1210				stringValue(1, "10"),
1211				stringValue(2, "20"),
1212				stringValue(3, "30"),
1213				stringValue(4, "40"),
1214			},
1215		},
1216		{
1217			a: []tsm1.StringValue{
1218				stringValue(0, "0"),
1219				stringValue(1, "1"),
1220				stringValue(2, "2"),
1221				stringValue(3, "3"),
1222				stringValue(4, "4"),
1223			},
1224			b: []tsm1.StringValue{
1225				stringValue(0, "0"),
1226				stringValue(2, "20"),
1227				stringValue(4, "40"),
1228			},
1229			exp: []tsm1.StringValue{
1230				stringValue(0, "0.0"),
1231				stringValue(1, "1"),
1232				stringValue(2, "20"),
1233				stringValue(3, "3"),
1234				stringValue(4, "40"),
1235			},
1236		},
1237	}
1238
1239	for i, test := range tests {
1240		if i != 2 {
1241			continue
1242		}
1243
1244		got := tsm1.StringValues(test.a).Merge(test.b)
1245		if exp, got := len(test.exp), len(got); exp != got {
1246			t.Fatalf("test(%d): value length mismatch: exp %v, got %v", i, exp, got)
1247		}
1248
1249		dedup := tsm1.StringValues(append(test.a, test.b...)).Deduplicate()
1250
1251		for i := range test.exp {
1252			if exp, got := test.exp[i].String(), got[i].String(); exp != got {
1253				t.Fatalf("value mismatch:\n exp %v\n got %v", exp, got)
1254			}
1255
1256			if exp, got := test.exp[i].String(), dedup[i].String(); exp != got {
1257				t.Fatalf("value mismatch:\n exp %v\n got %v", exp, got)
1258			}
1259		}
1260	}
1261}
1262func getTimes(n, step int, precision time.Duration) []int64 {
1263	t := time.Now().Round(precision).UnixNano()
1264	a := make([]int64, n)
1265	for i := 0; i < n; i++ {
1266		a[i] = t + (time.Duration(i*60) * precision).Nanoseconds()
1267	}
1268	return a
1269}
1270
1271func BenchmarkDecodeBlock_Float_Empty(b *testing.B) {
1272	valueCount := 1000
1273	times := getTimes(valueCount, 60, time.Second)
1274	values := make([]tsm1.Value, len(times))
1275	for i, t := range times {
1276		values[i] = tsm1.NewValue(t, float64(i))
1277	}
1278
1279	bytes, err := tsm1.Values(values).Encode(nil)
1280	if err != nil {
1281		b.Fatalf("unexpected error: %v", err)
1282	}
1283
1284	var decodedValues []tsm1.Value
1285	b.ResetTimer()
1286	for i := 0; i < b.N; i++ {
1287		_, err = tsm1.DecodeBlock(bytes, decodedValues)
1288		if err != nil {
1289			b.Fatalf("unexpected error decoding block: %v", err)
1290		}
1291	}
1292}
1293
1294func BenchmarkDecodeBlock_Float_EqualSize(b *testing.B) {
1295	valueCount := 1000
1296	times := getTimes(valueCount, 60, time.Second)
1297	values := make([]tsm1.Value, len(times))
1298	for i, t := range times {
1299		values[i] = tsm1.NewValue(t, float64(i))
1300	}
1301
1302	bytes, err := tsm1.Values(values).Encode(nil)
1303	if err != nil {
1304		b.Fatalf("unexpected error: %v", err)
1305	}
1306
1307	decodedValues := make([]tsm1.Value, len(values))
1308	b.ResetTimer()
1309	for i := 0; i < b.N; i++ {
1310		_, err = tsm1.DecodeBlock(bytes, decodedValues)
1311		if err != nil {
1312			b.Fatalf("unexpected error decoding block: %v", err)
1313		}
1314	}
1315}
1316
1317func BenchmarkDecodeBlock_Float_TypeSpecific(b *testing.B) {
1318	valueCount := 1000
1319	times := getTimes(valueCount, 60, time.Second)
1320	values := make([]tsm1.Value, len(times))
1321	for i, t := range times {
1322		values[i] = tsm1.NewValue(t, float64(i))
1323	}
1324
1325	bytes, err := tsm1.Values(values).Encode(nil)
1326	if err != nil {
1327		b.Fatalf("unexpected error: %v", err)
1328	}
1329
1330	decodedValues := make([]tsm1.FloatValue, len(values))
1331	b.ResetTimer()
1332	for i := 0; i < b.N; i++ {
1333		_, err = tsm1.DecodeFloatBlock(bytes, &decodedValues)
1334		if err != nil {
1335			b.Fatalf("unexpected error decoding block: %v", err)
1336		}
1337	}
1338}
1339
1340func BenchmarkDecodeBlock_Integer_Empty(b *testing.B) {
1341	valueCount := 1000
1342	times := getTimes(valueCount, 60, time.Second)
1343	values := make([]tsm1.Value, len(times))
1344	for i, t := range times {
1345		values[i] = tsm1.NewValue(t, int64(i))
1346	}
1347
1348	bytes, err := tsm1.Values(values).Encode(nil)
1349	if err != nil {
1350		b.Fatalf("unexpected error: %v", err)
1351	}
1352
1353	var decodedValues []tsm1.Value
1354	b.ResetTimer()
1355	for i := 0; i < b.N; i++ {
1356		_, err = tsm1.DecodeBlock(bytes, decodedValues)
1357		if err != nil {
1358			b.Fatalf("unexpected error decoding block: %v", err)
1359		}
1360	}
1361}
1362
1363func BenchmarkDecodeBlock_Integer_EqualSize(b *testing.B) {
1364	valueCount := 1000
1365	times := getTimes(valueCount, 60, time.Second)
1366	values := make([]tsm1.Value, len(times))
1367	for i, t := range times {
1368		values[i] = tsm1.NewValue(t, int64(i))
1369	}
1370
1371	bytes, err := tsm1.Values(values).Encode(nil)
1372	if err != nil {
1373		b.Fatalf("unexpected error: %v", err)
1374	}
1375
1376	decodedValues := make([]tsm1.Value, len(values))
1377	b.ResetTimer()
1378	for i := 0; i < b.N; i++ {
1379		_, err = tsm1.DecodeBlock(bytes, decodedValues)
1380		if err != nil {
1381			b.Fatalf("unexpected error decoding block: %v", err)
1382		}
1383	}
1384}
1385
1386func BenchmarkDecodeBlock_Integer_TypeSpecific(b *testing.B) {
1387	valueCount := 1000
1388	times := getTimes(valueCount, 60, time.Second)
1389	values := make([]tsm1.Value, len(times))
1390	for i, t := range times {
1391		values[i] = tsm1.NewValue(t, int64(i))
1392	}
1393
1394	bytes, err := tsm1.Values(values).Encode(nil)
1395	if err != nil {
1396		b.Fatalf("unexpected error: %v", err)
1397	}
1398
1399	decodedValues := make([]tsm1.IntegerValue, len(values))
1400	b.ResetTimer()
1401	for i := 0; i < b.N; i++ {
1402		_, err = tsm1.DecodeIntegerBlock(bytes, &decodedValues)
1403		if err != nil {
1404			b.Fatalf("unexpected error decoding block: %v", err)
1405		}
1406	}
1407}
1408
1409func BenchmarkDecodeBlock_Boolean_Empty(b *testing.B) {
1410	valueCount := 1000
1411	times := getTimes(valueCount, 60, time.Second)
1412	values := make([]tsm1.Value, len(times))
1413	for i, t := range times {
1414		values[i] = tsm1.NewValue(t, true)
1415	}
1416
1417	bytes, err := tsm1.Values(values).Encode(nil)
1418	if err != nil {
1419		b.Fatalf("unexpected error: %v", err)
1420	}
1421
1422	var decodedValues []tsm1.Value
1423	b.ResetTimer()
1424	for i := 0; i < b.N; i++ {
1425		_, err = tsm1.DecodeBlock(bytes, decodedValues)
1426		if err != nil {
1427			b.Fatalf("unexpected error decoding block: %v", err)
1428		}
1429	}
1430}
1431
1432func BenchmarkDecodeBlock_Boolean_EqualSize(b *testing.B) {
1433	valueCount := 1000
1434	times := getTimes(valueCount, 60, time.Second)
1435	values := make([]tsm1.Value, len(times))
1436	for i, t := range times {
1437		values[i] = tsm1.NewValue(t, true)
1438	}
1439
1440	bytes, err := tsm1.Values(values).Encode(nil)
1441	if err != nil {
1442		b.Fatalf("unexpected error: %v", err)
1443	}
1444
1445	decodedValues := make([]tsm1.Value, len(values))
1446	b.ResetTimer()
1447	for i := 0; i < b.N; i++ {
1448		_, err = tsm1.DecodeBlock(bytes, decodedValues)
1449		if err != nil {
1450			b.Fatalf("unexpected error decoding block: %v", err)
1451		}
1452	}
1453}
1454
1455func BenchmarkDecodeBlock_Boolean_TypeSpecific(b *testing.B) {
1456	valueCount := 1000
1457	times := getTimes(valueCount, 60, time.Second)
1458	values := make([]tsm1.Value, len(times))
1459	for i, t := range times {
1460		values[i] = tsm1.NewValue(t, true)
1461	}
1462
1463	bytes, err := tsm1.Values(values).Encode(nil)
1464	if err != nil {
1465		b.Fatalf("unexpected error: %v", err)
1466	}
1467
1468	decodedValues := make([]tsm1.BooleanValue, len(values))
1469	b.ResetTimer()
1470	for i := 0; i < b.N; i++ {
1471		_, err = tsm1.DecodeBooleanBlock(bytes, &decodedValues)
1472		if err != nil {
1473			b.Fatalf("unexpected error decoding block: %v", err)
1474		}
1475	}
1476}
1477
1478func BenchmarkDecodeBooleanBlock(b *testing.B) {
1479	cases := []int{
1480		5,
1481		55,
1482		555,
1483		1000,
1484	}
1485	for _, n := range cases {
1486		b.Run(fmt.Sprintf("%d", n), func(b *testing.B) {
1487			valueCount := n
1488			times := getTimes(valueCount, 60, time.Second)
1489			values := make([]tsm1.Value, len(times))
1490			for i, t := range times {
1491				values[i] = tsm1.NewValue(t, true)
1492			}
1493
1494			bytes, err := tsm1.Values(values).Encode(nil)
1495			if err != nil {
1496				b.Fatalf("unexpected error: %v", err)
1497			}
1498
1499			b.ResetTimer()
1500			b.ReportAllocs()
1501			b.SetBytes(int64(tsm1.Values(values).Size()))
1502
1503			b.RunParallel(func(pb *testing.PB) {
1504				decodedValues := make([]tsm1.BooleanValue, len(values))
1505
1506				for pb.Next() {
1507					_, err = tsm1.DecodeBooleanBlock(bytes, &decodedValues)
1508					if err != nil {
1509						b.Fatalf("unexpected error decoding block: %v", err)
1510					}
1511				}
1512			})
1513		})
1514	}
1515}
1516
1517func BenchmarkDecodeFloatBlock(b *testing.B) {
1518	cases := []int{
1519		5,
1520		55,
1521		555,
1522		1000,
1523	}
1524	for _, n := range cases {
1525		b.Run(fmt.Sprintf("%d", n), func(b *testing.B) {
1526			valueCount := n
1527			times := getTimes(valueCount, 60, time.Second)
1528			values := make([]tsm1.Value, len(times))
1529			for i, t := range times {
1530				values[i] = tsm1.NewValue(t, float64(i))
1531			}
1532
1533			bytes, err := tsm1.Values(values).Encode(nil)
1534			if err != nil {
1535				b.Fatalf("unexpected error: %v", err)
1536			}
1537
1538			b.ResetTimer()
1539			b.ReportAllocs()
1540			b.SetBytes(int64(tsm1.Values(values).Size()))
1541
1542			b.RunParallel(func(pb *testing.PB) {
1543				decodedValues := make([]tsm1.FloatValue, len(values))
1544
1545				for pb.Next() {
1546					_, err = tsm1.DecodeFloatBlock(bytes, &decodedValues)
1547					if err != nil {
1548						b.Fatalf("unexpected error decoding block: %v", err)
1549					}
1550				}
1551			})
1552		})
1553	}
1554}
1555
1556func BenchmarkDecodeIntegerBlock(b *testing.B) {
1557	rle := func(i int) int64 { return int64(i) }
1558	s8b := func(i int) int64 { return int64(i + int(rand.Int31n(10))) }
1559
1560	cases := []struct {
1561		enc string
1562		gen func(i int) int64
1563		n   int
1564	}{
1565		{enc: "rle", gen: rle, n: 5},
1566		{enc: "rle", gen: rle, n: 55},
1567		{enc: "rle", gen: rle, n: 555},
1568		{enc: "rle", gen: rle, n: 1000},
1569		{enc: "s8b", gen: s8b, n: 5},
1570		{enc: "s8b", gen: s8b, n: 55},
1571		{enc: "s8b", gen: s8b, n: 555},
1572		{enc: "s8b", gen: s8b, n: 1000},
1573	}
1574	for _, bm := range cases {
1575		b.Run(fmt.Sprintf("%s_%d", bm.enc, bm.n), func(b *testing.B) {
1576			rand.Seed(int64(bm.n * 1e3))
1577
1578			valueCount := bm.n
1579			times := getTimes(valueCount, 60, time.Second)
1580			values := make([]tsm1.Value, len(times))
1581			for i, t := range times {
1582				values[i] = tsm1.NewValue(t, bm.gen(i))
1583			}
1584
1585			bytes, err := tsm1.Values(values).Encode(nil)
1586			if err != nil {
1587				b.Fatalf("unexpected error: %v", err)
1588			}
1589
1590			b.ResetTimer()
1591			b.ReportAllocs()
1592			b.SetBytes(int64(tsm1.Values(values).Size()))
1593
1594			b.RunParallel(func(pb *testing.PB) {
1595				decodedValues := make([]tsm1.IntegerValue, len(values))
1596
1597				for pb.Next() {
1598					_, err = tsm1.DecodeIntegerBlock(bytes, &decodedValues)
1599					if err != nil {
1600						b.Fatalf("unexpected error decoding block: %v", err)
1601					}
1602				}
1603			})
1604		})
1605	}
1606}
1607
1608func BenchmarkDecodeStringBlock(b *testing.B) {
1609	cases := []int{
1610		5,
1611		55,
1612		555,
1613		1000,
1614	}
1615	for _, n := range cases {
1616		b.Run(fmt.Sprintf("%d", n), func(b *testing.B) {
1617			valueCount := n
1618			times := getTimes(valueCount, 60, time.Second)
1619			values := make([]tsm1.Value, len(times))
1620			for i, t := range times {
1621				values[i] = tsm1.NewValue(t, fmt.Sprintf("value %d", i))
1622			}
1623
1624			bytes, err := tsm1.Values(values).Encode(nil)
1625			if err != nil {
1626				b.Fatalf("unexpected error: %v", err)
1627			}
1628
1629			b.ResetTimer()
1630			b.ReportAllocs()
1631			b.SetBytes(int64(tsm1.Values(values).Size()))
1632
1633			b.RunParallel(func(pb *testing.PB) {
1634				decodedValues := make([]tsm1.StringValue, len(values))
1635
1636				for pb.Next() {
1637					_, err = tsm1.DecodeStringBlock(bytes, &decodedValues)
1638					if err != nil {
1639						b.Fatalf("unexpected error decoding block: %v", err)
1640					}
1641				}
1642			})
1643		})
1644	}
1645}
1646
1647func BenchmarkDecodeBlock_String_Empty(b *testing.B) {
1648	valueCount := 1000
1649	times := getTimes(valueCount, 60, time.Second)
1650	values := make([]tsm1.Value, len(times))
1651	for i, t := range times {
1652		values[i] = tsm1.NewValue(t, fmt.Sprintf("value %d", i))
1653	}
1654
1655	bytes, err := tsm1.Values(values).Encode(nil)
1656	if err != nil {
1657		b.Fatalf("unexpected error: %v", err)
1658	}
1659
1660	var decodedValues []tsm1.Value
1661	b.ResetTimer()
1662	for i := 0; i < b.N; i++ {
1663		_, err = tsm1.DecodeBlock(bytes, decodedValues)
1664		if err != nil {
1665			b.Fatalf("unexpected error decoding block: %v", err)
1666		}
1667	}
1668}
1669
1670func BenchmarkDecodeBlock_String_EqualSize(b *testing.B) {
1671	valueCount := 1000
1672	times := getTimes(valueCount, 60, time.Second)
1673	values := make([]tsm1.Value, len(times))
1674	for i, t := range times {
1675		values[i] = tsm1.NewValue(t, fmt.Sprintf("value %d", i))
1676	}
1677
1678	bytes, err := tsm1.Values(values).Encode(nil)
1679	if err != nil {
1680		b.Fatalf("unexpected error: %v", err)
1681	}
1682
1683	decodedValues := make([]tsm1.Value, len(values))
1684	b.ResetTimer()
1685	for i := 0; i < b.N; i++ {
1686		_, err = tsm1.DecodeBlock(bytes, decodedValues)
1687		if err != nil {
1688			b.Fatalf("unexpected error decoding block: %v", err)
1689		}
1690	}
1691}
1692
1693func BenchmarkDecodeBlock_String_TypeSpecific(b *testing.B) {
1694	valueCount := 1000
1695	times := getTimes(valueCount, 60, time.Second)
1696	values := make([]tsm1.Value, len(times))
1697	for i, t := range times {
1698		values[i] = tsm1.NewValue(t, fmt.Sprintf("value %d", i))
1699	}
1700
1701	bytes, err := tsm1.Values(values).Encode(nil)
1702	if err != nil {
1703		b.Fatalf("unexpected error: %v", err)
1704	}
1705
1706	decodedValues := make([]tsm1.StringValue, len(values))
1707	b.ResetTimer()
1708	for i := 0; i < b.N; i++ {
1709		_, err = tsm1.DecodeStringBlock(bytes, &decodedValues)
1710		if err != nil {
1711			b.Fatalf("unexpected error decoding block: %v", err)
1712		}
1713	}
1714}
1715
1716func BenchmarkValues_Deduplicate(b *testing.B) {
1717	valueCount := 1000
1718	times := getTimes(valueCount, 60, time.Second)
1719	values := make([]tsm1.Value, len(times))
1720	for i, t := range times {
1721		values[i] = tsm1.NewValue(t, fmt.Sprintf("value %d", i))
1722	}
1723	values = append(values, values...)
1724
1725	b.ResetTimer()
1726	for i := 0; i < b.N; i++ {
1727		tsm1.Values(values).Deduplicate()
1728	}
1729}
1730
1731func BenchmarkValues_Merge(b *testing.B) {
1732	valueCount := 1000
1733	times := getTimes(valueCount, 60, time.Second)
1734	a := make([]tsm1.Value, len(times))
1735	c := make([]tsm1.Value, len(times))
1736
1737	for i, t := range times {
1738		a[i] = tsm1.NewValue(t, float64(i))
1739		c[i] = tsm1.NewValue(t+1, float64(i))
1740	}
1741
1742	b.ResetTimer()
1743	benchmarkMerge(a, c, b)
1744}
1745
1746func BenchmarkValues_MergeDisjoint(b *testing.B) {
1747	valueCount := 1000
1748	times := getTimes(valueCount, 60, time.Second)
1749	a := make([]tsm1.Value, len(times))
1750	c := make([]tsm1.Value, len(times))
1751
1752	for i, t := range times {
1753		a[i] = tsm1.NewValue(t, float64(i))
1754		c[i] = tsm1.NewValue(times[len(times)-1]+int64((i+1)*1e9), float64(i))
1755	}
1756
1757	b.ResetTimer()
1758	benchmarkMerge(a, c, b)
1759}
1760
1761func BenchmarkValues_MergeSame(b *testing.B) {
1762	valueCount := 1000
1763	times := getTimes(valueCount, 60, time.Second)
1764	a := make([]tsm1.Value, len(times))
1765	c := make([]tsm1.Value, len(times))
1766
1767	for i, t := range times {
1768		a[i] = tsm1.NewValue(t, float64(i))
1769		c[i] = tsm1.NewValue(t, float64(i))
1770	}
1771
1772	b.ResetTimer()
1773	benchmarkMerge(a, c, b)
1774}
1775
1776func BenchmarkValues_MergeSimilar(b *testing.B) {
1777	valueCount := 1000
1778	times := getTimes(valueCount, 60, time.Second)
1779	a := make([]tsm1.Value, len(times))
1780	c := make([]tsm1.Value, len(times))
1781
1782	for i, t := range times {
1783		a[i] = tsm1.NewValue(t, float64(i))
1784		if i == 0 {
1785			t++
1786		}
1787		c[i] = tsm1.NewValue(t, float64(i))
1788	}
1789
1790	b.ResetTimer()
1791	benchmarkMerge(a, c, b)
1792}
1793
1794func BenchmarkValues_MergeUnevenA(b *testing.B) {
1795	valueCount := 1000
1796	times := getTimes(valueCount, 60, time.Second)
1797	a := make([]tsm1.Value, len(times))
1798	c := make([]tsm1.Value, len(times))
1799
1800	for i, t := range times {
1801		a[i] = tsm1.NewValue(t, float64(i))
1802		c[i] = tsm1.NewValue(t, float64(i))
1803	}
1804
1805	b.ResetTimer()
1806	benchmarkMerge(a[:700], c[:10], b)
1807}
1808
1809func BenchmarkValues_MergeUnevenB(b *testing.B) {
1810	valueCount := 1000
1811	times := getTimes(valueCount, 60, time.Second)
1812	a := make([]tsm1.Value, len(times))
1813	c := make([]tsm1.Value, len(times))
1814
1815	for i, t := range times {
1816		a[i] = tsm1.NewValue(t, float64(i))
1817		c[i] = tsm1.NewValue(t, float64(i))
1818	}
1819
1820	b.ResetTimer()
1821	benchmarkMerge(a[:10], c[:700], b)
1822}
1823
1824func benchmarkMerge(a, c tsm1.Values, b *testing.B) {
1825	for i := 0; i < b.N; i++ {
1826		b.StopTimer()
1827		aa := make(tsm1.Values, len(a))
1828		copy(aa, a)
1829		cc := make(tsm1.Values, len(c))
1830		copy(cc, c)
1831		b.StartTimer()
1832		tsm1.Values(aa).Merge(tsm1.Values(cc))
1833	}
1834}
1835
1836func BenchmarkValues_EncodeInteger(b *testing.B) {
1837	valueCount := 1024
1838	times := getTimes(valueCount, 60, time.Second)
1839	a := make([]tsm1.Value, len(times))
1840
1841	for i, t := range times {
1842		a[i] = tsm1.NewValue(t, int64(i))
1843	}
1844
1845	buf := make([]byte, 1024*8)
1846	b.ResetTimer()
1847	for i := 0; i < b.N; i++ {
1848		tsm1.Values(a).Encode(buf)
1849	}
1850}
1851
1852func BenchmarkValues_EncodeFloat(b *testing.B) {
1853	valueCount := 1024
1854	times := getTimes(valueCount, 60, time.Second)
1855	a := make([]tsm1.Value, len(times))
1856
1857	for i, t := range times {
1858		a[i] = tsm1.NewValue(t, float64(i))
1859	}
1860
1861	buf := make([]byte, 1024*8)
1862	b.ResetTimer()
1863	for i := 0; i < b.N; i++ {
1864		tsm1.Values(a).Encode(buf)
1865	}
1866}
1867func BenchmarkValues_EncodeString(b *testing.B) {
1868	valueCount := 1024
1869	times := getTimes(valueCount, 60, time.Second)
1870	a := make([]tsm1.Value, len(times))
1871
1872	for i, t := range times {
1873		a[i] = tsm1.NewValue(t, fmt.Sprintf("%d", i))
1874	}
1875
1876	buf := make([]byte, 1024*8)
1877	b.ResetTimer()
1878	for i := 0; i < b.N; i++ {
1879		tsm1.Values(a).Encode(buf)
1880	}
1881}
1882func BenchmarkValues_EncodeBool(b *testing.B) {
1883	valueCount := 1024
1884	times := getTimes(valueCount, 60, time.Second)
1885	a := make([]tsm1.Value, len(times))
1886
1887	for i, t := range times {
1888		if i%2 == 0 {
1889			a[i] = tsm1.NewValue(t, true)
1890		} else {
1891			a[i] = tsm1.NewValue(t, false)
1892		}
1893	}
1894
1895	buf := make([]byte, 1024*8)
1896	b.ResetTimer()
1897	for i := 0; i < b.N; i++ {
1898		tsm1.Values(a).Encode(buf)
1899	}
1900}
1901