1/* 2Copyright 2019 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17/* 18Package spannertest contains test helpers for working with Cloud Spanner. 19 20This package is EXPERIMENTAL, and is lacking many features. See the README.md 21file in this directory for more details. 22 23In-memory fake 24 25This package has an in-memory fake implementation of spanner. To use it, 26create a Server, and then connect to it with no security: 27 srv, err := spannertest.NewServer("localhost:0") 28 ... 29 conn, err := grpc.DialContext(ctx, srv.Addr, grpc.WithInsecure()) 30 ... 31 client, err := spanner.NewClient(ctx, db, option.WithGRPCConn(conn)) 32 ... 33 34Alternatively, create a Server, then set the SPANNER_EMULATOR_HOST environment 35variable and use the regular spanner.NewClient: 36 srv, err := spannertest.NewServer("localhost:0") 37 ... 38 os.Setenv("SPANNER_EMULATOR_HOST", srv.Addr) 39 client, err := spanner.NewClient(ctx, db) 40 ... 41 42The same server also supports database admin operations for use with 43the cloud.google.com/go/spanner/admin/database/apiv1 package. 44*/ 45package spannertest 46 47import ( 48 "context" 49 "encoding/base64" 50 "fmt" 51 "io" 52 "log" 53 "math/rand" 54 "net" 55 "strconv" 56 "sync" 57 "time" 58 59 "github.com/golang/protobuf/proto" 60 "github.com/golang/protobuf/ptypes" 61 "google.golang.org/grpc" 62 "google.golang.org/grpc/codes" 63 "google.golang.org/grpc/status" 64 65 anypb "github.com/golang/protobuf/ptypes/any" 66 emptypb "github.com/golang/protobuf/ptypes/empty" 67 structpb "github.com/golang/protobuf/ptypes/struct" 68 timestamppb "github.com/golang/protobuf/ptypes/timestamp" 69 lropb "google.golang.org/genproto/googleapis/longrunning" 70 adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1" 71 spannerpb "google.golang.org/genproto/googleapis/spanner/v1" 72 73 "cloud.google.com/go/spanner/spansql" 74) 75 76// Server is an in-memory Cloud Spanner fake. 77// It is unauthenticated, non-performant, and only a rough approximation. 78type Server struct { 79 Addr string 80 81 l net.Listener 82 srv *grpc.Server 83 s *server 84} 85 86// server is the real implementation of the fake. 87// It is a separate and unexported type so the API won't be cluttered with 88// methods that are only relevant to the fake's implementation. 89type server struct { 90 logf Logger 91 92 db database 93 94 mu sync.Mutex 95 sessions map[string]*session 96 lros map[string]*lro 97 98 // Any unimplemented methods will cause a panic. 99 // TODO: Switch to Unimplemented at some point? spannerpb would need regenerating. 100 adminpb.DatabaseAdminServer 101 spannerpb.SpannerServer 102 lropb.OperationsServer 103} 104 105type session struct { 106 name string 107 creation time.Time 108 109 // This context tracks the lifetime of this session. 110 // It is canceled in DeleteSession. 111 ctx context.Context 112 cancel func() 113 114 mu sync.Mutex 115 lastUse time.Time 116 transactions map[string]*transaction 117} 118 119func (s *session) Proto() *spannerpb.Session { 120 s.mu.Lock() 121 defer s.mu.Unlock() 122 m := &spannerpb.Session{ 123 Name: s.name, 124 CreateTime: timestampProto(s.creation), 125 ApproximateLastUseTime: timestampProto(s.lastUse), 126 } 127 return m 128} 129 130// timestampProto returns a valid timestamp.Timestamp, 131// or nil if the given time is zero or isn't representable. 132func timestampProto(t time.Time) *timestamppb.Timestamp { 133 if t.IsZero() { 134 return nil 135 } 136 ts, err := ptypes.TimestampProto(t) 137 if err != nil { 138 return nil 139 } 140 return ts 141} 142 143// lro represents a Long-Running Operation, generally a schema change. 144type lro struct { 145 mu sync.Mutex 146 state *lropb.Operation 147} 148 149func (l *lro) State() *lropb.Operation { 150 l.mu.Lock() 151 defer l.mu.Unlock() 152 return proto.Clone(l.state).(*lropb.Operation) 153} 154 155// Logger is something that can be used for logging. 156// It is matched by log.Printf and testing.T.Logf. 157type Logger func(format string, args ...interface{}) 158 159// NewServer creates a new Server. 160// The Server will be listening for gRPC connections, without TLS, on the provided TCP address. 161// The resolved address is available in the Addr field. 162func NewServer(laddr string) (*Server, error) { 163 l, err := net.Listen("tcp", laddr) 164 if err != nil { 165 return nil, err 166 } 167 168 s := &Server{ 169 Addr: l.Addr().String(), 170 l: l, 171 srv: grpc.NewServer(), 172 s: &server{ 173 logf: func(format string, args ...interface{}) { 174 log.Printf("spannertest.inmem: "+format, args...) 175 }, 176 sessions: make(map[string]*session), 177 lros: make(map[string]*lro), 178 }, 179 } 180 adminpb.RegisterDatabaseAdminServer(s.srv, s.s) 181 spannerpb.RegisterSpannerServer(s.srv, s.s) 182 lropb.RegisterOperationsServer(s.srv, s.s) 183 184 go s.srv.Serve(s.l) 185 186 return s, nil 187} 188 189// SetLogger sets a logger for the server. 190// You can use a *testing.T as this argument to collate extra information 191// from the execution of the server. 192func (s *Server) SetLogger(l Logger) { s.s.logf = l } 193 194// Close shuts down the server. 195func (s *Server) Close() { 196 s.srv.Stop() 197 s.l.Close() 198} 199 200func genRandomSession() string { 201 var b [4]byte 202 rand.Read(b[:]) 203 return fmt.Sprintf("%x", b) 204} 205 206func genRandomTransaction() string { 207 var b [6]byte 208 rand.Read(b[:]) 209 return fmt.Sprintf("tx-%x", b) 210} 211 212func genRandomOperation() string { 213 var b [3]byte 214 rand.Read(b[:]) 215 return fmt.Sprintf("op-%x", b) 216} 217 218func (s *server) GetOperation(ctx context.Context, req *lropb.GetOperationRequest) (*lropb.Operation, error) { 219 s.mu.Lock() 220 lro, ok := s.lros[req.Name] 221 s.mu.Unlock() 222 if !ok { 223 return nil, status.Errorf(codes.NotFound, "unknown LRO %q", req.Name) 224 } 225 return lro.State(), nil 226} 227 228// UpdateDDL applies the given DDL to the server. 229// 230// This is a convenience method for tests that may assume an existing schema. 231// The more general approach is to dial this server using an admin client, and 232// use the UpdateDatabaseDdl RPC method. 233func (s *Server) UpdateDDL(ddl *spansql.DDL) error { 234 ctx := context.Background() 235 for _, stmt := range ddl.List { 236 if st := s.s.runOneDDL(ctx, stmt); st.Code() != codes.OK { 237 return st.Err() 238 } 239 } 240 return nil 241} 242 243func (s *server) UpdateDatabaseDdl(ctx context.Context, req *adminpb.UpdateDatabaseDdlRequest) (*lropb.Operation, error) { 244 // Parse all the DDL statements first. 245 var stmts []spansql.DDLStmt 246 for _, s := range req.Statements { 247 stmt, err := spansql.ParseDDLStmt(s) 248 if err != nil { 249 // TODO: check what code the real Spanner returns here. 250 return nil, status.Errorf(codes.InvalidArgument, "bad DDL statement %q: %v", s, err) 251 } 252 stmts = append(stmts, stmt) 253 } 254 255 // Nothing should be depending on the exact structure of this, 256 // but it is specified in google/spanner/admin/database/v1/spanner_database_admin.proto. 257 id := "projects/fake-proj/instances/fake-instance/databases/fake-db/operations/" + genRandomOperation() 258 lro := &lro{ 259 state: &lropb.Operation{ 260 Name: id, 261 }, 262 } 263 s.mu.Lock() 264 s.lros[id] = lro 265 s.mu.Unlock() 266 267 go lro.Run(s, stmts) 268 return lro.State(), nil 269} 270 271func (l *lro) Run(s *server, stmts []spansql.DDLStmt) { 272 ctx := context.Background() 273 274 for _, stmt := range stmts { 275 time.Sleep(100 * time.Millisecond) 276 if st := s.runOneDDL(ctx, stmt); st.Code() != codes.OK { 277 l.mu.Lock() 278 l.state.Done = true 279 l.state.Result = &lropb.Operation_Error{st.Proto()} 280 l.mu.Unlock() 281 return 282 } 283 } 284 285 l.mu.Lock() 286 l.state.Done = true 287 l.state.Result = &lropb.Operation_Response{&anypb.Any{}} 288 l.mu.Unlock() 289} 290 291func (s *server) runOneDDL(ctx context.Context, stmt spansql.DDLStmt) *status.Status { 292 return s.db.ApplyDDL(stmt) 293} 294 295func (s *server) CreateSession(ctx context.Context, req *spannerpb.CreateSessionRequest) (*spannerpb.Session, error) { 296 //s.logf("CreateSession(%q)", req.Database) 297 return s.newSession(), nil 298} 299 300func (s *server) newSession() *spannerpb.Session { 301 id := genRandomSession() 302 now := time.Now() 303 sess := &session{ 304 name: id, 305 creation: now, 306 lastUse: now, 307 transactions: make(map[string]*transaction), 308 } 309 sess.ctx, sess.cancel = context.WithCancel(context.Background()) 310 311 s.mu.Lock() 312 s.sessions[id] = sess 313 s.mu.Unlock() 314 315 return sess.Proto() 316} 317 318func (s *server) BatchCreateSessions(ctx context.Context, req *spannerpb.BatchCreateSessionsRequest) (*spannerpb.BatchCreateSessionsResponse, error) { 319 //s.logf("BatchCreateSessions(%q)", req.Database) 320 321 var sessions []*spannerpb.Session 322 for i := int32(0); i < req.GetSessionCount(); i++ { 323 sessions = append(sessions, s.newSession()) 324 } 325 326 return &spannerpb.BatchCreateSessionsResponse{Session: sessions}, nil 327} 328 329func (s *server) GetSession(ctx context.Context, req *spannerpb.GetSessionRequest) (*spannerpb.Session, error) { 330 s.mu.Lock() 331 sess, ok := s.sessions[req.Name] 332 s.mu.Unlock() 333 334 if !ok { 335 // TODO: what error does the real Spanner return? 336 return nil, status.Errorf(codes.NotFound, "unknown session %q", req.Name) 337 } 338 339 return sess.Proto(), nil 340} 341 342// TODO: ListSessions 343 344func (s *server) DeleteSession(ctx context.Context, req *spannerpb.DeleteSessionRequest) (*emptypb.Empty, error) { 345 //s.logf("DeleteSession(%q)", req.Name) 346 347 s.mu.Lock() 348 sess, ok := s.sessions[req.Name] 349 delete(s.sessions, req.Name) 350 s.mu.Unlock() 351 352 if !ok { 353 // TODO: what error does the real Spanner return? 354 return nil, status.Errorf(codes.NotFound, "unknown session %q", req.Name) 355 } 356 357 // Terminate any operations in this session. 358 sess.cancel() 359 360 return &emptypb.Empty{}, nil 361} 362 363// popTx returns an existing transaction, removing it from the session. 364// This is called when a transaction is finishing (Commit, Rollback). 365func (s *server) popTx(sessionID, tid string) (tx *transaction, err error) { 366 s.mu.Lock() 367 sess, ok := s.sessions[sessionID] 368 s.mu.Unlock() 369 if !ok { 370 // TODO: what error does the real Spanner return? 371 return nil, status.Errorf(codes.NotFound, "unknown session %q", sessionID) 372 } 373 374 sess.mu.Lock() 375 sess.lastUse = time.Now() 376 tx, ok = sess.transactions[tid] 377 if ok { 378 delete(sess.transactions, tid) 379 } 380 sess.mu.Unlock() 381 if !ok { 382 // TODO: what error does the real Spanner return? 383 return nil, status.Errorf(codes.NotFound, "unknown transaction ID %q", tid) 384 } 385 return tx, nil 386} 387 388// readTx returns a transaction for the given session and transaction selector. 389// It is used by read/query operations (ExecuteStreamingSql, StreamingRead). 390func (s *server) readTx(ctx context.Context, session string, tsel *spannerpb.TransactionSelector) (tx *transaction, cleanup func(), err error) { 391 s.mu.Lock() 392 sess, ok := s.sessions[session] 393 s.mu.Unlock() 394 if !ok { 395 // TODO: what error does the real Spanner return? 396 return nil, nil, status.Errorf(codes.NotFound, "unknown session %q", session) 397 } 398 399 sess.mu.Lock() 400 sess.lastUse = time.Now() 401 sess.mu.Unlock() 402 403 // Only give a read-only transaction regardless of whether the selector 404 // is requesting a read-write or read-only one, since this is in readTx 405 // and so shouldn't be mutating anyway. 406 singleUse := func() (*transaction, func(), error) { 407 tx := s.db.NewReadOnlyTransaction() 408 return tx, tx.Rollback, nil 409 } 410 411 if tsel.GetSelector() == nil { 412 return singleUse() 413 } 414 415 switch sel := tsel.Selector.(type) { 416 default: 417 return nil, nil, fmt.Errorf("TransactionSelector type %T not supported", sel) 418 case *spannerpb.TransactionSelector_SingleUse: 419 // Ignore options (e.g. timestamps). 420 switch mode := sel.SingleUse.Mode.(type) { 421 case *spannerpb.TransactionOptions_ReadOnly_: 422 return singleUse() 423 case *spannerpb.TransactionOptions_ReadWrite_: 424 return singleUse() 425 default: 426 return nil, nil, fmt.Errorf("single use transaction in mode %T not supported", mode) 427 } 428 case *spannerpb.TransactionSelector_Id: 429 sess.mu.Lock() 430 tx, ok := sess.transactions[string(sel.Id)] 431 sess.mu.Unlock() 432 if !ok { 433 return nil, nil, fmt.Errorf("no transaction with id %q", sel.Id) 434 } 435 return tx, func() {}, nil 436 } 437} 438 439func (s *server) ExecuteSql(ctx context.Context, req *spannerpb.ExecuteSqlRequest) (*spannerpb.ResultSet, error) { 440 // Assume this is probably a DML statement. Queries tend to use ExecuteStreamingSql. 441 // TODO: Expand this to support more things. 442 443 obj, ok := req.Transaction.Selector.(*spannerpb.TransactionSelector_Id) 444 if !ok { 445 return nil, fmt.Errorf("unsupported transaction type %T", req.Transaction.Selector) 446 } 447 tid := string(obj.Id) 448 _ = tid // TODO: lookup an existing transaction by ID. 449 450 stmt, err := spansql.ParseDMLStmt(req.Sql) 451 if err != nil { 452 return nil, status.Errorf(codes.InvalidArgument, "bad DML: %v", err) 453 } 454 params, err := parseQueryParams(req.GetParams()) 455 if err != nil { 456 return nil, err 457 } 458 459 s.logf("Executing: %s", stmt.SQL()) 460 if len(params) > 0 { 461 s.logf(" ▹ %v", params) 462 } 463 464 n, err := s.db.Execute(stmt, params) 465 if err != nil { 466 return nil, err 467 } 468 return &spannerpb.ResultSet{ 469 Stats: &spannerpb.ResultSetStats{ 470 RowCount: &spannerpb.ResultSetStats_RowCountExact{int64(n)}, 471 }, 472 }, nil 473} 474 475func (s *server) ExecuteStreamingSql(req *spannerpb.ExecuteSqlRequest, stream spannerpb.Spanner_ExecuteStreamingSqlServer) error { 476 tx, cleanup, err := s.readTx(stream.Context(), req.Session, req.Transaction) 477 if err != nil { 478 return err 479 } 480 defer cleanup() 481 482 q, err := spansql.ParseQuery(req.Sql) 483 if err != nil { 484 // TODO: check what code the real Spanner returns here. 485 return status.Errorf(codes.InvalidArgument, "bad query: %v", err) 486 } 487 488 params, err := parseQueryParams(req.GetParams()) 489 if err != nil { 490 return err 491 } 492 493 s.logf("Querying: %s", q.SQL()) 494 if len(params) > 0 { 495 s.logf(" ▹ %v", params) 496 } 497 498 ri, err := s.db.Query(q, params) 499 if err != nil { 500 return err 501 } 502 return s.readStream(stream.Context(), tx, stream.Send, ri) 503} 504 505// TODO: Read 506 507func (s *server) StreamingRead(req *spannerpb.ReadRequest, stream spannerpb.Spanner_StreamingReadServer) error { 508 tx, cleanup, err := s.readTx(stream.Context(), req.Session, req.Transaction) 509 if err != nil { 510 return err 511 } 512 defer cleanup() 513 514 // Bail out if various advanced features are being used. 515 if req.Index != "" { 516 // This is okay; we can still return results. 517 s.logf("Warning: index reads (%q) not supported", req.Index) 518 } 519 if len(req.ResumeToken) > 0 { 520 // This should only happen if we send resume_token ourselves. 521 return fmt.Errorf("read resumption not supported") 522 } 523 if len(req.PartitionToken) > 0 { 524 return fmt.Errorf("partition restrictions not supported") 525 } 526 527 var ri rowIter 528 if req.KeySet.All { 529 s.logf("Reading all from %s (cols: %v)", req.Table, req.Columns) 530 ri, err = s.db.ReadAll(req.Table, req.Columns, req.Limit) 531 } else { 532 s.logf("Reading rows from %d keys and %d ranges from %s (cols: %v)", len(req.KeySet.Keys), len(req.KeySet.Ranges), req.Table, req.Columns) 533 ri, err = s.db.Read(req.Table, req.Columns, req.KeySet.Keys, makeKeyRangeList(req.KeySet.Ranges), req.Limit) 534 } 535 if err != nil { 536 return err 537 } 538 539 // TODO: Figure out the right contexts to use here. There's the session one (sess.ctx), 540 // but also this specific RPC one (stream.Context()). Which takes precedence? 541 // They appear to be independent. 542 543 return s.readStream(stream.Context(), tx, stream.Send, ri) 544} 545 546func (s *server) readStream(ctx context.Context, tx *transaction, send func(*spannerpb.PartialResultSet) error, ri rowIter) error { 547 // Build the result set metadata. 548 rsm := &spannerpb.ResultSetMetadata{ 549 RowType: &spannerpb.StructType{}, 550 // TODO: transaction info? 551 } 552 for _, ci := range ri.Cols() { 553 st, err := spannerTypeFromType(ci.Type) 554 if err != nil { 555 return err 556 } 557 rsm.RowType.Fields = append(rsm.RowType.Fields, &spannerpb.StructType_Field{ 558 Name: ci.Name, 559 Type: st, 560 }) 561 } 562 563 for { 564 row, err := ri.Next() 565 if err == io.EOF { 566 break 567 } else if err != nil { 568 return err 569 } 570 571 values := make([]*structpb.Value, len(row)) 572 for i, x := range row { 573 v, err := spannerValueFromValue(x) 574 if err != nil { 575 return err 576 } 577 values[i] = v 578 } 579 580 prs := &spannerpb.PartialResultSet{ 581 Metadata: rsm, 582 Values: values, 583 } 584 if err := send(prs); err != nil { 585 return err 586 } 587 588 // ResultSetMetadata is only set for the first PartialResultSet. 589 rsm = nil 590 } 591 592 return nil 593} 594 595func (s *server) BeginTransaction(ctx context.Context, req *spannerpb.BeginTransactionRequest) (*spannerpb.Transaction, error) { 596 //s.logf("BeginTransaction(%v)", req) 597 598 s.mu.Lock() 599 sess, ok := s.sessions[req.Session] 600 s.mu.Unlock() 601 if !ok { 602 // TODO: what error does the real Spanner return? 603 return nil, status.Errorf(codes.NotFound, "unknown session %q", req.Session) 604 } 605 606 id := genRandomTransaction() 607 tx := s.db.NewTransaction() 608 609 sess.mu.Lock() 610 sess.lastUse = time.Now() 611 sess.transactions[id] = tx 612 sess.mu.Unlock() 613 614 tr := &spannerpb.Transaction{Id: []byte(id)} 615 616 if req.GetOptions().GetReadOnly().GetReturnReadTimestamp() { 617 // Return the last commit timestamp. 618 // This isn't wholly accurate, but may be good enough for simple use cases. 619 tr.ReadTimestamp = timestampProto(s.db.LastCommitTimestamp()) 620 } 621 622 return tr, nil 623} 624 625func (s *server) Commit(ctx context.Context, req *spannerpb.CommitRequest) (resp *spannerpb.CommitResponse, err error) { 626 //s.logf("Commit(%q, %q)", req.Session, req.Transaction) 627 628 obj, ok := req.Transaction.(*spannerpb.CommitRequest_TransactionId) 629 if !ok { 630 return nil, fmt.Errorf("unsupported transaction type %T", req.Transaction) 631 } 632 tid := string(obj.TransactionId) 633 634 tx, err := s.popTx(req.Session, tid) 635 if err != nil { 636 return nil, err 637 } 638 defer func() { 639 if err != nil { 640 tx.Rollback() 641 } 642 }() 643 tx.Start() 644 645 for _, m := range req.Mutations { 646 switch op := m.Operation.(type) { 647 default: 648 return nil, fmt.Errorf("unsupported mutation operation type %T", op) 649 case *spannerpb.Mutation_Insert: 650 ins := op.Insert 651 err := s.db.Insert(tx, ins.Table, ins.Columns, ins.Values) 652 if err != nil { 653 return nil, err 654 } 655 case *spannerpb.Mutation_Update: 656 up := op.Update 657 err := s.db.Update(tx, up.Table, up.Columns, up.Values) 658 if err != nil { 659 return nil, err 660 } 661 case *spannerpb.Mutation_InsertOrUpdate: 662 iou := op.InsertOrUpdate 663 err := s.db.InsertOrUpdate(tx, iou.Table, iou.Columns, iou.Values) 664 if err != nil { 665 return nil, err 666 } 667 case *spannerpb.Mutation_Delete_: 668 del := op.Delete 669 ks := del.KeySet 670 671 err := s.db.Delete(tx, del.Table, ks.Keys, makeKeyRangeList(ks.Ranges), ks.All) 672 if err != nil { 673 return nil, err 674 } 675 } 676 677 } 678 679 ts, err := tx.Commit() 680 if err != nil { 681 return nil, err 682 } 683 684 return &spannerpb.CommitResponse{ 685 CommitTimestamp: timestampProto(ts), 686 }, nil 687} 688 689func (s *server) Rollback(ctx context.Context, req *spannerpb.RollbackRequest) (*emptypb.Empty, error) { 690 s.logf("Rollback(%v)", req) 691 692 tx, err := s.popTx(req.Session, string(req.TransactionId)) 693 if err != nil { 694 return nil, err 695 } 696 697 tx.Rollback() 698 699 return &emptypb.Empty{}, nil 700} 701 702// TODO: PartitionQuery, PartitionRead 703 704func parseQueryParams(p *structpb.Struct) (queryParams, error) { 705 params := make(queryParams) 706 for k, v := range p.GetFields() { 707 switch v := v.Kind.(type) { 708 default: 709 return nil, fmt.Errorf("unsupported well-known type value kind %T", v) 710 case *structpb.Value_NullValue: 711 params[k] = nil 712 case *structpb.Value_NumberValue: 713 params[k] = v.NumberValue 714 case *structpb.Value_StringValue: 715 params[k] = v.StringValue 716 } 717 } 718 return params, nil 719} 720 721func spannerTypeFromType(typ spansql.Type) (*spannerpb.Type, error) { 722 var code spannerpb.TypeCode 723 switch typ.Base { 724 default: 725 return nil, fmt.Errorf("unhandled base type %d", typ.Base) 726 case spansql.Bool: 727 code = spannerpb.TypeCode_BOOL 728 case spansql.Int64: 729 code = spannerpb.TypeCode_INT64 730 case spansql.Float64: 731 code = spannerpb.TypeCode_FLOAT64 732 case spansql.String: 733 code = spannerpb.TypeCode_STRING 734 case spansql.Bytes: 735 code = spannerpb.TypeCode_BYTES 736 case spansql.Date: 737 code = spannerpb.TypeCode_DATE 738 case spansql.Timestamp: 739 code = spannerpb.TypeCode_TIMESTAMP 740 } 741 st := &spannerpb.Type{Code: code} 742 if typ.Array { 743 st = &spannerpb.Type{ 744 Code: spannerpb.TypeCode_ARRAY, 745 ArrayElementType: st, 746 } 747 } 748 return st, nil 749} 750 751func spannerValueFromValue(x interface{}) (*structpb.Value, error) { 752 switch x := x.(type) { 753 default: 754 return nil, fmt.Errorf("unhandled database value type %T", x) 755 case bool: 756 return &structpb.Value{Kind: &structpb.Value_BoolValue{x}}, nil 757 case int64: 758 // The Spanner int64 is actually a decimal string. 759 s := strconv.FormatInt(x, 10) 760 return &structpb.Value{Kind: &structpb.Value_StringValue{s}}, nil 761 case float64: 762 return &structpb.Value{Kind: &structpb.Value_NumberValue{x}}, nil 763 case string: 764 return &structpb.Value{Kind: &structpb.Value_StringValue{x}}, nil 765 case []byte: 766 return &structpb.Value{Kind: &structpb.Value_StringValue{base64.StdEncoding.EncodeToString(x)}}, nil 767 case nil: 768 return &structpb.Value{Kind: &structpb.Value_NullValue{}}, nil 769 case []interface{}: 770 var vs []*structpb.Value 771 for _, elem := range x { 772 v, err := spannerValueFromValue(elem) 773 if err != nil { 774 return nil, err 775 } 776 vs = append(vs, v) 777 } 778 return &structpb.Value{Kind: &structpb.Value_ListValue{ 779 &structpb.ListValue{Values: vs}, 780 }}, nil 781 } 782} 783 784func makeKeyRangeList(ranges []*spannerpb.KeyRange) keyRangeList { 785 var krl keyRangeList 786 for _, r := range ranges { 787 krl = append(krl, makeKeyRange(r)) 788 } 789 return krl 790} 791 792func makeKeyRange(r *spannerpb.KeyRange) *keyRange { 793 var kr keyRange 794 switch s := r.StartKeyType.(type) { 795 case *spannerpb.KeyRange_StartClosed: 796 kr.start = s.StartClosed 797 kr.startClosed = true 798 case *spannerpb.KeyRange_StartOpen: 799 kr.start = s.StartOpen 800 } 801 switch e := r.EndKeyType.(type) { 802 case *spannerpb.KeyRange_EndClosed: 803 kr.end = e.EndClosed 804 kr.endClosed = true 805 case *spannerpb.KeyRange_EndOpen: 806 kr.end = e.EndOpen 807 } 808 return &kr 809} 810