1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "bytes" 23 "io" 24 "time" 25 26 "golang.org/x/net/context" 27 "golang.org/x/net/trace" 28 "google.golang.org/grpc/codes" 29 "google.golang.org/grpc/peer" 30 "google.golang.org/grpc/stats" 31 "google.golang.org/grpc/status" 32 "google.golang.org/grpc/transport" 33) 34 35// recvResponse receives and parses an RPC response. 36// On error, it returns the error and indicates whether the call should be retried. 37// 38// TODO(zhaoq): Check whether the received message sequence is valid. 39// TODO ctx is used for stats collection and processing. It is the context passed from the application. 40func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) { 41 // Try to acquire header metadata from the server if there is any. 42 defer func() { 43 if err != nil { 44 if _, ok := err.(transport.ConnectionError); !ok { 45 t.CloseStream(stream, err) 46 } 47 } 48 }() 49 c.headerMD, err = stream.Header() 50 if err != nil { 51 return 52 } 53 p := &parser{r: stream} 54 var inPayload *stats.InPayload 55 if dopts.copts.StatsHandler != nil { 56 inPayload = &stats.InPayload{ 57 Client: true, 58 } 59 } 60 for { 61 if c.maxReceiveMessageSize == nil { 62 return Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)") 63 } 64 if err = recv(p, dopts.codec, stream, dopts.dc, reply, *c.maxReceiveMessageSize, inPayload); err != nil { 65 if err == io.EOF { 66 break 67 } 68 return 69 } 70 } 71 if inPayload != nil && err == io.EOF && stream.Status().Code() == codes.OK { 72 // TODO in the current implementation, inTrailer may be handled before inPayload in some cases. 73 // Fix the order if necessary. 74 dopts.copts.StatsHandler.HandleRPC(ctx, inPayload) 75 } 76 c.trailerMD = stream.Trailer() 77 return nil 78} 79 80// sendRequest writes out various information of an RPC such as Context and Message. 81func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, c *callInfo, callHdr *transport.CallHdr, stream *transport.Stream, t transport.ClientTransport, args interface{}, opts *transport.Options) (err error) { 82 defer func() { 83 if err != nil { 84 // If err is connection error, t will be closed, no need to close stream here. 85 if _, ok := err.(transport.ConnectionError); !ok { 86 t.CloseStream(stream, err) 87 } 88 } 89 }() 90 var ( 91 cbuf *bytes.Buffer 92 outPayload *stats.OutPayload 93 ) 94 if compressor != nil { 95 cbuf = new(bytes.Buffer) 96 } 97 if dopts.copts.StatsHandler != nil { 98 outPayload = &stats.OutPayload{ 99 Client: true, 100 } 101 } 102 hdr, data, err := encode(dopts.codec, args, compressor, cbuf, outPayload) 103 if err != nil { 104 return err 105 } 106 if c.maxSendMessageSize == nil { 107 return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)") 108 } 109 if len(data) > *c.maxSendMessageSize { 110 return Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(data), *c.maxSendMessageSize) 111 } 112 err = t.Write(stream, hdr, data, opts) 113 if err == nil && outPayload != nil { 114 outPayload.SentTime = time.Now() 115 dopts.copts.StatsHandler.HandleRPC(ctx, outPayload) 116 } 117 // t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method 118 // does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following 119 // recvResponse to get the final status. 120 if err != nil && err != io.EOF { 121 return err 122 } 123 // Sent successfully. 124 return nil 125} 126 127// Invoke sends the RPC request on the wire and returns after response is received. 128// Invoke is called by generated code. Also users can call Invoke directly when it 129// is really needed in their use cases. 130func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) error { 131 if cc.dopts.unaryInt != nil { 132 return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...) 133 } 134 return invoke(ctx, method, args, reply, cc, opts...) 135} 136 137func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) { 138 c := defaultCallInfo 139 mc := cc.GetMethodConfig(method) 140 if mc.WaitForReady != nil { 141 c.failFast = !*mc.WaitForReady 142 } 143 144 if mc.Timeout != nil && *mc.Timeout >= 0 { 145 var cancel context.CancelFunc 146 ctx, cancel = context.WithTimeout(ctx, *mc.Timeout) 147 defer cancel() 148 } 149 150 opts = append(cc.dopts.callOptions, opts...) 151 for _, o := range opts { 152 if err := o.before(&c); err != nil { 153 return toRPCErr(err) 154 } 155 } 156 defer func() { 157 for _, o := range opts { 158 o.after(&c) 159 } 160 }() 161 162 c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize) 163 c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize) 164 165 if EnableTracing { 166 c.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method) 167 defer c.traceInfo.tr.Finish() 168 c.traceInfo.firstLine.client = true 169 if deadline, ok := ctx.Deadline(); ok { 170 c.traceInfo.firstLine.deadline = deadline.Sub(time.Now()) 171 } 172 c.traceInfo.tr.LazyLog(&c.traceInfo.firstLine, false) 173 // TODO(dsymonds): Arrange for c.traceInfo.firstLine.remoteAddr to be set. 174 defer func() { 175 if e != nil { 176 c.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{e}}, true) 177 c.traceInfo.tr.SetError() 178 } 179 }() 180 } 181 ctx = newContextWithRPCInfo(ctx) 182 sh := cc.dopts.copts.StatsHandler 183 if sh != nil { 184 ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast}) 185 begin := &stats.Begin{ 186 Client: true, 187 BeginTime: time.Now(), 188 FailFast: c.failFast, 189 } 190 sh.HandleRPC(ctx, begin) 191 defer func() { 192 end := &stats.End{ 193 Client: true, 194 EndTime: time.Now(), 195 Error: e, 196 } 197 sh.HandleRPC(ctx, end) 198 }() 199 } 200 topts := &transport.Options{ 201 Last: true, 202 Delay: false, 203 } 204 for { 205 var ( 206 err error 207 t transport.ClientTransport 208 stream *transport.Stream 209 // Record the put handler from Balancer.Get(...). It is called once the 210 // RPC has completed or failed. 211 put func() 212 ) 213 // TODO(zhaoq): Need a formal spec of fail-fast. 214 callHdr := &transport.CallHdr{ 215 Host: cc.authority, 216 Method: method, 217 } 218 if cc.dopts.cp != nil { 219 callHdr.SendCompress = cc.dopts.cp.Type() 220 } 221 if c.creds != nil { 222 callHdr.Creds = c.creds 223 } 224 225 gopts := BalancerGetOptions{ 226 BlockingWait: !c.failFast, 227 } 228 t, put, err = cc.getTransport(ctx, gopts) 229 if err != nil { 230 // TODO(zhaoq): Probably revisit the error handling. 231 if _, ok := status.FromError(err); ok { 232 return err 233 } 234 if err == errConnClosing || err == errConnUnavailable { 235 if c.failFast { 236 return Errorf(codes.Unavailable, "%v", err) 237 } 238 continue 239 } 240 // All the other errors are treated as Internal errors. 241 return Errorf(codes.Internal, "%v", err) 242 } 243 if c.traceInfo.tr != nil { 244 c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true) 245 } 246 stream, err = t.NewStream(ctx, callHdr) 247 if err != nil { 248 if put != nil { 249 if _, ok := err.(transport.ConnectionError); ok { 250 // If error is connection error, transport was sending data on wire, 251 // and we are not sure if anything has been sent on wire. 252 // If error is not connection error, we are sure nothing has been sent. 253 updateRPCInfoInContext(ctx, rpcInfo{bytesSent: true, bytesReceived: false}) 254 } 255 put() 256 } 257 if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast { 258 continue 259 } 260 return toRPCErr(err) 261 } 262 if peer, ok := peer.FromContext(stream.Context()); ok { 263 c.peer = peer 264 } 265 err = sendRequest(ctx, cc.dopts, cc.dopts.cp, &c, callHdr, stream, t, args, topts) 266 if err != nil { 267 if put != nil { 268 updateRPCInfoInContext(ctx, rpcInfo{ 269 bytesSent: stream.BytesSent(), 270 bytesReceived: stream.BytesReceived(), 271 }) 272 put() 273 } 274 // Retry a non-failfast RPC when 275 // i) there is a connection error; or 276 // ii) the server started to drain before this RPC was initiated. 277 if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast { 278 continue 279 } 280 return toRPCErr(err) 281 } 282 err = recvResponse(ctx, cc.dopts, t, &c, stream, reply) 283 if err != nil { 284 if put != nil { 285 updateRPCInfoInContext(ctx, rpcInfo{ 286 bytesSent: stream.BytesSent(), 287 bytesReceived: stream.BytesReceived(), 288 }) 289 put() 290 } 291 if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast { 292 continue 293 } 294 return toRPCErr(err) 295 } 296 if c.traceInfo.tr != nil { 297 c.traceInfo.tr.LazyLog(&payload{sent: false, msg: reply}, true) 298 } 299 t.CloseStream(stream, nil) 300 if put != nil { 301 updateRPCInfoInContext(ctx, rpcInfo{ 302 bytesSent: stream.BytesSent(), 303 bytesReceived: stream.BytesReceived(), 304 }) 305 put() 306 } 307 return stream.Status().Err() 308 } 309} 310