1// Copyright 2015 Google Inc. All Rights Reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package bigquery
16
17import (
18	"errors"
19	"reflect"
20	"testing"
21
22	"golang.org/x/net/context"
23)
24
25type readTabledataArgs struct {
26	conf *readTableConf
27	tok  string
28}
29
30type readQueryArgs struct {
31	conf *readQueryConf
32	tok  string
33}
34
35// readServiceStub services read requests by returning data from an in-memory list of values.
36type readServiceStub struct {
37	// values and pageTokens are used as sources of data to return in response to calls to readTabledata or readQuery.
38	values     [][][]Value       // contains pages / rows / columns.
39	pageTokens map[string]string // maps incoming page token to returned page token.
40
41	// arguments are recorded for later inspection.
42	readTabledataCalls []readTabledataArgs
43	readQueryCalls     []readQueryArgs
44
45	service
46}
47
48func (s *readServiceStub) readValues(tok string) *readDataResult {
49	result := &readDataResult{
50		pageToken: s.pageTokens[tok],
51		rows:      s.values[0],
52	}
53	s.values = s.values[1:]
54
55	return result
56}
57func (s *readServiceStub) readTabledata(ctx context.Context, conf *readTableConf, token string) (*readDataResult, error) {
58	s.readTabledataCalls = append(s.readTabledataCalls, readTabledataArgs{conf, token})
59	return s.readValues(token), nil
60}
61
62func (s *readServiceStub) readQuery(ctx context.Context, conf *readQueryConf, token string) (*readDataResult, error) {
63	s.readQueryCalls = append(s.readQueryCalls, readQueryArgs{conf, token})
64	return s.readValues(token), nil
65}
66
67func TestRead(t *testing.T) {
68	// The data for the service stub to return is populated for each test case in the testCases for loop.
69	service := &readServiceStub{}
70	c := &Client{
71		service: service,
72	}
73
74	queryJob := &Job{
75		projectID: "project-id",
76		jobID:     "job-id",
77		service:   service,
78		isQuery:   true,
79	}
80
81	for _, src := range []ReadSource{defaultTable, queryJob} {
82		testCases := []struct {
83			data       [][][]Value
84			pageTokens map[string]string
85			want       []ValueList
86		}{
87			{
88				data:       [][][]Value{{{1, 2}, {11, 12}}, {{30, 40}, {31, 41}}},
89				pageTokens: map[string]string{"": "a", "a": ""},
90				want:       []ValueList{{1, 2}, {11, 12}, {30, 40}, {31, 41}},
91			},
92			{
93				data:       [][][]Value{{{1, 2}, {11, 12}}, {{30, 40}, {31, 41}}},
94				pageTokens: map[string]string{"": ""}, // no more pages after first one.
95				want:       []ValueList{{1, 2}, {11, 12}},
96			},
97		}
98
99		for _, tc := range testCases {
100			service.values = tc.data
101			service.pageTokens = tc.pageTokens
102			if got, ok := doRead(t, c, src); ok {
103				if !reflect.DeepEqual(got, tc.want) {
104					t.Errorf("reading: got:\n%v\nwant:\n%v", got, tc.want)
105				}
106			}
107		}
108	}
109}
110
111// doRead calls Read with a ReadSource. Get is repeatedly called on the Iterator returned by Read and the results are returned.
112func doRead(t *testing.T, c *Client, src ReadSource) ([]ValueList, bool) {
113	it, err := c.Read(context.Background(), src)
114	if err != nil {
115		t.Errorf("err calling Read: %v", err)
116		return nil, false
117	}
118	var got []ValueList
119	for it.Next(context.Background()) {
120		var vals ValueList
121		if err := it.Get(&vals); err != nil {
122			t.Errorf("err calling Get: %v", err)
123			return nil, false
124		} else {
125			got = append(got, vals)
126		}
127	}
128
129	return got, true
130}
131
132func TestNoMoreValues(t *testing.T) {
133	c := &Client{
134		service: &readServiceStub{
135			values: [][][]Value{{{1, 2}, {11, 12}}},
136		},
137	}
138	it, err := c.Read(context.Background(), defaultTable)
139	if err != nil {
140		t.Fatalf("err calling Read: %v", err)
141	}
142	var vals ValueList
143	// We expect to retrieve two values and then fail on the next attempt.
144	if !it.Next(context.Background()) {
145		t.Fatalf("Next: got: false: want: true")
146	}
147	if !it.Next(context.Background()) {
148		t.Fatalf("Next: got: false: want: true")
149	}
150	if err := it.Get(&vals); err != nil {
151		t.Fatalf("Get: got: %v: want: nil", err)
152	}
153	if it.Next(context.Background()) {
154		t.Fatalf("Next: got: true: want: false")
155	}
156	if err := it.Get(&vals); err == nil {
157		t.Fatalf("Get: got: %v: want: non-nil", err)
158	}
159}
160
161// delayedReadStub simulates reading results from a query that has not yet
162// completed. Its readQuery method initially reports that the query job is not
163// yet complete. Subsequently, it proxies the request through to another
164// service stub.
165type delayedReadStub struct {
166	numDelays int
167
168	readServiceStub
169}
170
171func (s *delayedReadStub) readQuery(ctx context.Context, conf *readQueryConf, token string) (*readDataResult, error) {
172	if s.numDelays > 0 {
173		s.numDelays--
174		return nil, errIncompleteJob
175	}
176	return s.readServiceStub.readQuery(ctx, conf, token)
177}
178
179// TestIncompleteJob tests that an Iterator which reads from a query job will block until the job is complete.
180func TestIncompleteJob(t *testing.T) {
181	service := &delayedReadStub{
182		numDelays: 2,
183		readServiceStub: readServiceStub{
184			values: [][][]Value{{{1, 2}}},
185		},
186	}
187	c := &Client{service: service}
188	queryJob := &Job{
189		projectID: "project-id",
190		jobID:     "job-id",
191		service:   service,
192		isQuery:   true,
193	}
194	it, err := c.Read(context.Background(), queryJob)
195	if err != nil {
196		t.Fatalf("err calling Read: %v", err)
197	}
198	var got ValueList
199	want := ValueList{1, 2}
200	if !it.Next(context.Background()) {
201		t.Fatalf("Next: got: false: want: true")
202	}
203	if err := it.Get(&got); err != nil {
204		t.Fatalf("Error calling Get: %v", err)
205	}
206	if service.numDelays != 0 {
207		t.Errorf("remaining numDelays : got: %v want:0", service.numDelays)
208	}
209	if !reflect.DeepEqual(got, want) {
210		t.Errorf("reading: got:\n%v\nwant:\n%v", got, want)
211	}
212}
213
214type errorReadService struct {
215	service
216}
217
218func (s *errorReadService) readTabledata(ctx context.Context, conf *readTableConf, token string) (*readDataResult, error) {
219	return nil, errors.New("bang!")
220}
221
222func TestReadError(t *testing.T) {
223	// test that service read errors are propagated back to the caller.
224	c := &Client{service: &errorReadService{}}
225	it, err := c.Read(context.Background(), defaultTable)
226	if err != nil {
227		// Read should not return an error; only Err should.
228		t.Fatalf("err calling Read: %v", err)
229	}
230	if it.Next(context.Background()) {
231		t.Fatalf("Next: got: true: want: false")
232	}
233	if err := it.Err(); err.Error() != "bang!" {
234		t.Fatalf("Get: got: %v: want: bang!", err)
235	}
236}
237
238func TestReadTabledataOptions(t *testing.T) {
239	// test that read options are propagated.
240	s := &readServiceStub{
241		values: [][][]Value{{{1, 2}}},
242	}
243	c := &Client{service: s}
244	it, err := c.Read(context.Background(), defaultTable, RecordsPerRequest(5))
245
246	if err != nil {
247		t.Fatalf("err calling Read: %v", err)
248	}
249	if !it.Next(context.Background()) {
250		t.Fatalf("Next: got: false: want: true")
251	}
252
253	want := []readTabledataArgs{{
254		conf: &readTableConf{
255			projectID: "project-id",
256			datasetID: "dataset-id",
257			tableID:   "table-id",
258			paging: pagingConf{
259				recordsPerRequest:    5,
260				setRecordsPerRequest: true,
261			},
262		},
263		tok: "",
264	}}
265
266	if !reflect.DeepEqual(s.readTabledataCalls, want) {
267		t.Errorf("reading: got:\n%v\nwant:\n%v", s.readTabledataCalls, want)
268	}
269}
270
271func TestReadQueryOptions(t *testing.T) {
272	// test that read options are propagated.
273	s := &readServiceStub{
274		values: [][][]Value{{{1, 2}}},
275	}
276	c := &Client{service: s}
277
278	queryJob := &Job{
279		projectID: "project-id",
280		jobID:     "job-id",
281		service:   s,
282		isQuery:   true,
283	}
284	it, err := c.Read(context.Background(), queryJob, RecordsPerRequest(5))
285
286	if err != nil {
287		t.Fatalf("err calling Read: %v", err)
288	}
289	if !it.Next(context.Background()) {
290		t.Fatalf("Next: got: false: want: true")
291	}
292
293	want := []readQueryArgs{{
294		conf: &readQueryConf{
295			projectID: "project-id",
296			jobID:     "job-id",
297			paging: pagingConf{
298				recordsPerRequest:    5,
299				setRecordsPerRequest: true,
300			},
301		},
302		tok: "",
303	}}
304
305	if !reflect.DeepEqual(s.readQueryCalls, want) {
306		t.Errorf("reading: got:\n%v\nwant:\n%v", s.readQueryCalls, want)
307	}
308}
309