1// Copyright 2015 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package bigquery 16 17import ( 18 "context" 19 "errors" 20 "testing" 21 22 "cloud.google.com/go/internal/testutil" 23 "github.com/google/go-cmp/cmp" 24 bq "google.golang.org/api/bigquery/v2" 25 "google.golang.org/api/iterator" 26) 27 28type pageFetcherArgs struct { 29 table *Table 30 schema Schema 31 startIndex uint64 32 pageSize int64 33 pageToken string 34} 35 36// pageFetcherReadStub services read requests by returning data from an in-memory list of values. 37type pageFetcherReadStub struct { 38 // values and pageTokens are used as sources of data to return in response to calls to readTabledata or readQuery. 39 values [][][]Value // contains pages / rows / columns. 40 pageTokens map[string]string // maps incoming page token to returned page token. 41 42 // arguments are recorded for later inspection. 43 calls []pageFetcherArgs 44} 45 46func (s *pageFetcherReadStub) fetchPage(ctx context.Context, t *Table, schema Schema, startIndex uint64, pageSize int64, pageToken string) (*fetchPageResult, error) { 47 s.calls = append(s.calls, 48 pageFetcherArgs{t, schema, startIndex, pageSize, pageToken}) 49 result := &fetchPageResult{ 50 pageToken: s.pageTokens[pageToken], 51 rows: s.values[0], 52 } 53 s.values = s.values[1:] 54 return result, nil 55} 56 57func waitForQueryStub(context.Context, string) (Schema, uint64, error) { 58 return nil, 1, nil 59} 60 61func TestRead(t *testing.T) { 62 // The data for the service stub to return is populated for each test case in the testCases for loop. 63 ctx := context.Background() 64 c := &Client{projectID: "project-id"} 65 pf := &pageFetcherReadStub{} 66 queryJob := &Job{ 67 projectID: "project-id", 68 jobID: "job-id", 69 c: c, 70 config: &bq.JobConfiguration{ 71 Query: &bq.JobConfigurationQuery{ 72 DestinationTable: &bq.TableReference{ 73 ProjectId: "project-id", 74 DatasetId: "dataset-id", 75 TableId: "table-id", 76 }, 77 }, 78 }, 79 } 80 81 for _, readFunc := range []func() *RowIterator{ 82 func() *RowIterator { 83 return c.Dataset("dataset-id").Table("table-id").read(ctx, pf.fetchPage) 84 }, 85 func() *RowIterator { 86 it, err := queryJob.read(ctx, waitForQueryStub, pf.fetchPage) 87 if err != nil { 88 t.Fatal(err) 89 } 90 return it 91 }, 92 } { 93 testCases := []struct { 94 data [][][]Value 95 pageTokens map[string]string 96 want [][]Value 97 }{ 98 { 99 data: [][][]Value{{{1, 2}, {11, 12}}, {{30, 40}, {31, 41}}}, 100 pageTokens: map[string]string{"": "a", "a": ""}, 101 want: [][]Value{{1, 2}, {11, 12}, {30, 40}, {31, 41}}, 102 }, 103 { 104 data: [][][]Value{{{1, 2}, {11, 12}}, {{30, 40}, {31, 41}}}, 105 pageTokens: map[string]string{"": ""}, // no more pages after first one. 106 want: [][]Value{{1, 2}, {11, 12}}, 107 }, 108 } 109 for _, tc := range testCases { 110 pf.values = tc.data 111 pf.pageTokens = tc.pageTokens 112 if got, ok := collectValues(t, readFunc()); ok { 113 if !testutil.Equal(got, tc.want) { 114 t.Errorf("reading: got:\n%v\nwant:\n%v", got, tc.want) 115 } 116 } 117 } 118 } 119} 120 121func collectValues(t *testing.T, it *RowIterator) ([][]Value, bool) { 122 var got [][]Value 123 for { 124 var vals []Value 125 err := it.Next(&vals) 126 if err == iterator.Done { 127 break 128 } 129 if err != nil { 130 t.Errorf("err calling Next: %v", err) 131 return nil, false 132 } 133 got = append(got, vals) 134 } 135 return got, true 136} 137 138func TestNoMoreValues(t *testing.T) { 139 c := &Client{projectID: "project-id"} 140 pf := &pageFetcherReadStub{ 141 values: [][][]Value{{{1, 2}, {11, 12}}}, 142 } 143 it := c.Dataset("dataset-id").Table("table-id").read(context.Background(), pf.fetchPage) 144 var vals []Value 145 // We expect to retrieve two values and then fail on the next attempt. 146 if err := it.Next(&vals); err != nil { 147 t.Fatalf("Next: got: %v: want: nil", err) 148 } 149 if err := it.Next(&vals); err != nil { 150 t.Fatalf("Next: got: %v: want: nil", err) 151 } 152 if err := it.Next(&vals); err != iterator.Done { 153 t.Fatalf("Next: got: %v: want: iterator.Done", err) 154 } 155} 156 157var errBang = errors.New("bang") 158 159func errorFetchPage(context.Context, *Table, Schema, uint64, int64, string) (*fetchPageResult, error) { 160 return nil, errBang 161} 162 163func TestReadError(t *testing.T) { 164 // test that service read errors are propagated back to the caller. 165 c := &Client{projectID: "project-id"} 166 it := c.Dataset("dataset-id").Table("table-id").read(context.Background(), errorFetchPage) 167 var vals []Value 168 if err := it.Next(&vals); err != errBang { 169 t.Fatalf("Get: got: %v: want: %v", err, errBang) 170 } 171} 172 173func TestReadTabledataOptions(t *testing.T) { 174 // test that read options are propagated. 175 s := &pageFetcherReadStub{ 176 values: [][][]Value{{{1, 2}}}, 177 } 178 c := &Client{projectID: "project-id"} 179 tr := c.Dataset("dataset-id").Table("table-id") 180 it := tr.read(context.Background(), s.fetchPage) 181 it.PageInfo().MaxSize = 5 182 var vals []Value 183 if err := it.Next(&vals); err != nil { 184 t.Fatal(err) 185 } 186 want := []pageFetcherArgs{{ 187 table: tr, 188 pageSize: 5, 189 pageToken: "", 190 }} 191 if diff := testutil.Diff(s.calls, want, cmp.AllowUnexported(pageFetcherArgs{}, pageFetcherReadStub{}, Table{}, Client{})); diff != "" { 192 t.Errorf("reading (got=-, want=+):\n%s", diff) 193 } 194} 195 196func TestReadQueryOptions(t *testing.T) { 197 // test that read options are propagated. 198 c := &Client{projectID: "project-id"} 199 pf := &pageFetcherReadStub{ 200 values: [][][]Value{{{1, 2}}}, 201 } 202 tr := &bq.TableReference{ 203 ProjectId: "project-id", 204 DatasetId: "dataset-id", 205 TableId: "table-id", 206 } 207 queryJob := &Job{ 208 projectID: "project-id", 209 jobID: "job-id", 210 c: c, 211 config: &bq.JobConfiguration{ 212 Query: &bq.JobConfigurationQuery{DestinationTable: tr}, 213 }, 214 } 215 it, err := queryJob.read(context.Background(), waitForQueryStub, pf.fetchPage) 216 if err != nil { 217 t.Fatalf("err calling Read: %v", err) 218 } 219 it.PageInfo().MaxSize = 5 220 var vals []Value 221 if err := it.Next(&vals); err != nil { 222 t.Fatalf("Next: got: %v: want: nil", err) 223 } 224 225 want := []pageFetcherArgs{{ 226 table: bqToTable(tr, c), 227 pageSize: 5, 228 pageToken: "", 229 }} 230 if !testutil.Equal(pf.calls, want, cmp.AllowUnexported(pageFetcherArgs{}, Table{}, Client{})) { 231 t.Errorf("reading: got:\n%v\nwant:\n%v", pf.calls, want) 232 } 233} 234