1/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package binarylog
20
21import (
22	"net"
23	"strings"
24	"sync/atomic"
25	"time"
26
27	"github.com/golang/protobuf/proto"
28	"github.com/golang/protobuf/ptypes"
29	pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
30	"google.golang.org/grpc/grpclog"
31	"google.golang.org/grpc/metadata"
32	"google.golang.org/grpc/status"
33)
34
35type callIDGenerator struct {
36	id uint64
37}
38
39func (g *callIDGenerator) next() uint64 {
40	id := atomic.AddUint64(&g.id, 1)
41	return id
42}
43
44// reset is for testing only, and doesn't need to be thread safe.
45func (g *callIDGenerator) reset() {
46	g.id = 0
47}
48
49var idGen callIDGenerator
50
51// MethodLogger is the sub-logger for each method.
52type MethodLogger struct {
53	headerMaxLen, messageMaxLen uint64
54
55	callID          uint64
56	idWithinCallGen *callIDGenerator
57
58	sink Sink // TODO(blog): make this plugable.
59}
60
61func newMethodLogger(h, m uint64) *MethodLogger {
62	return &MethodLogger{
63		headerMaxLen:  h,
64		messageMaxLen: m,
65
66		callID:          idGen.next(),
67		idWithinCallGen: &callIDGenerator{},
68
69		sink: defaultSink, // TODO(blog): make it plugable.
70	}
71}
72
73// Log creates a proto binary log entry, and logs it to the sink.
74func (ml *MethodLogger) Log(c LogEntryConfig) {
75	m := c.toProto()
76	timestamp, _ := ptypes.TimestampProto(time.Now())
77	m.Timestamp = timestamp
78	m.CallId = ml.callID
79	m.SequenceIdWithinCall = ml.idWithinCallGen.next()
80
81	switch pay := m.Payload.(type) {
82	case *pb.GrpcLogEntry_ClientHeader:
83		m.PayloadTruncated = ml.truncateMetadata(pay.ClientHeader.GetMetadata())
84	case *pb.GrpcLogEntry_ServerHeader:
85		m.PayloadTruncated = ml.truncateMetadata(pay.ServerHeader.GetMetadata())
86	case *pb.GrpcLogEntry_Message:
87		m.PayloadTruncated = ml.truncateMessage(pay.Message)
88	}
89
90	ml.sink.Write(m)
91}
92
93func (ml *MethodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) {
94	if ml.headerMaxLen == maxUInt {
95		return false
96	}
97	var (
98		bytesLimit = ml.headerMaxLen
99		index      int
100	)
101	// At the end of the loop, index will be the first entry where the total
102	// size is greater than the limit:
103	//
104	// len(entry[:index]) <= ml.hdr && len(entry[:index+1]) > ml.hdr.
105	for ; index < len(mdPb.Entry); index++ {
106		entry := mdPb.Entry[index]
107		if entry.Key == "grpc-trace-bin" {
108			// "grpc-trace-bin" is a special key. It's kept in the log entry,
109			// but not counted towards the size limit.
110			continue
111		}
112		currentEntryLen := uint64(len(entry.Value))
113		if currentEntryLen > bytesLimit {
114			break
115		}
116		bytesLimit -= currentEntryLen
117	}
118	truncated = index < len(mdPb.Entry)
119	mdPb.Entry = mdPb.Entry[:index]
120	return truncated
121}
122
123func (ml *MethodLogger) truncateMessage(msgPb *pb.Message) (truncated bool) {
124	if ml.messageMaxLen == maxUInt {
125		return false
126	}
127	if ml.messageMaxLen >= uint64(len(msgPb.Data)) {
128		return false
129	}
130	msgPb.Data = msgPb.Data[:ml.messageMaxLen]
131	return true
132}
133
134// LogEntryConfig represents the configuration for binary log entry.
135type LogEntryConfig interface {
136	toProto() *pb.GrpcLogEntry
137}
138
139// ClientHeader configs the binary log entry to be a ClientHeader entry.
140type ClientHeader struct {
141	OnClientSide bool
142	Header       metadata.MD
143	MethodName   string
144	Authority    string
145	Timeout      time.Duration
146	// PeerAddr is required only when it's on server side.
147	PeerAddr net.Addr
148}
149
150func (c *ClientHeader) toProto() *pb.GrpcLogEntry {
151	// This function doesn't need to set all the fields (e.g. seq ID). The Log
152	// function will set the fields when necessary.
153	clientHeader := &pb.ClientHeader{
154		Metadata:   mdToMetadataProto(c.Header),
155		MethodName: c.MethodName,
156		Authority:  c.Authority,
157	}
158	if c.Timeout > 0 {
159		clientHeader.Timeout = ptypes.DurationProto(c.Timeout)
160	}
161	ret := &pb.GrpcLogEntry{
162		Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER,
163		Payload: &pb.GrpcLogEntry_ClientHeader{
164			ClientHeader: clientHeader,
165		},
166	}
167	if c.OnClientSide {
168		ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
169	} else {
170		ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
171	}
172	if c.PeerAddr != nil {
173		ret.Peer = addrToProto(c.PeerAddr)
174	}
175	return ret
176}
177
178// ServerHeader configs the binary log entry to be a ServerHeader entry.
179type ServerHeader struct {
180	OnClientSide bool
181	Header       metadata.MD
182	// PeerAddr is required only when it's on client side.
183	PeerAddr net.Addr
184}
185
186func (c *ServerHeader) toProto() *pb.GrpcLogEntry {
187	ret := &pb.GrpcLogEntry{
188		Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER,
189		Payload: &pb.GrpcLogEntry_ServerHeader{
190			ServerHeader: &pb.ServerHeader{
191				Metadata: mdToMetadataProto(c.Header),
192			},
193		},
194	}
195	if c.OnClientSide {
196		ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
197	} else {
198		ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
199	}
200	if c.PeerAddr != nil {
201		ret.Peer = addrToProto(c.PeerAddr)
202	}
203	return ret
204}
205
206// ClientMessage configs the binary log entry to be a ClientMessage entry.
207type ClientMessage struct {
208	OnClientSide bool
209	// Message can be a proto.Message or []byte. Other messages formats are not
210	// supported.
211	Message interface{}
212}
213
214func (c *ClientMessage) toProto() *pb.GrpcLogEntry {
215	var (
216		data []byte
217		err  error
218	)
219	if m, ok := c.Message.(proto.Message); ok {
220		data, err = proto.Marshal(m)
221		if err != nil {
222			grpclog.Infof("binarylogging: failed to marshal proto message: %v", err)
223		}
224	} else if b, ok := c.Message.([]byte); ok {
225		data = b
226	} else {
227		grpclog.Infof("binarylogging: message to log is neither proto.message nor []byte")
228	}
229	ret := &pb.GrpcLogEntry{
230		Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE,
231		Payload: &pb.GrpcLogEntry_Message{
232			Message: &pb.Message{
233				Length: uint32(len(data)),
234				Data:   data,
235			},
236		},
237	}
238	if c.OnClientSide {
239		ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
240	} else {
241		ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
242	}
243	return ret
244}
245
246// ServerMessage configs the binary log entry to be a ServerMessage entry.
247type ServerMessage struct {
248	OnClientSide bool
249	// Message can be a proto.Message or []byte. Other messages formats are not
250	// supported.
251	Message interface{}
252}
253
254func (c *ServerMessage) toProto() *pb.GrpcLogEntry {
255	var (
256		data []byte
257		err  error
258	)
259	if m, ok := c.Message.(proto.Message); ok {
260		data, err = proto.Marshal(m)
261		if err != nil {
262			grpclog.Infof("binarylogging: failed to marshal proto message: %v", err)
263		}
264	} else if b, ok := c.Message.([]byte); ok {
265		data = b
266	} else {
267		grpclog.Infof("binarylogging: message to log is neither proto.message nor []byte")
268	}
269	ret := &pb.GrpcLogEntry{
270		Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE,
271		Payload: &pb.GrpcLogEntry_Message{
272			Message: &pb.Message{
273				Length: uint32(len(data)),
274				Data:   data,
275			},
276		},
277	}
278	if c.OnClientSide {
279		ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
280	} else {
281		ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
282	}
283	return ret
284}
285
286// ClientHalfClose configs the binary log entry to be a ClientHalfClose entry.
287type ClientHalfClose struct {
288	OnClientSide bool
289}
290
291func (c *ClientHalfClose) toProto() *pb.GrpcLogEntry {
292	ret := &pb.GrpcLogEntry{
293		Type:    pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE,
294		Payload: nil, // No payload here.
295	}
296	if c.OnClientSide {
297		ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
298	} else {
299		ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
300	}
301	return ret
302}
303
304// ServerTrailer configs the binary log entry to be a ServerTrailer entry.
305type ServerTrailer struct {
306	OnClientSide bool
307	Trailer      metadata.MD
308	// Err is the status error.
309	Err error
310	// PeerAddr is required only when it's on client side and the RPC is trailer
311	// only.
312	PeerAddr net.Addr
313}
314
315func (c *ServerTrailer) toProto() *pb.GrpcLogEntry {
316	st, ok := status.FromError(c.Err)
317	if !ok {
318		grpclog.Info("binarylogging: error in trailer is not a status error")
319	}
320	var (
321		detailsBytes []byte
322		err          error
323	)
324	stProto := st.Proto()
325	if stProto != nil && len(stProto.Details) != 0 {
326		detailsBytes, err = proto.Marshal(stProto)
327		if err != nil {
328			grpclog.Infof("binarylogging: failed to marshal status proto: %v", err)
329		}
330	}
331	ret := &pb.GrpcLogEntry{
332		Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER,
333		Payload: &pb.GrpcLogEntry_Trailer{
334			Trailer: &pb.Trailer{
335				Metadata:      mdToMetadataProto(c.Trailer),
336				StatusCode:    uint32(st.Code()),
337				StatusMessage: st.Message(),
338				StatusDetails: detailsBytes,
339			},
340		},
341	}
342	if c.OnClientSide {
343		ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
344	} else {
345		ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
346	}
347	if c.PeerAddr != nil {
348		ret.Peer = addrToProto(c.PeerAddr)
349	}
350	return ret
351}
352
353// Cancel configs the binary log entry to be a Cancel entry.
354type Cancel struct {
355	OnClientSide bool
356}
357
358func (c *Cancel) toProto() *pb.GrpcLogEntry {
359	ret := &pb.GrpcLogEntry{
360		Type:    pb.GrpcLogEntry_EVENT_TYPE_CANCEL,
361		Payload: nil,
362	}
363	if c.OnClientSide {
364		ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
365	} else {
366		ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
367	}
368	return ret
369}
370
371// metadataKeyOmit returns whether the metadata entry with this key should be
372// omitted.
373func metadataKeyOmit(key string) bool {
374	switch key {
375	case "lb-token", ":path", ":authority", "content-encoding", "content-type", "user-agent", "te":
376		return true
377	case "grpc-trace-bin": // grpc-trace-bin is special because it's visiable to users.
378		return false
379	}
380	return strings.HasPrefix(key, "grpc-")
381}
382
383func mdToMetadataProto(md metadata.MD) *pb.Metadata {
384	ret := &pb.Metadata{}
385	for k, vv := range md {
386		if metadataKeyOmit(k) {
387			continue
388		}
389		for _, v := range vv {
390			ret.Entry = append(ret.Entry,
391				&pb.MetadataEntry{
392					Key:   k,
393					Value: []byte(v),
394				},
395			)
396		}
397	}
398	return ret
399}
400
401func addrToProto(addr net.Addr) *pb.Address {
402	ret := &pb.Address{}
403	switch a := addr.(type) {
404	case *net.TCPAddr:
405		if a.IP.To4() != nil {
406			ret.Type = pb.Address_TYPE_IPV4
407		} else if a.IP.To16() != nil {
408			ret.Type = pb.Address_TYPE_IPV6
409		} else {
410			ret.Type = pb.Address_TYPE_UNKNOWN
411			// Do not set address and port fields.
412			break
413		}
414		ret.Address = a.IP.String()
415		ret.IpPort = uint32(a.Port)
416	case *net.UnixAddr:
417		ret.Type = pb.Address_TYPE_UNIX
418		ret.Address = a.String()
419	default:
420		ret.Type = pb.Address_TYPE_UNKNOWN
421	}
422	return ret
423}
424