1// Copyright 2017 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package rpcreplay 16 17import ( 18 "context" 19 "io" 20 "log" 21 "net" 22 23 pb "cloud.google.com/go/rpcreplay/proto/intstore" 24 "google.golang.org/grpc" 25 "google.golang.org/grpc/codes" 26 "google.golang.org/grpc/status" 27) 28 29// intStoreServer is an in-memory implementation of IntStore. 30type intStoreServer struct { 31 pb.IntStoreServer 32 33 Addr string 34 l net.Listener 35 gsrv *grpc.Server 36 37 items map[string]int32 38} 39 40func newIntStoreServer() *intStoreServer { 41 l, err := net.Listen("tcp", "127.0.0.1:0") 42 if err != nil { 43 log.Fatal(err) 44 } 45 s := &intStoreServer{ 46 Addr: l.Addr().String(), 47 l: l, 48 gsrv: grpc.NewServer(), 49 } 50 pb.RegisterIntStoreServer(s.gsrv, s) 51 go s.gsrv.Serve(s.l) 52 return s 53} 54 55func (s *intStoreServer) stop() { 56 s.gsrv.Stop() 57 s.l.Close() 58} 59 60func (s *intStoreServer) Set(_ context.Context, item *pb.Item) (*pb.SetResponse, error) { 61 old := s.setItem(item) 62 return &pb.SetResponse{PrevValue: old}, nil 63} 64 65func (s *intStoreServer) setItem(item *pb.Item) int32 { 66 if s.items == nil { 67 s.items = map[string]int32{} 68 } 69 old := s.items[item.Name] 70 s.items[item.Name] = item.Value 71 return old 72} 73 74func (s *intStoreServer) Get(_ context.Context, req *pb.GetRequest) (*pb.Item, error) { 75 val, ok := s.items[req.Name] 76 if !ok { 77 return nil, status.Errorf(codes.NotFound, "%q", req.Name) 78 } 79 return &pb.Item{Name: req.Name, Value: val}, nil 80} 81 82func (s *intStoreServer) ListItems(req *pb.ListItemsRequest, ss pb.IntStore_ListItemsServer) error { 83 for name, val := range s.items { 84 if val > req.GreaterThan { 85 if err := ss.Send(&pb.Item{Name: name, Value: val}); err != nil { 86 return err 87 } 88 } 89 } 90 return nil 91} 92 93func (s *intStoreServer) SetStream(ss pb.IntStore_SetStreamServer) error { 94 n := 0 95 for { 96 item, err := ss.Recv() 97 if err == io.EOF { 98 break 99 } 100 if err != nil { 101 return err 102 } 103 s.setItem(item) 104 n++ 105 } 106 return ss.SendAndClose(&pb.Summary{Count: int32(n)}) 107} 108 109func (s *intStoreServer) StreamChat(ss pb.IntStore_StreamChatServer) error { 110 for { 111 item, err := ss.Recv() 112 if err == io.EOF { 113 break 114 } 115 if err != nil { 116 return err 117 } 118 if err := ss.Send(item); err != nil { 119 return err 120 } 121 } 122 return nil 123} 124