1// Copyright 2015 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package bigquery
16
17import (
18	"context"
19	"errors"
20	"testing"
21
22	"cloud.google.com/go/internal/testutil"
23	"github.com/google/go-cmp/cmp"
24	bq "google.golang.org/api/bigquery/v2"
25	"google.golang.org/api/iterator"
26)
27
28type pageFetcherArgs struct {
29	table      *Table
30	schema     Schema
31	startIndex uint64
32	pageSize   int64
33	pageToken  string
34}
35
36// pageFetcherReadStub services read requests by returning data from an in-memory list of values.
37type pageFetcherReadStub struct {
38	// values and pageTokens are used as sources of data to return in response to calls to readTabledata or readQuery.
39	values     [][][]Value       // contains pages / rows / columns.
40	pageTokens map[string]string // maps incoming page token to returned page token.
41
42	// arguments are recorded for later inspection.
43	calls []pageFetcherArgs
44}
45
46func (s *pageFetcherReadStub) fetchPage(ctx context.Context, t *Table, schema Schema, startIndex uint64, pageSize int64, pageToken string) (*fetchPageResult, error) {
47	s.calls = append(s.calls,
48		pageFetcherArgs{t, schema, startIndex, pageSize, pageToken})
49	result := &fetchPageResult{
50		pageToken: s.pageTokens[pageToken],
51		rows:      s.values[0],
52	}
53	s.values = s.values[1:]
54	return result, nil
55}
56
57func waitForQueryStub(context.Context, string) (Schema, uint64, error) {
58	return nil, 1, nil
59}
60
61func TestRead(t *testing.T) {
62	// The data for the service stub to return is populated for each test case in the testCases for loop.
63	ctx := context.Background()
64	c := &Client{projectID: "project-id"}
65	pf := &pageFetcherReadStub{}
66	queryJob := &Job{
67		projectID: "project-id",
68		jobID:     "job-id",
69		c:         c,
70		config: &bq.JobConfiguration{
71			Query: &bq.JobConfigurationQuery{
72				DestinationTable: &bq.TableReference{
73					ProjectId: "project-id",
74					DatasetId: "dataset-id",
75					TableId:   "table-id",
76				},
77			},
78		},
79	}
80
81	for _, readFunc := range []func() *RowIterator{
82		func() *RowIterator {
83			return c.Dataset("dataset-id").Table("table-id").read(ctx, pf.fetchPage)
84		},
85		func() *RowIterator {
86			it, err := queryJob.read(ctx, waitForQueryStub, pf.fetchPage)
87			if err != nil {
88				t.Fatal(err)
89			}
90			return it
91		},
92	} {
93		testCases := []struct {
94			data       [][][]Value
95			pageTokens map[string]string
96			want       [][]Value
97		}{
98			{
99				data:       [][][]Value{{{1, 2}, {11, 12}}, {{30, 40}, {31, 41}}},
100				pageTokens: map[string]string{"": "a", "a": ""},
101				want:       [][]Value{{1, 2}, {11, 12}, {30, 40}, {31, 41}},
102			},
103			{
104				data:       [][][]Value{{{1, 2}, {11, 12}}, {{30, 40}, {31, 41}}},
105				pageTokens: map[string]string{"": ""}, // no more pages after first one.
106				want:       [][]Value{{1, 2}, {11, 12}},
107			},
108		}
109		for _, tc := range testCases {
110			pf.values = tc.data
111			pf.pageTokens = tc.pageTokens
112			if got, ok := collectValues(t, readFunc()); ok {
113				if !testutil.Equal(got, tc.want) {
114					t.Errorf("reading: got:\n%v\nwant:\n%v", got, tc.want)
115				}
116			}
117		}
118	}
119}
120
121func collectValues(t *testing.T, it *RowIterator) ([][]Value, bool) {
122	var got [][]Value
123	for {
124		var vals []Value
125		err := it.Next(&vals)
126		if err == iterator.Done {
127			break
128		}
129		if err != nil {
130			t.Errorf("err calling Next: %v", err)
131			return nil, false
132		}
133		got = append(got, vals)
134	}
135	return got, true
136}
137
138func TestNoMoreValues(t *testing.T) {
139	c := &Client{projectID: "project-id"}
140	pf := &pageFetcherReadStub{
141		values: [][][]Value{{{1, 2}, {11, 12}}},
142	}
143	it := c.Dataset("dataset-id").Table("table-id").read(context.Background(), pf.fetchPage)
144	var vals []Value
145	// We expect to retrieve two values and then fail on the next attempt.
146	if err := it.Next(&vals); err != nil {
147		t.Fatalf("Next: got: %v: want: nil", err)
148	}
149	if err := it.Next(&vals); err != nil {
150		t.Fatalf("Next: got: %v: want: nil", err)
151	}
152	if err := it.Next(&vals); err != iterator.Done {
153		t.Fatalf("Next: got: %v: want: iterator.Done", err)
154	}
155}
156
157var errBang = errors.New("bang")
158
159func errorFetchPage(context.Context, *Table, Schema, uint64, int64, string) (*fetchPageResult, error) {
160	return nil, errBang
161}
162
163func TestReadError(t *testing.T) {
164	// test that service read errors are propagated back to the caller.
165	c := &Client{projectID: "project-id"}
166	it := c.Dataset("dataset-id").Table("table-id").read(context.Background(), errorFetchPage)
167	var vals []Value
168	if err := it.Next(&vals); err != errBang {
169		t.Fatalf("Get: got: %v: want: %v", err, errBang)
170	}
171}
172
173func TestReadTabledataOptions(t *testing.T) {
174	// test that read options are propagated.
175	s := &pageFetcherReadStub{
176		values: [][][]Value{{{1, 2}}},
177	}
178	c := &Client{projectID: "project-id"}
179	tr := c.Dataset("dataset-id").Table("table-id")
180	it := tr.read(context.Background(), s.fetchPage)
181	it.PageInfo().MaxSize = 5
182	var vals []Value
183	if err := it.Next(&vals); err != nil {
184		t.Fatal(err)
185	}
186	want := []pageFetcherArgs{{
187		table:     tr,
188		pageSize:  5,
189		pageToken: "",
190	}}
191	if diff := testutil.Diff(s.calls, want, cmp.AllowUnexported(pageFetcherArgs{}, pageFetcherReadStub{}, Table{}, Client{})); diff != "" {
192		t.Errorf("reading (got=-, want=+):\n%s", diff)
193	}
194}
195
196func TestReadQueryOptions(t *testing.T) {
197	// test that read options are propagated.
198	c := &Client{projectID: "project-id"}
199	pf := &pageFetcherReadStub{
200		values: [][][]Value{{{1, 2}}},
201	}
202	tr := &bq.TableReference{
203		ProjectId: "project-id",
204		DatasetId: "dataset-id",
205		TableId:   "table-id",
206	}
207	queryJob := &Job{
208		projectID: "project-id",
209		jobID:     "job-id",
210		c:         c,
211		config: &bq.JobConfiguration{
212			Query: &bq.JobConfigurationQuery{DestinationTable: tr},
213		},
214	}
215	it, err := queryJob.read(context.Background(), waitForQueryStub, pf.fetchPage)
216	if err != nil {
217		t.Fatalf("err calling Read: %v", err)
218	}
219	it.PageInfo().MaxSize = 5
220	var vals []Value
221	if err := it.Next(&vals); err != nil {
222		t.Fatalf("Next: got: %v: want: nil", err)
223	}
224
225	want := []pageFetcherArgs{{
226		table:     bqToTable(tr, c),
227		pageSize:  5,
228		pageToken: "",
229	}}
230	if !testutil.Equal(pf.calls, want, cmp.AllowUnexported(pageFetcherArgs{}, Table{}, Client{})) {
231		t.Errorf("reading: got:\n%v\nwant:\n%v", pf.calls, want)
232	}
233}
234