1/* 2Copyright 2016 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package bigtable 18 19import ( 20 "encoding/json" 21 "fmt" 22 "io/ioutil" 23 "strings" 24 "testing" 25 26 "cloud.google.com/go/internal/testutil" 27 "github.com/golang/protobuf/proto" 28 "github.com/golang/protobuf/ptypes/wrappers" 29 btspb "google.golang.org/genproto/googleapis/bigtable/v2" 30) 31 32// Indicates that a field in the proto should be omitted, rather than included 33// as a wrapped empty string. 34const nilStr = "<>" 35 36func TestSingleCell(t *testing.T) { 37 cr := newChunkReader() 38 39 // All in one cell 40 row, err := cr.Process(cc("rk", "fm", "col", 1, "value", 0, true, []string{})) 41 if err != nil { 42 t.Fatalf("Processing chunk: %v", err) 43 } 44 if row == nil { 45 t.Fatalf("Missing row") 46 } 47 if len(row["fm"]) != 1 { 48 t.Fatalf("Family name length mismatch %d, %d", 1, len(row["fm"])) 49 } 50 want := []ReadItem{ri("rk", "fm", "col", 1, "value", []string{})} 51 if !testutil.Equal(row["fm"], want) { 52 t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm"], want) 53 } 54 if err := cr.Close(); err != nil { 55 t.Fatalf("Close: %v", err) 56 } 57} 58 59func TestMultipleCells(t *testing.T) { 60 cr := newChunkReader() 61 62 mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "val1", 0, false, []string{})) 63 mustProcess(t, cr, cc("rs", "fm1", "col1", 1, "val2", 0, false, []string{})) 64 mustProcess(t, cr, cc("rs", "fm1", "col2", 0, "val3", 0, false, []string{})) 65 mustProcess(t, cr, cc("rs", "fm2", "col1", 0, "val4", 0, false, []string{})) 66 row, err := cr.Process(cc("rs", "fm2", "col2", 1, "extralongval5", 0, true, []string{})) 67 if err != nil { 68 t.Fatalf("Processing chunk: %v", err) 69 } 70 if row == nil { 71 t.Fatalf("Missing row") 72 } 73 74 want := []ReadItem{ 75 ri("rs", "fm1", "col1", 0, "val1", []string{}), 76 ri("rs", "fm1", "col1", 1, "val2", []string{}), 77 ri("rs", "fm1", "col2", 0, "val3", []string{}), 78 } 79 if !testutil.Equal(row["fm1"], want) { 80 t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want) 81 } 82 want = []ReadItem{ 83 ri("rs", "fm2", "col1", 0, "val4", []string{}), 84 ri("rs", "fm2", "col2", 1, "extralongval5", []string{}), 85 } 86 if !testutil.Equal(row["fm2"], want) { 87 t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm2"], want) 88 } 89 if err := cr.Close(); err != nil { 90 t.Fatalf("Close: %v", err) 91 } 92} 93 94func TestSplitCells(t *testing.T) { 95 cr := newChunkReader() 96 97 mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "hello ", 11, false, []string{})) 98 mustProcess(t, cr, ccData("world", 0, false)) 99 row, err := cr.Process(cc("rs", "fm1", "col2", 0, "val2", 0, true, []string{})) 100 if err != nil { 101 t.Fatalf("Processing chunk: %v", err) 102 } 103 if row == nil { 104 t.Fatalf("Missing row") 105 } 106 107 want := []ReadItem{ 108 ri("rs", "fm1", "col1", 0, "hello world", []string{}), 109 ri("rs", "fm1", "col2", 0, "val2", []string{}), 110 } 111 if !testutil.Equal(row["fm1"], want) { 112 t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want) 113 } 114 if err := cr.Close(); err != nil { 115 t.Fatalf("Close: %v", err) 116 } 117} 118 119func TestMultipleRows(t *testing.T) { 120 cr := newChunkReader() 121 122 row, err := cr.Process(cc("rs1", "fm1", "col1", 1, "val1", 0, true, []string{})) 123 if err != nil { 124 t.Fatalf("Processing chunk: %v", err) 125 } 126 want := []ReadItem{ri("rs1", "fm1", "col1", 1, "val1", []string{})} 127 if !testutil.Equal(row["fm1"], want) { 128 t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want) 129 } 130 131 row, err = cr.Process(cc("rs2", "fm2", "col2", 2, "val2", 0, true, []string{})) 132 if err != nil { 133 t.Fatalf("Processing chunk: %v", err) 134 } 135 want = []ReadItem{ri("rs2", "fm2", "col2", 2, "val2", []string{})} 136 if !testutil.Equal(row["fm2"], want) { 137 t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm2"], want) 138 } 139 140 if err := cr.Close(); err != nil { 141 t.Fatalf("Close: %v", err) 142 } 143} 144 145func TestBlankQualifier(t *testing.T) { 146 cr := newChunkReader() 147 148 row, err := cr.Process(cc("rs1", "fm1", "", 1, "val1", 0, true, []string{})) 149 if err != nil { 150 t.Fatalf("Processing chunk: %v", err) 151 } 152 want := []ReadItem{ri("rs1", "fm1", "", 1, "val1", []string{})} 153 if !testutil.Equal(row["fm1"], want) { 154 t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want) 155 } 156 157 row, err = cr.Process(cc("rs2", "fm2", "col2", 2, "val2", 0, true, []string{})) 158 if err != nil { 159 t.Fatalf("Processing chunk: %v", err) 160 } 161 want = []ReadItem{ri("rs2", "fm2", "col2", 2, "val2", []string{})} 162 if !testutil.Equal(row["fm2"], want) { 163 t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm2"], want) 164 } 165 166 if err := cr.Close(); err != nil { 167 t.Fatalf("Close: %v", err) 168 } 169} 170 171func TestLabels(t *testing.T) { 172 cr := newChunkReader() 173 174 mustProcess(t, cr, cc("rs1", "fm1", "col1", 0, "hello ", 11, false, []string{"test-label"})) 175 row := mustProcess(t, cr, ccData("world", 0, true)) 176 want := []ReadItem{ 177 ri("rs1", "fm1", "col1", 0, "hello world", []string{"test-label"}), 178 } 179 if !testutil.Equal(row["fm1"], want) { 180 t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want) 181 } 182 183 row, err := cr.Process(cc("rs2", "fm1", "", 1, "val1", 0, true, []string{"test-label2"})) 184 if err != nil { 185 t.Fatalf("Processing chunk: %v", err) 186 } 187 want = []ReadItem{ri("rs2", "fm1", "", 1, "val1", []string{"test-label2"})} 188 if !testutil.Equal(row["fm1"], want) { 189 t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want) 190 } 191 192 if err := cr.Close(); err != nil { 193 t.Fatalf("Close: %v", err) 194 } 195} 196 197func TestReset(t *testing.T) { 198 cr := newChunkReader() 199 mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "val1", 0, false, []string{})) 200 mustProcess(t, cr, cc("rs", "fm1", "col1", 1, "val2", 0, false, []string{})) 201 mustProcess(t, cr, cc("rs", "fm1", "col2", 0, "val3", 0, false, []string{})) 202 mustProcess(t, cr, ccReset()) 203 row := mustProcess(t, cr, cc("rs1", "fm1", "col1", 1, "val1", 0, true, []string{})) 204 want := []ReadItem{ri("rs1", "fm1", "col1", 1, "val1", []string{})} 205 if !testutil.Equal(row["fm1"], want) { 206 t.Fatalf("Reset: got: %v\nwant: %v\n", row["fm1"], want) 207 } 208 if err := cr.Close(); err != nil { 209 t.Fatalf("Close: %v", err) 210 } 211} 212 213func TestNewFamEmptyQualifier(t *testing.T) { 214 cr := newChunkReader() 215 216 mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "val1", 0, false, []string{})) 217 _, err := cr.Process(cc(nilStr, "fm2", nilStr, 0, "val2", 0, true, []string{})) 218 if err == nil { 219 t.Fatalf("Expected error on second chunk with no qualifier set") 220 } 221} 222 223func mustProcess(t *testing.T, cr *chunkReader, cc *btspb.ReadRowsResponse_CellChunk) Row { 224 row, err := cr.Process(cc) 225 if err != nil { 226 t.Fatal(err) 227 } 228 return row 229} 230 231// The read rows acceptance test reads a json file specifying a number of tests, 232// each consisting of one or more cell chunk text protos and one or more resulting 233// cells or errors. 234type AcceptanceTest struct { 235 Tests []TestCase `json:"tests"` 236} 237 238type TestCase struct { 239 Name string `json:"name"` 240 Chunks []string `json:"chunks"` 241 Results []TestResult `json:"results"` 242} 243 244type TestResult struct { 245 RK string `json:"rk"` 246 FM string `json:"fm"` 247 Qual string `json:"qual"` 248 TS int64 `json:"ts"` 249 Value string `json:"value"` 250 Error bool `json:"error"` // If true, expect an error. Ignore any other field. 251} 252 253func TestAcceptance(t *testing.T) { 254 testJSON, err := ioutil.ReadFile("./testdata/read-rows-acceptance-test.json") 255 if err != nil { 256 t.Fatalf("could not open acceptance test file %v", err) 257 } 258 259 var accTest AcceptanceTest 260 err = json.Unmarshal(testJSON, &accTest) 261 if err != nil { 262 t.Fatalf("could not parse acceptance test file: %v", err) 263 } 264 265 for _, test := range accTest.Tests { 266 runTestCase(t, test) 267 } 268} 269 270func runTestCase(t *testing.T, test TestCase) { 271 // Increment an index into the result array as we get results 272 cr := newChunkReader() 273 var results []TestResult 274 var seenErr bool 275 for _, chunkText := range test.Chunks { 276 // Parse and pass each cell chunk to the ChunkReader 277 cc := &btspb.ReadRowsResponse_CellChunk{} 278 err := proto.UnmarshalText(chunkText, cc) 279 if err != nil { 280 t.Errorf("[%s] failed to unmarshal text proto: %s\n%s", test.Name, chunkText, err) 281 return 282 } 283 row, err := cr.Process(cc) 284 if err != nil { 285 results = append(results, TestResult{Error: true}) 286 seenErr = true 287 break 288 } else { 289 // Turn the Row into TestResults 290 for fm, ris := range row { 291 for _, ri := range ris { 292 tr := TestResult{ 293 RK: ri.Row, 294 FM: fm, 295 Qual: strings.Split(ri.Column, ":")[1], 296 TS: int64(ri.Timestamp), 297 Value: string(ri.Value), 298 } 299 results = append(results, tr) 300 } 301 } 302 } 303 } 304 305 // Only Close if we don't have an error yet, otherwise Close: is expected. 306 if !seenErr { 307 err := cr.Close() 308 if err != nil { 309 results = append(results, TestResult{Error: true}) 310 } 311 } 312 313 got := toSet(results) 314 want := toSet(test.Results) 315 if !testutil.Equal(got, want) { 316 t.Fatalf("[%s]: got: %v\nwant: %v\n", test.Name, got, want) 317 } 318} 319 320func toSet(res []TestResult) map[TestResult]bool { 321 set := make(map[TestResult]bool) 322 for _, tr := range res { 323 set[tr] = true 324 } 325 return set 326} 327 328// ri returns a ReadItem for the given components 329func ri(rk string, fm string, qual string, ts int64, val string, labels []string) ReadItem { 330 return ReadItem{Row: rk, Column: fmt.Sprintf("%s:%s", fm, qual), Value: []byte(val), Timestamp: Timestamp(ts), Labels: labels} 331} 332 333// cc returns a CellChunk proto 334func cc(rk string, fm string, qual string, ts int64, val string, size int32, commit bool, labels []string) *btspb.ReadRowsResponse_CellChunk { 335 // The components of the cell key are wrapped and can be null or empty 336 var rkWrapper []byte 337 if rk == nilStr { 338 rkWrapper = nil 339 } else { 340 rkWrapper = []byte(rk) 341 } 342 343 var fmWrapper *wrappers.StringValue 344 if fm != nilStr { 345 fmWrapper = &wrappers.StringValue{Value: fm} 346 } else { 347 fmWrapper = nil 348 } 349 350 var qualWrapper *wrappers.BytesValue 351 if qual != nilStr { 352 qualWrapper = &wrappers.BytesValue{Value: []byte(qual)} 353 } else { 354 qualWrapper = nil 355 } 356 357 return &btspb.ReadRowsResponse_CellChunk{ 358 RowKey: rkWrapper, 359 FamilyName: fmWrapper, 360 Qualifier: qualWrapper, 361 TimestampMicros: ts, 362 Value: []byte(val), 363 ValueSize: size, 364 RowStatus: &btspb.ReadRowsResponse_CellChunk_CommitRow{CommitRow: commit}, 365 Labels: labels, 366 } 367} 368 369// ccData returns a CellChunk with only a value and size 370func ccData(val string, size int32, commit bool) *btspb.ReadRowsResponse_CellChunk { 371 return cc(nilStr, nilStr, nilStr, 0, val, size, commit, []string{}) 372} 373 374// ccReset returns a CellChunk with RestRow set to true 375func ccReset() *btspb.ReadRowsResponse_CellChunk { 376 return &btspb.ReadRowsResponse_CellChunk{ 377 RowStatus: &btspb.ReadRowsResponse_CellChunk_ResetRow{ResetRow: true}} 378} 379