1/*
2Copyright 2019 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17/*
18Package spannertest contains test helpers for working with Cloud Spanner.
19
20This package is EXPERIMENTAL, and is lacking many features. See the README.md
21file in this directory for more details.
22
23In-memory fake
24
25This package has an in-memory fake implementation of spanner. To use it,
26create a Server, and then connect to it with no security:
27	srv, err := spannertest.NewServer("localhost:0")
28	...
29	conn, err := grpc.DialContext(ctx, srv.Addr, grpc.WithInsecure())
30	...
31	client, err := spanner.NewClient(ctx, db, option.WithGRPCConn(conn))
32	...
33
34Alternatively, create a Server, then set the SPANNER_EMULATOR_HOST environment
35variable and use the regular spanner.NewClient:
36	srv, err := spannertest.NewServer("localhost:0")
37	...
38	os.Setenv("SPANNER_EMULATOR_HOST", srv.Addr)
39	client, err := spanner.NewClient(ctx, db)
40	...
41
42The same server also supports database admin operations for use with
43the cloud.google.com/go/spanner/admin/database/apiv1 package.
44*/
45package spannertest
46
47import (
48	"context"
49	"encoding/base64"
50	"fmt"
51	"log"
52	"math/rand"
53	"net"
54	"strconv"
55	"sync"
56	"time"
57
58	"github.com/golang/protobuf/proto"
59	"github.com/golang/protobuf/ptypes"
60	"google.golang.org/grpc"
61	"google.golang.org/grpc/codes"
62	"google.golang.org/grpc/status"
63
64	anypb "github.com/golang/protobuf/ptypes/any"
65	emptypb "github.com/golang/protobuf/ptypes/empty"
66	structpb "github.com/golang/protobuf/ptypes/struct"
67	timestamppb "github.com/golang/protobuf/ptypes/timestamp"
68	lropb "google.golang.org/genproto/googleapis/longrunning"
69	adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1"
70	spannerpb "google.golang.org/genproto/googleapis/spanner/v1"
71
72	"cloud.google.com/go/spanner/spansql"
73)
74
75// Server is an in-memory Cloud Spanner fake.
76// It is unauthenticated, non-performant, and only a rough approximation.
77type Server struct {
78	Addr string
79
80	l   net.Listener
81	srv *grpc.Server
82	s   *server
83}
84
85// server is the real implementation of the fake.
86// It is a separate and unexported type so the API won't be cluttered with
87// methods that are only relevant to the fake's implementation.
88type server struct {
89	logf Logger
90
91	db database
92
93	mu       sync.Mutex
94	sessions map[string]*session
95	lros     map[string]*lro
96
97	// Any unimplemented methods will cause a panic.
98	// TODO: Switch to Unimplemented at some point? spannerpb would need regenerating.
99	adminpb.DatabaseAdminServer
100	spannerpb.SpannerServer
101	lropb.OperationsServer
102}
103
104type session struct {
105	name     string
106	creation time.Time
107
108	// This context tracks the lifetime of this session.
109	// It is canceled in DeleteSession.
110	ctx    context.Context
111	cancel func()
112
113	mu           sync.Mutex
114	lastUse      time.Time
115	transactions map[string]*transaction
116}
117
118func (s *session) Proto() *spannerpb.Session {
119	s.mu.Lock()
120	defer s.mu.Unlock()
121	m := &spannerpb.Session{
122		Name:                   s.name,
123		CreateTime:             timestampProto(s.creation),
124		ApproximateLastUseTime: timestampProto(s.lastUse),
125	}
126	return m
127}
128
129// timestampProto returns a valid timestamp.Timestamp,
130// or nil if the given time is zero or isn't representable.
131func timestampProto(t time.Time) *timestamppb.Timestamp {
132	if t.IsZero() {
133		return nil
134	}
135	ts, err := ptypes.TimestampProto(t)
136	if err != nil {
137		return nil
138	}
139	return ts
140}
141
142// lro represents a Long-Running Operation, generally a schema change.
143type lro struct {
144	mu    sync.Mutex
145	state *lropb.Operation
146}
147
148func (l *lro) State() *lropb.Operation {
149	l.mu.Lock()
150	defer l.mu.Unlock()
151	return proto.Clone(l.state).(*lropb.Operation)
152}
153
154// Logger is something that can be used for logging.
155// It is matched by log.Printf and testing.T.Logf.
156type Logger func(format string, args ...interface{})
157
158// NewServer creates a new Server.
159// The Server will be listening for gRPC connections, without TLS, on the provided TCP address.
160// The resolved address is available in the Addr field.
161func NewServer(laddr string) (*Server, error) {
162	l, err := net.Listen("tcp", laddr)
163	if err != nil {
164		return nil, err
165	}
166
167	s := &Server{
168		Addr: l.Addr().String(),
169		l:    l,
170		srv:  grpc.NewServer(),
171		s: &server{
172			logf: func(format string, args ...interface{}) {
173				log.Printf("spannertest.inmem: "+format, args...)
174			},
175			sessions: make(map[string]*session),
176			lros:     make(map[string]*lro),
177		},
178	}
179	adminpb.RegisterDatabaseAdminServer(s.srv, s.s)
180	spannerpb.RegisterSpannerServer(s.srv, s.s)
181	lropb.RegisterOperationsServer(s.srv, s.s)
182
183	go s.srv.Serve(s.l)
184
185	return s, nil
186}
187
188// SetLogger sets a logger for the server.
189// You can use a *testing.T as this argument to collate extra information
190// from the execution of the server.
191func (s *Server) SetLogger(l Logger) { s.s.logf = l }
192
193// Close shuts down the server.
194func (s *Server) Close() {
195	s.srv.Stop()
196	s.l.Close()
197}
198
199func genRandomSession() string {
200	var b [4]byte
201	rand.Read(b[:])
202	return fmt.Sprintf("%x", b)
203}
204
205func genRandomTransaction() string {
206	var b [6]byte
207	rand.Read(b[:])
208	return fmt.Sprintf("tx-%x", b)
209}
210
211func genRandomOperation() string {
212	var b [3]byte
213	rand.Read(b[:])
214	return fmt.Sprintf("op-%x", b)
215}
216
217func (s *server) GetOperation(ctx context.Context, req *lropb.GetOperationRequest) (*lropb.Operation, error) {
218	s.mu.Lock()
219	lro, ok := s.lros[req.Name]
220	s.mu.Unlock()
221	if !ok {
222		return nil, status.Errorf(codes.NotFound, "unknown LRO %q", req.Name)
223	}
224	return lro.State(), nil
225}
226
227// UpdateDDL applies the given DDL to the server.
228//
229// This is a convenience method for tests that may assume an existing schema.
230// The more general approach is to dial this server using an admin client, and
231// use the UpdateDatabaseDdl RPC method.
232func (s *Server) UpdateDDL(ddl *spansql.DDL) error {
233	ctx := context.Background()
234	for _, stmt := range ddl.List {
235		if st := s.s.runOneDDL(ctx, stmt); st.Code() != codes.OK {
236			return st.Err()
237		}
238	}
239	return nil
240}
241
242func (s *server) UpdateDatabaseDdl(ctx context.Context, req *adminpb.UpdateDatabaseDdlRequest) (*lropb.Operation, error) {
243	// Parse all the DDL statements first.
244	var stmts []spansql.DDLStmt
245	for _, s := range req.Statements {
246		stmt, err := spansql.ParseDDLStmt(s)
247		if err != nil {
248			// TODO: check what code the real Spanner returns here.
249			return nil, status.Errorf(codes.InvalidArgument, "bad DDL statement %q: %v", s, err)
250		}
251		stmts = append(stmts, stmt)
252	}
253
254	// Nothing should be depending on the exact structure of this,
255	// but it is specified in google/spanner/admin/database/v1/spanner_database_admin.proto.
256	id := "projects/fake-proj/instances/fake-instance/databases/fake-db/operations/" + genRandomOperation()
257	lro := &lro{
258		state: &lropb.Operation{
259			Name: id,
260		},
261	}
262	s.mu.Lock()
263	s.lros[id] = lro
264	s.mu.Unlock()
265
266	go lro.Run(s, stmts)
267	return lro.State(), nil
268}
269
270func (l *lro) Run(s *server, stmts []spansql.DDLStmt) {
271	ctx := context.Background()
272
273	for _, stmt := range stmts {
274		time.Sleep(100 * time.Millisecond)
275		if st := s.runOneDDL(ctx, stmt); st.Code() != codes.OK {
276			l.mu.Lock()
277			l.state.Done = true
278			l.state.Result = &lropb.Operation_Error{st.Proto()}
279			l.mu.Unlock()
280			return
281		}
282	}
283
284	l.mu.Lock()
285	l.state.Done = true
286	l.state.Result = &lropb.Operation_Response{&anypb.Any{}}
287	l.mu.Unlock()
288}
289
290func (s *server) runOneDDL(ctx context.Context, stmt spansql.DDLStmt) *status.Status {
291	return s.db.ApplyDDL(stmt)
292}
293
294func (s *server) CreateSession(ctx context.Context, req *spannerpb.CreateSessionRequest) (*spannerpb.Session, error) {
295	//s.logf("CreateSession(%q)", req.Database)
296	return s.newSession(), nil
297}
298
299func (s *server) newSession() *spannerpb.Session {
300	id := genRandomSession()
301	now := time.Now()
302	sess := &session{
303		name:         id,
304		creation:     now,
305		lastUse:      now,
306		transactions: make(map[string]*transaction),
307	}
308	sess.ctx, sess.cancel = context.WithCancel(context.Background())
309
310	s.mu.Lock()
311	s.sessions[id] = sess
312	s.mu.Unlock()
313
314	return sess.Proto()
315}
316
317func (s *server) BatchCreateSessions(ctx context.Context, req *spannerpb.BatchCreateSessionsRequest) (*spannerpb.BatchCreateSessionsResponse, error) {
318	//s.logf("BatchCreateSessions(%q)", req.Database)
319
320	var sessions []*spannerpb.Session
321	for i := int32(0); i < req.GetSessionCount(); i++ {
322		sessions = append(sessions, s.newSession())
323	}
324
325	return &spannerpb.BatchCreateSessionsResponse{Session: sessions}, nil
326}
327
328func (s *server) GetSession(ctx context.Context, req *spannerpb.GetSessionRequest) (*spannerpb.Session, error) {
329	s.mu.Lock()
330	sess, ok := s.sessions[req.Name]
331	s.mu.Unlock()
332
333	if !ok {
334		// TODO: what error does the real Spanner return?
335		return nil, status.Errorf(codes.NotFound, "unknown session %q", req.Name)
336	}
337
338	return sess.Proto(), nil
339}
340
341// TODO: ListSessions
342
343func (s *server) DeleteSession(ctx context.Context, req *spannerpb.DeleteSessionRequest) (*emptypb.Empty, error) {
344	//s.logf("DeleteSession(%q)", req.Name)
345
346	s.mu.Lock()
347	sess, ok := s.sessions[req.Name]
348	delete(s.sessions, req.Name)
349	s.mu.Unlock()
350
351	if !ok {
352		// TODO: what error does the real Spanner return?
353		return nil, status.Errorf(codes.NotFound, "unknown session %q", req.Name)
354	}
355
356	// Terminate any operations in this session.
357	sess.cancel()
358
359	return &emptypb.Empty{}, nil
360}
361
362// popTx returns an existing transaction, removing it from the session.
363// This is called when a transaction is finishing (Commit, Rollback).
364func (s *server) popTx(sessionID, tid string) (tx *transaction, err error) {
365	s.mu.Lock()
366	sess, ok := s.sessions[sessionID]
367	s.mu.Unlock()
368	if !ok {
369		// TODO: what error does the real Spanner return?
370		return nil, status.Errorf(codes.NotFound, "unknown session %q", sessionID)
371	}
372
373	sess.mu.Lock()
374	sess.lastUse = time.Now()
375	tx, ok = sess.transactions[tid]
376	if ok {
377		delete(sess.transactions, tid)
378	}
379	sess.mu.Unlock()
380	if !ok {
381		// TODO: what error does the real Spanner return?
382		return nil, status.Errorf(codes.NotFound, "unknown transaction ID %q", tid)
383	}
384	return tx, nil
385}
386
387// readTx returns a transaction for the given session and transaction selector.
388// It is used by read/query operations (ExecuteStreamingSql, StreamingRead).
389func (s *server) readTx(ctx context.Context, session string, tsel *spannerpb.TransactionSelector) (tx *transaction, cleanup func(), err error) {
390	s.mu.Lock()
391	sess, ok := s.sessions[session]
392	s.mu.Unlock()
393	if !ok {
394		// TODO: what error does the real Spanner return?
395		return nil, nil, status.Errorf(codes.NotFound, "unknown session %q", session)
396	}
397
398	sess.mu.Lock()
399	sess.lastUse = time.Now()
400	sess.mu.Unlock()
401
402	// Only give a read-only transaction regardless of whether the selector
403	// is requesting a read-write or read-only one, since this is in readTx
404	// and so shouldn't be mutating anyway.
405	singleUse := func() (*transaction, func(), error) {
406		tx := s.db.NewReadOnlyTransaction()
407		return tx, tx.Rollback, nil
408	}
409
410	if tsel.GetSelector() == nil {
411		return singleUse()
412	}
413
414	switch sel := tsel.Selector.(type) {
415	default:
416		return nil, nil, fmt.Errorf("TransactionSelector type %T not supported", sel)
417	case *spannerpb.TransactionSelector_SingleUse:
418		// Ignore options (e.g. timestamps).
419		switch mode := sel.SingleUse.Mode.(type) {
420		case *spannerpb.TransactionOptions_ReadOnly_:
421			return singleUse()
422		case *spannerpb.TransactionOptions_ReadWrite_:
423			return singleUse()
424		default:
425			return nil, nil, fmt.Errorf("single use transaction in mode %T not supported", mode)
426		}
427	case *spannerpb.TransactionSelector_Id:
428		sess.mu.Lock()
429		tx, ok := sess.transactions[string(sel.Id)]
430		sess.mu.Unlock()
431		if !ok {
432			return nil, nil, fmt.Errorf("no transaction with id %q", sel.Id)
433		}
434		return tx, func() {}, nil
435	}
436}
437
438func (s *server) ExecuteSql(ctx context.Context, req *spannerpb.ExecuteSqlRequest) (*spannerpb.ResultSet, error) {
439	// Assume this is probably a DML statement. Queries tend to use ExecuteStreamingSql.
440	// TODO: Expand this to support more things.
441
442	obj, ok := req.Transaction.Selector.(*spannerpb.TransactionSelector_Id)
443	if !ok {
444		return nil, fmt.Errorf("unsupported transaction type %T", req.Transaction.Selector)
445	}
446	tid := string(obj.Id)
447	_ = tid // TODO: lookup an existing transaction by ID.
448
449	stmt, err := spansql.ParseDMLStmt(req.Sql)
450	if err != nil {
451		return nil, status.Errorf(codes.InvalidArgument, "bad DML: %v", err)
452	}
453	params, err := parseQueryParams(req.GetParams())
454	if err != nil {
455		return nil, err
456	}
457
458	s.logf("Executing: %s", stmt.SQL())
459	if len(params) > 0 {
460		s.logf("        ▹ %v", params)
461	}
462
463	n, err := s.db.Execute(stmt, params)
464	if err != nil {
465		return nil, err
466	}
467	return &spannerpb.ResultSet{
468		Stats: &spannerpb.ResultSetStats{
469			RowCount: &spannerpb.ResultSetStats_RowCountExact{int64(n)},
470		},
471	}, nil
472}
473
474func (s *server) ExecuteStreamingSql(req *spannerpb.ExecuteSqlRequest, stream spannerpb.Spanner_ExecuteStreamingSqlServer) error {
475	tx, cleanup, err := s.readTx(stream.Context(), req.Session, req.Transaction)
476	if err != nil {
477		return err
478	}
479	defer cleanup()
480
481	q, err := spansql.ParseQuery(req.Sql)
482	if err != nil {
483		// TODO: check what code the real Spanner returns here.
484		return status.Errorf(codes.InvalidArgument, "bad query: %v", err)
485	}
486
487	params, err := parseQueryParams(req.GetParams())
488	if err != nil {
489		return err
490	}
491
492	s.logf("Querying: %s", q.SQL())
493	if len(params) > 0 {
494		s.logf("        ▹ %v", params)
495	}
496
497	ri, err := s.db.Query(q, params)
498	if err != nil {
499		return err
500	}
501	return s.readStream(stream.Context(), tx, stream.Send, ri)
502}
503
504// TODO: Read
505
506func (s *server) StreamingRead(req *spannerpb.ReadRequest, stream spannerpb.Spanner_StreamingReadServer) error {
507	tx, cleanup, err := s.readTx(stream.Context(), req.Session, req.Transaction)
508	if err != nil {
509		return err
510	}
511	defer cleanup()
512
513	// Bail out if various advanced features are being used.
514	if req.Index != "" {
515		return fmt.Errorf("index reads (%q) not supported", req.Index)
516	}
517	if len(req.ResumeToken) > 0 {
518		// This should only happen if we send resume_token ourselves.
519		return fmt.Errorf("read resumption not supported")
520	}
521	if len(req.PartitionToken) > 0 {
522		return fmt.Errorf("partition restrictions not supported")
523	}
524
525	var ri *resultIter
526	if req.KeySet.All {
527		s.logf("Reading all from %s (cols: %v)", req.Table, req.Columns)
528		ri, err = s.db.ReadAll(req.Table, req.Columns, req.Limit)
529	} else {
530		s.logf("Reading rows from %d keys and %d ranges from %s (cols: %v)", len(req.KeySet.Keys), len(req.KeySet.Ranges), req.Table, req.Columns)
531		ri, err = s.db.Read(req.Table, req.Columns, req.KeySet.Keys, makeKeyRangeList(req.KeySet.Ranges), req.Limit)
532	}
533	if err != nil {
534		return err
535	}
536
537	// TODO: Figure out the right contexts to use here. There's the session one (sess.ctx),
538	// but also this specific RPC one (stream.Context()). Which takes precedence?
539	// They appear to be independent.
540
541	return s.readStream(stream.Context(), tx, stream.Send, ri)
542}
543
544func (s *server) readStream(ctx context.Context, tx *transaction, send func(*spannerpb.PartialResultSet) error, ri *resultIter) error {
545	// Build the result set metadata.
546	rsm := &spannerpb.ResultSetMetadata{
547		RowType: &spannerpb.StructType{},
548		// TODO: transaction info?
549	}
550	for _, ci := range ri.Cols {
551		st, err := spannerTypeFromType(ci.Type)
552		if err != nil {
553			return err
554		}
555		rsm.RowType.Fields = append(rsm.RowType.Fields, &spannerpb.StructType_Field{
556			Name: ci.Name,
557			Type: st,
558		})
559	}
560
561	for {
562		row, ok := ri.Next()
563		if !ok {
564			break
565		}
566
567		values := make([]*structpb.Value, len(row))
568		for i, x := range row {
569			v, err := spannerValueFromValue(x)
570			if err != nil {
571				return err
572			}
573			values[i] = v
574		}
575
576		prs := &spannerpb.PartialResultSet{
577			Metadata: rsm,
578			Values:   values,
579		}
580		if err := send(prs); err != nil {
581			return err
582		}
583
584		// ResultSetMetadata is only set for the first PartialResultSet.
585		rsm = nil
586	}
587
588	return nil
589}
590
591func (s *server) BeginTransaction(ctx context.Context, req *spannerpb.BeginTransactionRequest) (*spannerpb.Transaction, error) {
592	//s.logf("BeginTransaction(%v)", req)
593
594	s.mu.Lock()
595	sess, ok := s.sessions[req.Session]
596	s.mu.Unlock()
597	if !ok {
598		// TODO: what error does the real Spanner return?
599		return nil, status.Errorf(codes.NotFound, "unknown session %q", req.Session)
600	}
601
602	id := genRandomTransaction()
603	tx := s.db.NewTransaction()
604
605	sess.mu.Lock()
606	sess.lastUse = time.Now()
607	sess.transactions[id] = tx
608	sess.mu.Unlock()
609
610	return &spannerpb.Transaction{Id: []byte(id)}, nil
611}
612
613func (s *server) Commit(ctx context.Context, req *spannerpb.CommitRequest) (resp *spannerpb.CommitResponse, err error) {
614	//s.logf("Commit(%q, %q)", req.Session, req.Transaction)
615
616	obj, ok := req.Transaction.(*spannerpb.CommitRequest_TransactionId)
617	if !ok {
618		return nil, fmt.Errorf("unsupported transaction type %T", req.Transaction)
619	}
620	tid := string(obj.TransactionId)
621
622	tx, err := s.popTx(req.Session, tid)
623	if err != nil {
624		return nil, err
625	}
626	defer func() {
627		if err != nil {
628			tx.Rollback()
629		}
630	}()
631	tx.Start()
632
633	for _, m := range req.Mutations {
634		switch op := m.Operation.(type) {
635		default:
636			return nil, fmt.Errorf("unsupported mutation operation type %T", op)
637		case *spannerpb.Mutation_Insert:
638			ins := op.Insert
639			err := s.db.Insert(tx, ins.Table, ins.Columns, ins.Values)
640			if err != nil {
641				return nil, err
642			}
643		case *spannerpb.Mutation_Update:
644			up := op.Update
645			err := s.db.Update(tx, up.Table, up.Columns, up.Values)
646			if err != nil {
647				return nil, err
648			}
649		case *spannerpb.Mutation_InsertOrUpdate:
650			iou := op.InsertOrUpdate
651			err := s.db.InsertOrUpdate(tx, iou.Table, iou.Columns, iou.Values)
652			if err != nil {
653				return nil, err
654			}
655		case *spannerpb.Mutation_Delete_:
656			del := op.Delete
657			ks := del.KeySet
658
659			err := s.db.Delete(tx, del.Table, ks.Keys, makeKeyRangeList(ks.Ranges), ks.All)
660			if err != nil {
661				return nil, err
662			}
663		}
664
665	}
666
667	ts, err := tx.Commit()
668	if err != nil {
669		return nil, err
670	}
671
672	return &spannerpb.CommitResponse{
673		CommitTimestamp: timestampProto(ts),
674	}, nil
675}
676
677func (s *server) Rollback(ctx context.Context, req *spannerpb.RollbackRequest) (*emptypb.Empty, error) {
678	s.logf("Rollback(%v)", req)
679
680	tx, err := s.popTx(req.Session, string(req.TransactionId))
681	if err != nil {
682		return nil, err
683	}
684
685	tx.Rollback()
686
687	return &emptypb.Empty{}, nil
688}
689
690// TODO: PartitionQuery, PartitionRead
691
692func parseQueryParams(p *structpb.Struct) (queryParams, error) {
693	params := make(queryParams)
694	for k, v := range p.GetFields() {
695		switch v := v.Kind.(type) {
696		default:
697			return nil, fmt.Errorf("unsupported well-known type value kind %T", v)
698		case *structpb.Value_NullValue:
699			params[k] = nil
700		case *structpb.Value_NumberValue:
701			params[k] = v.NumberValue
702		case *structpb.Value_StringValue:
703			params[k] = v.StringValue
704		}
705	}
706	return params, nil
707}
708
709func spannerTypeFromType(typ spansql.Type) (*spannerpb.Type, error) {
710	var code spannerpb.TypeCode
711	switch typ.Base {
712	default:
713		return nil, fmt.Errorf("unhandled base type %d", typ.Base)
714	case spansql.Bool:
715		code = spannerpb.TypeCode_BOOL
716	case spansql.Int64:
717		code = spannerpb.TypeCode_INT64
718	case spansql.Float64:
719		code = spannerpb.TypeCode_FLOAT64
720	case spansql.String:
721		code = spannerpb.TypeCode_STRING
722	case spansql.Bytes:
723		code = spannerpb.TypeCode_BYTES
724	case spansql.Date:
725		code = spannerpb.TypeCode_DATE
726	case spansql.Timestamp:
727		code = spannerpb.TypeCode_TIMESTAMP
728	}
729	st := &spannerpb.Type{Code: code}
730	if typ.Array {
731		st = &spannerpb.Type{
732			Code:             spannerpb.TypeCode_ARRAY,
733			ArrayElementType: st,
734		}
735	}
736	return st, nil
737}
738
739func spannerValueFromValue(x interface{}) (*structpb.Value, error) {
740	switch x := x.(type) {
741	default:
742		return nil, fmt.Errorf("unhandled database value type %T", x)
743	case bool:
744		return &structpb.Value{Kind: &structpb.Value_BoolValue{x}}, nil
745	case int64:
746		// The Spanner int64 is actually a decimal string.
747		s := strconv.FormatInt(x, 10)
748		return &structpb.Value{Kind: &structpb.Value_StringValue{s}}, nil
749	case float64:
750		return &structpb.Value{Kind: &structpb.Value_NumberValue{x}}, nil
751	case string:
752		return &structpb.Value{Kind: &structpb.Value_StringValue{x}}, nil
753	case []byte:
754		return &structpb.Value{Kind: &structpb.Value_StringValue{base64.StdEncoding.EncodeToString(x)}}, nil
755	case nil:
756		return &structpb.Value{Kind: &structpb.Value_NullValue{}}, nil
757	case []interface{}:
758		var vs []*structpb.Value
759		for _, elem := range x {
760			v, err := spannerValueFromValue(elem)
761			if err != nil {
762				return nil, err
763			}
764			vs = append(vs, v)
765		}
766		return &structpb.Value{Kind: &structpb.Value_ListValue{
767			&structpb.ListValue{Values: vs},
768		}}, nil
769	}
770}
771
772func makeKeyRangeList(ranges []*spannerpb.KeyRange) keyRangeList {
773	var krl keyRangeList
774	for _, r := range ranges {
775		krl = append(krl, makeKeyRange(r))
776	}
777	return krl
778}
779
780func makeKeyRange(r *spannerpb.KeyRange) *keyRange {
781	var kr keyRange
782	switch s := r.StartKeyType.(type) {
783	case *spannerpb.KeyRange_StartClosed:
784		kr.start = s.StartClosed
785		kr.startClosed = true
786	case *spannerpb.KeyRange_StartOpen:
787		kr.start = s.StartOpen
788	}
789	switch e := r.EndKeyType.(type) {
790	case *spannerpb.KeyRange_EndClosed:
791		kr.end = e.EndClosed
792		kr.endClosed = true
793	case *spannerpb.KeyRange_EndOpen:
794		kr.end = e.EndOpen
795	}
796	return &kr
797}
798