1// Copyright 2015 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package bigquery
16
17import (
18	"context"
19	"errors"
20	"fmt"
21	"testing"
22
23	"cloud.google.com/go/internal/testutil"
24	"github.com/google/go-cmp/cmp"
25	bq "google.golang.org/api/bigquery/v2"
26	"google.golang.org/api/iterator"
27)
28
29type fetchResponse struct {
30	result *fetchPageResult // The result to return.
31	err    error            // The error to return.
32}
33
34// pageFetcherStub services fetch requests by returning data from an in-memory list of values.
35type pageFetcherStub struct {
36	fetchResponses map[string]fetchResponse
37	err            error
38}
39
40func (pf *pageFetcherStub) fetchPage(ctx context.Context, _ *rowSource, _ Schema, _ uint64, _ int64, pageToken string) (*fetchPageResult, error) {
41	call, ok := pf.fetchResponses[pageToken]
42	if !ok {
43		pf.err = fmt.Errorf("Unexpected page token: %q", pageToken)
44	}
45	return call.result, call.err
46}
47
48func TestRowIteratorCacheBehavior(t *testing.T) {
49
50	testSchema := &bq.TableSchema{
51		Fields: []*bq.TableFieldSchema{
52			{Type: "INTEGER", Name: "field1"},
53			{Type: "STRING", Name: "field2"},
54		},
55	}
56	testRows := []*bq.TableRow{
57		{F: []*bq.TableCell{
58			{V: "1"},
59			{V: "foo"},
60		},
61		},
62	}
63	convertedSchema := bqToSchema(testSchema)
64
65	convertedRows, _ := convertRows(testRows, convertedSchema)
66
67	testCases := []struct {
68		inSource     *rowSource
69		inSchema     Schema
70		inStartIndex uint64
71		inPageSize   int64
72		inPageToken  string
73		wantErr      error
74		wantResult   *fetchPageResult
75	}{
76		{
77			inSource: &rowSource{},
78			wantErr:  errNoCacheData,
79		},
80		{
81			// primary success case: schema in cache
82			inSource: &rowSource{
83				cachedSchema: testSchema,
84				cachedRows:   testRows,
85			},
86			wantResult: &fetchPageResult{
87				totalRows: uint64(len(convertedRows)),
88				schema:    convertedSchema,
89				rows:      convertedRows,
90			},
91		},
92		{
93			// secondary success case: schema provided
94			inSource: &rowSource{
95				cachedRows:      testRows,
96				cachedNextToken: "foo",
97			},
98			inSchema: convertedSchema,
99			wantResult: &fetchPageResult{
100				totalRows: uint64(len(convertedRows)),
101				schema:    convertedSchema,
102				rows:      convertedRows,
103				pageToken: "foo",
104			},
105		},
106		{
107			// misaligned page size.
108			inSource: &rowSource{
109				cachedSchema: testSchema,
110				cachedRows:   testRows,
111			},
112			inPageSize: 99,
113			wantErr:    errNoCacheData,
114		},
115		{
116			// nonzero start.
117			inSource: &rowSource{
118				cachedSchema: testSchema,
119				cachedRows:   testRows,
120			},
121			inStartIndex: 1,
122			wantErr:      errNoCacheData,
123		},
124		{
125			// data without schema
126			inSource: &rowSource{
127				cachedSchema: testSchema,
128				cachedRows:   testRows,
129			},
130			inStartIndex: 1,
131			wantErr:      errNoCacheData,
132		},
133		{
134			// data without schema
135			inSource: &rowSource{
136				cachedSchema: testSchema,
137				cachedRows:   testRows,
138			},
139			inStartIndex: 1,
140			wantErr:      errNoCacheData,
141		},
142	}
143	for _, tc := range testCases {
144		gotResp, gotErr := fetchCachedPage(context.Background(), tc.inSource, tc.inSchema, tc.inStartIndex, tc.inPageSize, tc.inPageToken)
145		if gotErr != tc.wantErr {
146			t.Errorf("err mismatch.  got %v, want %v", gotErr, tc.wantErr)
147		} else {
148			if diff := testutil.Diff(gotResp, tc.wantResult,
149				cmp.AllowUnexported(fetchPageResult{}, rowSource{}, Job{}, Client{}, Table{})); diff != "" {
150				t.Errorf("response diff (got=-, want=+):\n%s", diff)
151			}
152		}
153	}
154
155}
156
157func TestIterator(t *testing.T) {
158	var (
159		iiSchema = Schema{
160			{Type: IntegerFieldType},
161			{Type: IntegerFieldType},
162		}
163		siSchema = Schema{
164			{Type: StringFieldType},
165			{Type: IntegerFieldType},
166		}
167	)
168	fetchFailure := errors.New("fetch failure")
169
170	testCases := []struct {
171		desc           string
172		pageToken      string
173		fetchResponses map[string]fetchResponse
174		want           [][]Value
175		wantErr        error
176		wantSchema     Schema
177		wantTotalRows  uint64
178	}{
179		{
180			desc: "Iteration over single empty page",
181			fetchResponses: map[string]fetchResponse{
182				"": {
183					result: &fetchPageResult{
184						pageToken: "",
185						rows:      [][]Value{},
186						schema:    Schema{},
187					},
188				},
189			},
190			want:       [][]Value{},
191			wantSchema: Schema{},
192		},
193		{
194			desc: "Iteration over single page",
195			fetchResponses: map[string]fetchResponse{
196				"": {
197					result: &fetchPageResult{
198						pageToken: "",
199						rows:      [][]Value{{1, 2}, {11, 12}},
200						schema:    iiSchema,
201						totalRows: 4,
202					},
203				},
204			},
205			want:          [][]Value{{1, 2}, {11, 12}},
206			wantSchema:    iiSchema,
207			wantTotalRows: 4,
208		},
209		{
210			desc: "Iteration over single page with different schema",
211			fetchResponses: map[string]fetchResponse{
212				"": {
213					result: &fetchPageResult{
214						pageToken: "",
215						rows:      [][]Value{{"1", 2}, {"11", 12}},
216						schema:    siSchema,
217					},
218				},
219			},
220			want:       [][]Value{{"1", 2}, {"11", 12}},
221			wantSchema: siSchema,
222		},
223		{
224			desc: "Iteration over two pages",
225			fetchResponses: map[string]fetchResponse{
226				"": {
227					result: &fetchPageResult{
228						pageToken: "a",
229						rows:      [][]Value{{1, 2}, {11, 12}},
230						schema:    iiSchema,
231						totalRows: 4,
232					},
233				},
234				"a": {
235					result: &fetchPageResult{
236						pageToken: "",
237						rows:      [][]Value{{101, 102}, {111, 112}},
238						schema:    iiSchema,
239						totalRows: 4,
240					},
241				},
242			},
243			want:          [][]Value{{1, 2}, {11, 12}, {101, 102}, {111, 112}},
244			wantSchema:    iiSchema,
245			wantTotalRows: 4,
246		},
247		{
248			desc: "Server response includes empty page",
249			fetchResponses: map[string]fetchResponse{
250				"": {
251					result: &fetchPageResult{
252						pageToken: "a",
253						rows:      [][]Value{{1, 2}, {11, 12}},
254						schema:    iiSchema,
255					},
256				},
257				"a": {
258					result: &fetchPageResult{
259						pageToken: "b",
260						rows:      [][]Value{},
261						schema:    iiSchema,
262					},
263				},
264				"b": {
265					result: &fetchPageResult{
266						pageToken: "",
267						rows:      [][]Value{{101, 102}, {111, 112}},
268						schema:    iiSchema,
269					},
270				},
271			},
272			want:       [][]Value{{1, 2}, {11, 12}, {101, 102}, {111, 112}},
273			wantSchema: iiSchema,
274		},
275		{
276			desc: "Fetch error",
277			fetchResponses: map[string]fetchResponse{
278				"": {
279					result: &fetchPageResult{
280						pageToken: "a",
281						rows:      [][]Value{{1, 2}, {11, 12}},
282						schema:    iiSchema,
283					},
284				},
285				"a": {
286					// We returns some data from this fetch, but also an error.
287					// So the end result should include only data from the previous fetch.
288					err: fetchFailure,
289					result: &fetchPageResult{
290						pageToken: "b",
291						rows:      [][]Value{{101, 102}, {111, 112}},
292						schema:    iiSchema,
293					},
294				},
295			},
296			want:       [][]Value{{1, 2}, {11, 12}},
297			wantErr:    fetchFailure,
298			wantSchema: iiSchema,
299		},
300
301		{
302			desc:      "Skip over an entire page",
303			pageToken: "a",
304			fetchResponses: map[string]fetchResponse{
305				"": {
306					result: &fetchPageResult{
307						pageToken: "a",
308						rows:      [][]Value{{1, 2}, {11, 12}},
309						schema:    iiSchema,
310					},
311				},
312				"a": {
313					result: &fetchPageResult{
314						pageToken: "",
315						rows:      [][]Value{{101, 102}, {111, 112}},
316						schema:    iiSchema,
317					},
318				},
319			},
320			want:       [][]Value{{101, 102}, {111, 112}},
321			wantSchema: iiSchema,
322		},
323
324		{
325			desc:      "Skip beyond all data",
326			pageToken: "b",
327			fetchResponses: map[string]fetchResponse{
328				"": {
329					result: &fetchPageResult{
330						pageToken: "a",
331						rows:      [][]Value{{1, 2}, {11, 12}},
332						schema:    iiSchema,
333					},
334				},
335				"a": {
336					result: &fetchPageResult{
337						pageToken: "b",
338						rows:      [][]Value{{101, 102}, {111, 112}},
339						schema:    iiSchema,
340					},
341				},
342				"b": {
343					result: &fetchPageResult{},
344				},
345			},
346			// In this test case, Next will return false on its first call,
347			// so we won't even attempt to call Get.
348			want:       [][]Value{},
349			wantSchema: Schema{},
350		},
351	}
352
353	for _, tc := range testCases {
354		pf := &pageFetcherStub{
355			fetchResponses: tc.fetchResponses,
356		}
357		it := newRowIterator(context.Background(), nil, pf.fetchPage)
358		it.PageInfo().Token = tc.pageToken
359		values, schema, totalRows, err := consumeRowIterator(it)
360		if err != tc.wantErr {
361			t.Fatalf("%s: got %v, want %v", tc.desc, err, tc.wantErr)
362		}
363		if (len(values) != 0 || len(tc.want) != 0) && !testutil.Equal(values, tc.want) {
364			t.Errorf("%s: values:\ngot: %v\nwant:%v", tc.desc, values, tc.want)
365		}
366		if (len(schema) != 0 || len(tc.wantSchema) != 0) && !testutil.Equal(schema, tc.wantSchema) {
367			t.Errorf("%s: iterator.Schema:\ngot: %v\nwant: %v", tc.desc, schema, tc.wantSchema)
368		}
369		if totalRows != tc.wantTotalRows {
370			t.Errorf("%s: totalRows: got %d, want %d", tc.desc, totalRows, tc.wantTotalRows)
371		}
372	}
373}
374
375// consumeRowIterator reads the schema and all values from a RowIterator and returns them.
376func consumeRowIterator(it *RowIterator) ([][]Value, Schema, uint64, error) {
377	var (
378		got       [][]Value
379		schema    Schema
380		totalRows uint64
381	)
382	for {
383		var vls []Value
384		err := it.Next(&vls)
385		if err == iterator.Done {
386			return got, schema, totalRows, nil
387		}
388		if err != nil {
389			return got, schema, totalRows, err
390		}
391		got = append(got, vls)
392		schema = it.Schema
393		totalRows = it.TotalRows
394	}
395}
396
397func TestNextDuringErrorState(t *testing.T) {
398	pf := &pageFetcherStub{
399		fetchResponses: map[string]fetchResponse{
400			"": {err: errors.New("bang")},
401		},
402	}
403	it := newRowIterator(context.Background(), nil, pf.fetchPage)
404	var vals []Value
405	if err := it.Next(&vals); err == nil {
406		t.Errorf("Expected error after calling Next")
407	}
408	if err := it.Next(&vals); err == nil {
409		t.Errorf("Expected error calling Next again when iterator has a non-nil error.")
410	}
411}
412
413func TestNextAfterFinished(t *testing.T) {
414	testCases := []struct {
415		fetchResponses map[string]fetchResponse
416		want           [][]Value
417	}{
418		{
419			fetchResponses: map[string]fetchResponse{
420				"": {
421					result: &fetchPageResult{
422						pageToken: "",
423						rows:      [][]Value{{1, 2}, {11, 12}},
424					},
425				},
426			},
427			want: [][]Value{{1, 2}, {11, 12}},
428		},
429		{
430			fetchResponses: map[string]fetchResponse{
431				"": {
432					result: &fetchPageResult{
433						pageToken: "",
434						rows:      [][]Value{},
435					},
436				},
437			},
438			want: [][]Value{},
439		},
440	}
441
442	for _, tc := range testCases {
443		pf := &pageFetcherStub{
444			fetchResponses: tc.fetchResponses,
445		}
446		it := newRowIterator(context.Background(), nil, pf.fetchPage)
447
448		values, _, _, err := consumeRowIterator(it)
449		if err != nil {
450			t.Fatal(err)
451		}
452		if (len(values) != 0 || len(tc.want) != 0) && !testutil.Equal(values, tc.want) {
453			t.Errorf("values: got:\n%v\nwant:\n%v", values, tc.want)
454		}
455		// Try calling Get again.
456		var vals []Value
457		if err := it.Next(&vals); err != iterator.Done {
458			t.Errorf("Expected Done calling Next when there are no more values")
459		}
460	}
461}
462
463func TestIteratorNextTypes(t *testing.T) {
464	it := newRowIterator(context.Background(), nil, nil)
465	for _, v := range []interface{}{3, "s", []int{}, &[]int{},
466		map[string]Value{}, &map[string]interface{}{},
467		struct{}{},
468	} {
469		if err := it.Next(v); err == nil {
470			t.Errorf("%v: want error, got nil", v)
471		}
472	}
473}
474