1/*
2Copyright 2016 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package bigtable
18
19import (
20	"encoding/json"
21	"fmt"
22	"io/ioutil"
23	"strings"
24	"testing"
25
26	"cloud.google.com/go/internal/testutil"
27	"github.com/golang/protobuf/proto"
28	"github.com/golang/protobuf/ptypes/wrappers"
29	btspb "google.golang.org/genproto/googleapis/bigtable/v2"
30)
31
32// Indicates that a field in the proto should be omitted, rather than included
33// as a wrapped empty string.
34const nilStr = "<>"
35
36func TestSingleCell(t *testing.T) {
37	cr := newChunkReader()
38
39	// All in one cell
40	row, err := cr.Process(cc("rk", "fm", "col", 1, "value", 0, true, []string{}))
41	if err != nil {
42		t.Fatalf("Processing chunk: %v", err)
43	}
44	if row == nil {
45		t.Fatalf("Missing row")
46	}
47	if len(row["fm"]) != 1 {
48		t.Fatalf("Family name length mismatch %d, %d", 1, len(row["fm"]))
49	}
50	want := []ReadItem{ri("rk", "fm", "col", 1, "value", []string{})}
51	if !testutil.Equal(row["fm"], want) {
52		t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm"], want)
53	}
54	if err := cr.Close(); err != nil {
55		t.Fatalf("Close: %v", err)
56	}
57}
58
59func TestMultipleCells(t *testing.T) {
60	cr := newChunkReader()
61
62	mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "val1", 0, false, []string{}))
63	mustProcess(t, cr, cc("rs", "fm1", "col1", 1, "val2", 0, false, []string{}))
64	mustProcess(t, cr, cc("rs", "fm1", "col2", 0, "val3", 0, false, []string{}))
65	mustProcess(t, cr, cc("rs", "fm2", "col1", 0, "val4", 0, false, []string{}))
66	row, err := cr.Process(cc("rs", "fm2", "col2", 1, "extralongval5", 0, true, []string{}))
67	if err != nil {
68		t.Fatalf("Processing chunk: %v", err)
69	}
70	if row == nil {
71		t.Fatalf("Missing row")
72	}
73
74	want := []ReadItem{
75		ri("rs", "fm1", "col1", 0, "val1", []string{}),
76		ri("rs", "fm1", "col1", 1, "val2", []string{}),
77		ri("rs", "fm1", "col2", 0, "val3", []string{}),
78	}
79	if !testutil.Equal(row["fm1"], want) {
80		t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want)
81	}
82	want = []ReadItem{
83		ri("rs", "fm2", "col1", 0, "val4", []string{}),
84		ri("rs", "fm2", "col2", 1, "extralongval5", []string{}),
85	}
86	if !testutil.Equal(row["fm2"], want) {
87		t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm2"], want)
88	}
89	if err := cr.Close(); err != nil {
90		t.Fatalf("Close: %v", err)
91	}
92}
93
94func TestSplitCells(t *testing.T) {
95	cr := newChunkReader()
96
97	mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "hello ", 11, false, []string{}))
98	mustProcess(t, cr, ccData("world", 0, false))
99	row, err := cr.Process(cc("rs", "fm1", "col2", 0, "val2", 0, true, []string{}))
100	if err != nil {
101		t.Fatalf("Processing chunk: %v", err)
102	}
103	if row == nil {
104		t.Fatalf("Missing row")
105	}
106
107	want := []ReadItem{
108		ri("rs", "fm1", "col1", 0, "hello world", []string{}),
109		ri("rs", "fm1", "col2", 0, "val2", []string{}),
110	}
111	if !testutil.Equal(row["fm1"], want) {
112		t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want)
113	}
114	if err := cr.Close(); err != nil {
115		t.Fatalf("Close: %v", err)
116	}
117}
118
119func TestMultipleRows(t *testing.T) {
120	cr := newChunkReader()
121
122	row, err := cr.Process(cc("rs1", "fm1", "col1", 1, "val1", 0, true, []string{}))
123	if err != nil {
124		t.Fatalf("Processing chunk: %v", err)
125	}
126	want := []ReadItem{ri("rs1", "fm1", "col1", 1, "val1", []string{})}
127	if !testutil.Equal(row["fm1"], want) {
128		t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want)
129	}
130
131	row, err = cr.Process(cc("rs2", "fm2", "col2", 2, "val2", 0, true, []string{}))
132	if err != nil {
133		t.Fatalf("Processing chunk: %v", err)
134	}
135	want = []ReadItem{ri("rs2", "fm2", "col2", 2, "val2", []string{})}
136	if !testutil.Equal(row["fm2"], want) {
137		t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm2"], want)
138	}
139
140	if err := cr.Close(); err != nil {
141		t.Fatalf("Close: %v", err)
142	}
143}
144
145func TestBlankQualifier(t *testing.T) {
146	cr := newChunkReader()
147
148	row, err := cr.Process(cc("rs1", "fm1", "", 1, "val1", 0, true, []string{}))
149	if err != nil {
150		t.Fatalf("Processing chunk: %v", err)
151	}
152	want := []ReadItem{ri("rs1", "fm1", "", 1, "val1", []string{})}
153	if !testutil.Equal(row["fm1"], want) {
154		t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want)
155	}
156
157	row, err = cr.Process(cc("rs2", "fm2", "col2", 2, "val2", 0, true, []string{}))
158	if err != nil {
159		t.Fatalf("Processing chunk: %v", err)
160	}
161	want = []ReadItem{ri("rs2", "fm2", "col2", 2, "val2", []string{})}
162	if !testutil.Equal(row["fm2"], want) {
163		t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm2"], want)
164	}
165
166	if err := cr.Close(); err != nil {
167		t.Fatalf("Close: %v", err)
168	}
169}
170
171func TestLabels(t *testing.T) {
172	cr := newChunkReader()
173
174	mustProcess(t, cr, cc("rs1", "fm1", "col1", 0, "hello ", 11, false, []string{"test-label"}))
175	row := mustProcess(t, cr, ccData("world", 0, true))
176	want := []ReadItem{
177		ri("rs1", "fm1", "col1", 0, "hello world", []string{"test-label"}),
178	}
179	if !testutil.Equal(row["fm1"], want) {
180		t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want)
181	}
182
183	row, err := cr.Process(cc("rs2", "fm1", "", 1, "val1", 0, true, []string{"test-label2"}))
184	if err != nil {
185		t.Fatalf("Processing chunk: %v", err)
186	}
187	want = []ReadItem{ri("rs2", "fm1", "", 1, "val1", []string{"test-label2"})}
188	if !testutil.Equal(row["fm1"], want) {
189		t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want)
190	}
191
192	if err := cr.Close(); err != nil {
193		t.Fatalf("Close: %v", err)
194	}
195}
196
197func TestReset(t *testing.T) {
198	cr := newChunkReader()
199	mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "val1", 0, false, []string{}))
200	mustProcess(t, cr, cc("rs", "fm1", "col1", 1, "val2", 0, false, []string{}))
201	mustProcess(t, cr, cc("rs", "fm1", "col2", 0, "val3", 0, false, []string{}))
202	mustProcess(t, cr, ccReset())
203	row := mustProcess(t, cr, cc("rs1", "fm1", "col1", 1, "val1", 0, true, []string{}))
204	want := []ReadItem{ri("rs1", "fm1", "col1", 1, "val1", []string{})}
205	if !testutil.Equal(row["fm1"], want) {
206		t.Fatalf("Reset: got: %v\nwant: %v\n", row["fm1"], want)
207	}
208	if err := cr.Close(); err != nil {
209		t.Fatalf("Close: %v", err)
210	}
211}
212
213func TestNewFamEmptyQualifier(t *testing.T) {
214	cr := newChunkReader()
215
216	mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "val1", 0, false, []string{}))
217	_, err := cr.Process(cc(nilStr, "fm2", nilStr, 0, "val2", 0, true, []string{}))
218	if err == nil {
219		t.Fatalf("Expected error on second chunk with no qualifier set")
220	}
221}
222
223func mustProcess(t *testing.T, cr *chunkReader, cc *btspb.ReadRowsResponse_CellChunk) Row {
224	row, err := cr.Process(cc)
225	if err != nil {
226		t.Fatal(err)
227	}
228	return row
229}
230
231// The read rows acceptance test reads a json file specifying a number of tests,
232// each consisting of one or more cell chunk text protos and one or more resulting
233// cells or errors.
234type AcceptanceTest struct {
235	Tests []TestCase `json:"tests"`
236}
237
238type TestCase struct {
239	Name    string       `json:"name"`
240	Chunks  []string     `json:"chunks"`
241	Results []TestResult `json:"results"`
242}
243
244type TestResult struct {
245	RK    string `json:"rk"`
246	FM    string `json:"fm"`
247	Qual  string `json:"qual"`
248	TS    int64  `json:"ts"`
249	Value string `json:"value"`
250	Error bool   `json:"error"` // If true, expect an error. Ignore any other field.
251}
252
253func TestAcceptance(t *testing.T) {
254	testJSON, err := ioutil.ReadFile("./testdata/read-rows-acceptance-test.json")
255	if err != nil {
256		t.Fatalf("could not open acceptance test file %v", err)
257	}
258
259	var accTest AcceptanceTest
260	err = json.Unmarshal(testJSON, &accTest)
261	if err != nil {
262		t.Fatalf("could not parse acceptance test file: %v", err)
263	}
264
265	for _, test := range accTest.Tests {
266		runTestCase(t, test)
267	}
268}
269
270func runTestCase(t *testing.T, test TestCase) {
271	// Increment an index into the result array as we get results
272	cr := newChunkReader()
273	var results []TestResult
274	var seenErr bool
275	for _, chunkText := range test.Chunks {
276		// Parse and pass each cell chunk to the ChunkReader
277		cc := &btspb.ReadRowsResponse_CellChunk{}
278		err := proto.UnmarshalText(chunkText, cc)
279		if err != nil {
280			t.Errorf("[%s] failed to unmarshal text proto: %s\n%s", test.Name, chunkText, err)
281			return
282		}
283		row, err := cr.Process(cc)
284		if err != nil {
285			results = append(results, TestResult{Error: true})
286			seenErr = true
287			break
288		} else {
289			// Turn the Row into TestResults
290			for fm, ris := range row {
291				for _, ri := range ris {
292					tr := TestResult{
293						RK:    ri.Row,
294						FM:    fm,
295						Qual:  strings.Split(ri.Column, ":")[1],
296						TS:    int64(ri.Timestamp),
297						Value: string(ri.Value),
298					}
299					results = append(results, tr)
300				}
301			}
302		}
303	}
304
305	// Only Close if we don't have an error yet, otherwise Close: is expected.
306	if !seenErr {
307		err := cr.Close()
308		if err != nil {
309			results = append(results, TestResult{Error: true})
310		}
311	}
312
313	got := toSet(results)
314	want := toSet(test.Results)
315	if !testutil.Equal(got, want) {
316		t.Fatalf("[%s]: got: %v\nwant: %v\n", test.Name, got, want)
317	}
318}
319
320func toSet(res []TestResult) map[TestResult]bool {
321	set := make(map[TestResult]bool)
322	for _, tr := range res {
323		set[tr] = true
324	}
325	return set
326}
327
328// ri returns a ReadItem for the given components
329func ri(rk string, fm string, qual string, ts int64, val string, labels []string) ReadItem {
330	return ReadItem{Row: rk, Column: fmt.Sprintf("%s:%s", fm, qual), Value: []byte(val), Timestamp: Timestamp(ts), Labels: labels}
331}
332
333// cc returns a CellChunk proto
334func cc(rk string, fm string, qual string, ts int64, val string, size int32, commit bool, labels []string) *btspb.ReadRowsResponse_CellChunk {
335	// The components of the cell key are wrapped and can be null or empty
336	var rkWrapper []byte
337	if rk == nilStr {
338		rkWrapper = nil
339	} else {
340		rkWrapper = []byte(rk)
341	}
342
343	var fmWrapper *wrappers.StringValue
344	if fm != nilStr {
345		fmWrapper = &wrappers.StringValue{Value: fm}
346	} else {
347		fmWrapper = nil
348	}
349
350	var qualWrapper *wrappers.BytesValue
351	if qual != nilStr {
352		qualWrapper = &wrappers.BytesValue{Value: []byte(qual)}
353	} else {
354		qualWrapper = nil
355	}
356
357	return &btspb.ReadRowsResponse_CellChunk{
358		RowKey:          rkWrapper,
359		FamilyName:      fmWrapper,
360		Qualifier:       qualWrapper,
361		TimestampMicros: ts,
362		Value:           []byte(val),
363		ValueSize:       size,
364		RowStatus:       &btspb.ReadRowsResponse_CellChunk_CommitRow{CommitRow: commit},
365		Labels:          labels,
366	}
367}
368
369// ccData returns a CellChunk with only a value and size
370func ccData(val string, size int32, commit bool) *btspb.ReadRowsResponse_CellChunk {
371	return cc(nilStr, nilStr, nilStr, 0, val, size, commit, []string{})
372}
373
374// ccReset returns a CellChunk with RestRow set to true
375func ccReset() *btspb.ReadRowsResponse_CellChunk {
376	return &btspb.ReadRowsResponse_CellChunk{
377		RowStatus: &btspb.ReadRowsResponse_CellChunk_ResetRow{ResetRow: true}}
378}
379