1/*
2Copyright 2019 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spannertest
18
19// TODO: More of this test should be moved into integration_test.go.
20
21import (
22	"fmt"
23	"io"
24	"reflect"
25	"sync"
26	"testing"
27
28	"google.golang.org/grpc/codes"
29
30	structpb "github.com/golang/protobuf/ptypes/struct"
31
32	"cloud.google.com/go/civil"
33	"cloud.google.com/go/spanner/spansql"
34)
35
36func TestTableCreation(t *testing.T) {
37	stdTestTable := &spansql.CreateTable{
38		Name: "Staff",
39		Columns: []spansql.ColumnDef{
40			{Name: "Tenure", Type: spansql.Type{Base: spansql.Int64}},
41			{Name: "ID", Type: spansql.Type{Base: spansql.Int64}},
42			{Name: "Name", Type: spansql.Type{Base: spansql.String}},
43			{Name: "Cool", Type: spansql.Type{Base: spansql.Bool}},
44			{Name: "Height", Type: spansql.Type{Base: spansql.Float64}},
45		},
46		PrimaryKey: []spansql.KeyPart{{Column: "Name"}, {Column: "ID"}},
47	}
48
49	var db database
50	st := db.ApplyDDL(stdTestTable)
51	if st.Code() != codes.OK {
52		t.Fatalf("Creating table: %v", st.Err())
53	}
54
55	// Snoop inside to check that it was constructed correctly.
56	got, ok := db.tables["Staff"]
57	if !ok {
58		t.Fatal("Table didn't get registered")
59	}
60	want := table{
61		cols: []colInfo{
62			{Name: "Name", Type: spansql.Type{Base: spansql.String}},
63			{Name: "ID", Type: spansql.Type{Base: spansql.Int64}},
64			{Name: "Tenure", Type: spansql.Type{Base: spansql.Int64}},
65			{Name: "Cool", Type: spansql.Type{Base: spansql.Bool}},
66			{Name: "Height", Type: spansql.Type{Base: spansql.Float64}},
67		},
68		colIndex: map[spansql.ID]int{
69			"Tenure": 2, "ID": 1, "Cool": 3, "Name": 0, "Height": 4,
70		},
71		pkCols: 2,
72	}
73	if !reflect.DeepEqual(got.cols, want.cols) {
74		t.Errorf("table.cols incorrect.\n got %v\nwant %v", got.cols, want.cols)
75	}
76	if !reflect.DeepEqual(got.colIndex, want.colIndex) {
77		t.Errorf("table.colIndex incorrect.\n got %v\nwant %v", got.colIndex, want.colIndex)
78	}
79	if got.pkCols != want.pkCols {
80		t.Errorf("table.pkCols incorrect.\n got %d\nwant %d", got.pkCols, want.pkCols)
81	}
82}
83
84func TestTableDescendingKey(t *testing.T) {
85	var descTestTable = &spansql.CreateTable{
86		Name: "Timeseries",
87		Columns: []spansql.ColumnDef{
88			{Name: "Name", Type: spansql.Type{Base: spansql.String}},
89			{Name: "Observed", Type: spansql.Type{Base: spansql.Int64}},
90			{Name: "Value", Type: spansql.Type{Base: spansql.Float64}},
91		},
92		PrimaryKey: []spansql.KeyPart{{Column: "Name"}, {Column: "Observed", Desc: true}},
93	}
94
95	var db database
96	if st := db.ApplyDDL(descTestTable); st.Code() != codes.OK {
97		t.Fatalf("Creating table: %v", st.Err())
98	}
99
100	tx := db.NewTransaction()
101	tx.Start()
102	err := db.Insert(tx, "Timeseries", []spansql.ID{"Name", "Observed", "Value"}, []*structpb.ListValue{
103		listV(stringV("box"), stringV("1"), floatV(1.1)),
104		listV(stringV("cupcake"), stringV("1"), floatV(6)),
105		listV(stringV("box"), stringV("2"), floatV(1.2)),
106		listV(stringV("cupcake"), stringV("2"), floatV(7)),
107		listV(stringV("box"), stringV("3"), floatV(1.3)),
108		listV(stringV("cupcake"), stringV("3"), floatV(8)),
109	})
110	if err != nil {
111		t.Fatalf("Inserting data: %v", err)
112	}
113	if _, err := tx.Commit(); err != nil {
114		t.Fatalf("Committing changes: %v", err)
115	}
116
117	// Querying the entire table should return values in key order,
118	// noting that the second key part here is in descending order.
119	q, err := spansql.ParseQuery(`SELECT * FROM Timeseries`)
120	if err != nil {
121		t.Fatalf("ParseQuery: %v", err)
122	}
123	ri, err := db.Query(q, nil)
124	if err != nil {
125		t.Fatalf("Query: %v", err)
126	}
127	got := slurp(t, ri)
128	want := [][]interface{}{
129		{"box", int64(3), 1.3},
130		{"box", int64(2), 1.2},
131		{"box", int64(1), 1.1},
132		{"cupcake", int64(3), 8.0},
133		{"cupcake", int64(2), 7.0},
134		{"cupcake", int64(1), 6.0},
135	}
136	if !reflect.DeepEqual(got, want) {
137		t.Errorf("Results from Query are wrong.\n got %v\nwant %v", got, want)
138	}
139
140	// TestKeyRange exercises the edge cases for key range reading.
141}
142
143func TestTableSchemaConvertNull(t *testing.T) {
144	var db database
145	st := db.ApplyDDL(&spansql.CreateTable{
146		Name: "Songwriters",
147		Columns: []spansql.ColumnDef{
148			{Name: "ID", Type: spansql.Type{Base: spansql.Int64}, NotNull: true},
149			{Name: "Nickname", Type: spansql.Type{Base: spansql.String}},
150		},
151		PrimaryKey: []spansql.KeyPart{{Column: "ID"}},
152	})
153	if err := st.Err(); err != nil {
154		t.Fatal(err)
155	}
156
157	// Populate with data including a NULL for the STRING field.
158	tx := db.NewTransaction()
159	tx.Start()
160	err := db.Insert(tx, "Songwriters", []spansql.ID{"ID", "Nickname"}, []*structpb.ListValue{
161		listV(stringV("6"), stringV("Tiger")),
162		listV(stringV("7"), nullV()),
163	})
164	if err != nil {
165		t.Fatalf("Inserting data: %v", err)
166	}
167	if _, err := tx.Commit(); err != nil {
168		t.Fatalf("Committing changes: %v", err)
169	}
170
171	// Convert the STRING field to a BYTES and back.
172	st = db.ApplyDDL(&spansql.AlterTable{
173		Name: "Songwriters",
174		Alteration: spansql.AlterColumn{
175			Name:       "Nickname",
176			Alteration: spansql.SetColumnType{Type: spansql.Type{Base: spansql.Bytes}},
177		},
178	})
179	if err := st.Err(); err != nil {
180		t.Fatalf("Converting STRING -> BYTES: %v", err)
181	}
182	st = db.ApplyDDL(&spansql.AlterTable{
183		Name: "Songwriters",
184		Alteration: spansql.AlterColumn{
185			Name:       "Nickname",
186			Alteration: spansql.SetColumnType{Type: spansql.Type{Base: spansql.String}},
187		},
188	})
189	if err := st.Err(); err != nil {
190		t.Fatalf("Converting BYTES -> STRING: %v", err)
191	}
192
193	// Check that the data is maintained.
194	q, err := spansql.ParseQuery(`SELECT * FROM Songwriters`)
195	if err != nil {
196		t.Fatalf("ParseQuery: %v", err)
197	}
198	ri, err := db.Query(q, nil)
199	if err != nil {
200		t.Fatalf("Query: %v", err)
201	}
202	got := slurp(t, ri)
203	want := [][]interface{}{
204		{int64(6), "Tiger"},
205		{int64(7), nil},
206	}
207	if !reflect.DeepEqual(got, want) {
208		t.Errorf("Results from Query are wrong.\n got %v\nwant %v", got, want)
209	}
210}
211
212func TestTableSchemaUpdates(t *testing.T) {
213	tests := []struct {
214		desc     string
215		ddl      string
216		wantCode codes.Code
217	}{
218		// TODO: add more cases, including interactions with the primary key and dropping columns.
219
220		{
221			"Add new column",
222			`CREATE TABLE Songwriters (
223				Id INT64 NOT NULL,
224			) PRIMARY KEY (Id);
225			ALTER TABLE Songwriters ADD COLUMN Nickname STRING(MAX);`,
226			codes.OK,
227		},
228		{
229			"Add new column with NOT NULL",
230			`CREATE TABLE Songwriters (
231				Id INT64 NOT NULL,
232			) PRIMARY KEY (Id);
233			ALTER TABLE Songwriters ADD COLUMN Nickname STRING(MAX) NOT NULL;`,
234			codes.InvalidArgument,
235		},
236
237		// Examples from https://cloud.google.com/spanner/docs/schema-updates:
238
239		{
240			"Add NOT NULL to a non-key column",
241			`CREATE TABLE Songwriters (
242				Id INT64 NOT NULL,
243				Nickname STRING(MAX),
244			) PRIMARY KEY (Id);
245			ALTER TABLE Songwriters ALTER COLUMN Nickname STRING(MAX) NOT NULL;`,
246			codes.OK,
247		},
248		{
249			"Remove NOT NULL from a non-key column",
250			`CREATE TABLE Songwriters (
251				Id INT64 NOT NULL,
252				Nickname STRING(MAX) NOT NULL,
253			) PRIMARY KEY (Id);
254			ALTER TABLE Songwriters ALTER COLUMN Nickname STRING(MAX);`,
255			codes.OK,
256		},
257		{
258			"Change a STRING column to a BYTES column",
259			`CREATE TABLE Songwriters (
260				Id INT64 NOT NULL,
261				Nickname STRING(MAX),
262			) PRIMARY KEY (Id);
263			ALTER TABLE Songwriters ALTER COLUMN Nickname BYTES(MAX);`,
264			codes.OK,
265		},
266		// TODO: Increase or decrease the length limit for a STRING or BYTES type (including to MAX)
267		// TODO: Enable or disable commit timestamps in value and primary key columns
268	}
269testLoop:
270	for _, test := range tests {
271		var db database
272
273		ddl, err := spansql.ParseDDL("filename", test.ddl)
274		if err != nil {
275			t.Fatalf("%s: Bad DDL: %v", test.desc, err)
276		}
277		for _, stmt := range ddl.List {
278			if st := db.ApplyDDL(stmt); st.Code() != codes.OK {
279				if st.Code() != test.wantCode {
280					t.Errorf("%s: Applying statement %q: %v", test.desc, stmt.SQL(), st.Err())
281				}
282				continue testLoop
283			}
284		}
285		if test.wantCode != codes.OK {
286			t.Errorf("%s: Finished with OK, want %v", test.desc, test.wantCode)
287		}
288	}
289}
290
291func TestConcurrentReadInsert(t *testing.T) {
292	// Check that data is safely copied during a query.
293	tbl := &spansql.CreateTable{
294		Name: "Tablino",
295		Columns: []spansql.ColumnDef{
296			{Name: "A", Type: spansql.Type{Base: spansql.Int64}},
297		},
298		PrimaryKey: []spansql.KeyPart{{Column: "A"}},
299	}
300
301	var db database
302	if st := db.ApplyDDL(tbl); st.Code() != codes.OK {
303		t.Fatalf("Creating table: %v", st.Err())
304	}
305
306	// Insert some initial data.
307	tx := db.NewTransaction()
308	tx.Start()
309	err := db.Insert(tx, "Tablino", []spansql.ID{"A"}, []*structpb.ListValue{
310		listV(stringV("1")),
311		listV(stringV("2")),
312		listV(stringV("4")),
313	})
314	if err != nil {
315		t.Fatalf("Inserting data: %v", err)
316	}
317	if _, err := tx.Commit(); err != nil {
318		t.Fatalf("Committing changes: %v", err)
319	}
320
321	// Now insert "3", and query concurrently.
322	q, err := spansql.ParseQuery(`SELECT * FROM Tablino WHERE A > 2`)
323	if err != nil {
324		t.Fatalf("ParseQuery: %v", err)
325	}
326	var out [][]interface{}
327
328	var wg sync.WaitGroup
329	wg.Add(2)
330	go func() {
331		defer wg.Done()
332
333		ri, err := db.Query(q, nil)
334		if err != nil {
335			t.Errorf("Query: %v", err)
336			return
337		}
338		out = slurp(t, ri)
339	}()
340	go func() {
341		defer wg.Done()
342
343		tx := db.NewTransaction()
344		tx.Start()
345		err := db.Insert(tx, "Tablino", []spansql.ID{"A"}, []*structpb.ListValue{
346			listV(stringV("3")),
347		})
348		if err != nil {
349			t.Errorf("Inserting data: %v", err)
350			return
351		}
352		if _, err := tx.Commit(); err != nil {
353			t.Errorf("Committing changes: %v", err)
354		}
355	}()
356	wg.Wait()
357
358	// We should get either 1 or 2 rows (value 4 should be included, and value 3 might).
359	if n := len(out); n != 1 && n != 2 {
360		t.Fatalf("Concurrent read returned %d rows, want 1 or 2", n)
361	}
362}
363
364func slurp(t *testing.T, ri rowIter) (all [][]interface{}) {
365	t.Helper()
366	for {
367		row, err := ri.Next()
368		if err == io.EOF {
369			return
370		} else if err != nil {
371			t.Fatalf("Reading rows: %v", err)
372		}
373		all = append(all, row)
374	}
375}
376
377func listV(vs ...*structpb.Value) *structpb.ListValue { return &structpb.ListValue{Values: vs} }
378func stringV(s string) *structpb.Value                { return &structpb.Value{Kind: &structpb.Value_StringValue{s}} }
379func floatV(f float64) *structpb.Value                { return &structpb.Value{Kind: &structpb.Value_NumberValue{f}} }
380func boolV(b bool) *structpb.Value                    { return &structpb.Value{Kind: &structpb.Value_BoolValue{b}} }
381func nullV() *structpb.Value                          { return &structpb.Value{Kind: &structpb.Value_NullValue{}} }
382
383func boolParam(b bool) queryParam     { return queryParam{Value: b, Type: boolType} }
384func stringParam(s string) queryParam { return queryParam{Value: s, Type: stringType} }
385func intParam(i int64) queryParam     { return queryParam{Value: i, Type: int64Type} }
386func floatParam(f float64) queryParam { return queryParam{Value: f, Type: float64Type} }
387func nullParam() queryParam           { return queryParam{Value: nil} }
388
389func dateParam(s string) queryParam {
390	d, err := civil.ParseDate(s)
391	if err != nil {
392		panic(fmt.Sprintf("bad test date %q: %v", s, err))
393	}
394	return queryParam{Value: d, Type: spansql.Type{Base: spansql.Date}}
395}
396
397func TestRowCmp(t *testing.T) {
398	r := func(x ...interface{}) []interface{} { return x }
399	tests := []struct {
400		a, b []interface{}
401		desc []bool
402		want int
403	}{
404		{r(int64(1), "foo", 1.6), r(int64(1), "foo", 1.6), []bool{false, false, false}, 0},
405		{r(int64(1), "foo"), r(int64(1), "foo", 1.6), []bool{false, false, false}, 0}, // first is shorter
406
407		{r(int64(1), "bar", 1.8), r(int64(1), "foo", 1.6), []bool{false, false, false}, -1},
408		{r(int64(1), "bar", 1.8), r(int64(1), "foo", 1.6), []bool{false, false, true}, -1},
409		{r(int64(1), "bar", 1.8), r(int64(1), "foo", 1.6), []bool{false, true, false}, 1},
410
411		{r(int64(1), "foo", 1.6), r(int64(1), "bar", 1.8), []bool{false, false, false}, 1},
412		{r(int64(1), "foo", 1.6), r(int64(1), "bar", 1.8), []bool{false, false, true}, 1},
413		{r(int64(1), "foo", 1.6), r(int64(1), "bar", 1.8), []bool{false, true, false}, -1},
414		{r(int64(1), "foo", 1.6), r(int64(1), "bar", 1.8), []bool{false, true, true}, -1},
415	}
416	for _, test := range tests {
417		if got := rowCmp(test.a, test.b, test.desc); got != test.want {
418			t.Errorf("rowCmp(%v, %v, %v) = %d, want %d", test.a, test.b, test.desc, got, test.want)
419		}
420	}
421}
422
423func TestKeyRange(t *testing.T) {
424	r := func(x ...interface{}) []interface{} { return x }
425	closedClosed := func(start, end []interface{}) *keyRange {
426		return &keyRange{
427			startKey:    start,
428			endKey:      end,
429			startClosed: true,
430			endClosed:   true,
431		}
432	}
433	halfOpen := func(start, end []interface{}) *keyRange {
434		return &keyRange{
435			startKey:    start,
436			endKey:      end,
437			startClosed: true,
438		}
439	}
440	openOpen := func(start, end []interface{}) *keyRange {
441		return &keyRange{
442			startKey: start,
443			endKey:   end,
444		}
445	}
446	tests := []struct {
447		kr      *keyRange
448		desc    []bool
449		include [][]interface{}
450		exclude [][]interface{}
451	}{
452		// Examples from google/spanner/v1/keys.proto.
453		{
454			kr: closedClosed(r("Bob", "2015-01-01"), r("Bob", "2015-12-31")),
455			include: [][]interface{}{
456				r("Bob", "2015-01-01"),
457				r("Bob", "2015-07-07"),
458				r("Bob", "2015-12-31"),
459			},
460			exclude: [][]interface{}{
461				r("Alice", "2015-07-07"),
462				r("Bob", "2014-12-31"),
463				r("Bob", "2016-01-01"),
464			},
465		},
466		{
467			kr: closedClosed(r("Bob", "2000-01-01"), r("Bob")),
468			include: [][]interface{}{
469				r("Bob", "2000-01-01"),
470				r("Bob", "2022-07-07"),
471			},
472			exclude: [][]interface{}{
473				r("Alice", "2015-07-07"),
474				r("Bob", "1999-11-07"),
475			},
476		},
477		{
478			kr: closedClosed(r("Bob"), r("Bob")),
479			include: [][]interface{}{
480				r("Bob", "2000-01-01"),
481			},
482			exclude: [][]interface{}{
483				r("Alice", "2015-07-07"),
484				r("Charlie", "1999-11-07"),
485			},
486		},
487		{
488			kr: halfOpen(r("Bob"), r("Bob", "2000-01-01")),
489			include: [][]interface{}{
490				r("Bob", "1999-11-07"),
491			},
492			exclude: [][]interface{}{
493				r("Alice", "1999-11-07"),
494				r("Bob", "2000-01-01"),
495				r("Bob", "2004-07-07"),
496				r("Charlie", "1999-11-07"),
497			},
498		},
499		{
500			kr: openOpen(r("Bob", "1999-11-06"), r("Bob", "2000-01-01")),
501			include: [][]interface{}{
502				r("Bob", "1999-11-07"),
503			},
504			exclude: [][]interface{}{
505				r("Alice", "1999-11-07"),
506				r("Bob", "1999-11-06"),
507				r("Bob", "2000-01-01"),
508				r("Bob", "2004-07-07"),
509				r("Charlie", "1999-11-07"),
510			},
511		},
512		{
513			kr: closedClosed(r(), r()),
514			include: [][]interface{}{
515				r("Alice", "1999-11-07"),
516				r("Bob", "1999-11-07"),
517				r("Charlie", "1999-11-07"),
518			},
519		},
520		{
521			kr: halfOpen(r("A"), r("D")),
522			include: [][]interface{}{
523				r("Alice", "1999-11-07"),
524				r("Bob", "1999-11-07"),
525				r("Charlie", "1999-11-07"),
526			},
527			exclude: [][]interface{}{
528				r("0day", "1999-11-07"),
529				r("Doris", "1999-11-07"),
530			},
531		},
532		// Exercise descending primary key ordering.
533		{
534			kr:   halfOpen(r("Alpha"), r("Charlie")),
535			desc: []bool{true, false},
536			// Key range is backwards, so nothing should be returned.
537			exclude: [][]interface{}{
538				r("Alice", "1999-11-07"),
539				r("Bob", "1999-11-07"),
540				r("Doris", "1999-11-07"),
541			},
542		},
543		{
544			kr:   halfOpen(r("Alice", "1999-11-07"), r("Charlie")),
545			desc: []bool{false, true},
546			// The second primary key column is descending.
547			include: [][]interface{}{
548				r("Alice", "1999-09-09"),
549				r("Alice", "1999-11-07"),
550				r("Bob", "2000-01-01"),
551			},
552			exclude: [][]interface{}{
553				r("Alice", "2000-01-01"),
554				r("Doris", "1999-11-07"),
555			},
556		},
557	}
558	for _, test := range tests {
559		desc := test.desc
560		if desc == nil {
561			desc = []bool{false, false} // default
562		}
563		tbl := &table{
564			pkCols: 2,
565			pkDesc: desc,
566		}
567		for _, pk := range append(test.include, test.exclude...) {
568			rowNum, _ := tbl.rowForPK(pk)
569			tbl.insertRow(rowNum, pk)
570		}
571		start, end := tbl.findRange(test.kr)
572		has := func(pk []interface{}) bool {
573			n, _ := tbl.rowForPK(pk)
574			return start <= n && n < end
575		}
576		for _, pk := range test.include {
577			if !has(pk) {
578				t.Errorf("keyRange %v does not include %v", test.kr, pk)
579			}
580		}
581		for _, pk := range test.exclude {
582			if has(pk) {
583				t.Errorf("keyRange %v includes %v", test.kr, pk)
584			}
585		}
586	}
587}
588