1/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package binarylog
20
21import (
22	"net"
23	"strings"
24	"sync/atomic"
25	"time"
26
27	"github.com/golang/protobuf/proto"
28	"github.com/golang/protobuf/ptypes"
29	pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
30	"google.golang.org/grpc/metadata"
31	"google.golang.org/grpc/status"
32)
33
34type callIDGenerator struct {
35	id uint64
36}
37
38func (g *callIDGenerator) next() uint64 {
39	id := atomic.AddUint64(&g.id, 1)
40	return id
41}
42
43// reset is for testing only, and doesn't need to be thread safe.
44func (g *callIDGenerator) reset() {
45	g.id = 0
46}
47
48var idGen callIDGenerator
49
50// MethodLogger is the sub-logger for each method.
51type MethodLogger struct {
52	headerMaxLen, messageMaxLen uint64
53
54	callID          uint64
55	idWithinCallGen *callIDGenerator
56
57	sink Sink // TODO(blog): make this plugable.
58}
59
60func newMethodLogger(h, m uint64) *MethodLogger {
61	return &MethodLogger{
62		headerMaxLen:  h,
63		messageMaxLen: m,
64
65		callID:          idGen.next(),
66		idWithinCallGen: &callIDGenerator{},
67
68		sink: DefaultSink, // TODO(blog): make it plugable.
69	}
70}
71
72// Log creates a proto binary log entry, and logs it to the sink.
73func (ml *MethodLogger) Log(c LogEntryConfig) {
74	m := c.toProto()
75	timestamp, _ := ptypes.TimestampProto(time.Now())
76	m.Timestamp = timestamp
77	m.CallId = ml.callID
78	m.SequenceIdWithinCall = ml.idWithinCallGen.next()
79
80	switch pay := m.Payload.(type) {
81	case *pb.GrpcLogEntry_ClientHeader:
82		m.PayloadTruncated = ml.truncateMetadata(pay.ClientHeader.GetMetadata())
83	case *pb.GrpcLogEntry_ServerHeader:
84		m.PayloadTruncated = ml.truncateMetadata(pay.ServerHeader.GetMetadata())
85	case *pb.GrpcLogEntry_Message:
86		m.PayloadTruncated = ml.truncateMessage(pay.Message)
87	}
88
89	ml.sink.Write(m)
90}
91
92func (ml *MethodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) {
93	if ml.headerMaxLen == maxUInt {
94		return false
95	}
96	var (
97		bytesLimit = ml.headerMaxLen
98		index      int
99	)
100	// At the end of the loop, index will be the first entry where the total
101	// size is greater than the limit:
102	//
103	// len(entry[:index]) <= ml.hdr && len(entry[:index+1]) > ml.hdr.
104	for ; index < len(mdPb.Entry); index++ {
105		entry := mdPb.Entry[index]
106		if entry.Key == "grpc-trace-bin" {
107			// "grpc-trace-bin" is a special key. It's kept in the log entry,
108			// but not counted towards the size limit.
109			continue
110		}
111		currentEntryLen := uint64(len(entry.Value))
112		if currentEntryLen > bytesLimit {
113			break
114		}
115		bytesLimit -= currentEntryLen
116	}
117	truncated = index < len(mdPb.Entry)
118	mdPb.Entry = mdPb.Entry[:index]
119	return truncated
120}
121
122func (ml *MethodLogger) truncateMessage(msgPb *pb.Message) (truncated bool) {
123	if ml.messageMaxLen == maxUInt {
124		return false
125	}
126	if ml.messageMaxLen >= uint64(len(msgPb.Data)) {
127		return false
128	}
129	msgPb.Data = msgPb.Data[:ml.messageMaxLen]
130	return true
131}
132
133// LogEntryConfig represents the configuration for binary log entry.
134type LogEntryConfig interface {
135	toProto() *pb.GrpcLogEntry
136}
137
138// ClientHeader configs the binary log entry to be a ClientHeader entry.
139type ClientHeader struct {
140	OnClientSide bool
141	Header       metadata.MD
142	MethodName   string
143	Authority    string
144	Timeout      time.Duration
145	// PeerAddr is required only when it's on server side.
146	PeerAddr net.Addr
147}
148
149func (c *ClientHeader) toProto() *pb.GrpcLogEntry {
150	// This function doesn't need to set all the fields (e.g. seq ID). The Log
151	// function will set the fields when necessary.
152	clientHeader := &pb.ClientHeader{
153		Metadata:   mdToMetadataProto(c.Header),
154		MethodName: c.MethodName,
155		Authority:  c.Authority,
156	}
157	if c.Timeout > 0 {
158		clientHeader.Timeout = ptypes.DurationProto(c.Timeout)
159	}
160	ret := &pb.GrpcLogEntry{
161		Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER,
162		Payload: &pb.GrpcLogEntry_ClientHeader{
163			ClientHeader: clientHeader,
164		},
165	}
166	if c.OnClientSide {
167		ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
168	} else {
169		ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
170	}
171	if c.PeerAddr != nil {
172		ret.Peer = addrToProto(c.PeerAddr)
173	}
174	return ret
175}
176
177// ServerHeader configs the binary log entry to be a ServerHeader entry.
178type ServerHeader struct {
179	OnClientSide bool
180	Header       metadata.MD
181	// PeerAddr is required only when it's on client side.
182	PeerAddr net.Addr
183}
184
185func (c *ServerHeader) toProto() *pb.GrpcLogEntry {
186	ret := &pb.GrpcLogEntry{
187		Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER,
188		Payload: &pb.GrpcLogEntry_ServerHeader{
189			ServerHeader: &pb.ServerHeader{
190				Metadata: mdToMetadataProto(c.Header),
191			},
192		},
193	}
194	if c.OnClientSide {
195		ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
196	} else {
197		ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
198	}
199	if c.PeerAddr != nil {
200		ret.Peer = addrToProto(c.PeerAddr)
201	}
202	return ret
203}
204
205// ClientMessage configs the binary log entry to be a ClientMessage entry.
206type ClientMessage struct {
207	OnClientSide bool
208	// Message can be a proto.Message or []byte. Other messages formats are not
209	// supported.
210	Message interface{}
211}
212
213func (c *ClientMessage) toProto() *pb.GrpcLogEntry {
214	var (
215		data []byte
216		err  error
217	)
218	if m, ok := c.Message.(proto.Message); ok {
219		data, err = proto.Marshal(m)
220		if err != nil {
221			grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err)
222		}
223	} else if b, ok := c.Message.([]byte); ok {
224		data = b
225	} else {
226		grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
227	}
228	ret := &pb.GrpcLogEntry{
229		Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE,
230		Payload: &pb.GrpcLogEntry_Message{
231			Message: &pb.Message{
232				Length: uint32(len(data)),
233				Data:   data,
234			},
235		},
236	}
237	if c.OnClientSide {
238		ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
239	} else {
240		ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
241	}
242	return ret
243}
244
245// ServerMessage configs the binary log entry to be a ServerMessage entry.
246type ServerMessage struct {
247	OnClientSide bool
248	// Message can be a proto.Message or []byte. Other messages formats are not
249	// supported.
250	Message interface{}
251}
252
253func (c *ServerMessage) toProto() *pb.GrpcLogEntry {
254	var (
255		data []byte
256		err  error
257	)
258	if m, ok := c.Message.(proto.Message); ok {
259		data, err = proto.Marshal(m)
260		if err != nil {
261			grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err)
262		}
263	} else if b, ok := c.Message.([]byte); ok {
264		data = b
265	} else {
266		grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
267	}
268	ret := &pb.GrpcLogEntry{
269		Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE,
270		Payload: &pb.GrpcLogEntry_Message{
271			Message: &pb.Message{
272				Length: uint32(len(data)),
273				Data:   data,
274			},
275		},
276	}
277	if c.OnClientSide {
278		ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
279	} else {
280		ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
281	}
282	return ret
283}
284
285// ClientHalfClose configs the binary log entry to be a ClientHalfClose entry.
286type ClientHalfClose struct {
287	OnClientSide bool
288}
289
290func (c *ClientHalfClose) toProto() *pb.GrpcLogEntry {
291	ret := &pb.GrpcLogEntry{
292		Type:    pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE,
293		Payload: nil, // No payload here.
294	}
295	if c.OnClientSide {
296		ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
297	} else {
298		ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
299	}
300	return ret
301}
302
303// ServerTrailer configs the binary log entry to be a ServerTrailer entry.
304type ServerTrailer struct {
305	OnClientSide bool
306	Trailer      metadata.MD
307	// Err is the status error.
308	Err error
309	// PeerAddr is required only when it's on client side and the RPC is trailer
310	// only.
311	PeerAddr net.Addr
312}
313
314func (c *ServerTrailer) toProto() *pb.GrpcLogEntry {
315	st, ok := status.FromError(c.Err)
316	if !ok {
317		grpclogLogger.Info("binarylogging: error in trailer is not a status error")
318	}
319	var (
320		detailsBytes []byte
321		err          error
322	)
323	stProto := st.Proto()
324	if stProto != nil && len(stProto.Details) != 0 {
325		detailsBytes, err = proto.Marshal(stProto)
326		if err != nil {
327			grpclogLogger.Infof("binarylogging: failed to marshal status proto: %v", err)
328		}
329	}
330	ret := &pb.GrpcLogEntry{
331		Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER,
332		Payload: &pb.GrpcLogEntry_Trailer{
333			Trailer: &pb.Trailer{
334				Metadata:      mdToMetadataProto(c.Trailer),
335				StatusCode:    uint32(st.Code()),
336				StatusMessage: st.Message(),
337				StatusDetails: detailsBytes,
338			},
339		},
340	}
341	if c.OnClientSide {
342		ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
343	} else {
344		ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
345	}
346	if c.PeerAddr != nil {
347		ret.Peer = addrToProto(c.PeerAddr)
348	}
349	return ret
350}
351
352// Cancel configs the binary log entry to be a Cancel entry.
353type Cancel struct {
354	OnClientSide bool
355}
356
357func (c *Cancel) toProto() *pb.GrpcLogEntry {
358	ret := &pb.GrpcLogEntry{
359		Type:    pb.GrpcLogEntry_EVENT_TYPE_CANCEL,
360		Payload: nil,
361	}
362	if c.OnClientSide {
363		ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
364	} else {
365		ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
366	}
367	return ret
368}
369
370// metadataKeyOmit returns whether the metadata entry with this key should be
371// omitted.
372func metadataKeyOmit(key string) bool {
373	switch key {
374	case "lb-token", ":path", ":authority", "content-encoding", "content-type", "user-agent", "te":
375		return true
376	case "grpc-trace-bin": // grpc-trace-bin is special because it's visiable to users.
377		return false
378	}
379	return strings.HasPrefix(key, "grpc-")
380}
381
382func mdToMetadataProto(md metadata.MD) *pb.Metadata {
383	ret := &pb.Metadata{}
384	for k, vv := range md {
385		if metadataKeyOmit(k) {
386			continue
387		}
388		for _, v := range vv {
389			ret.Entry = append(ret.Entry,
390				&pb.MetadataEntry{
391					Key:   k,
392					Value: []byte(v),
393				},
394			)
395		}
396	}
397	return ret
398}
399
400func addrToProto(addr net.Addr) *pb.Address {
401	ret := &pb.Address{}
402	switch a := addr.(type) {
403	case *net.TCPAddr:
404		if a.IP.To4() != nil {
405			ret.Type = pb.Address_TYPE_IPV4
406		} else if a.IP.To16() != nil {
407			ret.Type = pb.Address_TYPE_IPV6
408		} else {
409			ret.Type = pb.Address_TYPE_UNKNOWN
410			// Do not set address and port fields.
411			break
412		}
413		ret.Address = a.IP.String()
414		ret.IpPort = uint32(a.Port)
415	case *net.UnixAddr:
416		ret.Type = pb.Address_TYPE_UNIX
417		ret.Address = a.String()
418	default:
419		ret.Type = pb.Address_TYPE_UNKNOWN
420	}
421	return ret
422}
423