1// Copyright 2017 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package adapter 16 17import ( 18 "context" 19 20 "google.golang.org/grpc" 21 "google.golang.org/grpc/codes" 22 "google.golang.org/grpc/metadata" 23 "google.golang.org/grpc/status" 24) 25 26// chanServerStream implements grpc.ServerStream with a chanStream 27type chanServerStream struct { 28 headerc chan<- metadata.MD 29 trailerc chan<- metadata.MD 30 grpc.Stream 31 32 headers []metadata.MD 33} 34 35func (ss *chanServerStream) SendHeader(md metadata.MD) error { 36 if ss.headerc == nil { 37 return errAlreadySentHeader 38 } 39 outmd := make(map[string][]string) 40 for _, h := range append(ss.headers, md) { 41 for k, v := range h { 42 outmd[k] = v 43 } 44 } 45 select { 46 case ss.headerc <- outmd: 47 ss.headerc = nil 48 ss.headers = nil 49 return nil 50 case <-ss.Context().Done(): 51 } 52 return ss.Context().Err() 53} 54 55func (ss *chanServerStream) SetHeader(md metadata.MD) error { 56 if ss.headerc == nil { 57 return errAlreadySentHeader 58 } 59 ss.headers = append(ss.headers, md) 60 return nil 61} 62 63func (ss *chanServerStream) SetTrailer(md metadata.MD) { 64 ss.trailerc <- md 65} 66 67// chanClientStream implements grpc.ClientStream with a chanStream 68type chanClientStream struct { 69 headerc <-chan metadata.MD 70 trailerc <-chan metadata.MD 71 *chanStream 72} 73 74func (cs *chanClientStream) Header() (metadata.MD, error) { 75 select { 76 case md := <-cs.headerc: 77 return md, nil 78 case <-cs.Context().Done(): 79 } 80 return nil, cs.Context().Err() 81} 82 83func (cs *chanClientStream) Trailer() metadata.MD { 84 select { 85 case md := <-cs.trailerc: 86 return md 87 case <-cs.Context().Done(): 88 return nil 89 } 90} 91 92func (cs *chanClientStream) CloseSend() error { 93 close(cs.chanStream.sendc) 94 return nil 95} 96 97// chanStream implements grpc.Stream using channels 98type chanStream struct { 99 recvc <-chan interface{} 100 sendc chan<- interface{} 101 ctx context.Context 102 cancel context.CancelFunc 103} 104 105func (s *chanStream) Context() context.Context { return s.ctx } 106 107func (s *chanStream) SendMsg(m interface{}) error { 108 select { 109 case s.sendc <- m: 110 if err, ok := m.(error); ok { 111 return err 112 } 113 return nil 114 case <-s.ctx.Done(): 115 } 116 return s.ctx.Err() 117} 118 119func (s *chanStream) RecvMsg(m interface{}) error { 120 v := m.(*interface{}) 121 for { 122 select { 123 case msg, ok := <-s.recvc: 124 if !ok { 125 return status.Error(codes.Canceled, "the client connection is closing") 126 } 127 if err, ok := msg.(error); ok { 128 return err 129 } 130 *v = msg 131 return nil 132 case <-s.ctx.Done(): 133 } 134 if len(s.recvc) == 0 { 135 // prioritize any pending recv messages over canceled context 136 break 137 } 138 } 139 return s.ctx.Err() 140} 141 142func newPipeStream(ctx context.Context, ssHandler func(chanServerStream) error) chanClientStream { 143 // ch1 is buffered so server can send error on close 144 ch1, ch2 := make(chan interface{}, 1), make(chan interface{}) 145 headerc, trailerc := make(chan metadata.MD, 1), make(chan metadata.MD, 1) 146 147 cctx, ccancel := context.WithCancel(ctx) 148 cli := &chanStream{recvc: ch1, sendc: ch2, ctx: cctx, cancel: ccancel} 149 cs := chanClientStream{headerc, trailerc, cli} 150 151 sctx, scancel := context.WithCancel(ctx) 152 srv := &chanStream{recvc: ch2, sendc: ch1, ctx: sctx, cancel: scancel} 153 ss := chanServerStream{headerc, trailerc, srv, nil} 154 155 go func() { 156 if err := ssHandler(ss); err != nil { 157 select { 158 case srv.sendc <- err: 159 case <-sctx.Done(): 160 case <-cctx.Done(): 161 } 162 } 163 scancel() 164 ccancel() 165 }() 166 return cs 167} 168