1/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package service provides an implementation for channelz service server.
20package service
21
22import (
23	"context"
24	"net"
25
26	"github.com/golang/protobuf/ptypes"
27	wrpb "github.com/golang/protobuf/ptypes/wrappers"
28	"google.golang.org/grpc"
29	channelzgrpc "google.golang.org/grpc/channelz/grpc_channelz_v1"
30	channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1"
31	"google.golang.org/grpc/codes"
32	"google.golang.org/grpc/connectivity"
33	"google.golang.org/grpc/credentials"
34	"google.golang.org/grpc/internal/channelz"
35	"google.golang.org/grpc/status"
36)
37
38func init() {
39	channelz.TurnOn()
40}
41
42// RegisterChannelzServiceToServer registers the channelz service to the given server.
43func RegisterChannelzServiceToServer(s *grpc.Server) {
44	channelzgrpc.RegisterChannelzServer(s, newCZServer())
45}
46
47func newCZServer() channelzgrpc.ChannelzServer {
48	return &serverImpl{}
49}
50
51type serverImpl struct {
52	channelzgrpc.UnimplementedChannelzServer
53}
54
55func connectivityStateToProto(s connectivity.State) *channelzpb.ChannelConnectivityState {
56	switch s {
57	case connectivity.Idle:
58		return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_IDLE}
59	case connectivity.Connecting:
60		return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_CONNECTING}
61	case connectivity.Ready:
62		return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_READY}
63	case connectivity.TransientFailure:
64		return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_TRANSIENT_FAILURE}
65	case connectivity.Shutdown:
66		return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_SHUTDOWN}
67	default:
68		return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_UNKNOWN}
69	}
70}
71
72func channelTraceToProto(ct *channelz.ChannelTrace) *channelzpb.ChannelTrace {
73	pbt := &channelzpb.ChannelTrace{}
74	pbt.NumEventsLogged = ct.EventNum
75	if ts, err := ptypes.TimestampProto(ct.CreationTime); err == nil {
76		pbt.CreationTimestamp = ts
77	}
78	var events []*channelzpb.ChannelTraceEvent
79	for _, e := range ct.Events {
80		cte := &channelzpb.ChannelTraceEvent{
81			Description: e.Desc,
82			Severity:    channelzpb.ChannelTraceEvent_Severity(e.Severity),
83		}
84		if ts, err := ptypes.TimestampProto(e.Timestamp); err == nil {
85			cte.Timestamp = ts
86		}
87		if e.RefID != 0 {
88			switch e.RefType {
89			case channelz.RefChannel:
90				cte.ChildRef = &channelzpb.ChannelTraceEvent_ChannelRef{ChannelRef: &channelzpb.ChannelRef{ChannelId: e.RefID, Name: e.RefName}}
91			case channelz.RefSubChannel:
92				cte.ChildRef = &channelzpb.ChannelTraceEvent_SubchannelRef{SubchannelRef: &channelzpb.SubchannelRef{SubchannelId: e.RefID, Name: e.RefName}}
93			}
94		}
95		events = append(events, cte)
96	}
97	pbt.Events = events
98	return pbt
99}
100
101func channelMetricToProto(cm *channelz.ChannelMetric) *channelzpb.Channel {
102	c := &channelzpb.Channel{}
103	c.Ref = &channelzpb.ChannelRef{ChannelId: cm.ID, Name: cm.RefName}
104
105	c.Data = &channelzpb.ChannelData{
106		State:          connectivityStateToProto(cm.ChannelData.State),
107		Target:         cm.ChannelData.Target,
108		CallsStarted:   cm.ChannelData.CallsStarted,
109		CallsSucceeded: cm.ChannelData.CallsSucceeded,
110		CallsFailed:    cm.ChannelData.CallsFailed,
111	}
112	if ts, err := ptypes.TimestampProto(cm.ChannelData.LastCallStartedTimestamp); err == nil {
113		c.Data.LastCallStartedTimestamp = ts
114	}
115	nestedChans := make([]*channelzpb.ChannelRef, 0, len(cm.NestedChans))
116	for id, ref := range cm.NestedChans {
117		nestedChans = append(nestedChans, &channelzpb.ChannelRef{ChannelId: id, Name: ref})
118	}
119	c.ChannelRef = nestedChans
120
121	subChans := make([]*channelzpb.SubchannelRef, 0, len(cm.SubChans))
122	for id, ref := range cm.SubChans {
123		subChans = append(subChans, &channelzpb.SubchannelRef{SubchannelId: id, Name: ref})
124	}
125	c.SubchannelRef = subChans
126
127	sockets := make([]*channelzpb.SocketRef, 0, len(cm.Sockets))
128	for id, ref := range cm.Sockets {
129		sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref})
130	}
131	c.SocketRef = sockets
132	c.Data.Trace = channelTraceToProto(cm.Trace)
133	return c
134}
135
136func subChannelMetricToProto(cm *channelz.SubChannelMetric) *channelzpb.Subchannel {
137	sc := &channelzpb.Subchannel{}
138	sc.Ref = &channelzpb.SubchannelRef{SubchannelId: cm.ID, Name: cm.RefName}
139
140	sc.Data = &channelzpb.ChannelData{
141		State:          connectivityStateToProto(cm.ChannelData.State),
142		Target:         cm.ChannelData.Target,
143		CallsStarted:   cm.ChannelData.CallsStarted,
144		CallsSucceeded: cm.ChannelData.CallsSucceeded,
145		CallsFailed:    cm.ChannelData.CallsFailed,
146	}
147	if ts, err := ptypes.TimestampProto(cm.ChannelData.LastCallStartedTimestamp); err == nil {
148		sc.Data.LastCallStartedTimestamp = ts
149	}
150	nestedChans := make([]*channelzpb.ChannelRef, 0, len(cm.NestedChans))
151	for id, ref := range cm.NestedChans {
152		nestedChans = append(nestedChans, &channelzpb.ChannelRef{ChannelId: id, Name: ref})
153	}
154	sc.ChannelRef = nestedChans
155
156	subChans := make([]*channelzpb.SubchannelRef, 0, len(cm.SubChans))
157	for id, ref := range cm.SubChans {
158		subChans = append(subChans, &channelzpb.SubchannelRef{SubchannelId: id, Name: ref})
159	}
160	sc.SubchannelRef = subChans
161
162	sockets := make([]*channelzpb.SocketRef, 0, len(cm.Sockets))
163	for id, ref := range cm.Sockets {
164		sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref})
165	}
166	sc.SocketRef = sockets
167	sc.Data.Trace = channelTraceToProto(cm.Trace)
168	return sc
169}
170
171func securityToProto(se credentials.ChannelzSecurityValue) *channelzpb.Security {
172	switch v := se.(type) {
173	case *credentials.TLSChannelzSecurityValue:
174		return &channelzpb.Security{Model: &channelzpb.Security_Tls_{Tls: &channelzpb.Security_Tls{
175			CipherSuite:       &channelzpb.Security_Tls_StandardName{StandardName: v.StandardName},
176			LocalCertificate:  v.LocalCertificate,
177			RemoteCertificate: v.RemoteCertificate,
178		}}}
179	case *credentials.OtherChannelzSecurityValue:
180		otherSecurity := &channelzpb.Security_OtherSecurity{
181			Name: v.Name,
182		}
183		if anyval, err := ptypes.MarshalAny(v.Value); err == nil {
184			otherSecurity.Value = anyval
185		}
186		return &channelzpb.Security{Model: &channelzpb.Security_Other{Other: otherSecurity}}
187	}
188	return nil
189}
190
191func addrToProto(a net.Addr) *channelzpb.Address {
192	switch a.Network() {
193	case "udp":
194		// TODO: Address_OtherAddress{}. Need proto def for Value.
195	case "ip":
196		// Note zone info is discarded through the conversion.
197		return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.IPAddr).IP}}}
198	case "ip+net":
199		// Note mask info is discarded through the conversion.
200		return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.IPNet).IP}}}
201	case "tcp":
202		// Note zone info is discarded through the conversion.
203		return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.TCPAddr).IP, Port: int32(a.(*net.TCPAddr).Port)}}}
204	case "unix", "unixgram", "unixpacket":
205		return &channelzpb.Address{Address: &channelzpb.Address_UdsAddress_{UdsAddress: &channelzpb.Address_UdsAddress{Filename: a.String()}}}
206	default:
207	}
208	return &channelzpb.Address{}
209}
210
211func socketMetricToProto(sm *channelz.SocketMetric) *channelzpb.Socket {
212	s := &channelzpb.Socket{}
213	s.Ref = &channelzpb.SocketRef{SocketId: sm.ID, Name: sm.RefName}
214
215	s.Data = &channelzpb.SocketData{
216		StreamsStarted:   sm.SocketData.StreamsStarted,
217		StreamsSucceeded: sm.SocketData.StreamsSucceeded,
218		StreamsFailed:    sm.SocketData.StreamsFailed,
219		MessagesSent:     sm.SocketData.MessagesSent,
220		MessagesReceived: sm.SocketData.MessagesReceived,
221		KeepAlivesSent:   sm.SocketData.KeepAlivesSent,
222	}
223	if ts, err := ptypes.TimestampProto(sm.SocketData.LastLocalStreamCreatedTimestamp); err == nil {
224		s.Data.LastLocalStreamCreatedTimestamp = ts
225	}
226	if ts, err := ptypes.TimestampProto(sm.SocketData.LastRemoteStreamCreatedTimestamp); err == nil {
227		s.Data.LastRemoteStreamCreatedTimestamp = ts
228	}
229	if ts, err := ptypes.TimestampProto(sm.SocketData.LastMessageSentTimestamp); err == nil {
230		s.Data.LastMessageSentTimestamp = ts
231	}
232	if ts, err := ptypes.TimestampProto(sm.SocketData.LastMessageReceivedTimestamp); err == nil {
233		s.Data.LastMessageReceivedTimestamp = ts
234	}
235	s.Data.LocalFlowControlWindow = &wrpb.Int64Value{Value: sm.SocketData.LocalFlowControlWindow}
236	s.Data.RemoteFlowControlWindow = &wrpb.Int64Value{Value: sm.SocketData.RemoteFlowControlWindow}
237
238	if sm.SocketData.SocketOptions != nil {
239		s.Data.Option = sockoptToProto(sm.SocketData.SocketOptions)
240	}
241	if sm.SocketData.Security != nil {
242		s.Security = securityToProto(sm.SocketData.Security)
243	}
244
245	if sm.SocketData.LocalAddr != nil {
246		s.Local = addrToProto(sm.SocketData.LocalAddr)
247	}
248	if sm.SocketData.RemoteAddr != nil {
249		s.Remote = addrToProto(sm.SocketData.RemoteAddr)
250	}
251	s.RemoteName = sm.SocketData.RemoteName
252	return s
253}
254
255func (s *serverImpl) GetTopChannels(ctx context.Context, req *channelzpb.GetTopChannelsRequest) (*channelzpb.GetTopChannelsResponse, error) {
256	metrics, end := channelz.GetTopChannels(req.GetStartChannelId(), req.GetMaxResults())
257	resp := &channelzpb.GetTopChannelsResponse{}
258	for _, m := range metrics {
259		resp.Channel = append(resp.Channel, channelMetricToProto(m))
260	}
261	resp.End = end
262	return resp, nil
263}
264
265func serverMetricToProto(sm *channelz.ServerMetric) *channelzpb.Server {
266	s := &channelzpb.Server{}
267	s.Ref = &channelzpb.ServerRef{ServerId: sm.ID, Name: sm.RefName}
268
269	s.Data = &channelzpb.ServerData{
270		CallsStarted:   sm.ServerData.CallsStarted,
271		CallsSucceeded: sm.ServerData.CallsSucceeded,
272		CallsFailed:    sm.ServerData.CallsFailed,
273	}
274
275	if ts, err := ptypes.TimestampProto(sm.ServerData.LastCallStartedTimestamp); err == nil {
276		s.Data.LastCallStartedTimestamp = ts
277	}
278	sockets := make([]*channelzpb.SocketRef, 0, len(sm.ListenSockets))
279	for id, ref := range sm.ListenSockets {
280		sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref})
281	}
282	s.ListenSocket = sockets
283	return s
284}
285
286func (s *serverImpl) GetServers(ctx context.Context, req *channelzpb.GetServersRequest) (*channelzpb.GetServersResponse, error) {
287	metrics, end := channelz.GetServers(req.GetStartServerId(), req.GetMaxResults())
288	resp := &channelzpb.GetServersResponse{}
289	for _, m := range metrics {
290		resp.Server = append(resp.Server, serverMetricToProto(m))
291	}
292	resp.End = end
293	return resp, nil
294}
295
296func (s *serverImpl) GetServerSockets(ctx context.Context, req *channelzpb.GetServerSocketsRequest) (*channelzpb.GetServerSocketsResponse, error) {
297	metrics, end := channelz.GetServerSockets(req.GetServerId(), req.GetStartSocketId(), req.GetMaxResults())
298	resp := &channelzpb.GetServerSocketsResponse{}
299	for _, m := range metrics {
300		resp.SocketRef = append(resp.SocketRef, &channelzpb.SocketRef{SocketId: m.ID, Name: m.RefName})
301	}
302	resp.End = end
303	return resp, nil
304}
305
306func (s *serverImpl) GetChannel(ctx context.Context, req *channelzpb.GetChannelRequest) (*channelzpb.GetChannelResponse, error) {
307	var metric *channelz.ChannelMetric
308	if metric = channelz.GetChannel(req.GetChannelId()); metric == nil {
309		return nil, status.Errorf(codes.NotFound, "requested channel %d not found", req.GetChannelId())
310	}
311	resp := &channelzpb.GetChannelResponse{Channel: channelMetricToProto(metric)}
312	return resp, nil
313}
314
315func (s *serverImpl) GetSubchannel(ctx context.Context, req *channelzpb.GetSubchannelRequest) (*channelzpb.GetSubchannelResponse, error) {
316	var metric *channelz.SubChannelMetric
317	if metric = channelz.GetSubChannel(req.GetSubchannelId()); metric == nil {
318		return nil, status.Errorf(codes.NotFound, "requested sub channel %d not found", req.GetSubchannelId())
319	}
320	resp := &channelzpb.GetSubchannelResponse{Subchannel: subChannelMetricToProto(metric)}
321	return resp, nil
322}
323
324func (s *serverImpl) GetSocket(ctx context.Context, req *channelzpb.GetSocketRequest) (*channelzpb.GetSocketResponse, error) {
325	var metric *channelz.SocketMetric
326	if metric = channelz.GetSocket(req.GetSocketId()); metric == nil {
327		return nil, status.Errorf(codes.NotFound, "requested socket %d not found", req.GetSocketId())
328	}
329	resp := &channelzpb.GetSocketResponse{Socket: socketMetricToProto(metric)}
330	return resp, nil
331}
332
333func (s *serverImpl) GetServer(ctx context.Context, req *channelzpb.GetServerRequest) (*channelzpb.GetServerResponse, error) {
334	var metric *channelz.ServerMetric
335	if metric = channelz.GetServer(req.GetServerId()); metric == nil {
336		return nil, status.Errorf(codes.NotFound, "requested server %d not found", req.GetServerId())
337	}
338	resp := &channelzpb.GetServerResponse{Server: serverMetricToProto(metric)}
339	return resp, nil
340}
341