1/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19//go:generate ./regenerate.sh
20
21// Package service provides an implementation for channelz service server.
22package service
23
24import (
25	"net"
26	"time"
27
28	"github.com/golang/protobuf/ptypes"
29	durpb "github.com/golang/protobuf/ptypes/duration"
30	wrpb "github.com/golang/protobuf/ptypes/wrappers"
31	"golang.org/x/net/context"
32	"google.golang.org/grpc"
33	channelzgrpc "google.golang.org/grpc/channelz/grpc_channelz_v1"
34	channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1"
35	"google.golang.org/grpc/connectivity"
36	"google.golang.org/grpc/credentials"
37	"google.golang.org/grpc/internal/channelz"
38)
39
40func init() {
41	channelz.TurnOn()
42}
43
44func convertToPtypesDuration(sec int64, usec int64) *durpb.Duration {
45	return ptypes.DurationProto(time.Duration(sec*1e9 + usec*1e3))
46}
47
48// RegisterChannelzServiceToServer registers the channelz service to the given server.
49func RegisterChannelzServiceToServer(s *grpc.Server) {
50	channelzgrpc.RegisterChannelzServer(s, newCZServer())
51}
52
53func newCZServer() channelzgrpc.ChannelzServer {
54	return &serverImpl{}
55}
56
57type serverImpl struct{}
58
59func connectivityStateToProto(s connectivity.State) *channelzpb.ChannelConnectivityState {
60	switch s {
61	case connectivity.Idle:
62		return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_IDLE}
63	case connectivity.Connecting:
64		return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_CONNECTING}
65	case connectivity.Ready:
66		return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_READY}
67	case connectivity.TransientFailure:
68		return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_TRANSIENT_FAILURE}
69	case connectivity.Shutdown:
70		return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_SHUTDOWN}
71	default:
72		return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_UNKNOWN}
73	}
74}
75
76func channelTraceToProto(ct *channelz.ChannelTrace) *channelzpb.ChannelTrace {
77	pbt := &channelzpb.ChannelTrace{}
78	pbt.NumEventsLogged = ct.EventNum
79	if ts, err := ptypes.TimestampProto(ct.CreationTime); err == nil {
80		pbt.CreationTimestamp = ts
81	}
82	var events []*channelzpb.ChannelTraceEvent
83	for _, e := range ct.Events {
84		cte := &channelzpb.ChannelTraceEvent{
85			Description: e.Desc,
86			Severity:    channelzpb.ChannelTraceEvent_Severity(e.Severity),
87		}
88		if ts, err := ptypes.TimestampProto(e.Timestamp); err == nil {
89			cte.Timestamp = ts
90		}
91		if e.RefID != 0 {
92			switch e.RefType {
93			case channelz.RefChannel:
94				cte.ChildRef = &channelzpb.ChannelTraceEvent_ChannelRef{ChannelRef: &channelzpb.ChannelRef{ChannelId: e.RefID, Name: e.RefName}}
95			case channelz.RefSubChannel:
96				cte.ChildRef = &channelzpb.ChannelTraceEvent_SubchannelRef{SubchannelRef: &channelzpb.SubchannelRef{SubchannelId: e.RefID, Name: e.RefName}}
97			}
98		}
99		events = append(events, cte)
100	}
101	pbt.Events = events
102	return pbt
103}
104
105func channelMetricToProto(cm *channelz.ChannelMetric) *channelzpb.Channel {
106	c := &channelzpb.Channel{}
107	c.Ref = &channelzpb.ChannelRef{ChannelId: cm.ID, Name: cm.RefName}
108
109	c.Data = &channelzpb.ChannelData{
110		State:          connectivityStateToProto(cm.ChannelData.State),
111		Target:         cm.ChannelData.Target,
112		CallsStarted:   cm.ChannelData.CallsStarted,
113		CallsSucceeded: cm.ChannelData.CallsSucceeded,
114		CallsFailed:    cm.ChannelData.CallsFailed,
115	}
116	if ts, err := ptypes.TimestampProto(cm.ChannelData.LastCallStartedTimestamp); err == nil {
117		c.Data.LastCallStartedTimestamp = ts
118	}
119	nestedChans := make([]*channelzpb.ChannelRef, 0, len(cm.NestedChans))
120	for id, ref := range cm.NestedChans {
121		nestedChans = append(nestedChans, &channelzpb.ChannelRef{ChannelId: id, Name: ref})
122	}
123	c.ChannelRef = nestedChans
124
125	subChans := make([]*channelzpb.SubchannelRef, 0, len(cm.SubChans))
126	for id, ref := range cm.SubChans {
127		subChans = append(subChans, &channelzpb.SubchannelRef{SubchannelId: id, Name: ref})
128	}
129	c.SubchannelRef = subChans
130
131	sockets := make([]*channelzpb.SocketRef, 0, len(cm.Sockets))
132	for id, ref := range cm.Sockets {
133		sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref})
134	}
135	c.SocketRef = sockets
136	c.Data.Trace = channelTraceToProto(cm.Trace)
137	return c
138}
139
140func subChannelMetricToProto(cm *channelz.SubChannelMetric) *channelzpb.Subchannel {
141	sc := &channelzpb.Subchannel{}
142	sc.Ref = &channelzpb.SubchannelRef{SubchannelId: cm.ID, Name: cm.RefName}
143
144	sc.Data = &channelzpb.ChannelData{
145		State:          connectivityStateToProto(cm.ChannelData.State),
146		Target:         cm.ChannelData.Target,
147		CallsStarted:   cm.ChannelData.CallsStarted,
148		CallsSucceeded: cm.ChannelData.CallsSucceeded,
149		CallsFailed:    cm.ChannelData.CallsFailed,
150	}
151	if ts, err := ptypes.TimestampProto(cm.ChannelData.LastCallStartedTimestamp); err == nil {
152		sc.Data.LastCallStartedTimestamp = ts
153	}
154	nestedChans := make([]*channelzpb.ChannelRef, 0, len(cm.NestedChans))
155	for id, ref := range cm.NestedChans {
156		nestedChans = append(nestedChans, &channelzpb.ChannelRef{ChannelId: id, Name: ref})
157	}
158	sc.ChannelRef = nestedChans
159
160	subChans := make([]*channelzpb.SubchannelRef, 0, len(cm.SubChans))
161	for id, ref := range cm.SubChans {
162		subChans = append(subChans, &channelzpb.SubchannelRef{SubchannelId: id, Name: ref})
163	}
164	sc.SubchannelRef = subChans
165
166	sockets := make([]*channelzpb.SocketRef, 0, len(cm.Sockets))
167	for id, ref := range cm.Sockets {
168		sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref})
169	}
170	sc.SocketRef = sockets
171	sc.Data.Trace = channelTraceToProto(cm.Trace)
172	return sc
173}
174
175func securityToProto(se credentials.ChannelzSecurityValue) *channelzpb.Security {
176	switch v := se.(type) {
177	case *credentials.TLSChannelzSecurityValue:
178		return &channelzpb.Security{Model: &channelzpb.Security_Tls_{Tls: &channelzpb.Security_Tls{
179			CipherSuite:       &channelzpb.Security_Tls_StandardName{StandardName: v.StandardName},
180			LocalCertificate:  v.LocalCertificate,
181			RemoteCertificate: v.RemoteCertificate,
182		}}}
183	case *credentials.OtherChannelzSecurityValue:
184		otherSecurity := &channelzpb.Security_OtherSecurity{
185			Name: v.Name,
186		}
187		if anyval, err := ptypes.MarshalAny(v.Value); err == nil {
188			otherSecurity.Value = anyval
189		}
190		return &channelzpb.Security{Model: &channelzpb.Security_Other{Other: otherSecurity}}
191	}
192	return nil
193}
194
195func addrToProto(a net.Addr) *channelzpb.Address {
196	switch a.Network() {
197	case "udp":
198		// TODO: Address_OtherAddress{}. Need proto def for Value.
199	case "ip":
200		// Note zone info is discarded through the conversion.
201		return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.IPAddr).IP}}}
202	case "ip+net":
203		// Note mask info is discarded through the conversion.
204		return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.IPNet).IP}}}
205	case "tcp":
206		// Note zone info is discarded through the conversion.
207		return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.TCPAddr).IP, Port: int32(a.(*net.TCPAddr).Port)}}}
208	case "unix", "unixgram", "unixpacket":
209		return &channelzpb.Address{Address: &channelzpb.Address_UdsAddress_{UdsAddress: &channelzpb.Address_UdsAddress{Filename: a.String()}}}
210	default:
211	}
212	return &channelzpb.Address{}
213}
214
215func socketMetricToProto(sm *channelz.SocketMetric) *channelzpb.Socket {
216	s := &channelzpb.Socket{}
217	s.Ref = &channelzpb.SocketRef{SocketId: sm.ID, Name: sm.RefName}
218
219	s.Data = &channelzpb.SocketData{
220		StreamsStarted:   sm.SocketData.StreamsStarted,
221		StreamsSucceeded: sm.SocketData.StreamsSucceeded,
222		StreamsFailed:    sm.SocketData.StreamsFailed,
223		MessagesSent:     sm.SocketData.MessagesSent,
224		MessagesReceived: sm.SocketData.MessagesReceived,
225		KeepAlivesSent:   sm.SocketData.KeepAlivesSent,
226	}
227	if ts, err := ptypes.TimestampProto(sm.SocketData.LastLocalStreamCreatedTimestamp); err == nil {
228		s.Data.LastLocalStreamCreatedTimestamp = ts
229	}
230	if ts, err := ptypes.TimestampProto(sm.SocketData.LastRemoteStreamCreatedTimestamp); err == nil {
231		s.Data.LastRemoteStreamCreatedTimestamp = ts
232	}
233	if ts, err := ptypes.TimestampProto(sm.SocketData.LastMessageSentTimestamp); err == nil {
234		s.Data.LastMessageSentTimestamp = ts
235	}
236	if ts, err := ptypes.TimestampProto(sm.SocketData.LastMessageReceivedTimestamp); err == nil {
237		s.Data.LastMessageReceivedTimestamp = ts
238	}
239	s.Data.LocalFlowControlWindow = &wrpb.Int64Value{Value: sm.SocketData.LocalFlowControlWindow}
240	s.Data.RemoteFlowControlWindow = &wrpb.Int64Value{Value: sm.SocketData.RemoteFlowControlWindow}
241
242	if sm.SocketData.SocketOptions != nil {
243		s.Data.Option = sockoptToProto(sm.SocketData.SocketOptions)
244	}
245	if sm.SocketData.Security != nil {
246		s.Security = securityToProto(sm.SocketData.Security)
247	}
248
249	if sm.SocketData.LocalAddr != nil {
250		s.Local = addrToProto(sm.SocketData.LocalAddr)
251	}
252	if sm.SocketData.RemoteAddr != nil {
253		s.Remote = addrToProto(sm.SocketData.RemoteAddr)
254	}
255	s.RemoteName = sm.SocketData.RemoteName
256	return s
257}
258
259func (s *serverImpl) GetTopChannels(ctx context.Context, req *channelzpb.GetTopChannelsRequest) (*channelzpb.GetTopChannelsResponse, error) {
260	metrics, end := channelz.GetTopChannels(req.GetStartChannelId())
261	resp := &channelzpb.GetTopChannelsResponse{}
262	for _, m := range metrics {
263		resp.Channel = append(resp.Channel, channelMetricToProto(m))
264	}
265	resp.End = end
266	return resp, nil
267}
268
269func serverMetricToProto(sm *channelz.ServerMetric) *channelzpb.Server {
270	s := &channelzpb.Server{}
271	s.Ref = &channelzpb.ServerRef{ServerId: sm.ID, Name: sm.RefName}
272
273	s.Data = &channelzpb.ServerData{
274		CallsStarted:   sm.ServerData.CallsStarted,
275		CallsSucceeded: sm.ServerData.CallsSucceeded,
276		CallsFailed:    sm.ServerData.CallsFailed,
277	}
278
279	if ts, err := ptypes.TimestampProto(sm.ServerData.LastCallStartedTimestamp); err == nil {
280		s.Data.LastCallStartedTimestamp = ts
281	}
282	sockets := make([]*channelzpb.SocketRef, 0, len(sm.ListenSockets))
283	for id, ref := range sm.ListenSockets {
284		sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref})
285	}
286	s.ListenSocket = sockets
287	return s
288}
289
290func (s *serverImpl) GetServers(ctx context.Context, req *channelzpb.GetServersRequest) (*channelzpb.GetServersResponse, error) {
291	metrics, end := channelz.GetServers(req.GetStartServerId())
292	resp := &channelzpb.GetServersResponse{}
293	for _, m := range metrics {
294		resp.Server = append(resp.Server, serverMetricToProto(m))
295	}
296	resp.End = end
297	return resp, nil
298}
299
300func (s *serverImpl) GetServerSockets(ctx context.Context, req *channelzpb.GetServerSocketsRequest) (*channelzpb.GetServerSocketsResponse, error) {
301	metrics, end := channelz.GetServerSockets(req.GetServerId(), req.GetStartSocketId())
302	resp := &channelzpb.GetServerSocketsResponse{}
303	for _, m := range metrics {
304		resp.SocketRef = append(resp.SocketRef, &channelzpb.SocketRef{SocketId: m.ID, Name: m.RefName})
305	}
306	resp.End = end
307	return resp, nil
308}
309
310func (s *serverImpl) GetChannel(ctx context.Context, req *channelzpb.GetChannelRequest) (*channelzpb.GetChannelResponse, error) {
311	var metric *channelz.ChannelMetric
312	if metric = channelz.GetChannel(req.GetChannelId()); metric == nil {
313		return &channelzpb.GetChannelResponse{}, nil
314	}
315	resp := &channelzpb.GetChannelResponse{Channel: channelMetricToProto(metric)}
316	return resp, nil
317}
318
319func (s *serverImpl) GetSubchannel(ctx context.Context, req *channelzpb.GetSubchannelRequest) (*channelzpb.GetSubchannelResponse, error) {
320	var metric *channelz.SubChannelMetric
321	if metric = channelz.GetSubChannel(req.GetSubchannelId()); metric == nil {
322		return &channelzpb.GetSubchannelResponse{}, nil
323	}
324	resp := &channelzpb.GetSubchannelResponse{Subchannel: subChannelMetricToProto(metric)}
325	return resp, nil
326}
327
328func (s *serverImpl) GetSocket(ctx context.Context, req *channelzpb.GetSocketRequest) (*channelzpb.GetSocketResponse, error) {
329	var metric *channelz.SocketMetric
330	if metric = channelz.GetSocket(req.GetSocketId()); metric == nil {
331		return &channelzpb.GetSocketResponse{}, nil
332	}
333	resp := &channelzpb.GetSocketResponse{Socket: socketMetricToProto(metric)}
334	return resp, nil
335}
336