1/* 2 * 3 * Copyright 2018 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package binarylog 20 21import ( 22 "net" 23 "strings" 24 "sync/atomic" 25 "time" 26 27 "github.com/golang/protobuf/proto" 28 "github.com/golang/protobuf/ptypes" 29 pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1" 30 "google.golang.org/grpc/grpclog" 31 "google.golang.org/grpc/metadata" 32 "google.golang.org/grpc/status" 33) 34 35type callIDGenerator struct { 36 id uint64 37} 38 39func (g *callIDGenerator) next() uint64 { 40 id := atomic.AddUint64(&g.id, 1) 41 return id 42} 43 44// reset is for testing only, and doesn't need to be thread safe. 45func (g *callIDGenerator) reset() { 46 g.id = 0 47} 48 49var idGen callIDGenerator 50 51// MethodLogger is the sub-logger for each method. 52type MethodLogger struct { 53 headerMaxLen, messageMaxLen uint64 54 55 callID uint64 56 idWithinCallGen *callIDGenerator 57 58 sink Sink // TODO(blog): make this plugable. 59} 60 61func newMethodLogger(h, m uint64) *MethodLogger { 62 return &MethodLogger{ 63 headerMaxLen: h, 64 messageMaxLen: m, 65 66 callID: idGen.next(), 67 idWithinCallGen: &callIDGenerator{}, 68 69 sink: defaultSink, // TODO(blog): make it plugable. 70 } 71} 72 73// Log creates a proto binary log entry, and logs it to the sink. 74func (ml *MethodLogger) Log(c LogEntryConfig) { 75 m := c.toProto() 76 timestamp, _ := ptypes.TimestampProto(time.Now()) 77 m.Timestamp = timestamp 78 m.CallId = ml.callID 79 m.SequenceIdWithinCall = ml.idWithinCallGen.next() 80 81 switch pay := m.Payload.(type) { 82 case *pb.GrpcLogEntry_ClientHeader: 83 m.PayloadTruncated = ml.truncateMetadata(pay.ClientHeader.GetMetadata()) 84 case *pb.GrpcLogEntry_ServerHeader: 85 m.PayloadTruncated = ml.truncateMetadata(pay.ServerHeader.GetMetadata()) 86 case *pb.GrpcLogEntry_Message: 87 m.PayloadTruncated = ml.truncateMessage(pay.Message) 88 } 89 90 ml.sink.Write(m) 91} 92 93func (ml *MethodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) { 94 if ml.headerMaxLen == maxUInt { 95 return false 96 } 97 var ( 98 bytesLimit = ml.headerMaxLen 99 index int 100 ) 101 // At the end of the loop, index will be the first entry where the total 102 // size is greater than the limit: 103 // 104 // len(entry[:index]) <= ml.hdr && len(entry[:index+1]) > ml.hdr. 105 for ; index < len(mdPb.Entry); index++ { 106 entry := mdPb.Entry[index] 107 if entry.Key == "grpc-trace-bin" { 108 // "grpc-trace-bin" is a special key. It's kept in the log entry, 109 // but not counted towards the size limit. 110 continue 111 } 112 currentEntryLen := uint64(len(entry.Value)) 113 if currentEntryLen > bytesLimit { 114 break 115 } 116 bytesLimit -= currentEntryLen 117 } 118 truncated = index < len(mdPb.Entry) 119 mdPb.Entry = mdPb.Entry[:index] 120 return truncated 121} 122 123func (ml *MethodLogger) truncateMessage(msgPb *pb.Message) (truncated bool) { 124 if ml.messageMaxLen == maxUInt { 125 return false 126 } 127 if ml.messageMaxLen >= uint64(len(msgPb.Data)) { 128 return false 129 } 130 msgPb.Data = msgPb.Data[:ml.messageMaxLen] 131 return true 132} 133 134// LogEntryConfig represents the configuration for binary log entry. 135type LogEntryConfig interface { 136 toProto() *pb.GrpcLogEntry 137} 138 139// ClientHeader configs the binary log entry to be a ClientHeader entry. 140type ClientHeader struct { 141 OnClientSide bool 142 Header metadata.MD 143 MethodName string 144 Authority string 145 Timeout time.Duration 146 // PeerAddr is required only when it's on server side. 147 PeerAddr net.Addr 148} 149 150func (c *ClientHeader) toProto() *pb.GrpcLogEntry { 151 // This function doesn't need to set all the fields (e.g. seq ID). The Log 152 // function will set the fields when necessary. 153 clientHeader := &pb.ClientHeader{ 154 Metadata: mdToMetadataProto(c.Header), 155 MethodName: c.MethodName, 156 Authority: c.Authority, 157 } 158 if c.Timeout > 0 { 159 clientHeader.Timeout = ptypes.DurationProto(c.Timeout) 160 } 161 ret := &pb.GrpcLogEntry{ 162 Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER, 163 Payload: &pb.GrpcLogEntry_ClientHeader{ 164 ClientHeader: clientHeader, 165 }, 166 } 167 if c.OnClientSide { 168 ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT 169 } else { 170 ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER 171 } 172 if c.PeerAddr != nil { 173 ret.Peer = addrToProto(c.PeerAddr) 174 } 175 return ret 176} 177 178// ServerHeader configs the binary log entry to be a ServerHeader entry. 179type ServerHeader struct { 180 OnClientSide bool 181 Header metadata.MD 182 // PeerAddr is required only when it's on client side. 183 PeerAddr net.Addr 184} 185 186func (c *ServerHeader) toProto() *pb.GrpcLogEntry { 187 ret := &pb.GrpcLogEntry{ 188 Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER, 189 Payload: &pb.GrpcLogEntry_ServerHeader{ 190 ServerHeader: &pb.ServerHeader{ 191 Metadata: mdToMetadataProto(c.Header), 192 }, 193 }, 194 } 195 if c.OnClientSide { 196 ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT 197 } else { 198 ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER 199 } 200 if c.PeerAddr != nil { 201 ret.Peer = addrToProto(c.PeerAddr) 202 } 203 return ret 204} 205 206// ClientMessage configs the binary log entry to be a ClientMessage entry. 207type ClientMessage struct { 208 OnClientSide bool 209 // Message can be a proto.Message or []byte. Other messages formats are not 210 // supported. 211 Message interface{} 212} 213 214func (c *ClientMessage) toProto() *pb.GrpcLogEntry { 215 var ( 216 data []byte 217 err error 218 ) 219 if m, ok := c.Message.(proto.Message); ok { 220 data, err = proto.Marshal(m) 221 if err != nil { 222 grpclog.Infof("binarylogging: failed to marshal proto message: %v", err) 223 } 224 } else if b, ok := c.Message.([]byte); ok { 225 data = b 226 } else { 227 grpclog.Infof("binarylogging: message to log is neither proto.message nor []byte") 228 } 229 ret := &pb.GrpcLogEntry{ 230 Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE, 231 Payload: &pb.GrpcLogEntry_Message{ 232 Message: &pb.Message{ 233 Length: uint32(len(data)), 234 Data: data, 235 }, 236 }, 237 } 238 if c.OnClientSide { 239 ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT 240 } else { 241 ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER 242 } 243 return ret 244} 245 246// ServerMessage configs the binary log entry to be a ServerMessage entry. 247type ServerMessage struct { 248 OnClientSide bool 249 // Message can be a proto.Message or []byte. Other messages formats are not 250 // supported. 251 Message interface{} 252} 253 254func (c *ServerMessage) toProto() *pb.GrpcLogEntry { 255 var ( 256 data []byte 257 err error 258 ) 259 if m, ok := c.Message.(proto.Message); ok { 260 data, err = proto.Marshal(m) 261 if err != nil { 262 grpclog.Infof("binarylogging: failed to marshal proto message: %v", err) 263 } 264 } else if b, ok := c.Message.([]byte); ok { 265 data = b 266 } else { 267 grpclog.Infof("binarylogging: message to log is neither proto.message nor []byte") 268 } 269 ret := &pb.GrpcLogEntry{ 270 Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE, 271 Payload: &pb.GrpcLogEntry_Message{ 272 Message: &pb.Message{ 273 Length: uint32(len(data)), 274 Data: data, 275 }, 276 }, 277 } 278 if c.OnClientSide { 279 ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT 280 } else { 281 ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER 282 } 283 return ret 284} 285 286// ClientHalfClose configs the binary log entry to be a ClientHalfClose entry. 287type ClientHalfClose struct { 288 OnClientSide bool 289} 290 291func (c *ClientHalfClose) toProto() *pb.GrpcLogEntry { 292 ret := &pb.GrpcLogEntry{ 293 Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE, 294 Payload: nil, // No payload here. 295 } 296 if c.OnClientSide { 297 ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT 298 } else { 299 ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER 300 } 301 return ret 302} 303 304// ServerTrailer configs the binary log entry to be a ServerTrailer entry. 305type ServerTrailer struct { 306 OnClientSide bool 307 Trailer metadata.MD 308 // Err is the status error. 309 Err error 310 // PeerAddr is required only when it's on client side and the RPC is trailer 311 // only. 312 PeerAddr net.Addr 313} 314 315func (c *ServerTrailer) toProto() *pb.GrpcLogEntry { 316 st, ok := status.FromError(c.Err) 317 if !ok { 318 grpclog.Info("binarylogging: error in trailer is not a status error") 319 } 320 var ( 321 detailsBytes []byte 322 err error 323 ) 324 stProto := st.Proto() 325 if stProto != nil && len(stProto.Details) != 0 { 326 detailsBytes, err = proto.Marshal(stProto) 327 if err != nil { 328 grpclog.Infof("binarylogging: failed to marshal status proto: %v", err) 329 } 330 } 331 ret := &pb.GrpcLogEntry{ 332 Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER, 333 Payload: &pb.GrpcLogEntry_Trailer{ 334 Trailer: &pb.Trailer{ 335 Metadata: mdToMetadataProto(c.Trailer), 336 StatusCode: uint32(st.Code()), 337 StatusMessage: st.Message(), 338 StatusDetails: detailsBytes, 339 }, 340 }, 341 } 342 if c.OnClientSide { 343 ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT 344 } else { 345 ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER 346 } 347 if c.PeerAddr != nil { 348 ret.Peer = addrToProto(c.PeerAddr) 349 } 350 return ret 351} 352 353// Cancel configs the binary log entry to be a Cancel entry. 354type Cancel struct { 355 OnClientSide bool 356} 357 358func (c *Cancel) toProto() *pb.GrpcLogEntry { 359 ret := &pb.GrpcLogEntry{ 360 Type: pb.GrpcLogEntry_EVENT_TYPE_CANCEL, 361 Payload: nil, 362 } 363 if c.OnClientSide { 364 ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT 365 } else { 366 ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER 367 } 368 return ret 369} 370 371// metadataKeyOmit returns whether the metadata entry with this key should be 372// omitted. 373func metadataKeyOmit(key string) bool { 374 switch key { 375 case "lb-token", ":path", ":authority", "content-encoding", "content-type", "user-agent", "te": 376 return true 377 case "grpc-trace-bin": // grpc-trace-bin is special because it's visiable to users. 378 return false 379 } 380 return strings.HasPrefix(key, "grpc-") 381} 382 383func mdToMetadataProto(md metadata.MD) *pb.Metadata { 384 ret := &pb.Metadata{} 385 for k, vv := range md { 386 if metadataKeyOmit(k) { 387 continue 388 } 389 for _, v := range vv { 390 ret.Entry = append(ret.Entry, 391 &pb.MetadataEntry{ 392 Key: k, 393 Value: []byte(v), 394 }, 395 ) 396 } 397 } 398 return ret 399} 400 401func addrToProto(addr net.Addr) *pb.Address { 402 ret := &pb.Address{} 403 switch a := addr.(type) { 404 case *net.TCPAddr: 405 if a.IP.To4() != nil { 406 ret.Type = pb.Address_TYPE_IPV4 407 } else if a.IP.To16() != nil { 408 ret.Type = pb.Address_TYPE_IPV6 409 } else { 410 ret.Type = pb.Address_TYPE_UNKNOWN 411 // Do not set address and port fields. 412 break 413 } 414 ret.Address = a.IP.String() 415 ret.IpPort = uint32(a.Port) 416 case *net.UnixAddr: 417 ret.Type = pb.Address_TYPE_UNIX 418 ret.Address = a.String() 419 default: 420 ret.Type = pb.Address_TYPE_UNKNOWN 421 } 422 return ret 423} 424