1package tsm1_test
2
3import (
4	"fmt"
5	"reflect"
6	"testing"
7	"time"
8
9	"github.com/davecgh/go-spew/spew"
10	"github.com/influxdata/influxdb/tsdb/engine/tsm1"
11)
12
13func TestEncoding_FloatBlock(t *testing.T) {
14	valueCount := 1000
15	times := getTimes(valueCount, 60, time.Second)
16	values := make([]tsm1.Value, len(times))
17	for i, t := range times {
18		values[i] = tsm1.NewValue(t, float64(i))
19	}
20
21	b, err := tsm1.Values(values).Encode(nil)
22	if err != nil {
23		t.Fatalf("unexpected error: %v", err)
24	}
25
26	var decodedValues []tsm1.Value
27	decodedValues, err = tsm1.DecodeBlock(b, decodedValues)
28	if err != nil {
29		t.Fatalf("unexpected error decoding block: %v", err)
30	}
31
32	if !reflect.DeepEqual(decodedValues, values) {
33		t.Fatalf("unexpected results:\n\tgot: %s\n\texp: %s\n", spew.Sdump(decodedValues), spew.Sdump(values))
34	}
35}
36
37func TestEncoding_FloatBlock_ZeroTime(t *testing.T) {
38	values := make([]tsm1.Value, 3)
39	for i := 0; i < 3; i++ {
40		values[i] = tsm1.NewValue(0, float64(i))
41	}
42
43	b, err := tsm1.Values(values).Encode(nil)
44	if err != nil {
45		t.Fatalf("unexpected error: %v", err)
46	}
47
48	var decodedValues []tsm1.Value
49	decodedValues, err = tsm1.DecodeBlock(b, decodedValues)
50	if err != nil {
51		t.Fatalf("unexpected error decoding block: %v", err)
52	}
53
54	if !reflect.DeepEqual(decodedValues, values) {
55		t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
56	}
57}
58
59func TestEncoding_FloatBlock_SimilarFloats(t *testing.T) {
60	values := make([]tsm1.Value, 5)
61	values[0] = tsm1.NewValue(1444238178437870000, 6.00065e+06)
62	values[1] = tsm1.NewValue(1444238185286830000, 6.000656e+06)
63	values[2] = tsm1.NewValue(1444238188441501000, 6.000657e+06)
64	values[3] = tsm1.NewValue(1444238195286811000, 6.000659e+06)
65	values[4] = tsm1.NewValue(1444238198439917000, 6.000661e+06)
66
67	b, err := tsm1.Values(values).Encode(nil)
68	if err != nil {
69		t.Fatalf("unexpected error: %v", err)
70	}
71
72	var decodedValues []tsm1.Value
73	decodedValues, err = tsm1.DecodeBlock(b, decodedValues)
74	if err != nil {
75		t.Fatalf("unexpected error decoding block: %v", err)
76	}
77
78	if !reflect.DeepEqual(decodedValues, values) {
79		t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
80	}
81}
82
83func TestEncoding_IntBlock_Basic(t *testing.T) {
84	valueCount := 1000
85	times := getTimes(valueCount, 60, time.Second)
86	values := make([]tsm1.Value, len(times))
87	for i, t := range times {
88		values[i] = tsm1.NewValue(t, int64(i))
89	}
90
91	b, err := tsm1.Values(values).Encode(nil)
92	if err != nil {
93		t.Fatalf("unexpected error: %v", err)
94	}
95
96	var decodedValues []tsm1.Value
97	decodedValues, err = tsm1.DecodeBlock(b, decodedValues)
98	if err != nil {
99		t.Fatalf("unexpected error decoding block: %v", err)
100	}
101
102	if len(decodedValues) != len(values) {
103		t.Fatalf("unexpected results length:\n\tgot: %v\n\texp: %v\n", len(decodedValues), len(values))
104	}
105
106	for i := 0; i < len(decodedValues); i++ {
107		if decodedValues[i].UnixNano() != values[i].UnixNano() {
108			t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues[i].UnixNano(), values[i].UnixNano())
109		}
110
111		if decodedValues[i].Value() != values[i].Value() {
112			t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues[i].Value(), values[i].Value())
113		}
114	}
115}
116
117func TestEncoding_IntBlock_Negatives(t *testing.T) {
118	valueCount := 1000
119	times := getTimes(valueCount, 60, time.Second)
120	values := make([]tsm1.Value, len(times))
121	for i, t := range times {
122		v := int64(i)
123		if i%2 == 0 {
124			v = -v
125		}
126		values[i] = tsm1.NewValue(t, int64(v))
127	}
128
129	b, err := tsm1.Values(values).Encode(nil)
130	if err != nil {
131		t.Fatalf("unexpected error: %v", err)
132	}
133
134	var decodedValues []tsm1.Value
135	decodedValues, err = tsm1.DecodeBlock(b, decodedValues)
136	if err != nil {
137		t.Fatalf("unexpected error decoding block: %v", err)
138	}
139
140	if !reflect.DeepEqual(decodedValues, values) {
141		t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
142	}
143}
144
145func TestEncoding_UIntBlock_Basic(t *testing.T) {
146	valueCount := 1000
147	times := getTimes(valueCount, 60, time.Second)
148	values := make([]tsm1.Value, len(times))
149	for i, t := range times {
150		values[i] = tsm1.NewValue(t, uint64(i))
151	}
152
153	b, err := tsm1.Values(values).Encode(nil)
154	if err != nil {
155		t.Fatalf("unexpected error: %v", err)
156	}
157
158	var decodedValues []tsm1.Value
159	decodedValues, err = tsm1.DecodeBlock(b, decodedValues)
160	if err != nil {
161		t.Fatalf("unexpected error decoding block: %v", err)
162	}
163
164	if len(decodedValues) != len(values) {
165		t.Fatalf("unexpected results length:\n\tgot: %v\n\texp: %v\n", len(decodedValues), len(values))
166	}
167
168	for i := 0; i < len(decodedValues); i++ {
169		if decodedValues[i].UnixNano() != values[i].UnixNano() {
170			t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues[i].UnixNano(), values[i].UnixNano())
171		}
172
173		if decodedValues[i].Value() != values[i].Value() {
174			t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues[i].Value(), values[i].Value())
175		}
176	}
177}
178
179// TestEncoding_UIntBlock_MaxValues encodes uint64 numbers starting at max (18446744073709551615)
180// down to 18446744073709550616
181func TestEncoding_UIntBlock_MaxValues(t *testing.T) {
182	valueCount := 1000
183	times := getTimes(valueCount, 60, time.Second)
184	values := make([]tsm1.Value, len(times))
185	for i, t := range times {
186		values[i] = tsm1.NewValue(t, ^uint64(i))
187	}
188
189	b, err := tsm1.Values(values).Encode(nil)
190	if err != nil {
191		t.Fatalf("unexpected error: %v", err)
192	}
193
194	var decodedValues []tsm1.Value
195	decodedValues, err = tsm1.DecodeBlock(b, decodedValues)
196	if err != nil {
197		t.Fatalf("unexpected error decoding block: %v", err)
198	}
199
200	if !reflect.DeepEqual(decodedValues, values) {
201		t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
202	}
203}
204
205func TestEncoding_BooleanBlock_Basic(t *testing.T) {
206	valueCount := 1000
207	times := getTimes(valueCount, 60, time.Second)
208	values := make([]tsm1.Value, len(times))
209	for i, t := range times {
210		v := true
211		if i%2 == 0 {
212			v = false
213		}
214		values[i] = tsm1.NewValue(t, v)
215	}
216
217	b, err := tsm1.Values(values).Encode(nil)
218	if err != nil {
219		t.Fatalf("unexpected error: %v", err)
220	}
221
222	var decodedValues []tsm1.Value
223	decodedValues, err = tsm1.DecodeBlock(b, decodedValues)
224	if err != nil {
225		t.Fatalf("unexpected error decoding block: %v", err)
226	}
227
228	if !reflect.DeepEqual(decodedValues, values) {
229		t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
230	}
231}
232
233func TestEncoding_StringBlock_Basic(t *testing.T) {
234	valueCount := 1000
235	times := getTimes(valueCount, 60, time.Second)
236	values := make([]tsm1.Value, len(times))
237	for i, t := range times {
238		values[i] = tsm1.NewValue(t, fmt.Sprintf("value %d", i))
239	}
240
241	b, err := tsm1.Values(values).Encode(nil)
242	if err != nil {
243		t.Fatalf("unexpected error: %v", err)
244	}
245
246	var decodedValues []tsm1.Value
247	decodedValues, err = tsm1.DecodeBlock(b, decodedValues)
248	if err != nil {
249		t.Fatalf("unexpected error decoding block: %v", err)
250	}
251
252	if !reflect.DeepEqual(decodedValues, values) {
253		t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
254	}
255}
256
257func TestEncoding_BlockType(t *testing.T) {
258	tests := []struct {
259		value     interface{}
260		blockType byte
261	}{
262		{value: float64(1.0), blockType: tsm1.BlockFloat64},
263		{value: int64(1), blockType: tsm1.BlockInteger},
264		{value: uint64(1), blockType: tsm1.BlockUnsigned},
265		{value: true, blockType: tsm1.BlockBoolean},
266		{value: "string", blockType: tsm1.BlockString},
267	}
268
269	for _, test := range tests {
270		var values []tsm1.Value
271		values = append(values, tsm1.NewValue(0, test.value))
272
273		b, err := tsm1.Values(values).Encode(nil)
274		if err != nil {
275			t.Fatalf("unexpected error: %v", err)
276		}
277
278		bt, err := tsm1.BlockType(b)
279		if err != nil {
280			t.Fatalf("unexpected error decoding block type: %v", err)
281		}
282
283		if got, exp := bt, test.blockType; got != exp {
284			t.Fatalf("block type mismatch: got %v, exp %v", got, exp)
285		}
286	}
287
288	_, err := tsm1.BlockType([]byte{10})
289	if err == nil {
290		t.Fatalf("expected error decoding block type, got nil")
291	}
292}
293
294func TestEncoding_Count(t *testing.T) {
295	tests := []struct {
296		value     interface{}
297		blockType byte
298	}{
299		{value: float64(1.0), blockType: tsm1.BlockFloat64},
300		{value: int64(1), blockType: tsm1.BlockInteger},
301		{value: uint64(1), blockType: tsm1.BlockUnsigned},
302		{value: true, blockType: tsm1.BlockBoolean},
303		{value: "string", blockType: tsm1.BlockString},
304	}
305
306	for _, test := range tests {
307		var values []tsm1.Value
308		values = append(values, tsm1.NewValue(0, test.value))
309
310		b, err := tsm1.Values(values).Encode(nil)
311		if err != nil {
312			t.Fatalf("unexpected error: %v", err)
313		}
314
315		if got, exp := tsm1.BlockCount(b), 1; got != exp {
316			t.Fatalf("block count mismatch: got %v, exp %v", got, exp)
317		}
318	}
319}
320
321func TestValues_MergeFloat(t *testing.T) {
322	tests := []struct {
323		a, b, exp []tsm1.Value
324	}{
325
326		{ // empty a
327			a: []tsm1.Value{},
328
329			b: []tsm1.Value{
330				tsm1.NewValue(1, 1.2),
331				tsm1.NewValue(2, 2.2),
332			},
333			exp: []tsm1.Value{
334				tsm1.NewValue(1, 1.2),
335				tsm1.NewValue(2, 2.2),
336			},
337		},
338		{ // empty b
339			a: []tsm1.Value{
340				tsm1.NewValue(1, 1.1),
341				tsm1.NewValue(2, 2.1),
342			},
343
344			b: []tsm1.Value{},
345			exp: []tsm1.Value{
346				tsm1.NewValue(1, 1.1),
347				tsm1.NewValue(2, 2.1),
348			},
349		},
350		{
351			a: []tsm1.Value{
352				tsm1.NewValue(0, 0.0),
353				tsm1.NewValue(1, 1.1),
354				tsm1.NewValue(2, 2.1),
355			},
356			b: []tsm1.Value{
357				tsm1.NewValue(2, 2.2),
358				tsm1.NewValue(2, 2.2), // duplicate data
359			},
360			exp: []tsm1.Value{
361				tsm1.NewValue(0, 0.0),
362				tsm1.NewValue(1, 1.1),
363				tsm1.NewValue(2, 2.2),
364			},
365		},
366		{
367			a: []tsm1.Value{
368				tsm1.NewValue(0, 0.0),
369				tsm1.NewValue(1, 1.1),
370				tsm1.NewValue(1, 1.1), // duplicate data
371				tsm1.NewValue(2, 2.1),
372			},
373			b: []tsm1.Value{
374				tsm1.NewValue(2, 2.2),
375				tsm1.NewValue(2, 2.2), // duplicate data
376			},
377			exp: []tsm1.Value{
378				tsm1.NewValue(0, 0.0),
379				tsm1.NewValue(1, 1.1),
380				tsm1.NewValue(2, 2.2),
381			},
382		},
383
384		{
385			a: []tsm1.Value{
386				tsm1.NewValue(1, 1.1),
387			},
388			b: []tsm1.Value{
389				tsm1.NewValue(0, 0.0),
390				tsm1.NewValue(1, 1.2), // overwrites a
391				tsm1.NewValue(2, 2.2),
392				tsm1.NewValue(3, 3.2),
393				tsm1.NewValue(4, 4.2),
394			},
395			exp: []tsm1.Value{
396				tsm1.NewValue(0, 0.0),
397				tsm1.NewValue(1, 1.2),
398				tsm1.NewValue(2, 2.2),
399				tsm1.NewValue(3, 3.2),
400				tsm1.NewValue(4, 4.2),
401			},
402		},
403		{
404			a: []tsm1.Value{
405				tsm1.NewValue(1, 1.1),
406				tsm1.NewValue(2, 2.1),
407				tsm1.NewValue(3, 3.1),
408				tsm1.NewValue(4, 4.1),
409			},
410
411			b: []tsm1.Value{
412				tsm1.NewValue(1, 1.2), // overwrites a
413				tsm1.NewValue(2, 2.2), // overwrites a
414			},
415			exp: []tsm1.Value{
416				tsm1.NewValue(1, 1.2),
417				tsm1.NewValue(2, 2.2),
418				tsm1.NewValue(3, 3.1),
419				tsm1.NewValue(4, 4.1),
420			},
421		},
422		{
423			a: []tsm1.Value{
424				tsm1.NewValue(1, 1.1),
425				tsm1.NewValue(2, 2.1),
426				tsm1.NewValue(3, 3.1),
427				tsm1.NewValue(4, 4.1),
428			},
429
430			b: []tsm1.Value{
431				tsm1.NewValue(1, 1.2), // overwrites a
432				tsm1.NewValue(2, 2.2), // overwrites a
433				tsm1.NewValue(3, 3.2),
434				tsm1.NewValue(4, 4.2),
435			},
436			exp: []tsm1.Value{
437				tsm1.NewValue(1, 1.2),
438				tsm1.NewValue(2, 2.2),
439				tsm1.NewValue(3, 3.2),
440				tsm1.NewValue(4, 4.2),
441			},
442		},
443		{
444			a: []tsm1.Value{
445				tsm1.NewValue(0, 0.0),
446				tsm1.NewValue(1, 1.1),
447				tsm1.NewValue(2, 2.1),
448				tsm1.NewValue(3, 3.1),
449				tsm1.NewValue(4, 4.1),
450			},
451			b: []tsm1.Value{
452				tsm1.NewValue(0, 0.0),
453				tsm1.NewValue(2, 2.2),
454				tsm1.NewValue(4, 4.2),
455			},
456			exp: []tsm1.Value{
457				tsm1.NewValue(0, 0.0),
458				tsm1.NewValue(1, 1.1),
459				tsm1.NewValue(2, 2.2),
460				tsm1.NewValue(3, 3.1),
461				tsm1.NewValue(4, 4.2),
462			},
463		},
464
465		{
466			a: []tsm1.Value{
467				tsm1.NewValue(1462498658242869207, 0.0),
468				tsm1.NewValue(1462498658288956853, 1.1),
469			},
470			b: []tsm1.Value{
471				tsm1.NewValue(1462498658242870810, 0.0),
472				tsm1.NewValue(1462498658262911238, 2.2),
473				tsm1.NewValue(1462498658282415038, 4.2),
474				tsm1.NewValue(1462498658282417760, 4.2),
475			},
476			exp: []tsm1.Value{
477				tsm1.NewValue(1462498658242869207, 0.0),
478				tsm1.NewValue(1462498658242870810, 0.0),
479				tsm1.NewValue(1462498658262911238, 2.2),
480				tsm1.NewValue(1462498658282415038, 4.2),
481				tsm1.NewValue(1462498658282417760, 4.2),
482				tsm1.NewValue(1462498658288956853, 1.1),
483			},
484		},
485		{
486			a: []tsm1.Value{
487				tsm1.NewValue(4, 4.0),
488				tsm1.NewValue(5, 5.0),
489				tsm1.NewValue(6, 6.0),
490			},
491			b: []tsm1.Value{
492				tsm1.NewValue(1, 1.0),
493				tsm1.NewValue(2, 2.0),
494				tsm1.NewValue(3, 3.0),
495			},
496			exp: []tsm1.Value{
497				tsm1.NewValue(1, 1.0),
498				tsm1.NewValue(2, 2.0),
499				tsm1.NewValue(3, 3.0),
500				tsm1.NewValue(4, 4.0),
501				tsm1.NewValue(5, 5.0),
502				tsm1.NewValue(6, 6.0),
503			},
504		},
505		{
506			a: []tsm1.Value{
507				tsm1.NewValue(5, 5.0),
508				tsm1.NewValue(6, 6.0),
509			},
510			b: []tsm1.Value{
511				tsm1.NewValue(1, 1.0),
512				tsm1.NewValue(2, 2.0),
513				tsm1.NewValue(3, 3.0),
514				tsm1.NewValue(4, 4.0),
515				tsm1.NewValue(7, 7.0),
516				tsm1.NewValue(8, 8.0),
517			},
518			exp: []tsm1.Value{
519				tsm1.NewValue(1, 1.0),
520				tsm1.NewValue(2, 2.0),
521				tsm1.NewValue(3, 3.0),
522				tsm1.NewValue(4, 4.0),
523				tsm1.NewValue(5, 5.0),
524				tsm1.NewValue(6, 6.0),
525				tsm1.NewValue(7, 7.0),
526				tsm1.NewValue(8, 8.0),
527			},
528		},
529		{
530			a: []tsm1.Value{
531				tsm1.NewValue(1, 1.0),
532				tsm1.NewValue(2, 2.0),
533				tsm1.NewValue(3, 3.0),
534			},
535			b: []tsm1.Value{
536				tsm1.NewValue(4, 4.0),
537				tsm1.NewValue(5, 5.0),
538				tsm1.NewValue(6, 6.0),
539			},
540			exp: []tsm1.Value{
541				tsm1.NewValue(1, 1.0),
542				tsm1.NewValue(2, 2.0),
543				tsm1.NewValue(3, 3.0),
544				tsm1.NewValue(4, 4.0),
545				tsm1.NewValue(5, 5.0),
546				tsm1.NewValue(6, 6.0),
547			},
548		},
549	}
550
551	for i, test := range tests {
552		got := tsm1.Values(test.a).Merge(test.b)
553		if exp, got := len(test.exp), len(got); exp != got {
554			t.Fatalf("test(%d): value length mismatch: exp %v, got %v", i, exp, got)
555		}
556
557		dedup := tsm1.Values(append(test.a, test.b...)).Deduplicate()
558
559		for i := range test.exp {
560			if exp, got := test.exp[i].String(), got[i].String(); exp != got {
561				t.Fatalf("value mismatch:\n exp %v\n got %v", exp, got)
562			}
563
564			if exp, got := test.exp[i].String(), dedup[i].String(); exp != got {
565				t.Fatalf("value mismatch:\n exp %v\n got %v", exp, got)
566			}
567		}
568	}
569}
570
571func TestIntegerValues_Merge(t *testing.T) {
572	integerValue := func(t int64, f int64) tsm1.IntegerValue {
573		return tsm1.NewValue(t, f).(tsm1.IntegerValue)
574	}
575
576	tests := []struct {
577		a, b, exp []tsm1.IntegerValue
578	}{
579
580		{ // empty a
581			a: []tsm1.IntegerValue{},
582
583			b: []tsm1.IntegerValue{
584				integerValue(1, 10),
585				integerValue(2, 20),
586			},
587			exp: []tsm1.IntegerValue{
588				integerValue(1, 10),
589				integerValue(2, 20),
590			},
591		},
592		{ // empty b
593			a: []tsm1.IntegerValue{
594				integerValue(1, 1),
595				integerValue(2, 2),
596			},
597
598			b: []tsm1.IntegerValue{},
599			exp: []tsm1.IntegerValue{
600				integerValue(1, 1),
601				integerValue(2, 2),
602			},
603		},
604		{
605			a: []tsm1.IntegerValue{
606				integerValue(1, 1),
607			},
608			b: []tsm1.IntegerValue{
609				integerValue(0, 0),
610				integerValue(1, 10), // overwrites a
611				integerValue(2, 20),
612				integerValue(3, 30),
613				integerValue(4, 40),
614			},
615			exp: []tsm1.IntegerValue{
616				integerValue(0, 0),
617				integerValue(1, 10),
618				integerValue(2, 20),
619				integerValue(3, 30),
620				integerValue(4, 40),
621			},
622		},
623		{
624			a: []tsm1.IntegerValue{
625				integerValue(1, 1),
626				integerValue(2, 2),
627				integerValue(3, 3),
628				integerValue(4, 4),
629			},
630
631			b: []tsm1.IntegerValue{
632				integerValue(1, 10), // overwrites a
633				integerValue(2, 20), // overwrites a
634			},
635			exp: []tsm1.IntegerValue{
636				integerValue(1, 10),
637				integerValue(2, 20),
638				integerValue(3, 3),
639				integerValue(4, 4),
640			},
641		},
642		{
643			a: []tsm1.IntegerValue{
644				integerValue(1, 1),
645				integerValue(2, 2),
646				integerValue(3, 3),
647				integerValue(4, 4),
648			},
649
650			b: []tsm1.IntegerValue{
651				integerValue(1, 10), // overwrites a
652				integerValue(2, 20), // overwrites a
653				integerValue(3, 30),
654				integerValue(4, 40),
655			},
656			exp: []tsm1.IntegerValue{
657				integerValue(1, 10),
658				integerValue(2, 20),
659				integerValue(3, 30),
660				integerValue(4, 40),
661			},
662		},
663		{
664			a: []tsm1.IntegerValue{
665				integerValue(0, 0),
666				integerValue(1, 1),
667				integerValue(2, 2),
668				integerValue(3, 3),
669				integerValue(4, 4),
670			},
671			b: []tsm1.IntegerValue{
672				integerValue(0, 0),
673				integerValue(2, 20),
674				integerValue(4, 40),
675			},
676			exp: []tsm1.IntegerValue{
677				integerValue(0, 0.0),
678				integerValue(1, 1),
679				integerValue(2, 20),
680				integerValue(3, 3),
681				integerValue(4, 40),
682			},
683		},
684	}
685
686	for i, test := range tests {
687		if i != 2 {
688			continue
689		}
690
691		got := tsm1.IntegerValues(test.a).Merge(test.b)
692		if exp, got := len(test.exp), len(got); exp != got {
693			t.Fatalf("test(%d): value length mismatch: exp %v, got %v", i, exp, got)
694		}
695
696		dedup := tsm1.IntegerValues(append(test.a, test.b...)).Deduplicate()
697
698		for i := range test.exp {
699			if exp, got := test.exp[i].String(), got[i].String(); exp != got {
700				t.Fatalf("value mismatch:\n exp %v\n got %v", exp, got)
701			}
702
703			if exp, got := test.exp[i].String(), dedup[i].String(); exp != got {
704				t.Fatalf("value mismatch:\n exp %v\n got %v", exp, got)
705			}
706		}
707	}
708}
709
710func TestUnsignedValues_Merge(t *testing.T) {
711	uintValue := func(t int64, f uint64) tsm1.UnsignedValue {
712		return tsm1.NewValue(t, f).(tsm1.UnsignedValue)
713	}
714
715	tests := []struct {
716		a, b, exp []tsm1.UnsignedValue
717	}{
718
719		{ // empty a
720			a: []tsm1.UnsignedValue{},
721
722			b: []tsm1.UnsignedValue{
723				uintValue(1, 10),
724				uintValue(2, 20),
725			},
726			exp: []tsm1.UnsignedValue{
727				uintValue(1, 10),
728				uintValue(2, 20),
729			},
730		},
731		{ // empty b
732			a: []tsm1.UnsignedValue{
733				uintValue(1, 1),
734				uintValue(2, 2),
735			},
736
737			b: []tsm1.UnsignedValue{},
738			exp: []tsm1.UnsignedValue{
739				uintValue(1, 1),
740				uintValue(2, 2),
741			},
742		},
743		{
744			a: []tsm1.UnsignedValue{
745				uintValue(1, 1),
746			},
747			b: []tsm1.UnsignedValue{
748				uintValue(0, 0),
749				uintValue(1, 10), // overwrites a
750				uintValue(2, 20),
751				uintValue(3, 30),
752				uintValue(4, 40),
753			},
754			exp: []tsm1.UnsignedValue{
755				uintValue(0, 0),
756				uintValue(1, 10),
757				uintValue(2, 20),
758				uintValue(3, 30),
759				uintValue(4, 40),
760			},
761		},
762		{
763			a: []tsm1.UnsignedValue{
764				uintValue(1, 1),
765				uintValue(2, 2),
766				uintValue(3, 3),
767				uintValue(4, 4),
768			},
769
770			b: []tsm1.UnsignedValue{
771				uintValue(1, ^uint64(0)), // overwrites a
772				uintValue(2, 20),         // overwrites a
773			},
774			exp: []tsm1.UnsignedValue{
775				uintValue(1, ^uint64(0)),
776				uintValue(2, 20),
777				uintValue(3, 3),
778				uintValue(4, 4),
779			},
780		},
781		{
782			a: []tsm1.UnsignedValue{
783				uintValue(1, 1),
784				uintValue(2, 2),
785				uintValue(3, 3),
786				uintValue(4, 4),
787			},
788
789			b: []tsm1.UnsignedValue{
790				uintValue(1, 10), // overwrites a
791				uintValue(2, 20), // overwrites a
792				uintValue(3, 30),
793				uintValue(4, 40),
794			},
795			exp: []tsm1.UnsignedValue{
796				uintValue(1, 10),
797				uintValue(2, 20),
798				uintValue(3, 30),
799				uintValue(4, 40),
800			},
801		},
802		{
803			a: []tsm1.UnsignedValue{
804				uintValue(0, 0),
805				uintValue(1, 1),
806				uintValue(2, 2),
807				uintValue(3, 3),
808				uintValue(4, 4),
809			},
810			b: []tsm1.UnsignedValue{
811				uintValue(0, 0),
812				uintValue(2, 20),
813				uintValue(4, 40),
814			},
815			exp: []tsm1.UnsignedValue{
816				uintValue(0, 0.0),
817				uintValue(1, 1),
818				uintValue(2, 20),
819				uintValue(3, 3),
820				uintValue(4, 40),
821			},
822		},
823	}
824
825	for i, test := range tests {
826		if i != 2 {
827			continue
828		}
829
830		got := tsm1.UnsignedValues(test.a).Merge(test.b)
831		if exp, got := len(test.exp), len(got); exp != got {
832			t.Fatalf("test(%d): value length mismatch: exp %v, got %v", i, exp, got)
833		}
834
835		dedup := tsm1.UnsignedValues(append(test.a, test.b...)).Deduplicate()
836
837		for i := range test.exp {
838			if exp, got := test.exp[i].String(), got[i].String(); exp != got {
839				t.Fatalf("value mismatch:\n exp %v\n got %v", exp, got)
840			}
841
842			if exp, got := test.exp[i].String(), dedup[i].String(); exp != got {
843				t.Fatalf("value mismatch:\n exp %v\n got %v", exp, got)
844			}
845		}
846	}
847}
848
849func TestFloatValues_Merge(t *testing.T) {
850	floatValue := func(t int64, f float64) tsm1.FloatValue {
851		return tsm1.NewValue(t, f).(tsm1.FloatValue)
852	}
853
854	tests := []struct {
855		a, b, exp []tsm1.FloatValue
856	}{
857
858		{ // empty a
859			a: []tsm1.FloatValue{},
860
861			b: []tsm1.FloatValue{
862				floatValue(1, 1.2),
863				floatValue(2, 2.2),
864			},
865			exp: []tsm1.FloatValue{
866				floatValue(1, 1.2),
867				floatValue(2, 2.2),
868			},
869		},
870		{ // empty b
871			a: []tsm1.FloatValue{
872				floatValue(1, 1.1),
873				floatValue(2, 2.1),
874			},
875
876			b: []tsm1.FloatValue{},
877			exp: []tsm1.FloatValue{
878				floatValue(1, 1.1),
879				floatValue(2, 2.1),
880			},
881		},
882		{
883			a: []tsm1.FloatValue{
884				floatValue(1, 1.1),
885			},
886			b: []tsm1.FloatValue{
887				floatValue(0, 0.0),
888				floatValue(1, 1.2), // overwrites a
889				floatValue(2, 2.2),
890				floatValue(3, 3.2),
891				floatValue(4, 4.2),
892			},
893			exp: []tsm1.FloatValue{
894				floatValue(0, 0.0),
895				floatValue(1, 1.2),
896				floatValue(2, 2.2),
897				floatValue(3, 3.2),
898				floatValue(4, 4.2),
899			},
900		},
901		{
902			a: []tsm1.FloatValue{
903				floatValue(1, 1.1),
904				floatValue(2, 2.1),
905				floatValue(3, 3.1),
906				floatValue(4, 4.1),
907			},
908
909			b: []tsm1.FloatValue{
910				floatValue(1, 1.2), // overwrites a
911				floatValue(2, 2.2), // overwrites a
912			},
913			exp: []tsm1.FloatValue{
914				floatValue(1, 1.2),
915				floatValue(2, 2.2),
916				floatValue(3, 3.1),
917				floatValue(4, 4.1),
918			},
919		},
920		{
921			a: []tsm1.FloatValue{
922				floatValue(1, 1.1),
923				floatValue(2, 2.1),
924				floatValue(3, 3.1),
925				floatValue(4, 4.1),
926			},
927
928			b: []tsm1.FloatValue{
929				floatValue(1, 1.2), // overwrites a
930				floatValue(2, 2.2), // overwrites a
931				floatValue(3, 3.2),
932				floatValue(4, 4.2),
933			},
934			exp: []tsm1.FloatValue{
935				floatValue(1, 1.2),
936				floatValue(2, 2.2),
937				floatValue(3, 3.2),
938				floatValue(4, 4.2),
939			},
940		},
941		{
942			a: []tsm1.FloatValue{
943				floatValue(0, 0.0),
944				floatValue(1, 1.1),
945				floatValue(2, 2.1),
946				floatValue(3, 3.1),
947				floatValue(4, 4.1),
948			},
949			b: []tsm1.FloatValue{
950				floatValue(0, 0.0),
951				floatValue(2, 2.2),
952				floatValue(4, 4.2),
953			},
954			exp: []tsm1.FloatValue{
955				floatValue(0, 0.0),
956				floatValue(1, 1.1),
957				floatValue(2, 2.2),
958				floatValue(3, 3.1),
959				floatValue(4, 4.2),
960			},
961		},
962	}
963
964	for i, test := range tests {
965		got := tsm1.FloatValues(test.a).Merge(test.b)
966		if exp, got := len(test.exp), len(got); exp != got {
967			t.Fatalf("test(%d): value length mismatch: exp %v, got %v", i, exp, got)
968		}
969
970		dedup := tsm1.FloatValues(append(test.a, test.b...)).Deduplicate()
971
972		for i := range test.exp {
973			if exp, got := test.exp[i].String(), got[i].String(); exp != got {
974				t.Fatalf("value mismatch:\n exp %v\n got %v", exp, got)
975			}
976
977			if exp, got := test.exp[i].String(), dedup[i].String(); exp != got {
978				t.Fatalf("value mismatch:\n exp %v\n got %v", exp, got)
979			}
980		}
981	}
982}
983
984func TestBooleanValues_Merge(t *testing.T) {
985	booleanValue := func(t int64, f bool) tsm1.BooleanValue {
986		return tsm1.NewValue(t, f).(tsm1.BooleanValue)
987	}
988
989	tests := []struct {
990		a, b, exp []tsm1.BooleanValue
991	}{
992
993		{ // empty a
994			a: []tsm1.BooleanValue{},
995
996			b: []tsm1.BooleanValue{
997				booleanValue(1, true),
998				booleanValue(2, true),
999			},
1000			exp: []tsm1.BooleanValue{
1001				booleanValue(1, true),
1002				booleanValue(2, true),
1003			},
1004		},
1005		{ // empty b
1006			a: []tsm1.BooleanValue{
1007				booleanValue(1, true),
1008				booleanValue(2, true),
1009			},
1010
1011			b: []tsm1.BooleanValue{},
1012			exp: []tsm1.BooleanValue{
1013				booleanValue(1, true),
1014				booleanValue(2, true),
1015			},
1016		},
1017		{
1018			a: []tsm1.BooleanValue{
1019				booleanValue(1, true),
1020			},
1021			b: []tsm1.BooleanValue{
1022				booleanValue(0, false),
1023				booleanValue(1, false), // overwrites a
1024				booleanValue(2, false),
1025				booleanValue(3, false),
1026				booleanValue(4, false),
1027			},
1028			exp: []tsm1.BooleanValue{
1029				booleanValue(0, false),
1030				booleanValue(1, false),
1031				booleanValue(2, false),
1032				booleanValue(3, false),
1033				booleanValue(4, false),
1034			},
1035		},
1036		{
1037			a: []tsm1.BooleanValue{
1038				booleanValue(1, true),
1039				booleanValue(2, true),
1040				booleanValue(3, true),
1041				booleanValue(4, true),
1042			},
1043
1044			b: []tsm1.BooleanValue{
1045				booleanValue(1, false), // overwrites a
1046				booleanValue(2, false), // overwrites a
1047			},
1048			exp: []tsm1.BooleanValue{
1049				booleanValue(1, false), // overwrites a
1050				booleanValue(2, false), // overwrites a
1051				booleanValue(3, true),
1052				booleanValue(4, true),
1053			},
1054		},
1055		{
1056			a: []tsm1.BooleanValue{
1057				booleanValue(1, true),
1058				booleanValue(2, true),
1059				booleanValue(3, true),
1060				booleanValue(4, true),
1061			},
1062
1063			b: []tsm1.BooleanValue{
1064				booleanValue(1, false), // overwrites a
1065				booleanValue(2, false), // overwrites a
1066				booleanValue(3, false),
1067				booleanValue(4, false),
1068			},
1069			exp: []tsm1.BooleanValue{
1070				booleanValue(1, false),
1071				booleanValue(2, false),
1072				booleanValue(3, false),
1073				booleanValue(4, false),
1074			},
1075		},
1076		{
1077			a: []tsm1.BooleanValue{
1078				booleanValue(0, true),
1079				booleanValue(1, true),
1080				booleanValue(2, true),
1081				booleanValue(3, true),
1082				booleanValue(4, true),
1083			},
1084			b: []tsm1.BooleanValue{
1085				booleanValue(0, false),
1086				booleanValue(2, false),
1087				booleanValue(4, false),
1088			},
1089			exp: []tsm1.BooleanValue{
1090				booleanValue(0, false),
1091				booleanValue(1, true),
1092				booleanValue(2, false),
1093				booleanValue(3, true),
1094				booleanValue(4, false),
1095			},
1096		},
1097	}
1098
1099	for i, test := range tests {
1100		got := tsm1.BooleanValues(test.a).Merge(test.b)
1101		if exp, got := len(test.exp), len(got); exp != got {
1102			t.Fatalf("test(%d): value length mismatch: exp %v, got %v", i, exp, got)
1103		}
1104
1105		dedup := tsm1.BooleanValues(append(test.a, test.b...)).Deduplicate()
1106
1107		for i := range test.exp {
1108			if exp, got := test.exp[i].String(), got[i].String(); exp != got {
1109				t.Fatalf("value mismatch:\n exp %v\n got %v", exp, got)
1110			}
1111
1112			if exp, got := test.exp[i].String(), dedup[i].String(); exp != got {
1113				t.Fatalf("value mismatch:\n exp %v\n got %v", exp, got)
1114			}
1115		}
1116	}
1117}
1118
1119func TestStringValues_Merge(t *testing.T) {
1120	stringValue := func(t int64, f string) tsm1.StringValue {
1121		return tsm1.NewValue(t, f).(tsm1.StringValue)
1122	}
1123
1124	tests := []struct {
1125		a, b, exp []tsm1.StringValue
1126	}{
1127
1128		{ // empty a
1129			a: []tsm1.StringValue{},
1130
1131			b: []tsm1.StringValue{
1132				stringValue(1, "10"),
1133				stringValue(2, "20"),
1134			},
1135			exp: []tsm1.StringValue{
1136				stringValue(1, "10"),
1137				stringValue(2, "20"),
1138			},
1139		},
1140		{ // empty b
1141			a: []tsm1.StringValue{
1142				stringValue(1, "1"),
1143				stringValue(2, "2"),
1144			},
1145
1146			b: []tsm1.StringValue{},
1147			exp: []tsm1.StringValue{
1148				stringValue(1, "1"),
1149				stringValue(2, "2"),
1150			},
1151		},
1152		{
1153			a: []tsm1.StringValue{
1154				stringValue(1, "1"),
1155			},
1156			b: []tsm1.StringValue{
1157				stringValue(0, "0"),
1158				stringValue(1, "10"), // overwrites a
1159				stringValue(2, "20"),
1160				stringValue(3, "30"),
1161				stringValue(4, "40"),
1162			},
1163			exp: []tsm1.StringValue{
1164				stringValue(0, "0"),
1165				stringValue(1, "10"),
1166				stringValue(2, "20"),
1167				stringValue(3, "30"),
1168				stringValue(4, "40"),
1169			},
1170		},
1171		{
1172			a: []tsm1.StringValue{
1173				stringValue(1, "1"),
1174				stringValue(2, "2"),
1175				stringValue(3, "3"),
1176				stringValue(4, "4"),
1177			},
1178
1179			b: []tsm1.StringValue{
1180				stringValue(1, "10"), // overwrites a
1181				stringValue(2, "20"), // overwrites a
1182			},
1183			exp: []tsm1.StringValue{
1184				stringValue(1, "10"),
1185				stringValue(2, "20"),
1186				stringValue(3, "3"),
1187				stringValue(4, "4"),
1188			},
1189		},
1190		{
1191			a: []tsm1.StringValue{
1192				stringValue(1, "1"),
1193				stringValue(2, "2"),
1194				stringValue(3, "3"),
1195				stringValue(4, "4"),
1196			},
1197
1198			b: []tsm1.StringValue{
1199				stringValue(1, "10"), // overwrites a
1200				stringValue(2, "20"), // overwrites a
1201				stringValue(3, "30"),
1202				stringValue(4, "40"),
1203			},
1204			exp: []tsm1.StringValue{
1205				stringValue(1, "10"),
1206				stringValue(2, "20"),
1207				stringValue(3, "30"),
1208				stringValue(4, "40"),
1209			},
1210		},
1211		{
1212			a: []tsm1.StringValue{
1213				stringValue(0, "0"),
1214				stringValue(1, "1"),
1215				stringValue(2, "2"),
1216				stringValue(3, "3"),
1217				stringValue(4, "4"),
1218			},
1219			b: []tsm1.StringValue{
1220				stringValue(0, "0"),
1221				stringValue(2, "20"),
1222				stringValue(4, "40"),
1223			},
1224			exp: []tsm1.StringValue{
1225				stringValue(0, "0.0"),
1226				stringValue(1, "1"),
1227				stringValue(2, "20"),
1228				stringValue(3, "3"),
1229				stringValue(4, "40"),
1230			},
1231		},
1232	}
1233
1234	for i, test := range tests {
1235		if i != 2 {
1236			continue
1237		}
1238
1239		got := tsm1.StringValues(test.a).Merge(test.b)
1240		if exp, got := len(test.exp), len(got); exp != got {
1241			t.Fatalf("test(%d): value length mismatch: exp %v, got %v", i, exp, got)
1242		}
1243
1244		dedup := tsm1.StringValues(append(test.a, test.b...)).Deduplicate()
1245
1246		for i := range test.exp {
1247			if exp, got := test.exp[i].String(), got[i].String(); exp != got {
1248				t.Fatalf("value mismatch:\n exp %v\n got %v", exp, got)
1249			}
1250
1251			if exp, got := test.exp[i].String(), dedup[i].String(); exp != got {
1252				t.Fatalf("value mismatch:\n exp %v\n got %v", exp, got)
1253			}
1254		}
1255	}
1256}
1257func getTimes(n, step int, precision time.Duration) []int64 {
1258	t := time.Now().Round(precision).UnixNano()
1259	a := make([]int64, n)
1260	for i := 0; i < n; i++ {
1261		a[i] = t + (time.Duration(i*60) * precision).Nanoseconds()
1262	}
1263	return a
1264}
1265
1266func BenchmarkDecodeBlock_Float_Empty(b *testing.B) {
1267	valueCount := 1000
1268	times := getTimes(valueCount, 60, time.Second)
1269	values := make([]tsm1.Value, len(times))
1270	for i, t := range times {
1271		values[i] = tsm1.NewValue(t, float64(i))
1272	}
1273
1274	bytes, err := tsm1.Values(values).Encode(nil)
1275	if err != nil {
1276		b.Fatalf("unexpected error: %v", err)
1277	}
1278
1279	var decodedValues []tsm1.Value
1280	b.ResetTimer()
1281	for i := 0; i < b.N; i++ {
1282		_, err = tsm1.DecodeBlock(bytes, decodedValues)
1283		if err != nil {
1284			b.Fatalf("unexpected error decoding block: %v", err)
1285		}
1286	}
1287}
1288
1289func BenchmarkDecodeBlock_Float_EqualSize(b *testing.B) {
1290	valueCount := 1000
1291	times := getTimes(valueCount, 60, time.Second)
1292	values := make([]tsm1.Value, len(times))
1293	for i, t := range times {
1294		values[i] = tsm1.NewValue(t, float64(i))
1295	}
1296
1297	bytes, err := tsm1.Values(values).Encode(nil)
1298	if err != nil {
1299		b.Fatalf("unexpected error: %v", err)
1300	}
1301
1302	decodedValues := make([]tsm1.Value, len(values))
1303	b.ResetTimer()
1304	for i := 0; i < b.N; i++ {
1305		_, err = tsm1.DecodeBlock(bytes, decodedValues)
1306		if err != nil {
1307			b.Fatalf("unexpected error decoding block: %v", err)
1308		}
1309	}
1310}
1311
1312func BenchmarkDecodeBlock_Float_TypeSpecific(b *testing.B) {
1313	valueCount := 1000
1314	times := getTimes(valueCount, 60, time.Second)
1315	values := make([]tsm1.Value, len(times))
1316	for i, t := range times {
1317		values[i] = tsm1.NewValue(t, float64(i))
1318	}
1319
1320	bytes, err := tsm1.Values(values).Encode(nil)
1321	if err != nil {
1322		b.Fatalf("unexpected error: %v", err)
1323	}
1324
1325	decodedValues := make([]tsm1.FloatValue, len(values))
1326	b.ResetTimer()
1327	for i := 0; i < b.N; i++ {
1328		_, err = tsm1.DecodeFloatBlock(bytes, &decodedValues)
1329		if err != nil {
1330			b.Fatalf("unexpected error decoding block: %v", err)
1331		}
1332	}
1333}
1334
1335func BenchmarkDecodeBlock_Integer_Empty(b *testing.B) {
1336	valueCount := 1000
1337	times := getTimes(valueCount, 60, time.Second)
1338	values := make([]tsm1.Value, len(times))
1339	for i, t := range times {
1340		values[i] = tsm1.NewValue(t, int64(i))
1341	}
1342
1343	bytes, err := tsm1.Values(values).Encode(nil)
1344	if err != nil {
1345		b.Fatalf("unexpected error: %v", err)
1346	}
1347
1348	var decodedValues []tsm1.Value
1349	b.ResetTimer()
1350	for i := 0; i < b.N; i++ {
1351		_, err = tsm1.DecodeBlock(bytes, decodedValues)
1352		if err != nil {
1353			b.Fatalf("unexpected error decoding block: %v", err)
1354		}
1355	}
1356}
1357
1358func BenchmarkDecodeBlock_Integer_EqualSize(b *testing.B) {
1359	valueCount := 1000
1360	times := getTimes(valueCount, 60, time.Second)
1361	values := make([]tsm1.Value, len(times))
1362	for i, t := range times {
1363		values[i] = tsm1.NewValue(t, int64(i))
1364	}
1365
1366	bytes, err := tsm1.Values(values).Encode(nil)
1367	if err != nil {
1368		b.Fatalf("unexpected error: %v", err)
1369	}
1370
1371	decodedValues := make([]tsm1.Value, len(values))
1372	b.ResetTimer()
1373	for i := 0; i < b.N; i++ {
1374		_, err = tsm1.DecodeBlock(bytes, decodedValues)
1375		if err != nil {
1376			b.Fatalf("unexpected error decoding block: %v", err)
1377		}
1378	}
1379}
1380
1381func BenchmarkDecodeBlock_Integer_TypeSpecific(b *testing.B) {
1382	valueCount := 1000
1383	times := getTimes(valueCount, 60, time.Second)
1384	values := make([]tsm1.Value, len(times))
1385	for i, t := range times {
1386		values[i] = tsm1.NewValue(t, int64(i))
1387	}
1388
1389	bytes, err := tsm1.Values(values).Encode(nil)
1390	if err != nil {
1391		b.Fatalf("unexpected error: %v", err)
1392	}
1393
1394	decodedValues := make([]tsm1.IntegerValue, len(values))
1395	b.ResetTimer()
1396	for i := 0; i < b.N; i++ {
1397		_, err = tsm1.DecodeIntegerBlock(bytes, &decodedValues)
1398		if err != nil {
1399			b.Fatalf("unexpected error decoding block: %v", err)
1400		}
1401	}
1402}
1403
1404func BenchmarkDecodeBlock_Boolean_Empty(b *testing.B) {
1405	valueCount := 1000
1406	times := getTimes(valueCount, 60, time.Second)
1407	values := make([]tsm1.Value, len(times))
1408	for i, t := range times {
1409		values[i] = tsm1.NewValue(t, true)
1410	}
1411
1412	bytes, err := tsm1.Values(values).Encode(nil)
1413	if err != nil {
1414		b.Fatalf("unexpected error: %v", err)
1415	}
1416
1417	var decodedValues []tsm1.Value
1418	b.ResetTimer()
1419	for i := 0; i < b.N; i++ {
1420		_, err = tsm1.DecodeBlock(bytes, decodedValues)
1421		if err != nil {
1422			b.Fatalf("unexpected error decoding block: %v", err)
1423		}
1424	}
1425}
1426
1427func BenchmarkDecodeBlock_Boolean_EqualSize(b *testing.B) {
1428	valueCount := 1000
1429	times := getTimes(valueCount, 60, time.Second)
1430	values := make([]tsm1.Value, len(times))
1431	for i, t := range times {
1432		values[i] = tsm1.NewValue(t, true)
1433	}
1434
1435	bytes, err := tsm1.Values(values).Encode(nil)
1436	if err != nil {
1437		b.Fatalf("unexpected error: %v", err)
1438	}
1439
1440	decodedValues := make([]tsm1.Value, len(values))
1441	b.ResetTimer()
1442	for i := 0; i < b.N; i++ {
1443		_, err = tsm1.DecodeBlock(bytes, decodedValues)
1444		if err != nil {
1445			b.Fatalf("unexpected error decoding block: %v", err)
1446		}
1447	}
1448}
1449
1450func BenchmarkDecodeBlock_Boolean_TypeSpecific(b *testing.B) {
1451	valueCount := 1000
1452	times := getTimes(valueCount, 60, time.Second)
1453	values := make([]tsm1.Value, len(times))
1454	for i, t := range times {
1455		values[i] = tsm1.NewValue(t, true)
1456	}
1457
1458	bytes, err := tsm1.Values(values).Encode(nil)
1459	if err != nil {
1460		b.Fatalf("unexpected error: %v", err)
1461	}
1462
1463	decodedValues := make([]tsm1.BooleanValue, len(values))
1464	b.ResetTimer()
1465	for i := 0; i < b.N; i++ {
1466		_, err = tsm1.DecodeBooleanBlock(bytes, &decodedValues)
1467		if err != nil {
1468			b.Fatalf("unexpected error decoding block: %v", err)
1469		}
1470	}
1471}
1472
1473func BenchmarkDecodeBlock_String_Empty(b *testing.B) {
1474	valueCount := 1000
1475	times := getTimes(valueCount, 60, time.Second)
1476	values := make([]tsm1.Value, len(times))
1477	for i, t := range times {
1478		values[i] = tsm1.NewValue(t, fmt.Sprintf("value %d", i))
1479	}
1480
1481	bytes, err := tsm1.Values(values).Encode(nil)
1482	if err != nil {
1483		b.Fatalf("unexpected error: %v", err)
1484	}
1485
1486	var decodedValues []tsm1.Value
1487	b.ResetTimer()
1488	for i := 0; i < b.N; i++ {
1489		_, err = tsm1.DecodeBlock(bytes, decodedValues)
1490		if err != nil {
1491			b.Fatalf("unexpected error decoding block: %v", err)
1492		}
1493	}
1494}
1495
1496func BenchmarkDecodeBlock_String_EqualSize(b *testing.B) {
1497	valueCount := 1000
1498	times := getTimes(valueCount, 60, time.Second)
1499	values := make([]tsm1.Value, len(times))
1500	for i, t := range times {
1501		values[i] = tsm1.NewValue(t, fmt.Sprintf("value %d", i))
1502	}
1503
1504	bytes, err := tsm1.Values(values).Encode(nil)
1505	if err != nil {
1506		b.Fatalf("unexpected error: %v", err)
1507	}
1508
1509	decodedValues := make([]tsm1.Value, len(values))
1510	b.ResetTimer()
1511	for i := 0; i < b.N; i++ {
1512		_, err = tsm1.DecodeBlock(bytes, decodedValues)
1513		if err != nil {
1514			b.Fatalf("unexpected error decoding block: %v", err)
1515		}
1516	}
1517}
1518
1519func BenchmarkDecodeBlock_String_TypeSpecific(b *testing.B) {
1520	valueCount := 1000
1521	times := getTimes(valueCount, 60, time.Second)
1522	values := make([]tsm1.Value, len(times))
1523	for i, t := range times {
1524		values[i] = tsm1.NewValue(t, fmt.Sprintf("value %d", i))
1525	}
1526
1527	bytes, err := tsm1.Values(values).Encode(nil)
1528	if err != nil {
1529		b.Fatalf("unexpected error: %v", err)
1530	}
1531
1532	decodedValues := make([]tsm1.StringValue, len(values))
1533	b.ResetTimer()
1534	for i := 0; i < b.N; i++ {
1535		_, err = tsm1.DecodeStringBlock(bytes, &decodedValues)
1536		if err != nil {
1537			b.Fatalf("unexpected error decoding block: %v", err)
1538		}
1539	}
1540}
1541
1542func BenchmarkValues_Deduplicate(b *testing.B) {
1543	valueCount := 1000
1544	times := getTimes(valueCount, 60, time.Second)
1545	values := make([]tsm1.Value, len(times))
1546	for i, t := range times {
1547		values[i] = tsm1.NewValue(t, fmt.Sprintf("value %d", i))
1548	}
1549	values = append(values, values...)
1550
1551	b.ResetTimer()
1552	for i := 0; i < b.N; i++ {
1553		tsm1.Values(values).Deduplicate()
1554	}
1555}
1556
1557func BenchmarkValues_Merge(b *testing.B) {
1558	valueCount := 1000
1559	times := getTimes(valueCount, 60, time.Second)
1560	a := make([]tsm1.Value, len(times))
1561	c := make([]tsm1.Value, len(times))
1562
1563	for i, t := range times {
1564		a[i] = tsm1.NewValue(t, float64(i))
1565		c[i] = tsm1.NewValue(t+1, float64(i))
1566	}
1567
1568	b.ResetTimer()
1569	benchmarkMerge(a, c, b)
1570}
1571
1572func BenchmarkValues_MergeDisjoint(b *testing.B) {
1573	valueCount := 1000
1574	times := getTimes(valueCount, 60, time.Second)
1575	a := make([]tsm1.Value, len(times))
1576	c := make([]tsm1.Value, len(times))
1577
1578	for i, t := range times {
1579		a[i] = tsm1.NewValue(t, float64(i))
1580		c[i] = tsm1.NewValue(times[len(times)-1]+int64((i+1)*1e9), float64(i))
1581	}
1582
1583	b.ResetTimer()
1584	benchmarkMerge(a, c, b)
1585}
1586
1587func BenchmarkValues_MergeSame(b *testing.B) {
1588	valueCount := 1000
1589	times := getTimes(valueCount, 60, time.Second)
1590	a := make([]tsm1.Value, len(times))
1591	c := make([]tsm1.Value, len(times))
1592
1593	for i, t := range times {
1594		a[i] = tsm1.NewValue(t, float64(i))
1595		c[i] = tsm1.NewValue(t, float64(i))
1596	}
1597
1598	b.ResetTimer()
1599	benchmarkMerge(a, c, b)
1600}
1601
1602func BenchmarkValues_MergeSimilar(b *testing.B) {
1603	valueCount := 1000
1604	times := getTimes(valueCount, 60, time.Second)
1605	a := make([]tsm1.Value, len(times))
1606	c := make([]tsm1.Value, len(times))
1607
1608	for i, t := range times {
1609		a[i] = tsm1.NewValue(t, float64(i))
1610		if i == 0 {
1611			t++
1612		}
1613		c[i] = tsm1.NewValue(t, float64(i))
1614	}
1615
1616	b.ResetTimer()
1617	benchmarkMerge(a, c, b)
1618}
1619
1620func BenchmarkValues_MergeUnevenA(b *testing.B) {
1621	valueCount := 1000
1622	times := getTimes(valueCount, 60, time.Second)
1623	a := make([]tsm1.Value, len(times))
1624	c := make([]tsm1.Value, len(times))
1625
1626	for i, t := range times {
1627		a[i] = tsm1.NewValue(t, float64(i))
1628		c[i] = tsm1.NewValue(t, float64(i))
1629	}
1630
1631	b.ResetTimer()
1632	benchmarkMerge(a[:700], c[:10], b)
1633}
1634
1635func BenchmarkValues_MergeUnevenB(b *testing.B) {
1636	valueCount := 1000
1637	times := getTimes(valueCount, 60, time.Second)
1638	a := make([]tsm1.Value, len(times))
1639	c := make([]tsm1.Value, len(times))
1640
1641	for i, t := range times {
1642		a[i] = tsm1.NewValue(t, float64(i))
1643		c[i] = tsm1.NewValue(t, float64(i))
1644	}
1645
1646	b.ResetTimer()
1647	benchmarkMerge(a[:10], c[:700], b)
1648}
1649
1650func benchmarkMerge(a, c tsm1.Values, b *testing.B) {
1651	for i := 0; i < b.N; i++ {
1652		b.StopTimer()
1653		aa := make(tsm1.Values, len(a))
1654		copy(aa, a)
1655		cc := make(tsm1.Values, len(c))
1656		copy(cc, c)
1657		b.StartTimer()
1658		tsm1.Values(aa).Merge(tsm1.Values(cc))
1659	}
1660}
1661
1662func BenchmarkValues_EncodeInteger(b *testing.B) {
1663	valueCount := 1024
1664	times := getTimes(valueCount, 60, time.Second)
1665	a := make([]tsm1.Value, len(times))
1666
1667	for i, t := range times {
1668		a[i] = tsm1.NewValue(t, int64(i))
1669	}
1670
1671	buf := make([]byte, 1024*8)
1672	b.ResetTimer()
1673	for i := 0; i < b.N; i++ {
1674		tsm1.Values(a).Encode(buf)
1675	}
1676}
1677
1678func BenchmarkValues_EncodeFloat(b *testing.B) {
1679	valueCount := 1024
1680	times := getTimes(valueCount, 60, time.Second)
1681	a := make([]tsm1.Value, len(times))
1682
1683	for i, t := range times {
1684		a[i] = tsm1.NewValue(t, float64(i))
1685	}
1686
1687	buf := make([]byte, 1024*8)
1688	b.ResetTimer()
1689	for i := 0; i < b.N; i++ {
1690		tsm1.Values(a).Encode(buf)
1691	}
1692}
1693func BenchmarkValues_EncodeString(b *testing.B) {
1694	valueCount := 1024
1695	times := getTimes(valueCount, 60, time.Second)
1696	a := make([]tsm1.Value, len(times))
1697
1698	for i, t := range times {
1699		a[i] = tsm1.NewValue(t, fmt.Sprintf("%d", i))
1700	}
1701
1702	buf := make([]byte, 1024*8)
1703	b.ResetTimer()
1704	for i := 0; i < b.N; i++ {
1705		tsm1.Values(a).Encode(buf)
1706	}
1707}
1708func BenchmarkValues_EncodeBool(b *testing.B) {
1709	valueCount := 1024
1710	times := getTimes(valueCount, 60, time.Second)
1711	a := make([]tsm1.Value, len(times))
1712
1713	for i, t := range times {
1714		if i%2 == 0 {
1715			a[i] = tsm1.NewValue(t, true)
1716		} else {
1717			a[i] = tsm1.NewValue(t, false)
1718		}
1719	}
1720
1721	buf := make([]byte, 1024*8)
1722	b.ResetTimer()
1723	for i := 0; i < b.N; i++ {
1724		tsm1.Values(a).Encode(buf)
1725	}
1726}
1727