1/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package service provides an implementation for channelz service server.
20package service
21
22import (
23	"context"
24	"net"
25
26	"github.com/golang/protobuf/ptypes"
27	wrpb "github.com/golang/protobuf/ptypes/wrappers"
28	"google.golang.org/grpc"
29	channelzgrpc "google.golang.org/grpc/channelz/grpc_channelz_v1"
30	channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1"
31	"google.golang.org/grpc/codes"
32	"google.golang.org/grpc/connectivity"
33	"google.golang.org/grpc/credentials"
34	"google.golang.org/grpc/grpclog"
35	"google.golang.org/grpc/internal/channelz"
36	"google.golang.org/grpc/status"
37)
38
39func init() {
40	channelz.TurnOn()
41}
42
43var logger = grpclog.Component("channelz")
44
45// RegisterChannelzServiceToServer registers the channelz service to the given server.
46func RegisterChannelzServiceToServer(s grpc.ServiceRegistrar) {
47	channelzgrpc.RegisterChannelzServer(s, newCZServer())
48}
49
50func newCZServer() channelzgrpc.ChannelzServer {
51	return &serverImpl{}
52}
53
54type serverImpl struct {
55	channelzgrpc.UnimplementedChannelzServer
56}
57
58func connectivityStateToProto(s connectivity.State) *channelzpb.ChannelConnectivityState {
59	switch s {
60	case connectivity.Idle:
61		return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_IDLE}
62	case connectivity.Connecting:
63		return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_CONNECTING}
64	case connectivity.Ready:
65		return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_READY}
66	case connectivity.TransientFailure:
67		return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_TRANSIENT_FAILURE}
68	case connectivity.Shutdown:
69		return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_SHUTDOWN}
70	default:
71		return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_UNKNOWN}
72	}
73}
74
75func channelTraceToProto(ct *channelz.ChannelTrace) *channelzpb.ChannelTrace {
76	pbt := &channelzpb.ChannelTrace{}
77	pbt.NumEventsLogged = ct.EventNum
78	if ts, err := ptypes.TimestampProto(ct.CreationTime); err == nil {
79		pbt.CreationTimestamp = ts
80	}
81	var events []*channelzpb.ChannelTraceEvent
82	for _, e := range ct.Events {
83		cte := &channelzpb.ChannelTraceEvent{
84			Description: e.Desc,
85			Severity:    channelzpb.ChannelTraceEvent_Severity(e.Severity),
86		}
87		if ts, err := ptypes.TimestampProto(e.Timestamp); err == nil {
88			cte.Timestamp = ts
89		}
90		if e.RefID != 0 {
91			switch e.RefType {
92			case channelz.RefChannel:
93				cte.ChildRef = &channelzpb.ChannelTraceEvent_ChannelRef{ChannelRef: &channelzpb.ChannelRef{ChannelId: e.RefID, Name: e.RefName}}
94			case channelz.RefSubChannel:
95				cte.ChildRef = &channelzpb.ChannelTraceEvent_SubchannelRef{SubchannelRef: &channelzpb.SubchannelRef{SubchannelId: e.RefID, Name: e.RefName}}
96			}
97		}
98		events = append(events, cte)
99	}
100	pbt.Events = events
101	return pbt
102}
103
104func channelMetricToProto(cm *channelz.ChannelMetric) *channelzpb.Channel {
105	c := &channelzpb.Channel{}
106	c.Ref = &channelzpb.ChannelRef{ChannelId: cm.ID, Name: cm.RefName}
107
108	c.Data = &channelzpb.ChannelData{
109		State:          connectivityStateToProto(cm.ChannelData.State),
110		Target:         cm.ChannelData.Target,
111		CallsStarted:   cm.ChannelData.CallsStarted,
112		CallsSucceeded: cm.ChannelData.CallsSucceeded,
113		CallsFailed:    cm.ChannelData.CallsFailed,
114	}
115	if ts, err := ptypes.TimestampProto(cm.ChannelData.LastCallStartedTimestamp); err == nil {
116		c.Data.LastCallStartedTimestamp = ts
117	}
118	nestedChans := make([]*channelzpb.ChannelRef, 0, len(cm.NestedChans))
119	for id, ref := range cm.NestedChans {
120		nestedChans = append(nestedChans, &channelzpb.ChannelRef{ChannelId: id, Name: ref})
121	}
122	c.ChannelRef = nestedChans
123
124	subChans := make([]*channelzpb.SubchannelRef, 0, len(cm.SubChans))
125	for id, ref := range cm.SubChans {
126		subChans = append(subChans, &channelzpb.SubchannelRef{SubchannelId: id, Name: ref})
127	}
128	c.SubchannelRef = subChans
129
130	sockets := make([]*channelzpb.SocketRef, 0, len(cm.Sockets))
131	for id, ref := range cm.Sockets {
132		sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref})
133	}
134	c.SocketRef = sockets
135	c.Data.Trace = channelTraceToProto(cm.Trace)
136	return c
137}
138
139func subChannelMetricToProto(cm *channelz.SubChannelMetric) *channelzpb.Subchannel {
140	sc := &channelzpb.Subchannel{}
141	sc.Ref = &channelzpb.SubchannelRef{SubchannelId: cm.ID, Name: cm.RefName}
142
143	sc.Data = &channelzpb.ChannelData{
144		State:          connectivityStateToProto(cm.ChannelData.State),
145		Target:         cm.ChannelData.Target,
146		CallsStarted:   cm.ChannelData.CallsStarted,
147		CallsSucceeded: cm.ChannelData.CallsSucceeded,
148		CallsFailed:    cm.ChannelData.CallsFailed,
149	}
150	if ts, err := ptypes.TimestampProto(cm.ChannelData.LastCallStartedTimestamp); err == nil {
151		sc.Data.LastCallStartedTimestamp = ts
152	}
153	nestedChans := make([]*channelzpb.ChannelRef, 0, len(cm.NestedChans))
154	for id, ref := range cm.NestedChans {
155		nestedChans = append(nestedChans, &channelzpb.ChannelRef{ChannelId: id, Name: ref})
156	}
157	sc.ChannelRef = nestedChans
158
159	subChans := make([]*channelzpb.SubchannelRef, 0, len(cm.SubChans))
160	for id, ref := range cm.SubChans {
161		subChans = append(subChans, &channelzpb.SubchannelRef{SubchannelId: id, Name: ref})
162	}
163	sc.SubchannelRef = subChans
164
165	sockets := make([]*channelzpb.SocketRef, 0, len(cm.Sockets))
166	for id, ref := range cm.Sockets {
167		sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref})
168	}
169	sc.SocketRef = sockets
170	sc.Data.Trace = channelTraceToProto(cm.Trace)
171	return sc
172}
173
174func securityToProto(se credentials.ChannelzSecurityValue) *channelzpb.Security {
175	switch v := se.(type) {
176	case *credentials.TLSChannelzSecurityValue:
177		return &channelzpb.Security{Model: &channelzpb.Security_Tls_{Tls: &channelzpb.Security_Tls{
178			CipherSuite:       &channelzpb.Security_Tls_StandardName{StandardName: v.StandardName},
179			LocalCertificate:  v.LocalCertificate,
180			RemoteCertificate: v.RemoteCertificate,
181		}}}
182	case *credentials.OtherChannelzSecurityValue:
183		otherSecurity := &channelzpb.Security_OtherSecurity{
184			Name: v.Name,
185		}
186		if anyval, err := ptypes.MarshalAny(v.Value); err == nil {
187			otherSecurity.Value = anyval
188		}
189		return &channelzpb.Security{Model: &channelzpb.Security_Other{Other: otherSecurity}}
190	}
191	return nil
192}
193
194func addrToProto(a net.Addr) *channelzpb.Address {
195	switch a.Network() {
196	case "udp":
197		// TODO: Address_OtherAddress{}. Need proto def for Value.
198	case "ip":
199		// Note zone info is discarded through the conversion.
200		return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.IPAddr).IP}}}
201	case "ip+net":
202		// Note mask info is discarded through the conversion.
203		return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.IPNet).IP}}}
204	case "tcp":
205		// Note zone info is discarded through the conversion.
206		return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.TCPAddr).IP, Port: int32(a.(*net.TCPAddr).Port)}}}
207	case "unix", "unixgram", "unixpacket":
208		return &channelzpb.Address{Address: &channelzpb.Address_UdsAddress_{UdsAddress: &channelzpb.Address_UdsAddress{Filename: a.String()}}}
209	default:
210	}
211	return &channelzpb.Address{}
212}
213
214func socketMetricToProto(sm *channelz.SocketMetric) *channelzpb.Socket {
215	s := &channelzpb.Socket{}
216	s.Ref = &channelzpb.SocketRef{SocketId: sm.ID, Name: sm.RefName}
217
218	s.Data = &channelzpb.SocketData{
219		StreamsStarted:   sm.SocketData.StreamsStarted,
220		StreamsSucceeded: sm.SocketData.StreamsSucceeded,
221		StreamsFailed:    sm.SocketData.StreamsFailed,
222		MessagesSent:     sm.SocketData.MessagesSent,
223		MessagesReceived: sm.SocketData.MessagesReceived,
224		KeepAlivesSent:   sm.SocketData.KeepAlivesSent,
225	}
226	if ts, err := ptypes.TimestampProto(sm.SocketData.LastLocalStreamCreatedTimestamp); err == nil {
227		s.Data.LastLocalStreamCreatedTimestamp = ts
228	}
229	if ts, err := ptypes.TimestampProto(sm.SocketData.LastRemoteStreamCreatedTimestamp); err == nil {
230		s.Data.LastRemoteStreamCreatedTimestamp = ts
231	}
232	if ts, err := ptypes.TimestampProto(sm.SocketData.LastMessageSentTimestamp); err == nil {
233		s.Data.LastMessageSentTimestamp = ts
234	}
235	if ts, err := ptypes.TimestampProto(sm.SocketData.LastMessageReceivedTimestamp); err == nil {
236		s.Data.LastMessageReceivedTimestamp = ts
237	}
238	s.Data.LocalFlowControlWindow = &wrpb.Int64Value{Value: sm.SocketData.LocalFlowControlWindow}
239	s.Data.RemoteFlowControlWindow = &wrpb.Int64Value{Value: sm.SocketData.RemoteFlowControlWindow}
240
241	if sm.SocketData.SocketOptions != nil {
242		s.Data.Option = sockoptToProto(sm.SocketData.SocketOptions)
243	}
244	if sm.SocketData.Security != nil {
245		s.Security = securityToProto(sm.SocketData.Security)
246	}
247
248	if sm.SocketData.LocalAddr != nil {
249		s.Local = addrToProto(sm.SocketData.LocalAddr)
250	}
251	if sm.SocketData.RemoteAddr != nil {
252		s.Remote = addrToProto(sm.SocketData.RemoteAddr)
253	}
254	s.RemoteName = sm.SocketData.RemoteName
255	return s
256}
257
258func (s *serverImpl) GetTopChannels(ctx context.Context, req *channelzpb.GetTopChannelsRequest) (*channelzpb.GetTopChannelsResponse, error) {
259	metrics, end := channelz.GetTopChannels(req.GetStartChannelId(), req.GetMaxResults())
260	resp := &channelzpb.GetTopChannelsResponse{}
261	for _, m := range metrics {
262		resp.Channel = append(resp.Channel, channelMetricToProto(m))
263	}
264	resp.End = end
265	return resp, nil
266}
267
268func serverMetricToProto(sm *channelz.ServerMetric) *channelzpb.Server {
269	s := &channelzpb.Server{}
270	s.Ref = &channelzpb.ServerRef{ServerId: sm.ID, Name: sm.RefName}
271
272	s.Data = &channelzpb.ServerData{
273		CallsStarted:   sm.ServerData.CallsStarted,
274		CallsSucceeded: sm.ServerData.CallsSucceeded,
275		CallsFailed:    sm.ServerData.CallsFailed,
276	}
277
278	if ts, err := ptypes.TimestampProto(sm.ServerData.LastCallStartedTimestamp); err == nil {
279		s.Data.LastCallStartedTimestamp = ts
280	}
281	sockets := make([]*channelzpb.SocketRef, 0, len(sm.ListenSockets))
282	for id, ref := range sm.ListenSockets {
283		sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref})
284	}
285	s.ListenSocket = sockets
286	return s
287}
288
289func (s *serverImpl) GetServers(ctx context.Context, req *channelzpb.GetServersRequest) (*channelzpb.GetServersResponse, error) {
290	metrics, end := channelz.GetServers(req.GetStartServerId(), req.GetMaxResults())
291	resp := &channelzpb.GetServersResponse{}
292	for _, m := range metrics {
293		resp.Server = append(resp.Server, serverMetricToProto(m))
294	}
295	resp.End = end
296	return resp, nil
297}
298
299func (s *serverImpl) GetServerSockets(ctx context.Context, req *channelzpb.GetServerSocketsRequest) (*channelzpb.GetServerSocketsResponse, error) {
300	metrics, end := channelz.GetServerSockets(req.GetServerId(), req.GetStartSocketId(), req.GetMaxResults())
301	resp := &channelzpb.GetServerSocketsResponse{}
302	for _, m := range metrics {
303		resp.SocketRef = append(resp.SocketRef, &channelzpb.SocketRef{SocketId: m.ID, Name: m.RefName})
304	}
305	resp.End = end
306	return resp, nil
307}
308
309func (s *serverImpl) GetChannel(ctx context.Context, req *channelzpb.GetChannelRequest) (*channelzpb.GetChannelResponse, error) {
310	var metric *channelz.ChannelMetric
311	if metric = channelz.GetChannel(req.GetChannelId()); metric == nil {
312		return nil, status.Errorf(codes.NotFound, "requested channel %d not found", req.GetChannelId())
313	}
314	resp := &channelzpb.GetChannelResponse{Channel: channelMetricToProto(metric)}
315	return resp, nil
316}
317
318func (s *serverImpl) GetSubchannel(ctx context.Context, req *channelzpb.GetSubchannelRequest) (*channelzpb.GetSubchannelResponse, error) {
319	var metric *channelz.SubChannelMetric
320	if metric = channelz.GetSubChannel(req.GetSubchannelId()); metric == nil {
321		return nil, status.Errorf(codes.NotFound, "requested sub channel %d not found", req.GetSubchannelId())
322	}
323	resp := &channelzpb.GetSubchannelResponse{Subchannel: subChannelMetricToProto(metric)}
324	return resp, nil
325}
326
327func (s *serverImpl) GetSocket(ctx context.Context, req *channelzpb.GetSocketRequest) (*channelzpb.GetSocketResponse, error) {
328	var metric *channelz.SocketMetric
329	if metric = channelz.GetSocket(req.GetSocketId()); metric == nil {
330		return nil, status.Errorf(codes.NotFound, "requested socket %d not found", req.GetSocketId())
331	}
332	resp := &channelzpb.GetSocketResponse{Socket: socketMetricToProto(metric)}
333	return resp, nil
334}
335
336func (s *serverImpl) GetServer(ctx context.Context, req *channelzpb.GetServerRequest) (*channelzpb.GetServerResponse, error) {
337	var metric *channelz.ServerMetric
338	if metric = channelz.GetServer(req.GetServerId()); metric == nil {
339		return nil, status.Errorf(codes.NotFound, "requested server %d not found", req.GetServerId())
340	}
341	resp := &channelzpb.GetServerResponse{Server: serverMetricToProto(metric)}
342	return resp, nil
343}
344