1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package testutil
18
19import (
20	"context"
21	"encoding/binary"
22	"fmt"
23	"io"
24	"net"
25	"sync"
26	"testing"
27	"time"
28
29	"github.com/golang/protobuf/ptypes/empty"
30	proto3 "github.com/golang/protobuf/ptypes/struct"
31	pbt "github.com/golang/protobuf/ptypes/timestamp"
32	sppb "google.golang.org/genproto/googleapis/spanner/v1"
33	"google.golang.org/grpc"
34	"google.golang.org/grpc/codes"
35	"google.golang.org/grpc/status"
36)
37
38var (
39	// KvMeta is the Metadata for mocked KV table.
40	KvMeta = sppb.ResultSetMetadata{
41		RowType: &sppb.StructType{
42			Fields: []*sppb.StructType_Field{
43				{
44					Name: "Key",
45					Type: &sppb.Type{Code: sppb.TypeCode_STRING},
46				},
47				{
48					Name: "Value",
49					Type: &sppb.Type{Code: sppb.TypeCode_STRING},
50				},
51			},
52		},
53	}
54)
55
56// MockCtlMsg encapsulates PartialResultSet/error that might be sent to
57// client
58type MockCtlMsg struct {
59	// If ResumeToken == true, mock server will generate a row with
60	// resume token.
61	ResumeToken bool
62	// If Err != nil, mock server will return error in RPC response.
63	Err error
64}
65
66// MockCloudSpanner is a mock implementation of SpannerServer interface.
67// TODO: make MockCloudSpanner a full-fleged Cloud Spanner implementation.
68type MockCloudSpanner struct {
69	sppb.SpannerServer
70
71	s      *grpc.Server
72	t      *testing.T
73	addr   string
74	msgs   chan MockCtlMsg
75	readTs time.Time
76
77	mu          sync.Mutex
78	next        int
79	nextSession int
80	sessions    map[string]*sppb.Session
81}
82
83// Addr returns the listening address of mock server.
84func (m *MockCloudSpanner) Addr() string {
85	return m.addr
86}
87
88// AddMsg generates a new mocked row which can be received by client.
89func (m *MockCloudSpanner) AddMsg(err error, resumeToken bool) {
90	msg := MockCtlMsg{
91		ResumeToken: resumeToken,
92		Err:         err,
93	}
94	if err == io.EOF {
95		close(m.msgs)
96	} else {
97		m.msgs <- msg
98	}
99}
100
101// Done signals an end to a mocked stream.
102func (m *MockCloudSpanner) Done() {
103	close(m.msgs)
104}
105
106// CreateSession is a placeholder for SpannerServer.CreateSession.
107func (m *MockCloudSpanner) CreateSession(c context.Context, r *sppb.CreateSessionRequest) (*sppb.Session, error) {
108	m.mu.Lock()
109	defer m.mu.Unlock()
110	name := fmt.Sprintf("session-%d", m.nextSession)
111	m.nextSession++
112	s := &sppb.Session{Name: name}
113	m.sessions[name] = s
114	return s, nil
115}
116
117// GetSession is a placeholder for SpannerServer.GetSession.
118func (m *MockCloudSpanner) GetSession(c context.Context, r *sppb.GetSessionRequest) (*sppb.Session, error) {
119	m.mu.Lock()
120	defer m.mu.Unlock()
121	if s, ok := m.sessions[r.Name]; ok {
122		return s, nil
123	}
124	return nil, status.Errorf(codes.NotFound, "not found")
125}
126
127// DeleteSession is a placeholder for SpannerServer.DeleteSession.
128func (m *MockCloudSpanner) DeleteSession(c context.Context, r *sppb.DeleteSessionRequest) (*empty.Empty, error) {
129	m.mu.Lock()
130	defer m.mu.Unlock()
131	delete(m.sessions, r.Name)
132	return &empty.Empty{}, nil
133}
134
135// EncodeResumeToken return mock resume token encoding for an uint64 integer.
136func EncodeResumeToken(t uint64) []byte {
137	rt := make([]byte, 16)
138	binary.PutUvarint(rt, t)
139	return rt
140}
141
142// DecodeResumeToken decodes a mock resume token into an uint64 integer.
143func DecodeResumeToken(t []byte) (uint64, error) {
144	s, n := binary.Uvarint(t)
145	if n <= 0 {
146		return 0, fmt.Errorf("invalid resume token: %v", t)
147	}
148	return s, nil
149}
150
151// ExecuteStreamingSql is a mock implementation of SpannerServer.ExecuteStreamingSql.
152func (m *MockCloudSpanner) ExecuteStreamingSql(r *sppb.ExecuteSqlRequest, s sppb.Spanner_ExecuteStreamingSqlServer) error {
153	switch r.Sql {
154	case "SELECT * from t_unavailable":
155		return status.Errorf(codes.Unavailable, "mock table unavailable")
156
157	case "UPDATE t SET x = 2 WHERE x = 1":
158		err := s.Send(&sppb.PartialResultSet{
159			Stats: &sppb.ResultSetStats{RowCount: &sppb.ResultSetStats_RowCountLowerBound{3}},
160		})
161		if err != nil {
162			panic(err)
163		}
164		return nil
165
166	case "SELECT t.key key, t.value value FROM t_mock t":
167		if r.ResumeToken != nil {
168			s, err := DecodeResumeToken(r.ResumeToken)
169			if err != nil {
170				return err
171			}
172			m.mu.Lock()
173			m.next = int(s) + 1
174			m.mu.Unlock()
175		}
176		for {
177			msg, more := <-m.msgs
178			if !more {
179				break
180			}
181			if msg.Err == nil {
182				var rt []byte
183				if msg.ResumeToken {
184					m.mu.Lock()
185					rt = EncodeResumeToken(uint64(m.next))
186					m.mu.Unlock()
187				}
188				meta := KvMeta
189				meta.Transaction = &sppb.Transaction{
190					ReadTimestamp: &pbt.Timestamp{
191						Seconds: m.readTs.Unix(),
192						Nanos:   int32(m.readTs.Nanosecond()),
193					},
194				}
195				m.mu.Lock()
196				next := m.next
197				m.next++
198				m.mu.Unlock()
199				err := s.Send(&sppb.PartialResultSet{
200					Metadata: &meta,
201					Values: []*proto3.Value{
202						{Kind: &proto3.Value_StringValue{StringValue: fmt.Sprintf("foo-%02d", next)}},
203						{Kind: &proto3.Value_StringValue{StringValue: fmt.Sprintf("bar-%02d", next)}},
204					},
205					ResumeToken: rt,
206				})
207				if err != nil {
208					return err
209				}
210				continue
211			}
212			return msg.Err
213		}
214		return nil
215	default:
216		return fmt.Errorf("unsupported SQL: %v", r.Sql)
217	}
218}
219
220// StreamingRead is a placeholder for SpannerServer.StreamingRead.
221func (m *MockCloudSpanner) StreamingRead(r *sppb.ReadRequest, s sppb.Spanner_StreamingReadServer) error {
222	return s.Send(&sppb.PartialResultSet{})
223}
224
225// Serve runs a MockCloudSpanner listening on a random localhost address.
226func (m *MockCloudSpanner) Serve() {
227	m.s = grpc.NewServer()
228	if m.addr == "" {
229		m.addr = "localhost:0"
230	}
231	lis, err := net.Listen("tcp", m.addr)
232	if err != nil {
233		m.t.Fatalf("Failed to listen: %v", err)
234	}
235	_, port, err := net.SplitHostPort(lis.Addr().String())
236	if err != nil {
237		m.t.Fatalf("Failed to parse listener address: %v", err)
238	}
239	sppb.RegisterSpannerServer(m.s, m)
240	m.addr = "localhost:" + port
241	go m.s.Serve(lis)
242}
243
244// BeginTransaction is a placeholder for SpannerServer.BeginTransaction.
245func (m *MockCloudSpanner) BeginTransaction(_ context.Context, r *sppb.BeginTransactionRequest) (*sppb.Transaction, error) {
246	m.mu.Lock()
247	defer m.mu.Unlock()
248	return &sppb.Transaction{}, nil
249}
250
251// Stop terminates MockCloudSpanner and closes the serving port.
252func (m *MockCloudSpanner) Stop() {
253	m.s.Stop()
254}
255
256// NewMockCloudSpanner creates a new MockCloudSpanner instance.
257func NewMockCloudSpanner(t *testing.T, ts time.Time) *MockCloudSpanner {
258	mcs := &MockCloudSpanner{
259		t:        t,
260		msgs:     make(chan MockCtlMsg, 1000),
261		readTs:   ts,
262		sessions: map[string]*sppb.Session{},
263	}
264	return mcs
265}
266