1/*
2Copyright 2019 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17/*
18Package spannertest contains test helpers for working with Cloud Spanner.
19
20This package is EXPERIMENTAL, and is lacking many features. See the README.md
21file in this directory for more details.
22
23In-memory fake
24
25This package has an in-memory fake implementation of spanner. To use it,
26create a Server, and then connect to it with no security:
27	srv, err := spannertest.NewServer("localhost:0")
28	...
29	conn, err := grpc.DialContext(ctx, srv.Addr, grpc.WithInsecure())
30	...
31	client, err := spanner.NewClient(ctx, db, option.WithGRPCConn(conn))
32	...
33
34Alternatively, create a Server, then set the SPANNER_EMULATOR_HOST environment
35variable and use the regular spanner.NewClient:
36	srv, err := spannertest.NewServer("localhost:0")
37	...
38	os.Setenv("SPANNER_EMULATOR_HOST", srv.Addr)
39	client, err := spanner.NewClient(ctx, db)
40	...
41
42The same server also supports database admin operations for use with
43the cloud.google.com/go/spanner/admin/database/apiv1 package.
44*/
45package spannertest
46
47import (
48	"context"
49	"encoding/base64"
50	"fmt"
51	"io"
52	"log"
53	"math/rand"
54	"net"
55	"strconv"
56	"sync"
57	"time"
58
59	"github.com/golang/protobuf/proto"
60	"github.com/golang/protobuf/ptypes"
61	"google.golang.org/grpc"
62	"google.golang.org/grpc/codes"
63	"google.golang.org/grpc/status"
64
65	anypb "github.com/golang/protobuf/ptypes/any"
66	emptypb "github.com/golang/protobuf/ptypes/empty"
67	structpb "github.com/golang/protobuf/ptypes/struct"
68	timestamppb "github.com/golang/protobuf/ptypes/timestamp"
69	lropb "google.golang.org/genproto/googleapis/longrunning"
70	adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1"
71	spannerpb "google.golang.org/genproto/googleapis/spanner/v1"
72
73	"cloud.google.com/go/spanner/spansql"
74)
75
76// Server is an in-memory Cloud Spanner fake.
77// It is unauthenticated, non-performant, and only a rough approximation.
78type Server struct {
79	Addr string
80
81	l   net.Listener
82	srv *grpc.Server
83	s   *server
84}
85
86// server is the real implementation of the fake.
87// It is a separate and unexported type so the API won't be cluttered with
88// methods that are only relevant to the fake's implementation.
89type server struct {
90	logf Logger
91
92	db database
93
94	mu       sync.Mutex
95	sessions map[string]*session
96	lros     map[string]*lro
97
98	// Any unimplemented methods will cause a panic.
99	// TODO: Switch to Unimplemented at some point? spannerpb would need regenerating.
100	adminpb.DatabaseAdminServer
101	spannerpb.SpannerServer
102	lropb.OperationsServer
103}
104
105type session struct {
106	name     string
107	creation time.Time
108
109	// This context tracks the lifetime of this session.
110	// It is canceled in DeleteSession.
111	ctx    context.Context
112	cancel func()
113
114	mu           sync.Mutex
115	lastUse      time.Time
116	transactions map[string]*transaction
117}
118
119func (s *session) Proto() *spannerpb.Session {
120	s.mu.Lock()
121	defer s.mu.Unlock()
122	m := &spannerpb.Session{
123		Name:                   s.name,
124		CreateTime:             timestampProto(s.creation),
125		ApproximateLastUseTime: timestampProto(s.lastUse),
126	}
127	return m
128}
129
130// timestampProto returns a valid timestamp.Timestamp,
131// or nil if the given time is zero or isn't representable.
132func timestampProto(t time.Time) *timestamppb.Timestamp {
133	if t.IsZero() {
134		return nil
135	}
136	ts, err := ptypes.TimestampProto(t)
137	if err != nil {
138		return nil
139	}
140	return ts
141}
142
143// lro represents a Long-Running Operation, generally a schema change.
144type lro struct {
145	mu    sync.Mutex
146	state *lropb.Operation
147}
148
149func (l *lro) State() *lropb.Operation {
150	l.mu.Lock()
151	defer l.mu.Unlock()
152	return proto.Clone(l.state).(*lropb.Operation)
153}
154
155// Logger is something that can be used for logging.
156// It is matched by log.Printf and testing.T.Logf.
157type Logger func(format string, args ...interface{})
158
159// NewServer creates a new Server.
160// The Server will be listening for gRPC connections, without TLS, on the provided TCP address.
161// The resolved address is available in the Addr field.
162func NewServer(laddr string) (*Server, error) {
163	l, err := net.Listen("tcp", laddr)
164	if err != nil {
165		return nil, err
166	}
167
168	s := &Server{
169		Addr: l.Addr().String(),
170		l:    l,
171		srv:  grpc.NewServer(),
172		s: &server{
173			logf: func(format string, args ...interface{}) {
174				log.Printf("spannertest.inmem: "+format, args...)
175			},
176			sessions: make(map[string]*session),
177			lros:     make(map[string]*lro),
178		},
179	}
180	adminpb.RegisterDatabaseAdminServer(s.srv, s.s)
181	spannerpb.RegisterSpannerServer(s.srv, s.s)
182	lropb.RegisterOperationsServer(s.srv, s.s)
183
184	go s.srv.Serve(s.l)
185
186	return s, nil
187}
188
189// SetLogger sets a logger for the server.
190// You can use a *testing.T as this argument to collate extra information
191// from the execution of the server.
192func (s *Server) SetLogger(l Logger) { s.s.logf = l }
193
194// Close shuts down the server.
195func (s *Server) Close() {
196	s.srv.Stop()
197	s.l.Close()
198}
199
200func genRandomSession() string {
201	var b [4]byte
202	rand.Read(b[:])
203	return fmt.Sprintf("%x", b)
204}
205
206func genRandomTransaction() string {
207	var b [6]byte
208	rand.Read(b[:])
209	return fmt.Sprintf("tx-%x", b)
210}
211
212func genRandomOperation() string {
213	var b [3]byte
214	rand.Read(b[:])
215	return fmt.Sprintf("op-%x", b)
216}
217
218func (s *server) GetOperation(ctx context.Context, req *lropb.GetOperationRequest) (*lropb.Operation, error) {
219	s.mu.Lock()
220	lro, ok := s.lros[req.Name]
221	s.mu.Unlock()
222	if !ok {
223		return nil, status.Errorf(codes.NotFound, "unknown LRO %q", req.Name)
224	}
225	return lro.State(), nil
226}
227
228// UpdateDDL applies the given DDL to the server.
229//
230// This is a convenience method for tests that may assume an existing schema.
231// The more general approach is to dial this server using an admin client, and
232// use the UpdateDatabaseDdl RPC method.
233func (s *Server) UpdateDDL(ddl *spansql.DDL) error {
234	ctx := context.Background()
235	for _, stmt := range ddl.List {
236		if st := s.s.runOneDDL(ctx, stmt); st.Code() != codes.OK {
237			return st.Err()
238		}
239	}
240	return nil
241}
242
243func (s *server) UpdateDatabaseDdl(ctx context.Context, req *adminpb.UpdateDatabaseDdlRequest) (*lropb.Operation, error) {
244	// Parse all the DDL statements first.
245	var stmts []spansql.DDLStmt
246	for _, s := range req.Statements {
247		stmt, err := spansql.ParseDDLStmt(s)
248		if err != nil {
249			// TODO: check what code the real Spanner returns here.
250			return nil, status.Errorf(codes.InvalidArgument, "bad DDL statement %q: %v", s, err)
251		}
252		stmts = append(stmts, stmt)
253	}
254
255	// Nothing should be depending on the exact structure of this,
256	// but it is specified in google/spanner/admin/database/v1/spanner_database_admin.proto.
257	id := "projects/fake-proj/instances/fake-instance/databases/fake-db/operations/" + genRandomOperation()
258	lro := &lro{
259		state: &lropb.Operation{
260			Name: id,
261		},
262	}
263	s.mu.Lock()
264	s.lros[id] = lro
265	s.mu.Unlock()
266
267	go lro.Run(s, stmts)
268	return lro.State(), nil
269}
270
271func (l *lro) Run(s *server, stmts []spansql.DDLStmt) {
272	ctx := context.Background()
273
274	for _, stmt := range stmts {
275		time.Sleep(100 * time.Millisecond)
276		if st := s.runOneDDL(ctx, stmt); st.Code() != codes.OK {
277			l.mu.Lock()
278			l.state.Done = true
279			l.state.Result = &lropb.Operation_Error{st.Proto()}
280			l.mu.Unlock()
281			return
282		}
283	}
284
285	l.mu.Lock()
286	l.state.Done = true
287	l.state.Result = &lropb.Operation_Response{&anypb.Any{}}
288	l.mu.Unlock()
289}
290
291func (s *server) runOneDDL(ctx context.Context, stmt spansql.DDLStmt) *status.Status {
292	return s.db.ApplyDDL(stmt)
293}
294
295func (s *server) CreateSession(ctx context.Context, req *spannerpb.CreateSessionRequest) (*spannerpb.Session, error) {
296	//s.logf("CreateSession(%q)", req.Database)
297	return s.newSession(), nil
298}
299
300func (s *server) newSession() *spannerpb.Session {
301	id := genRandomSession()
302	now := time.Now()
303	sess := &session{
304		name:         id,
305		creation:     now,
306		lastUse:      now,
307		transactions: make(map[string]*transaction),
308	}
309	sess.ctx, sess.cancel = context.WithCancel(context.Background())
310
311	s.mu.Lock()
312	s.sessions[id] = sess
313	s.mu.Unlock()
314
315	return sess.Proto()
316}
317
318func (s *server) BatchCreateSessions(ctx context.Context, req *spannerpb.BatchCreateSessionsRequest) (*spannerpb.BatchCreateSessionsResponse, error) {
319	//s.logf("BatchCreateSessions(%q)", req.Database)
320
321	var sessions []*spannerpb.Session
322	for i := int32(0); i < req.GetSessionCount(); i++ {
323		sessions = append(sessions, s.newSession())
324	}
325
326	return &spannerpb.BatchCreateSessionsResponse{Session: sessions}, nil
327}
328
329func (s *server) GetSession(ctx context.Context, req *spannerpb.GetSessionRequest) (*spannerpb.Session, error) {
330	s.mu.Lock()
331	sess, ok := s.sessions[req.Name]
332	s.mu.Unlock()
333
334	if !ok {
335		// TODO: what error does the real Spanner return?
336		return nil, status.Errorf(codes.NotFound, "unknown session %q", req.Name)
337	}
338
339	return sess.Proto(), nil
340}
341
342// TODO: ListSessions
343
344func (s *server) DeleteSession(ctx context.Context, req *spannerpb.DeleteSessionRequest) (*emptypb.Empty, error) {
345	//s.logf("DeleteSession(%q)", req.Name)
346
347	s.mu.Lock()
348	sess, ok := s.sessions[req.Name]
349	delete(s.sessions, req.Name)
350	s.mu.Unlock()
351
352	if !ok {
353		// TODO: what error does the real Spanner return?
354		return nil, status.Errorf(codes.NotFound, "unknown session %q", req.Name)
355	}
356
357	// Terminate any operations in this session.
358	sess.cancel()
359
360	return &emptypb.Empty{}, nil
361}
362
363// popTx returns an existing transaction, removing it from the session.
364// This is called when a transaction is finishing (Commit, Rollback).
365func (s *server) popTx(sessionID, tid string) (tx *transaction, err error) {
366	s.mu.Lock()
367	sess, ok := s.sessions[sessionID]
368	s.mu.Unlock()
369	if !ok {
370		// TODO: what error does the real Spanner return?
371		return nil, status.Errorf(codes.NotFound, "unknown session %q", sessionID)
372	}
373
374	sess.mu.Lock()
375	sess.lastUse = time.Now()
376	tx, ok = sess.transactions[tid]
377	if ok {
378		delete(sess.transactions, tid)
379	}
380	sess.mu.Unlock()
381	if !ok {
382		// TODO: what error does the real Spanner return?
383		return nil, status.Errorf(codes.NotFound, "unknown transaction ID %q", tid)
384	}
385	return tx, nil
386}
387
388// readTx returns a transaction for the given session and transaction selector.
389// It is used by read/query operations (ExecuteStreamingSql, StreamingRead).
390func (s *server) readTx(ctx context.Context, session string, tsel *spannerpb.TransactionSelector) (tx *transaction, cleanup func(), err error) {
391	s.mu.Lock()
392	sess, ok := s.sessions[session]
393	s.mu.Unlock()
394	if !ok {
395		// TODO: what error does the real Spanner return?
396		return nil, nil, status.Errorf(codes.NotFound, "unknown session %q", session)
397	}
398
399	sess.mu.Lock()
400	sess.lastUse = time.Now()
401	sess.mu.Unlock()
402
403	// Only give a read-only transaction regardless of whether the selector
404	// is requesting a read-write or read-only one, since this is in readTx
405	// and so shouldn't be mutating anyway.
406	singleUse := func() (*transaction, func(), error) {
407		tx := s.db.NewReadOnlyTransaction()
408		return tx, tx.Rollback, nil
409	}
410
411	if tsel.GetSelector() == nil {
412		return singleUse()
413	}
414
415	switch sel := tsel.Selector.(type) {
416	default:
417		return nil, nil, fmt.Errorf("TransactionSelector type %T not supported", sel)
418	case *spannerpb.TransactionSelector_SingleUse:
419		// Ignore options (e.g. timestamps).
420		switch mode := sel.SingleUse.Mode.(type) {
421		case *spannerpb.TransactionOptions_ReadOnly_:
422			return singleUse()
423		case *spannerpb.TransactionOptions_ReadWrite_:
424			return singleUse()
425		default:
426			return nil, nil, fmt.Errorf("single use transaction in mode %T not supported", mode)
427		}
428	case *spannerpb.TransactionSelector_Id:
429		sess.mu.Lock()
430		tx, ok := sess.transactions[string(sel.Id)]
431		sess.mu.Unlock()
432		if !ok {
433			return nil, nil, fmt.Errorf("no transaction with id %q", sel.Id)
434		}
435		return tx, func() {}, nil
436	}
437}
438
439func (s *server) ExecuteSql(ctx context.Context, req *spannerpb.ExecuteSqlRequest) (*spannerpb.ResultSet, error) {
440	// Assume this is probably a DML statement. Queries tend to use ExecuteStreamingSql.
441	// TODO: Expand this to support more things.
442
443	obj, ok := req.Transaction.Selector.(*spannerpb.TransactionSelector_Id)
444	if !ok {
445		return nil, fmt.Errorf("unsupported transaction type %T", req.Transaction.Selector)
446	}
447	tid := string(obj.Id)
448	_ = tid // TODO: lookup an existing transaction by ID.
449
450	stmt, err := spansql.ParseDMLStmt(req.Sql)
451	if err != nil {
452		return nil, status.Errorf(codes.InvalidArgument, "bad DML: %v", err)
453	}
454	params, err := parseQueryParams(req.GetParams())
455	if err != nil {
456		return nil, err
457	}
458
459	s.logf("Executing: %s", stmt.SQL())
460	if len(params) > 0 {
461		s.logf("        ▹ %v", params)
462	}
463
464	n, err := s.db.Execute(stmt, params)
465	if err != nil {
466		return nil, err
467	}
468	return &spannerpb.ResultSet{
469		Stats: &spannerpb.ResultSetStats{
470			RowCount: &spannerpb.ResultSetStats_RowCountExact{int64(n)},
471		},
472	}, nil
473}
474
475func (s *server) ExecuteStreamingSql(req *spannerpb.ExecuteSqlRequest, stream spannerpb.Spanner_ExecuteStreamingSqlServer) error {
476	tx, cleanup, err := s.readTx(stream.Context(), req.Session, req.Transaction)
477	if err != nil {
478		return err
479	}
480	defer cleanup()
481
482	q, err := spansql.ParseQuery(req.Sql)
483	if err != nil {
484		// TODO: check what code the real Spanner returns here.
485		return status.Errorf(codes.InvalidArgument, "bad query: %v", err)
486	}
487
488	params, err := parseQueryParams(req.GetParams())
489	if err != nil {
490		return err
491	}
492
493	s.logf("Querying: %s", q.SQL())
494	if len(params) > 0 {
495		s.logf("        ▹ %v", params)
496	}
497
498	ri, err := s.db.Query(q, params)
499	if err != nil {
500		return err
501	}
502	return s.readStream(stream.Context(), tx, stream.Send, ri)
503}
504
505// TODO: Read
506
507func (s *server) StreamingRead(req *spannerpb.ReadRequest, stream spannerpb.Spanner_StreamingReadServer) error {
508	tx, cleanup, err := s.readTx(stream.Context(), req.Session, req.Transaction)
509	if err != nil {
510		return err
511	}
512	defer cleanup()
513
514	// Bail out if various advanced features are being used.
515	if req.Index != "" {
516		// This is okay; we can still return results.
517		s.logf("Warning: index reads (%q) not supported", req.Index)
518	}
519	if len(req.ResumeToken) > 0 {
520		// This should only happen if we send resume_token ourselves.
521		return fmt.Errorf("read resumption not supported")
522	}
523	if len(req.PartitionToken) > 0 {
524		return fmt.Errorf("partition restrictions not supported")
525	}
526
527	var ri rowIter
528	if req.KeySet.All {
529		s.logf("Reading all from %s (cols: %v)", req.Table, req.Columns)
530		ri, err = s.db.ReadAll(req.Table, req.Columns, req.Limit)
531	} else {
532		s.logf("Reading rows from %d keys and %d ranges from %s (cols: %v)", len(req.KeySet.Keys), len(req.KeySet.Ranges), req.Table, req.Columns)
533		ri, err = s.db.Read(req.Table, req.Columns, req.KeySet.Keys, makeKeyRangeList(req.KeySet.Ranges), req.Limit)
534	}
535	if err != nil {
536		return err
537	}
538
539	// TODO: Figure out the right contexts to use here. There's the session one (sess.ctx),
540	// but also this specific RPC one (stream.Context()). Which takes precedence?
541	// They appear to be independent.
542
543	return s.readStream(stream.Context(), tx, stream.Send, ri)
544}
545
546func (s *server) readStream(ctx context.Context, tx *transaction, send func(*spannerpb.PartialResultSet) error, ri rowIter) error {
547	// Build the result set metadata.
548	rsm := &spannerpb.ResultSetMetadata{
549		RowType: &spannerpb.StructType{},
550		// TODO: transaction info?
551	}
552	for _, ci := range ri.Cols() {
553		st, err := spannerTypeFromType(ci.Type)
554		if err != nil {
555			return err
556		}
557		rsm.RowType.Fields = append(rsm.RowType.Fields, &spannerpb.StructType_Field{
558			Name: ci.Name,
559			Type: st,
560		})
561	}
562
563	for {
564		row, err := ri.Next()
565		if err == io.EOF {
566			break
567		} else if err != nil {
568			return err
569		}
570
571		values := make([]*structpb.Value, len(row))
572		for i, x := range row {
573			v, err := spannerValueFromValue(x)
574			if err != nil {
575				return err
576			}
577			values[i] = v
578		}
579
580		prs := &spannerpb.PartialResultSet{
581			Metadata: rsm,
582			Values:   values,
583		}
584		if err := send(prs); err != nil {
585			return err
586		}
587
588		// ResultSetMetadata is only set for the first PartialResultSet.
589		rsm = nil
590	}
591
592	return nil
593}
594
595func (s *server) BeginTransaction(ctx context.Context, req *spannerpb.BeginTransactionRequest) (*spannerpb.Transaction, error) {
596	//s.logf("BeginTransaction(%v)", req)
597
598	s.mu.Lock()
599	sess, ok := s.sessions[req.Session]
600	s.mu.Unlock()
601	if !ok {
602		// TODO: what error does the real Spanner return?
603		return nil, status.Errorf(codes.NotFound, "unknown session %q", req.Session)
604	}
605
606	id := genRandomTransaction()
607	tx := s.db.NewTransaction()
608
609	sess.mu.Lock()
610	sess.lastUse = time.Now()
611	sess.transactions[id] = tx
612	sess.mu.Unlock()
613
614	tr := &spannerpb.Transaction{Id: []byte(id)}
615
616	if req.GetOptions().GetReadOnly().GetReturnReadTimestamp() {
617		// Return the last commit timestamp.
618		// This isn't wholly accurate, but may be good enough for simple use cases.
619		tr.ReadTimestamp = timestampProto(s.db.LastCommitTimestamp())
620	}
621
622	return tr, nil
623}
624
625func (s *server) Commit(ctx context.Context, req *spannerpb.CommitRequest) (resp *spannerpb.CommitResponse, err error) {
626	//s.logf("Commit(%q, %q)", req.Session, req.Transaction)
627
628	obj, ok := req.Transaction.(*spannerpb.CommitRequest_TransactionId)
629	if !ok {
630		return nil, fmt.Errorf("unsupported transaction type %T", req.Transaction)
631	}
632	tid := string(obj.TransactionId)
633
634	tx, err := s.popTx(req.Session, tid)
635	if err != nil {
636		return nil, err
637	}
638	defer func() {
639		if err != nil {
640			tx.Rollback()
641		}
642	}()
643	tx.Start()
644
645	for _, m := range req.Mutations {
646		switch op := m.Operation.(type) {
647		default:
648			return nil, fmt.Errorf("unsupported mutation operation type %T", op)
649		case *spannerpb.Mutation_Insert:
650			ins := op.Insert
651			err := s.db.Insert(tx, ins.Table, ins.Columns, ins.Values)
652			if err != nil {
653				return nil, err
654			}
655		case *spannerpb.Mutation_Update:
656			up := op.Update
657			err := s.db.Update(tx, up.Table, up.Columns, up.Values)
658			if err != nil {
659				return nil, err
660			}
661		case *spannerpb.Mutation_InsertOrUpdate:
662			iou := op.InsertOrUpdate
663			err := s.db.InsertOrUpdate(tx, iou.Table, iou.Columns, iou.Values)
664			if err != nil {
665				return nil, err
666			}
667		case *spannerpb.Mutation_Delete_:
668			del := op.Delete
669			ks := del.KeySet
670
671			err := s.db.Delete(tx, del.Table, ks.Keys, makeKeyRangeList(ks.Ranges), ks.All)
672			if err != nil {
673				return nil, err
674			}
675		}
676
677	}
678
679	ts, err := tx.Commit()
680	if err != nil {
681		return nil, err
682	}
683
684	return &spannerpb.CommitResponse{
685		CommitTimestamp: timestampProto(ts),
686	}, nil
687}
688
689func (s *server) Rollback(ctx context.Context, req *spannerpb.RollbackRequest) (*emptypb.Empty, error) {
690	s.logf("Rollback(%v)", req)
691
692	tx, err := s.popTx(req.Session, string(req.TransactionId))
693	if err != nil {
694		return nil, err
695	}
696
697	tx.Rollback()
698
699	return &emptypb.Empty{}, nil
700}
701
702// TODO: PartitionQuery, PartitionRead
703
704func parseQueryParams(p *structpb.Struct) (queryParams, error) {
705	params := make(queryParams)
706	for k, v := range p.GetFields() {
707		switch v := v.Kind.(type) {
708		default:
709			return nil, fmt.Errorf("unsupported well-known type value kind %T", v)
710		case *structpb.Value_NullValue:
711			params[k] = nil
712		case *structpb.Value_NumberValue:
713			params[k] = v.NumberValue
714		case *structpb.Value_StringValue:
715			params[k] = v.StringValue
716		}
717	}
718	return params, nil
719}
720
721func spannerTypeFromType(typ spansql.Type) (*spannerpb.Type, error) {
722	var code spannerpb.TypeCode
723	switch typ.Base {
724	default:
725		return nil, fmt.Errorf("unhandled base type %d", typ.Base)
726	case spansql.Bool:
727		code = spannerpb.TypeCode_BOOL
728	case spansql.Int64:
729		code = spannerpb.TypeCode_INT64
730	case spansql.Float64:
731		code = spannerpb.TypeCode_FLOAT64
732	case spansql.String:
733		code = spannerpb.TypeCode_STRING
734	case spansql.Bytes:
735		code = spannerpb.TypeCode_BYTES
736	case spansql.Date:
737		code = spannerpb.TypeCode_DATE
738	case spansql.Timestamp:
739		code = spannerpb.TypeCode_TIMESTAMP
740	}
741	st := &spannerpb.Type{Code: code}
742	if typ.Array {
743		st = &spannerpb.Type{
744			Code:             spannerpb.TypeCode_ARRAY,
745			ArrayElementType: st,
746		}
747	}
748	return st, nil
749}
750
751func spannerValueFromValue(x interface{}) (*structpb.Value, error) {
752	switch x := x.(type) {
753	default:
754		return nil, fmt.Errorf("unhandled database value type %T", x)
755	case bool:
756		return &structpb.Value{Kind: &structpb.Value_BoolValue{x}}, nil
757	case int64:
758		// The Spanner int64 is actually a decimal string.
759		s := strconv.FormatInt(x, 10)
760		return &structpb.Value{Kind: &structpb.Value_StringValue{s}}, nil
761	case float64:
762		return &structpb.Value{Kind: &structpb.Value_NumberValue{x}}, nil
763	case string:
764		return &structpb.Value{Kind: &structpb.Value_StringValue{x}}, nil
765	case []byte:
766		return &structpb.Value{Kind: &structpb.Value_StringValue{base64.StdEncoding.EncodeToString(x)}}, nil
767	case nil:
768		return &structpb.Value{Kind: &structpb.Value_NullValue{}}, nil
769	case []interface{}:
770		var vs []*structpb.Value
771		for _, elem := range x {
772			v, err := spannerValueFromValue(elem)
773			if err != nil {
774				return nil, err
775			}
776			vs = append(vs, v)
777		}
778		return &structpb.Value{Kind: &structpb.Value_ListValue{
779			&structpb.ListValue{Values: vs},
780		}}, nil
781	}
782}
783
784func makeKeyRangeList(ranges []*spannerpb.KeyRange) keyRangeList {
785	var krl keyRangeList
786	for _, r := range ranges {
787		krl = append(krl, makeKeyRange(r))
788	}
789	return krl
790}
791
792func makeKeyRange(r *spannerpb.KeyRange) *keyRange {
793	var kr keyRange
794	switch s := r.StartKeyType.(type) {
795	case *spannerpb.KeyRange_StartClosed:
796		kr.start = s.StartClosed
797		kr.startClosed = true
798	case *spannerpb.KeyRange_StartOpen:
799		kr.start = s.StartOpen
800	}
801	switch e := r.EndKeyType.(type) {
802	case *spannerpb.KeyRange_EndClosed:
803		kr.end = e.EndClosed
804		kr.endClosed = true
805	case *spannerpb.KeyRange_EndOpen:
806		kr.end = e.EndOpen
807	}
808	return &kr
809}
810