1// Copyright 2015 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package bigquery 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "testing" 22 23 "cloud.google.com/go/internal/testutil" 24 "github.com/google/go-cmp/cmp" 25 bq "google.golang.org/api/bigquery/v2" 26 "google.golang.org/api/iterator" 27) 28 29type fetchResponse struct { 30 result *fetchPageResult // The result to return. 31 err error // The error to return. 32} 33 34// pageFetcherStub services fetch requests by returning data from an in-memory list of values. 35type pageFetcherStub struct { 36 fetchResponses map[string]fetchResponse 37 err error 38} 39 40func (pf *pageFetcherStub) fetchPage(ctx context.Context, _ *rowSource, _ Schema, _ uint64, _ int64, pageToken string) (*fetchPageResult, error) { 41 call, ok := pf.fetchResponses[pageToken] 42 if !ok { 43 pf.err = fmt.Errorf("Unexpected page token: %q", pageToken) 44 } 45 return call.result, call.err 46} 47 48func TestRowIteratorCacheBehavior(t *testing.T) { 49 50 testSchema := &bq.TableSchema{ 51 Fields: []*bq.TableFieldSchema{ 52 {Type: "INTEGER", Name: "field1"}, 53 {Type: "STRING", Name: "field2"}, 54 }, 55 } 56 testRows := []*bq.TableRow{ 57 {F: []*bq.TableCell{ 58 {V: "1"}, 59 {V: "foo"}, 60 }, 61 }, 62 } 63 convertedSchema := bqToSchema(testSchema) 64 65 convertedRows, _ := convertRows(testRows, convertedSchema) 66 67 testCases := []struct { 68 inSource *rowSource 69 inSchema Schema 70 inStartIndex uint64 71 inPageSize int64 72 inPageToken string 73 wantErr error 74 wantResult *fetchPageResult 75 }{ 76 { 77 inSource: &rowSource{}, 78 wantErr: errNoCacheData, 79 }, 80 { 81 // primary success case: schema in cache 82 inSource: &rowSource{ 83 cachedSchema: testSchema, 84 cachedRows: testRows, 85 }, 86 wantResult: &fetchPageResult{ 87 totalRows: uint64(len(convertedRows)), 88 schema: convertedSchema, 89 rows: convertedRows, 90 }, 91 }, 92 { 93 // secondary success case: schema provided 94 inSource: &rowSource{ 95 cachedRows: testRows, 96 cachedNextToken: "foo", 97 }, 98 inSchema: convertedSchema, 99 wantResult: &fetchPageResult{ 100 totalRows: uint64(len(convertedRows)), 101 schema: convertedSchema, 102 rows: convertedRows, 103 pageToken: "foo", 104 }, 105 }, 106 { 107 // misaligned page size. 108 inSource: &rowSource{ 109 cachedSchema: testSchema, 110 cachedRows: testRows, 111 }, 112 inPageSize: 99, 113 wantErr: errNoCacheData, 114 }, 115 { 116 // nonzero start. 117 inSource: &rowSource{ 118 cachedSchema: testSchema, 119 cachedRows: testRows, 120 }, 121 inStartIndex: 1, 122 wantErr: errNoCacheData, 123 }, 124 { 125 // data without schema 126 inSource: &rowSource{ 127 cachedSchema: testSchema, 128 cachedRows: testRows, 129 }, 130 inStartIndex: 1, 131 wantErr: errNoCacheData, 132 }, 133 { 134 // data without schema 135 inSource: &rowSource{ 136 cachedSchema: testSchema, 137 cachedRows: testRows, 138 }, 139 inStartIndex: 1, 140 wantErr: errNoCacheData, 141 }, 142 } 143 for _, tc := range testCases { 144 gotResp, gotErr := fetchCachedPage(context.Background(), tc.inSource, tc.inSchema, tc.inStartIndex, tc.inPageSize, tc.inPageToken) 145 if gotErr != tc.wantErr { 146 t.Errorf("err mismatch. got %v, want %v", gotErr, tc.wantErr) 147 } else { 148 if diff := testutil.Diff(gotResp, tc.wantResult, 149 cmp.AllowUnexported(fetchPageResult{}, rowSource{}, Job{}, Client{}, Table{})); diff != "" { 150 t.Errorf("response diff (got=-, want=+):\n%s", diff) 151 } 152 } 153 } 154 155} 156 157func TestIterator(t *testing.T) { 158 var ( 159 iiSchema = Schema{ 160 {Type: IntegerFieldType}, 161 {Type: IntegerFieldType}, 162 } 163 siSchema = Schema{ 164 {Type: StringFieldType}, 165 {Type: IntegerFieldType}, 166 } 167 ) 168 fetchFailure := errors.New("fetch failure") 169 170 testCases := []struct { 171 desc string 172 pageToken string 173 fetchResponses map[string]fetchResponse 174 want [][]Value 175 wantErr error 176 wantSchema Schema 177 wantTotalRows uint64 178 }{ 179 { 180 desc: "Iteration over single empty page", 181 fetchResponses: map[string]fetchResponse{ 182 "": { 183 result: &fetchPageResult{ 184 pageToken: "", 185 rows: [][]Value{}, 186 schema: Schema{}, 187 }, 188 }, 189 }, 190 want: [][]Value{}, 191 wantSchema: Schema{}, 192 }, 193 { 194 desc: "Iteration over single page", 195 fetchResponses: map[string]fetchResponse{ 196 "": { 197 result: &fetchPageResult{ 198 pageToken: "", 199 rows: [][]Value{{1, 2}, {11, 12}}, 200 schema: iiSchema, 201 totalRows: 4, 202 }, 203 }, 204 }, 205 want: [][]Value{{1, 2}, {11, 12}}, 206 wantSchema: iiSchema, 207 wantTotalRows: 4, 208 }, 209 { 210 desc: "Iteration over single page with different schema", 211 fetchResponses: map[string]fetchResponse{ 212 "": { 213 result: &fetchPageResult{ 214 pageToken: "", 215 rows: [][]Value{{"1", 2}, {"11", 12}}, 216 schema: siSchema, 217 }, 218 }, 219 }, 220 want: [][]Value{{"1", 2}, {"11", 12}}, 221 wantSchema: siSchema, 222 }, 223 { 224 desc: "Iteration over two pages", 225 fetchResponses: map[string]fetchResponse{ 226 "": { 227 result: &fetchPageResult{ 228 pageToken: "a", 229 rows: [][]Value{{1, 2}, {11, 12}}, 230 schema: iiSchema, 231 totalRows: 4, 232 }, 233 }, 234 "a": { 235 result: &fetchPageResult{ 236 pageToken: "", 237 rows: [][]Value{{101, 102}, {111, 112}}, 238 schema: iiSchema, 239 totalRows: 4, 240 }, 241 }, 242 }, 243 want: [][]Value{{1, 2}, {11, 12}, {101, 102}, {111, 112}}, 244 wantSchema: iiSchema, 245 wantTotalRows: 4, 246 }, 247 { 248 desc: "Server response includes empty page", 249 fetchResponses: map[string]fetchResponse{ 250 "": { 251 result: &fetchPageResult{ 252 pageToken: "a", 253 rows: [][]Value{{1, 2}, {11, 12}}, 254 schema: iiSchema, 255 }, 256 }, 257 "a": { 258 result: &fetchPageResult{ 259 pageToken: "b", 260 rows: [][]Value{}, 261 schema: iiSchema, 262 }, 263 }, 264 "b": { 265 result: &fetchPageResult{ 266 pageToken: "", 267 rows: [][]Value{{101, 102}, {111, 112}}, 268 schema: iiSchema, 269 }, 270 }, 271 }, 272 want: [][]Value{{1, 2}, {11, 12}, {101, 102}, {111, 112}}, 273 wantSchema: iiSchema, 274 }, 275 { 276 desc: "Fetch error", 277 fetchResponses: map[string]fetchResponse{ 278 "": { 279 result: &fetchPageResult{ 280 pageToken: "a", 281 rows: [][]Value{{1, 2}, {11, 12}}, 282 schema: iiSchema, 283 }, 284 }, 285 "a": { 286 // We returns some data from this fetch, but also an error. 287 // So the end result should include only data from the previous fetch. 288 err: fetchFailure, 289 result: &fetchPageResult{ 290 pageToken: "b", 291 rows: [][]Value{{101, 102}, {111, 112}}, 292 schema: iiSchema, 293 }, 294 }, 295 }, 296 want: [][]Value{{1, 2}, {11, 12}}, 297 wantErr: fetchFailure, 298 wantSchema: iiSchema, 299 }, 300 301 { 302 desc: "Skip over an entire page", 303 pageToken: "a", 304 fetchResponses: map[string]fetchResponse{ 305 "": { 306 result: &fetchPageResult{ 307 pageToken: "a", 308 rows: [][]Value{{1, 2}, {11, 12}}, 309 schema: iiSchema, 310 }, 311 }, 312 "a": { 313 result: &fetchPageResult{ 314 pageToken: "", 315 rows: [][]Value{{101, 102}, {111, 112}}, 316 schema: iiSchema, 317 }, 318 }, 319 }, 320 want: [][]Value{{101, 102}, {111, 112}}, 321 wantSchema: iiSchema, 322 }, 323 324 { 325 desc: "Skip beyond all data", 326 pageToken: "b", 327 fetchResponses: map[string]fetchResponse{ 328 "": { 329 result: &fetchPageResult{ 330 pageToken: "a", 331 rows: [][]Value{{1, 2}, {11, 12}}, 332 schema: iiSchema, 333 }, 334 }, 335 "a": { 336 result: &fetchPageResult{ 337 pageToken: "b", 338 rows: [][]Value{{101, 102}, {111, 112}}, 339 schema: iiSchema, 340 }, 341 }, 342 "b": { 343 result: &fetchPageResult{}, 344 }, 345 }, 346 // In this test case, Next will return false on its first call, 347 // so we won't even attempt to call Get. 348 want: [][]Value{}, 349 wantSchema: Schema{}, 350 }, 351 } 352 353 for _, tc := range testCases { 354 pf := &pageFetcherStub{ 355 fetchResponses: tc.fetchResponses, 356 } 357 it := newRowIterator(context.Background(), nil, pf.fetchPage) 358 it.PageInfo().Token = tc.pageToken 359 values, schema, totalRows, err := consumeRowIterator(it) 360 if err != tc.wantErr { 361 t.Fatalf("%s: got %v, want %v", tc.desc, err, tc.wantErr) 362 } 363 if (len(values) != 0 || len(tc.want) != 0) && !testutil.Equal(values, tc.want) { 364 t.Errorf("%s: values:\ngot: %v\nwant:%v", tc.desc, values, tc.want) 365 } 366 if (len(schema) != 0 || len(tc.wantSchema) != 0) && !testutil.Equal(schema, tc.wantSchema) { 367 t.Errorf("%s: iterator.Schema:\ngot: %v\nwant: %v", tc.desc, schema, tc.wantSchema) 368 } 369 if totalRows != tc.wantTotalRows { 370 t.Errorf("%s: totalRows: got %d, want %d", tc.desc, totalRows, tc.wantTotalRows) 371 } 372 } 373} 374 375// consumeRowIterator reads the schema and all values from a RowIterator and returns them. 376func consumeRowIterator(it *RowIterator) ([][]Value, Schema, uint64, error) { 377 var ( 378 got [][]Value 379 schema Schema 380 totalRows uint64 381 ) 382 for { 383 var vls []Value 384 err := it.Next(&vls) 385 if err == iterator.Done { 386 return got, schema, totalRows, nil 387 } 388 if err != nil { 389 return got, schema, totalRows, err 390 } 391 got = append(got, vls) 392 schema = it.Schema 393 totalRows = it.TotalRows 394 } 395} 396 397func TestNextDuringErrorState(t *testing.T) { 398 pf := &pageFetcherStub{ 399 fetchResponses: map[string]fetchResponse{ 400 "": {err: errors.New("bang")}, 401 }, 402 } 403 it := newRowIterator(context.Background(), nil, pf.fetchPage) 404 var vals []Value 405 if err := it.Next(&vals); err == nil { 406 t.Errorf("Expected error after calling Next") 407 } 408 if err := it.Next(&vals); err == nil { 409 t.Errorf("Expected error calling Next again when iterator has a non-nil error.") 410 } 411} 412 413func TestNextAfterFinished(t *testing.T) { 414 testCases := []struct { 415 fetchResponses map[string]fetchResponse 416 want [][]Value 417 }{ 418 { 419 fetchResponses: map[string]fetchResponse{ 420 "": { 421 result: &fetchPageResult{ 422 pageToken: "", 423 rows: [][]Value{{1, 2}, {11, 12}}, 424 }, 425 }, 426 }, 427 want: [][]Value{{1, 2}, {11, 12}}, 428 }, 429 { 430 fetchResponses: map[string]fetchResponse{ 431 "": { 432 result: &fetchPageResult{ 433 pageToken: "", 434 rows: [][]Value{}, 435 }, 436 }, 437 }, 438 want: [][]Value{}, 439 }, 440 } 441 442 for _, tc := range testCases { 443 pf := &pageFetcherStub{ 444 fetchResponses: tc.fetchResponses, 445 } 446 it := newRowIterator(context.Background(), nil, pf.fetchPage) 447 448 values, _, _, err := consumeRowIterator(it) 449 if err != nil { 450 t.Fatal(err) 451 } 452 if (len(values) != 0 || len(tc.want) != 0) && !testutil.Equal(values, tc.want) { 453 t.Errorf("values: got:\n%v\nwant:\n%v", values, tc.want) 454 } 455 // Try calling Get again. 456 var vals []Value 457 if err := it.Next(&vals); err != iterator.Done { 458 t.Errorf("Expected Done calling Next when there are no more values") 459 } 460 } 461} 462 463func TestIteratorNextTypes(t *testing.T) { 464 it := newRowIterator(context.Background(), nil, nil) 465 for _, v := range []interface{}{3, "s", []int{}, &[]int{}, 466 map[string]Value{}, &map[string]interface{}{}, 467 struct{}{}, 468 } { 469 if err := it.Next(v); err == nil { 470 t.Errorf("%v: want error, got nil", v) 471 } 472 } 473} 474