1/* 2Copyright 2019 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17/* 18Package spannertest contains test helpers for working with Cloud Spanner. 19 20This package is EXPERIMENTAL, and is lacking many features. See the README.md 21file in this directory for more details. 22 23In-memory fake 24 25This package has an in-memory fake implementation of spanner. To use it, 26create a Server, and then connect to it with no security: 27 srv, err := spannertest.NewServer("localhost:0") 28 ... 29 conn, err := grpc.DialContext(ctx, srv.Addr, grpc.WithInsecure()) 30 ... 31 client, err := spanner.NewClient(ctx, db, option.WithGRPCConn(conn)) 32 ... 33 34Alternatively, create a Server, then set the SPANNER_EMULATOR_HOST environment 35variable and use the regular spanner.NewClient: 36 srv, err := spannertest.NewServer("localhost:0") 37 ... 38 os.Setenv("SPANNER_EMULATOR_HOST", srv.Addr) 39 client, err := spanner.NewClient(ctx, db) 40 ... 41 42The same server also supports database admin operations for use with 43the cloud.google.com/go/spanner/admin/database/apiv1 package. 44*/ 45package spannertest 46 47import ( 48 "context" 49 "encoding/base64" 50 "fmt" 51 "log" 52 "math/rand" 53 "net" 54 "strconv" 55 "sync" 56 "time" 57 58 "github.com/golang/protobuf/proto" 59 "github.com/golang/protobuf/ptypes" 60 "google.golang.org/grpc" 61 "google.golang.org/grpc/codes" 62 "google.golang.org/grpc/status" 63 64 anypb "github.com/golang/protobuf/ptypes/any" 65 emptypb "github.com/golang/protobuf/ptypes/empty" 66 structpb "github.com/golang/protobuf/ptypes/struct" 67 timestamppb "github.com/golang/protobuf/ptypes/timestamp" 68 lropb "google.golang.org/genproto/googleapis/longrunning" 69 adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1" 70 spannerpb "google.golang.org/genproto/googleapis/spanner/v1" 71 72 "cloud.google.com/go/spanner/spansql" 73) 74 75// Server is an in-memory Cloud Spanner fake. 76// It is unauthenticated, non-performant, and only a rough approximation. 77type Server struct { 78 Addr string 79 80 l net.Listener 81 srv *grpc.Server 82 s *server 83} 84 85// server is the real implementation of the fake. 86// It is a separate and unexported type so the API won't be cluttered with 87// methods that are only relevant to the fake's implementation. 88type server struct { 89 logf Logger 90 91 db database 92 93 mu sync.Mutex 94 sessions map[string]*session 95 lros map[string]*lro 96 97 // Any unimplemented methods will cause a panic. 98 // TODO: Switch to Unimplemented at some point? spannerpb would need regenerating. 99 adminpb.DatabaseAdminServer 100 spannerpb.SpannerServer 101 lropb.OperationsServer 102} 103 104type session struct { 105 name string 106 creation time.Time 107 108 // This context tracks the lifetime of this session. 109 // It is canceled in DeleteSession. 110 ctx context.Context 111 cancel func() 112 113 mu sync.Mutex 114 lastUse time.Time 115 transactions map[string]*transaction 116} 117 118func (s *session) Proto() *spannerpb.Session { 119 s.mu.Lock() 120 defer s.mu.Unlock() 121 m := &spannerpb.Session{ 122 Name: s.name, 123 CreateTime: timestampProto(s.creation), 124 ApproximateLastUseTime: timestampProto(s.lastUse), 125 } 126 return m 127} 128 129// timestampProto returns a valid timestamp.Timestamp, 130// or nil if the given time is zero or isn't representable. 131func timestampProto(t time.Time) *timestamppb.Timestamp { 132 if t.IsZero() { 133 return nil 134 } 135 ts, err := ptypes.TimestampProto(t) 136 if err != nil { 137 return nil 138 } 139 return ts 140} 141 142// lro represents a Long-Running Operation, generally a schema change. 143type lro struct { 144 mu sync.Mutex 145 state *lropb.Operation 146} 147 148func (l *lro) State() *lropb.Operation { 149 l.mu.Lock() 150 defer l.mu.Unlock() 151 return proto.Clone(l.state).(*lropb.Operation) 152} 153 154// Logger is something that can be used for logging. 155// It is matched by log.Printf and testing.T.Logf. 156type Logger func(format string, args ...interface{}) 157 158// NewServer creates a new Server. 159// The Server will be listening for gRPC connections, without TLS, on the provided TCP address. 160// The resolved address is available in the Addr field. 161func NewServer(laddr string) (*Server, error) { 162 l, err := net.Listen("tcp", laddr) 163 if err != nil { 164 return nil, err 165 } 166 167 s := &Server{ 168 Addr: l.Addr().String(), 169 l: l, 170 srv: grpc.NewServer(), 171 s: &server{ 172 logf: func(format string, args ...interface{}) { 173 log.Printf("spannertest.inmem: "+format, args...) 174 }, 175 sessions: make(map[string]*session), 176 lros: make(map[string]*lro), 177 }, 178 } 179 adminpb.RegisterDatabaseAdminServer(s.srv, s.s) 180 spannerpb.RegisterSpannerServer(s.srv, s.s) 181 lropb.RegisterOperationsServer(s.srv, s.s) 182 183 go s.srv.Serve(s.l) 184 185 return s, nil 186} 187 188// SetLogger sets a logger for the server. 189// You can use a *testing.T as this argument to collate extra information 190// from the execution of the server. 191func (s *Server) SetLogger(l Logger) { s.s.logf = l } 192 193// Close shuts down the server. 194func (s *Server) Close() { 195 s.srv.Stop() 196 s.l.Close() 197} 198 199func genRandomSession() string { 200 var b [4]byte 201 rand.Read(b[:]) 202 return fmt.Sprintf("%x", b) 203} 204 205func genRandomTransaction() string { 206 var b [6]byte 207 rand.Read(b[:]) 208 return fmt.Sprintf("tx-%x", b) 209} 210 211func genRandomOperation() string { 212 var b [3]byte 213 rand.Read(b[:]) 214 return fmt.Sprintf("op-%x", b) 215} 216 217func (s *server) GetOperation(ctx context.Context, req *lropb.GetOperationRequest) (*lropb.Operation, error) { 218 s.mu.Lock() 219 lro, ok := s.lros[req.Name] 220 s.mu.Unlock() 221 if !ok { 222 return nil, status.Errorf(codes.NotFound, "unknown LRO %q", req.Name) 223 } 224 return lro.State(), nil 225} 226 227// UpdateDDL applies the given DDL to the server. 228// 229// This is a convenience method for tests that may assume an existing schema. 230// The more general approach is to dial this server using an admin client, and 231// use the UpdateDatabaseDdl RPC method. 232func (s *Server) UpdateDDL(ddl *spansql.DDL) error { 233 ctx := context.Background() 234 for _, stmt := range ddl.List { 235 if st := s.s.runOneDDL(ctx, stmt); st.Code() != codes.OK { 236 return st.Err() 237 } 238 } 239 return nil 240} 241 242func (s *server) UpdateDatabaseDdl(ctx context.Context, req *adminpb.UpdateDatabaseDdlRequest) (*lropb.Operation, error) { 243 // Parse all the DDL statements first. 244 var stmts []spansql.DDLStmt 245 for _, s := range req.Statements { 246 stmt, err := spansql.ParseDDLStmt(s) 247 if err != nil { 248 // TODO: check what code the real Spanner returns here. 249 return nil, status.Errorf(codes.InvalidArgument, "bad DDL statement %q: %v", s, err) 250 } 251 stmts = append(stmts, stmt) 252 } 253 254 // Nothing should be depending on the exact structure of this, 255 // but it is specified in google/spanner/admin/database/v1/spanner_database_admin.proto. 256 id := "projects/fake-proj/instances/fake-instance/databases/fake-db/operations/" + genRandomOperation() 257 lro := &lro{ 258 state: &lropb.Operation{ 259 Name: id, 260 }, 261 } 262 s.mu.Lock() 263 s.lros[id] = lro 264 s.mu.Unlock() 265 266 go lro.Run(s, stmts) 267 return lro.State(), nil 268} 269 270func (l *lro) Run(s *server, stmts []spansql.DDLStmt) { 271 ctx := context.Background() 272 273 for _, stmt := range stmts { 274 time.Sleep(100 * time.Millisecond) 275 if st := s.runOneDDL(ctx, stmt); st.Code() != codes.OK { 276 l.mu.Lock() 277 l.state.Done = true 278 l.state.Result = &lropb.Operation_Error{st.Proto()} 279 l.mu.Unlock() 280 return 281 } 282 } 283 284 l.mu.Lock() 285 l.state.Done = true 286 l.state.Result = &lropb.Operation_Response{&anypb.Any{}} 287 l.mu.Unlock() 288} 289 290func (s *server) runOneDDL(ctx context.Context, stmt spansql.DDLStmt) *status.Status { 291 return s.db.ApplyDDL(stmt) 292} 293 294func (s *server) CreateSession(ctx context.Context, req *spannerpb.CreateSessionRequest) (*spannerpb.Session, error) { 295 //s.logf("CreateSession(%q)", req.Database) 296 return s.newSession(), nil 297} 298 299func (s *server) newSession() *spannerpb.Session { 300 id := genRandomSession() 301 now := time.Now() 302 sess := &session{ 303 name: id, 304 creation: now, 305 lastUse: now, 306 transactions: make(map[string]*transaction), 307 } 308 sess.ctx, sess.cancel = context.WithCancel(context.Background()) 309 310 s.mu.Lock() 311 s.sessions[id] = sess 312 s.mu.Unlock() 313 314 return sess.Proto() 315} 316 317func (s *server) BatchCreateSessions(ctx context.Context, req *spannerpb.BatchCreateSessionsRequest) (*spannerpb.BatchCreateSessionsResponse, error) { 318 //s.logf("BatchCreateSessions(%q)", req.Database) 319 320 var sessions []*spannerpb.Session 321 for i := int32(0); i < req.GetSessionCount(); i++ { 322 sessions = append(sessions, s.newSession()) 323 } 324 325 return &spannerpb.BatchCreateSessionsResponse{Session: sessions}, nil 326} 327 328func (s *server) GetSession(ctx context.Context, req *spannerpb.GetSessionRequest) (*spannerpb.Session, error) { 329 s.mu.Lock() 330 sess, ok := s.sessions[req.Name] 331 s.mu.Unlock() 332 333 if !ok { 334 // TODO: what error does the real Spanner return? 335 return nil, status.Errorf(codes.NotFound, "unknown session %q", req.Name) 336 } 337 338 return sess.Proto(), nil 339} 340 341// TODO: ListSessions 342 343func (s *server) DeleteSession(ctx context.Context, req *spannerpb.DeleteSessionRequest) (*emptypb.Empty, error) { 344 //s.logf("DeleteSession(%q)", req.Name) 345 346 s.mu.Lock() 347 sess, ok := s.sessions[req.Name] 348 delete(s.sessions, req.Name) 349 s.mu.Unlock() 350 351 if !ok { 352 // TODO: what error does the real Spanner return? 353 return nil, status.Errorf(codes.NotFound, "unknown session %q", req.Name) 354 } 355 356 // Terminate any operations in this session. 357 sess.cancel() 358 359 return &emptypb.Empty{}, nil 360} 361 362// popTx returns an existing transaction, removing it from the session. 363// This is called when a transaction is finishing (Commit, Rollback). 364func (s *server) popTx(sessionID, tid string) (tx *transaction, err error) { 365 s.mu.Lock() 366 sess, ok := s.sessions[sessionID] 367 s.mu.Unlock() 368 if !ok { 369 // TODO: what error does the real Spanner return? 370 return nil, status.Errorf(codes.NotFound, "unknown session %q", sessionID) 371 } 372 373 sess.mu.Lock() 374 sess.lastUse = time.Now() 375 tx, ok = sess.transactions[tid] 376 if ok { 377 delete(sess.transactions, tid) 378 } 379 sess.mu.Unlock() 380 if !ok { 381 // TODO: what error does the real Spanner return? 382 return nil, status.Errorf(codes.NotFound, "unknown transaction ID %q", tid) 383 } 384 return tx, nil 385} 386 387// readTx returns a transaction for the given session and transaction selector. 388// It is used by read/query operations (ExecuteStreamingSql, StreamingRead). 389func (s *server) readTx(ctx context.Context, session string, tsel *spannerpb.TransactionSelector) (tx *transaction, cleanup func(), err error) { 390 s.mu.Lock() 391 sess, ok := s.sessions[session] 392 s.mu.Unlock() 393 if !ok { 394 // TODO: what error does the real Spanner return? 395 return nil, nil, status.Errorf(codes.NotFound, "unknown session %q", session) 396 } 397 398 sess.mu.Lock() 399 sess.lastUse = time.Now() 400 sess.mu.Unlock() 401 402 // Only give a read-only transaction regardless of whether the selector 403 // is requesting a read-write or read-only one, since this is in readTx 404 // and so shouldn't be mutating anyway. 405 singleUse := func() (*transaction, func(), error) { 406 tx := s.db.NewReadOnlyTransaction() 407 return tx, tx.Rollback, nil 408 } 409 410 if tsel.GetSelector() == nil { 411 return singleUse() 412 } 413 414 switch sel := tsel.Selector.(type) { 415 default: 416 return nil, nil, fmt.Errorf("TransactionSelector type %T not supported", sel) 417 case *spannerpb.TransactionSelector_SingleUse: 418 // Ignore options (e.g. timestamps). 419 switch mode := sel.SingleUse.Mode.(type) { 420 case *spannerpb.TransactionOptions_ReadOnly_: 421 return singleUse() 422 case *spannerpb.TransactionOptions_ReadWrite_: 423 return singleUse() 424 default: 425 return nil, nil, fmt.Errorf("single use transaction in mode %T not supported", mode) 426 } 427 case *spannerpb.TransactionSelector_Id: 428 sess.mu.Lock() 429 tx, ok := sess.transactions[string(sel.Id)] 430 sess.mu.Unlock() 431 if !ok { 432 return nil, nil, fmt.Errorf("no transaction with id %q", sel.Id) 433 } 434 return tx, func() {}, nil 435 } 436} 437 438func (s *server) ExecuteSql(ctx context.Context, req *spannerpb.ExecuteSqlRequest) (*spannerpb.ResultSet, error) { 439 // Assume this is probably a DML statement. Queries tend to use ExecuteStreamingSql. 440 // TODO: Expand this to support more things. 441 442 obj, ok := req.Transaction.Selector.(*spannerpb.TransactionSelector_Id) 443 if !ok { 444 return nil, fmt.Errorf("unsupported transaction type %T", req.Transaction.Selector) 445 } 446 tid := string(obj.Id) 447 _ = tid // TODO: lookup an existing transaction by ID. 448 449 stmt, err := spansql.ParseDMLStmt(req.Sql) 450 if err != nil { 451 return nil, status.Errorf(codes.InvalidArgument, "bad DML: %v", err) 452 } 453 params, err := parseQueryParams(req.GetParams()) 454 if err != nil { 455 return nil, err 456 } 457 458 s.logf("Executing: %s", stmt.SQL()) 459 if len(params) > 0 { 460 s.logf(" ▹ %v", params) 461 } 462 463 n, err := s.db.Execute(stmt, params) 464 if err != nil { 465 return nil, err 466 } 467 return &spannerpb.ResultSet{ 468 Stats: &spannerpb.ResultSetStats{ 469 RowCount: &spannerpb.ResultSetStats_RowCountExact{int64(n)}, 470 }, 471 }, nil 472} 473 474func (s *server) ExecuteStreamingSql(req *spannerpb.ExecuteSqlRequest, stream spannerpb.Spanner_ExecuteStreamingSqlServer) error { 475 tx, cleanup, err := s.readTx(stream.Context(), req.Session, req.Transaction) 476 if err != nil { 477 return err 478 } 479 defer cleanup() 480 481 q, err := spansql.ParseQuery(req.Sql) 482 if err != nil { 483 // TODO: check what code the real Spanner returns here. 484 return status.Errorf(codes.InvalidArgument, "bad query: %v", err) 485 } 486 487 params, err := parseQueryParams(req.GetParams()) 488 if err != nil { 489 return err 490 } 491 492 s.logf("Querying: %s", q.SQL()) 493 if len(params) > 0 { 494 s.logf(" ▹ %v", params) 495 } 496 497 ri, err := s.db.Query(q, params) 498 if err != nil { 499 return err 500 } 501 return s.readStream(stream.Context(), tx, stream.Send, ri) 502} 503 504// TODO: Read 505 506func (s *server) StreamingRead(req *spannerpb.ReadRequest, stream spannerpb.Spanner_StreamingReadServer) error { 507 tx, cleanup, err := s.readTx(stream.Context(), req.Session, req.Transaction) 508 if err != nil { 509 return err 510 } 511 defer cleanup() 512 513 // Bail out if various advanced features are being used. 514 if req.Index != "" { 515 return fmt.Errorf("index reads (%q) not supported", req.Index) 516 } 517 if len(req.ResumeToken) > 0 { 518 // This should only happen if we send resume_token ourselves. 519 return fmt.Errorf("read resumption not supported") 520 } 521 if len(req.PartitionToken) > 0 { 522 return fmt.Errorf("partition restrictions not supported") 523 } 524 525 var ri *resultIter 526 if req.KeySet.All { 527 s.logf("Reading all from %s (cols: %v)", req.Table, req.Columns) 528 ri, err = s.db.ReadAll(req.Table, req.Columns, req.Limit) 529 } else { 530 s.logf("Reading rows from %d keys and %d ranges from %s (cols: %v)", len(req.KeySet.Keys), len(req.KeySet.Ranges), req.Table, req.Columns) 531 ri, err = s.db.Read(req.Table, req.Columns, req.KeySet.Keys, makeKeyRangeList(req.KeySet.Ranges), req.Limit) 532 } 533 if err != nil { 534 return err 535 } 536 537 // TODO: Figure out the right contexts to use here. There's the session one (sess.ctx), 538 // but also this specific RPC one (stream.Context()). Which takes precedence? 539 // They appear to be independent. 540 541 return s.readStream(stream.Context(), tx, stream.Send, ri) 542} 543 544func (s *server) readStream(ctx context.Context, tx *transaction, send func(*spannerpb.PartialResultSet) error, ri *resultIter) error { 545 // Build the result set metadata. 546 rsm := &spannerpb.ResultSetMetadata{ 547 RowType: &spannerpb.StructType{}, 548 // TODO: transaction info? 549 } 550 for _, ci := range ri.Cols { 551 st, err := spannerTypeFromType(ci.Type) 552 if err != nil { 553 return err 554 } 555 rsm.RowType.Fields = append(rsm.RowType.Fields, &spannerpb.StructType_Field{ 556 Name: ci.Name, 557 Type: st, 558 }) 559 } 560 561 for { 562 row, ok := ri.Next() 563 if !ok { 564 break 565 } 566 567 values := make([]*structpb.Value, len(row)) 568 for i, x := range row { 569 v, err := spannerValueFromValue(x) 570 if err != nil { 571 return err 572 } 573 values[i] = v 574 } 575 576 prs := &spannerpb.PartialResultSet{ 577 Metadata: rsm, 578 Values: values, 579 } 580 if err := send(prs); err != nil { 581 return err 582 } 583 584 // ResultSetMetadata is only set for the first PartialResultSet. 585 rsm = nil 586 } 587 588 return nil 589} 590 591func (s *server) BeginTransaction(ctx context.Context, req *spannerpb.BeginTransactionRequest) (*spannerpb.Transaction, error) { 592 //s.logf("BeginTransaction(%v)", req) 593 594 s.mu.Lock() 595 sess, ok := s.sessions[req.Session] 596 s.mu.Unlock() 597 if !ok { 598 // TODO: what error does the real Spanner return? 599 return nil, status.Errorf(codes.NotFound, "unknown session %q", req.Session) 600 } 601 602 id := genRandomTransaction() 603 tx := s.db.NewTransaction() 604 605 sess.mu.Lock() 606 sess.lastUse = time.Now() 607 sess.transactions[id] = tx 608 sess.mu.Unlock() 609 610 return &spannerpb.Transaction{Id: []byte(id)}, nil 611} 612 613func (s *server) Commit(ctx context.Context, req *spannerpb.CommitRequest) (resp *spannerpb.CommitResponse, err error) { 614 //s.logf("Commit(%q, %q)", req.Session, req.Transaction) 615 616 obj, ok := req.Transaction.(*spannerpb.CommitRequest_TransactionId) 617 if !ok { 618 return nil, fmt.Errorf("unsupported transaction type %T", req.Transaction) 619 } 620 tid := string(obj.TransactionId) 621 622 tx, err := s.popTx(req.Session, tid) 623 if err != nil { 624 return nil, err 625 } 626 defer func() { 627 if err != nil { 628 tx.Rollback() 629 } 630 }() 631 tx.Start() 632 633 for _, m := range req.Mutations { 634 switch op := m.Operation.(type) { 635 default: 636 return nil, fmt.Errorf("unsupported mutation operation type %T", op) 637 case *spannerpb.Mutation_Insert: 638 ins := op.Insert 639 err := s.db.Insert(tx, ins.Table, ins.Columns, ins.Values) 640 if err != nil { 641 return nil, err 642 } 643 case *spannerpb.Mutation_Update: 644 up := op.Update 645 err := s.db.Update(tx, up.Table, up.Columns, up.Values) 646 if err != nil { 647 return nil, err 648 } 649 case *spannerpb.Mutation_InsertOrUpdate: 650 iou := op.InsertOrUpdate 651 err := s.db.InsertOrUpdate(tx, iou.Table, iou.Columns, iou.Values) 652 if err != nil { 653 return nil, err 654 } 655 case *spannerpb.Mutation_Delete_: 656 del := op.Delete 657 ks := del.KeySet 658 659 err := s.db.Delete(tx, del.Table, ks.Keys, makeKeyRangeList(ks.Ranges), ks.All) 660 if err != nil { 661 return nil, err 662 } 663 } 664 665 } 666 667 ts, err := tx.Commit() 668 if err != nil { 669 return nil, err 670 } 671 672 return &spannerpb.CommitResponse{ 673 CommitTimestamp: timestampProto(ts), 674 }, nil 675} 676 677func (s *server) Rollback(ctx context.Context, req *spannerpb.RollbackRequest) (*emptypb.Empty, error) { 678 s.logf("Rollback(%v)", req) 679 680 tx, err := s.popTx(req.Session, string(req.TransactionId)) 681 if err != nil { 682 return nil, err 683 } 684 685 tx.Rollback() 686 687 return &emptypb.Empty{}, nil 688} 689 690// TODO: PartitionQuery, PartitionRead 691 692func parseQueryParams(p *structpb.Struct) (queryParams, error) { 693 params := make(queryParams) 694 for k, v := range p.GetFields() { 695 switch v := v.Kind.(type) { 696 default: 697 return nil, fmt.Errorf("unsupported well-known type value kind %T", v) 698 case *structpb.Value_NullValue: 699 params[k] = nil 700 case *structpb.Value_NumberValue: 701 params[k] = v.NumberValue 702 case *structpb.Value_StringValue: 703 params[k] = v.StringValue 704 } 705 } 706 return params, nil 707} 708 709func spannerTypeFromType(typ spansql.Type) (*spannerpb.Type, error) { 710 var code spannerpb.TypeCode 711 switch typ.Base { 712 default: 713 return nil, fmt.Errorf("unhandled base type %d", typ.Base) 714 case spansql.Bool: 715 code = spannerpb.TypeCode_BOOL 716 case spansql.Int64: 717 code = spannerpb.TypeCode_INT64 718 case spansql.Float64: 719 code = spannerpb.TypeCode_FLOAT64 720 case spansql.String: 721 code = spannerpb.TypeCode_STRING 722 case spansql.Bytes: 723 code = spannerpb.TypeCode_BYTES 724 case spansql.Date: 725 code = spannerpb.TypeCode_DATE 726 case spansql.Timestamp: 727 code = spannerpb.TypeCode_TIMESTAMP 728 } 729 st := &spannerpb.Type{Code: code} 730 if typ.Array { 731 st = &spannerpb.Type{ 732 Code: spannerpb.TypeCode_ARRAY, 733 ArrayElementType: st, 734 } 735 } 736 return st, nil 737} 738 739func spannerValueFromValue(x interface{}) (*structpb.Value, error) { 740 switch x := x.(type) { 741 default: 742 return nil, fmt.Errorf("unhandled database value type %T", x) 743 case bool: 744 return &structpb.Value{Kind: &structpb.Value_BoolValue{x}}, nil 745 case int64: 746 // The Spanner int64 is actually a decimal string. 747 s := strconv.FormatInt(x, 10) 748 return &structpb.Value{Kind: &structpb.Value_StringValue{s}}, nil 749 case float64: 750 return &structpb.Value{Kind: &structpb.Value_NumberValue{x}}, nil 751 case string: 752 return &structpb.Value{Kind: &structpb.Value_StringValue{x}}, nil 753 case []byte: 754 return &structpb.Value{Kind: &structpb.Value_StringValue{base64.StdEncoding.EncodeToString(x)}}, nil 755 case nil: 756 return &structpb.Value{Kind: &structpb.Value_NullValue{}}, nil 757 case []interface{}: 758 var vs []*structpb.Value 759 for _, elem := range x { 760 v, err := spannerValueFromValue(elem) 761 if err != nil { 762 return nil, err 763 } 764 vs = append(vs, v) 765 } 766 return &structpb.Value{Kind: &structpb.Value_ListValue{ 767 &structpb.ListValue{Values: vs}, 768 }}, nil 769 } 770} 771 772func makeKeyRangeList(ranges []*spannerpb.KeyRange) keyRangeList { 773 var krl keyRangeList 774 for _, r := range ranges { 775 krl = append(krl, makeKeyRange(r)) 776 } 777 return krl 778} 779 780func makeKeyRange(r *spannerpb.KeyRange) *keyRange { 781 var kr keyRange 782 switch s := r.StartKeyType.(type) { 783 case *spannerpb.KeyRange_StartClosed: 784 kr.start = s.StartClosed 785 kr.startClosed = true 786 case *spannerpb.KeyRange_StartOpen: 787 kr.start = s.StartOpen 788 } 789 switch e := r.EndKeyType.(type) { 790 case *spannerpb.KeyRange_EndClosed: 791 kr.end = e.EndClosed 792 kr.endClosed = true 793 case *spannerpb.KeyRange_EndOpen: 794 kr.end = e.EndOpen 795 } 796 return &kr 797} 798