1// Copyright 2017 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package rpcreplay
16
17import (
18	"context"
19	"io"
20	"log"
21	"net"
22
23	pb "cloud.google.com/go/rpcreplay/proto/intstore"
24	"google.golang.org/grpc"
25	"google.golang.org/grpc/codes"
26	"google.golang.org/grpc/status"
27)
28
29// intStoreServer is an in-memory implementation of IntStore.
30type intStoreServer struct {
31	pb.IntStoreServer
32
33	Addr string
34	l    net.Listener
35	gsrv *grpc.Server
36
37	items map[string]int32
38}
39
40func newIntStoreServer() *intStoreServer {
41	l, err := net.Listen("tcp", "127.0.0.1:0")
42	if err != nil {
43		log.Fatal(err)
44	}
45	s := &intStoreServer{
46		Addr: l.Addr().String(),
47		l:    l,
48		gsrv: grpc.NewServer(),
49	}
50	pb.RegisterIntStoreServer(s.gsrv, s)
51	go s.gsrv.Serve(s.l)
52	return s
53}
54
55func (s *intStoreServer) stop() {
56	s.gsrv.Stop()
57	s.l.Close()
58}
59
60func (s *intStoreServer) Set(_ context.Context, item *pb.Item) (*pb.SetResponse, error) {
61	old := s.setItem(item)
62	return &pb.SetResponse{PrevValue: old}, nil
63}
64
65func (s *intStoreServer) setItem(item *pb.Item) int32 {
66	if s.items == nil {
67		s.items = map[string]int32{}
68	}
69	old := s.items[item.Name]
70	s.items[item.Name] = item.Value
71	return old
72}
73
74func (s *intStoreServer) Get(_ context.Context, req *pb.GetRequest) (*pb.Item, error) {
75	val, ok := s.items[req.Name]
76	if !ok {
77		return nil, status.Errorf(codes.NotFound, "%q", req.Name)
78	}
79	return &pb.Item{Name: req.Name, Value: val}, nil
80}
81
82func (s *intStoreServer) ListItems(req *pb.ListItemsRequest, ss pb.IntStore_ListItemsServer) error {
83	for name, val := range s.items {
84		if val > req.GreaterThan {
85			if err := ss.Send(&pb.Item{Name: name, Value: val}); err != nil {
86				return err
87			}
88		}
89	}
90	return nil
91}
92
93func (s *intStoreServer) SetStream(ss pb.IntStore_SetStreamServer) error {
94	n := 0
95	for {
96		item, err := ss.Recv()
97		if err == io.EOF {
98			break
99		}
100		if err != nil {
101			return err
102		}
103		s.setItem(item)
104		n++
105	}
106	return ss.SendAndClose(&pb.Summary{Count: int32(n)})
107}
108
109func (s *intStoreServer) StreamChat(ss pb.IntStore_StreamChatServer) error {
110	for {
111		item, err := ss.Recv()
112		if err == io.EOF {
113			break
114		}
115		if err != nil {
116			return err
117		}
118		if err := ss.Send(item); err != nil {
119			return err
120		}
121	}
122	return nil
123}
124