1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package testutil 18 19import ( 20 "context" 21 "encoding/binary" 22 "fmt" 23 "io" 24 "net" 25 "sync" 26 "testing" 27 "time" 28 29 "github.com/golang/protobuf/ptypes/empty" 30 proto3 "github.com/golang/protobuf/ptypes/struct" 31 pbt "github.com/golang/protobuf/ptypes/timestamp" 32 sppb "google.golang.org/genproto/googleapis/spanner/v1" 33 "google.golang.org/grpc" 34 "google.golang.org/grpc/codes" 35 "google.golang.org/grpc/status" 36) 37 38var ( 39 // KvMeta is the Metadata for mocked KV table. 40 KvMeta = sppb.ResultSetMetadata{ 41 RowType: &sppb.StructType{ 42 Fields: []*sppb.StructType_Field{ 43 { 44 Name: "Key", 45 Type: &sppb.Type{Code: sppb.TypeCode_STRING}, 46 }, 47 { 48 Name: "Value", 49 Type: &sppb.Type{Code: sppb.TypeCode_STRING}, 50 }, 51 }, 52 }, 53 } 54) 55 56// MockCtlMsg encapsulates PartialResultSet/error that might be sent to 57// client 58type MockCtlMsg struct { 59 // If ResumeToken == true, mock server will generate a row with 60 // resume token. 61 ResumeToken bool 62 // If Err != nil, mock server will return error in RPC response. 63 Err error 64} 65 66// MockCloudSpanner is a mock implementation of SpannerServer interface. 67// TODO: make MockCloudSpanner a full-fleged Cloud Spanner implementation. 68type MockCloudSpanner struct { 69 sppb.SpannerServer 70 71 s *grpc.Server 72 t *testing.T 73 addr string 74 msgs chan MockCtlMsg 75 readTs time.Time 76 77 mu sync.Mutex 78 next int 79 nextSession int 80 sessions map[string]*sppb.Session 81} 82 83// Addr returns the listening address of mock server. 84func (m *MockCloudSpanner) Addr() string { 85 return m.addr 86} 87 88// AddMsg generates a new mocked row which can be received by client. 89func (m *MockCloudSpanner) AddMsg(err error, resumeToken bool) { 90 msg := MockCtlMsg{ 91 ResumeToken: resumeToken, 92 Err: err, 93 } 94 if err == io.EOF { 95 close(m.msgs) 96 } else { 97 m.msgs <- msg 98 } 99} 100 101// Done signals an end to a mocked stream. 102func (m *MockCloudSpanner) Done() { 103 close(m.msgs) 104} 105 106// CreateSession is a placeholder for SpannerServer.CreateSession. 107func (m *MockCloudSpanner) CreateSession(c context.Context, r *sppb.CreateSessionRequest) (*sppb.Session, error) { 108 m.mu.Lock() 109 defer m.mu.Unlock() 110 name := fmt.Sprintf("session-%d", m.nextSession) 111 m.nextSession++ 112 s := &sppb.Session{Name: name} 113 m.sessions[name] = s 114 return s, nil 115} 116 117// GetSession is a placeholder for SpannerServer.GetSession. 118func (m *MockCloudSpanner) GetSession(c context.Context, r *sppb.GetSessionRequest) (*sppb.Session, error) { 119 m.mu.Lock() 120 defer m.mu.Unlock() 121 if s, ok := m.sessions[r.Name]; ok { 122 return s, nil 123 } 124 return nil, status.Errorf(codes.NotFound, "not found") 125} 126 127// DeleteSession is a placeholder for SpannerServer.DeleteSession. 128func (m *MockCloudSpanner) DeleteSession(c context.Context, r *sppb.DeleteSessionRequest) (*empty.Empty, error) { 129 m.mu.Lock() 130 defer m.mu.Unlock() 131 delete(m.sessions, r.Name) 132 return &empty.Empty{}, nil 133} 134 135// EncodeResumeToken return mock resume token encoding for an uint64 integer. 136func EncodeResumeToken(t uint64) []byte { 137 rt := make([]byte, 16) 138 binary.PutUvarint(rt, t) 139 return rt 140} 141 142// DecodeResumeToken decodes a mock resume token into an uint64 integer. 143func DecodeResumeToken(t []byte) (uint64, error) { 144 s, n := binary.Uvarint(t) 145 if n <= 0 { 146 return 0, fmt.Errorf("invalid resume token: %v", t) 147 } 148 return s, nil 149} 150 151// ExecuteStreamingSql is a mock implementation of SpannerServer.ExecuteStreamingSql. 152func (m *MockCloudSpanner) ExecuteStreamingSql(r *sppb.ExecuteSqlRequest, s sppb.Spanner_ExecuteStreamingSqlServer) error { 153 switch r.Sql { 154 case "SELECT * from t_unavailable": 155 return status.Errorf(codes.Unavailable, "mock table unavailable") 156 157 case "UPDATE t SET x = 2 WHERE x = 1": 158 err := s.Send(&sppb.PartialResultSet{ 159 Stats: &sppb.ResultSetStats{RowCount: &sppb.ResultSetStats_RowCountLowerBound{3}}, 160 }) 161 if err != nil { 162 panic(err) 163 } 164 return nil 165 166 case "SELECT t.key key, t.value value FROM t_mock t": 167 if r.ResumeToken != nil { 168 s, err := DecodeResumeToken(r.ResumeToken) 169 if err != nil { 170 return err 171 } 172 m.mu.Lock() 173 m.next = int(s) + 1 174 m.mu.Unlock() 175 } 176 for { 177 msg, more := <-m.msgs 178 if !more { 179 break 180 } 181 if msg.Err == nil { 182 var rt []byte 183 if msg.ResumeToken { 184 m.mu.Lock() 185 rt = EncodeResumeToken(uint64(m.next)) 186 m.mu.Unlock() 187 } 188 meta := KvMeta 189 meta.Transaction = &sppb.Transaction{ 190 ReadTimestamp: &pbt.Timestamp{ 191 Seconds: m.readTs.Unix(), 192 Nanos: int32(m.readTs.Nanosecond()), 193 }, 194 } 195 m.mu.Lock() 196 next := m.next 197 m.next++ 198 m.mu.Unlock() 199 err := s.Send(&sppb.PartialResultSet{ 200 Metadata: &meta, 201 Values: []*proto3.Value{ 202 {Kind: &proto3.Value_StringValue{StringValue: fmt.Sprintf("foo-%02d", next)}}, 203 {Kind: &proto3.Value_StringValue{StringValue: fmt.Sprintf("bar-%02d", next)}}, 204 }, 205 ResumeToken: rt, 206 }) 207 if err != nil { 208 return err 209 } 210 continue 211 } 212 return msg.Err 213 } 214 return nil 215 default: 216 return fmt.Errorf("unsupported SQL: %v", r.Sql) 217 } 218} 219 220// StreamingRead is a placeholder for SpannerServer.StreamingRead. 221func (m *MockCloudSpanner) StreamingRead(r *sppb.ReadRequest, s sppb.Spanner_StreamingReadServer) error { 222 return s.Send(&sppb.PartialResultSet{}) 223} 224 225// Serve runs a MockCloudSpanner listening on a random localhost address. 226func (m *MockCloudSpanner) Serve() { 227 m.s = grpc.NewServer() 228 if m.addr == "" { 229 m.addr = "localhost:0" 230 } 231 lis, err := net.Listen("tcp", m.addr) 232 if err != nil { 233 m.t.Fatalf("Failed to listen: %v", err) 234 } 235 _, port, err := net.SplitHostPort(lis.Addr().String()) 236 if err != nil { 237 m.t.Fatalf("Failed to parse listener address: %v", err) 238 } 239 sppb.RegisterSpannerServer(m.s, m) 240 m.addr = "localhost:" + port 241 go m.s.Serve(lis) 242} 243 244// BeginTransaction is a placeholder for SpannerServer.BeginTransaction. 245func (m *MockCloudSpanner) BeginTransaction(_ context.Context, r *sppb.BeginTransactionRequest) (*sppb.Transaction, error) { 246 m.mu.Lock() 247 defer m.mu.Unlock() 248 return &sppb.Transaction{}, nil 249} 250 251// Stop terminates MockCloudSpanner and closes the serving port. 252func (m *MockCloudSpanner) Stop() { 253 m.s.Stop() 254} 255 256// NewMockCloudSpanner creates a new MockCloudSpanner instance. 257func NewMockCloudSpanner(t *testing.T, ts time.Time) *MockCloudSpanner { 258 mcs := &MockCloudSpanner{ 259 t: t, 260 msgs: make(chan MockCtlMsg, 1000), 261 readTs: ts, 262 sessions: map[string]*sppb.Session{}, 263 } 264 return mcs 265} 266