1/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19//go:generate ./regenerate.sh
20
21// Package service provides an implementation for channelz service server.
22package service
23
24import (
25	"context"
26	"net"
27	"time"
28
29	"github.com/golang/protobuf/ptypes"
30	durpb "github.com/golang/protobuf/ptypes/duration"
31	wrpb "github.com/golang/protobuf/ptypes/wrappers"
32	"google.golang.org/grpc"
33	channelzgrpc "google.golang.org/grpc/channelz/grpc_channelz_v1"
34	channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1"
35	"google.golang.org/grpc/codes"
36	"google.golang.org/grpc/connectivity"
37	"google.golang.org/grpc/credentials"
38	"google.golang.org/grpc/internal/channelz"
39	"google.golang.org/grpc/status"
40)
41
42func init() {
43	channelz.TurnOn()
44}
45
46func convertToPtypesDuration(sec int64, usec int64) *durpb.Duration {
47	return ptypes.DurationProto(time.Duration(sec*1e9 + usec*1e3))
48}
49
50// RegisterChannelzServiceToServer registers the channelz service to the given server.
51func RegisterChannelzServiceToServer(s *grpc.Server) {
52	channelzgrpc.RegisterChannelzServer(s, newCZServer())
53}
54
55func newCZServer() channelzgrpc.ChannelzServer {
56	return &serverImpl{}
57}
58
59type serverImpl struct{}
60
61func connectivityStateToProto(s connectivity.State) *channelzpb.ChannelConnectivityState {
62	switch s {
63	case connectivity.Idle:
64		return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_IDLE}
65	case connectivity.Connecting:
66		return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_CONNECTING}
67	case connectivity.Ready:
68		return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_READY}
69	case connectivity.TransientFailure:
70		return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_TRANSIENT_FAILURE}
71	case connectivity.Shutdown:
72		return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_SHUTDOWN}
73	default:
74		return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_UNKNOWN}
75	}
76}
77
78func channelTraceToProto(ct *channelz.ChannelTrace) *channelzpb.ChannelTrace {
79	pbt := &channelzpb.ChannelTrace{}
80	pbt.NumEventsLogged = ct.EventNum
81	if ts, err := ptypes.TimestampProto(ct.CreationTime); err == nil {
82		pbt.CreationTimestamp = ts
83	}
84	var events []*channelzpb.ChannelTraceEvent
85	for _, e := range ct.Events {
86		cte := &channelzpb.ChannelTraceEvent{
87			Description: e.Desc,
88			Severity:    channelzpb.ChannelTraceEvent_Severity(e.Severity),
89		}
90		if ts, err := ptypes.TimestampProto(e.Timestamp); err == nil {
91			cte.Timestamp = ts
92		}
93		if e.RefID != 0 {
94			switch e.RefType {
95			case channelz.RefChannel:
96				cte.ChildRef = &channelzpb.ChannelTraceEvent_ChannelRef{ChannelRef: &channelzpb.ChannelRef{ChannelId: e.RefID, Name: e.RefName}}
97			case channelz.RefSubChannel:
98				cte.ChildRef = &channelzpb.ChannelTraceEvent_SubchannelRef{SubchannelRef: &channelzpb.SubchannelRef{SubchannelId: e.RefID, Name: e.RefName}}
99			}
100		}
101		events = append(events, cte)
102	}
103	pbt.Events = events
104	return pbt
105}
106
107func channelMetricToProto(cm *channelz.ChannelMetric) *channelzpb.Channel {
108	c := &channelzpb.Channel{}
109	c.Ref = &channelzpb.ChannelRef{ChannelId: cm.ID, Name: cm.RefName}
110
111	c.Data = &channelzpb.ChannelData{
112		State:          connectivityStateToProto(cm.ChannelData.State),
113		Target:         cm.ChannelData.Target,
114		CallsStarted:   cm.ChannelData.CallsStarted,
115		CallsSucceeded: cm.ChannelData.CallsSucceeded,
116		CallsFailed:    cm.ChannelData.CallsFailed,
117	}
118	if ts, err := ptypes.TimestampProto(cm.ChannelData.LastCallStartedTimestamp); err == nil {
119		c.Data.LastCallStartedTimestamp = ts
120	}
121	nestedChans := make([]*channelzpb.ChannelRef, 0, len(cm.NestedChans))
122	for id, ref := range cm.NestedChans {
123		nestedChans = append(nestedChans, &channelzpb.ChannelRef{ChannelId: id, Name: ref})
124	}
125	c.ChannelRef = nestedChans
126
127	subChans := make([]*channelzpb.SubchannelRef, 0, len(cm.SubChans))
128	for id, ref := range cm.SubChans {
129		subChans = append(subChans, &channelzpb.SubchannelRef{SubchannelId: id, Name: ref})
130	}
131	c.SubchannelRef = subChans
132
133	sockets := make([]*channelzpb.SocketRef, 0, len(cm.Sockets))
134	for id, ref := range cm.Sockets {
135		sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref})
136	}
137	c.SocketRef = sockets
138	c.Data.Trace = channelTraceToProto(cm.Trace)
139	return c
140}
141
142func subChannelMetricToProto(cm *channelz.SubChannelMetric) *channelzpb.Subchannel {
143	sc := &channelzpb.Subchannel{}
144	sc.Ref = &channelzpb.SubchannelRef{SubchannelId: cm.ID, Name: cm.RefName}
145
146	sc.Data = &channelzpb.ChannelData{
147		State:          connectivityStateToProto(cm.ChannelData.State),
148		Target:         cm.ChannelData.Target,
149		CallsStarted:   cm.ChannelData.CallsStarted,
150		CallsSucceeded: cm.ChannelData.CallsSucceeded,
151		CallsFailed:    cm.ChannelData.CallsFailed,
152	}
153	if ts, err := ptypes.TimestampProto(cm.ChannelData.LastCallStartedTimestamp); err == nil {
154		sc.Data.LastCallStartedTimestamp = ts
155	}
156	nestedChans := make([]*channelzpb.ChannelRef, 0, len(cm.NestedChans))
157	for id, ref := range cm.NestedChans {
158		nestedChans = append(nestedChans, &channelzpb.ChannelRef{ChannelId: id, Name: ref})
159	}
160	sc.ChannelRef = nestedChans
161
162	subChans := make([]*channelzpb.SubchannelRef, 0, len(cm.SubChans))
163	for id, ref := range cm.SubChans {
164		subChans = append(subChans, &channelzpb.SubchannelRef{SubchannelId: id, Name: ref})
165	}
166	sc.SubchannelRef = subChans
167
168	sockets := make([]*channelzpb.SocketRef, 0, len(cm.Sockets))
169	for id, ref := range cm.Sockets {
170		sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref})
171	}
172	sc.SocketRef = sockets
173	sc.Data.Trace = channelTraceToProto(cm.Trace)
174	return sc
175}
176
177func securityToProto(se credentials.ChannelzSecurityValue) *channelzpb.Security {
178	switch v := se.(type) {
179	case *credentials.TLSChannelzSecurityValue:
180		return &channelzpb.Security{Model: &channelzpb.Security_Tls_{Tls: &channelzpb.Security_Tls{
181			CipherSuite:       &channelzpb.Security_Tls_StandardName{StandardName: v.StandardName},
182			LocalCertificate:  v.LocalCertificate,
183			RemoteCertificate: v.RemoteCertificate,
184		}}}
185	case *credentials.OtherChannelzSecurityValue:
186		otherSecurity := &channelzpb.Security_OtherSecurity{
187			Name: v.Name,
188		}
189		if anyval, err := ptypes.MarshalAny(v.Value); err == nil {
190			otherSecurity.Value = anyval
191		}
192		return &channelzpb.Security{Model: &channelzpb.Security_Other{Other: otherSecurity}}
193	}
194	return nil
195}
196
197func addrToProto(a net.Addr) *channelzpb.Address {
198	switch a.Network() {
199	case "udp":
200		// TODO: Address_OtherAddress{}. Need proto def for Value.
201	case "ip":
202		// Note zone info is discarded through the conversion.
203		return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.IPAddr).IP}}}
204	case "ip+net":
205		// Note mask info is discarded through the conversion.
206		return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.IPNet).IP}}}
207	case "tcp":
208		// Note zone info is discarded through the conversion.
209		return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.TCPAddr).IP, Port: int32(a.(*net.TCPAddr).Port)}}}
210	case "unix", "unixgram", "unixpacket":
211		return &channelzpb.Address{Address: &channelzpb.Address_UdsAddress_{UdsAddress: &channelzpb.Address_UdsAddress{Filename: a.String()}}}
212	default:
213	}
214	return &channelzpb.Address{}
215}
216
217func socketMetricToProto(sm *channelz.SocketMetric) *channelzpb.Socket {
218	s := &channelzpb.Socket{}
219	s.Ref = &channelzpb.SocketRef{SocketId: sm.ID, Name: sm.RefName}
220
221	s.Data = &channelzpb.SocketData{
222		StreamsStarted:   sm.SocketData.StreamsStarted,
223		StreamsSucceeded: sm.SocketData.StreamsSucceeded,
224		StreamsFailed:    sm.SocketData.StreamsFailed,
225		MessagesSent:     sm.SocketData.MessagesSent,
226		MessagesReceived: sm.SocketData.MessagesReceived,
227		KeepAlivesSent:   sm.SocketData.KeepAlivesSent,
228	}
229	if ts, err := ptypes.TimestampProto(sm.SocketData.LastLocalStreamCreatedTimestamp); err == nil {
230		s.Data.LastLocalStreamCreatedTimestamp = ts
231	}
232	if ts, err := ptypes.TimestampProto(sm.SocketData.LastRemoteStreamCreatedTimestamp); err == nil {
233		s.Data.LastRemoteStreamCreatedTimestamp = ts
234	}
235	if ts, err := ptypes.TimestampProto(sm.SocketData.LastMessageSentTimestamp); err == nil {
236		s.Data.LastMessageSentTimestamp = ts
237	}
238	if ts, err := ptypes.TimestampProto(sm.SocketData.LastMessageReceivedTimestamp); err == nil {
239		s.Data.LastMessageReceivedTimestamp = ts
240	}
241	s.Data.LocalFlowControlWindow = &wrpb.Int64Value{Value: sm.SocketData.LocalFlowControlWindow}
242	s.Data.RemoteFlowControlWindow = &wrpb.Int64Value{Value: sm.SocketData.RemoteFlowControlWindow}
243
244	if sm.SocketData.SocketOptions != nil {
245		s.Data.Option = sockoptToProto(sm.SocketData.SocketOptions)
246	}
247	if sm.SocketData.Security != nil {
248		s.Security = securityToProto(sm.SocketData.Security)
249	}
250
251	if sm.SocketData.LocalAddr != nil {
252		s.Local = addrToProto(sm.SocketData.LocalAddr)
253	}
254	if sm.SocketData.RemoteAddr != nil {
255		s.Remote = addrToProto(sm.SocketData.RemoteAddr)
256	}
257	s.RemoteName = sm.SocketData.RemoteName
258	return s
259}
260
261func (s *serverImpl) GetTopChannels(ctx context.Context, req *channelzpb.GetTopChannelsRequest) (*channelzpb.GetTopChannelsResponse, error) {
262	metrics, end := channelz.GetTopChannels(req.GetStartChannelId(), req.GetMaxResults())
263	resp := &channelzpb.GetTopChannelsResponse{}
264	for _, m := range metrics {
265		resp.Channel = append(resp.Channel, channelMetricToProto(m))
266	}
267	resp.End = end
268	return resp, nil
269}
270
271func serverMetricToProto(sm *channelz.ServerMetric) *channelzpb.Server {
272	s := &channelzpb.Server{}
273	s.Ref = &channelzpb.ServerRef{ServerId: sm.ID, Name: sm.RefName}
274
275	s.Data = &channelzpb.ServerData{
276		CallsStarted:   sm.ServerData.CallsStarted,
277		CallsSucceeded: sm.ServerData.CallsSucceeded,
278		CallsFailed:    sm.ServerData.CallsFailed,
279	}
280
281	if ts, err := ptypes.TimestampProto(sm.ServerData.LastCallStartedTimestamp); err == nil {
282		s.Data.LastCallStartedTimestamp = ts
283	}
284	sockets := make([]*channelzpb.SocketRef, 0, len(sm.ListenSockets))
285	for id, ref := range sm.ListenSockets {
286		sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref})
287	}
288	s.ListenSocket = sockets
289	return s
290}
291
292func (s *serverImpl) GetServers(ctx context.Context, req *channelzpb.GetServersRequest) (*channelzpb.GetServersResponse, error) {
293	metrics, end := channelz.GetServers(req.GetStartServerId(), req.GetMaxResults())
294	resp := &channelzpb.GetServersResponse{}
295	for _, m := range metrics {
296		resp.Server = append(resp.Server, serverMetricToProto(m))
297	}
298	resp.End = end
299	return resp, nil
300}
301
302func (s *serverImpl) GetServerSockets(ctx context.Context, req *channelzpb.GetServerSocketsRequest) (*channelzpb.GetServerSocketsResponse, error) {
303	metrics, end := channelz.GetServerSockets(req.GetServerId(), req.GetStartSocketId(), req.GetMaxResults())
304	resp := &channelzpb.GetServerSocketsResponse{}
305	for _, m := range metrics {
306		resp.SocketRef = append(resp.SocketRef, &channelzpb.SocketRef{SocketId: m.ID, Name: m.RefName})
307	}
308	resp.End = end
309	return resp, nil
310}
311
312func (s *serverImpl) GetChannel(ctx context.Context, req *channelzpb.GetChannelRequest) (*channelzpb.GetChannelResponse, error) {
313	var metric *channelz.ChannelMetric
314	if metric = channelz.GetChannel(req.GetChannelId()); metric == nil {
315		return nil, status.Errorf(codes.NotFound, "requested channel %d not found", req.GetChannelId())
316	}
317	resp := &channelzpb.GetChannelResponse{Channel: channelMetricToProto(metric)}
318	return resp, nil
319}
320
321func (s *serverImpl) GetSubchannel(ctx context.Context, req *channelzpb.GetSubchannelRequest) (*channelzpb.GetSubchannelResponse, error) {
322	var metric *channelz.SubChannelMetric
323	if metric = channelz.GetSubChannel(req.GetSubchannelId()); metric == nil {
324		return nil, status.Errorf(codes.NotFound, "requested sub channel %d not found", req.GetSubchannelId())
325	}
326	resp := &channelzpb.GetSubchannelResponse{Subchannel: subChannelMetricToProto(metric)}
327	return resp, nil
328}
329
330func (s *serverImpl) GetSocket(ctx context.Context, req *channelzpb.GetSocketRequest) (*channelzpb.GetSocketResponse, error) {
331	var metric *channelz.SocketMetric
332	if metric = channelz.GetSocket(req.GetSocketId()); metric == nil {
333		return nil, status.Errorf(codes.NotFound, "requested socket %d not found", req.GetSocketId())
334	}
335	resp := &channelzpb.GetSocketResponse{Socket: socketMetricToProto(metric)}
336	return resp, nil
337}
338
339func (s *serverImpl) GetServer(ctx context.Context, req *channelzpb.GetServerRequest) (*channelzpb.GetServerResponse, error) {
340	var metric *channelz.ServerMetric
341	if metric = channelz.GetServer(req.GetServerId()); metric == nil {
342		return nil, status.Errorf(codes.NotFound, "requested server %d not found", req.GetServerId())
343	}
344	resp := &channelzpb.GetServerResponse{Server: serverMetricToProto(metric)}
345	return resp, nil
346}
347