1/* 2 * 3 * Copyright 2018 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19// Package service provides an implementation for channelz service server. 20package service 21 22import ( 23 "context" 24 "net" 25 26 "github.com/golang/protobuf/ptypes" 27 wrpb "github.com/golang/protobuf/ptypes/wrappers" 28 "google.golang.org/grpc" 29 channelzgrpc "google.golang.org/grpc/channelz/grpc_channelz_v1" 30 channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1" 31 "google.golang.org/grpc/codes" 32 "google.golang.org/grpc/connectivity" 33 "google.golang.org/grpc/credentials" 34 "google.golang.org/grpc/grpclog" 35 "google.golang.org/grpc/internal/channelz" 36 "google.golang.org/grpc/status" 37) 38 39func init() { 40 channelz.TurnOn() 41} 42 43var logger = grpclog.Component("channelz") 44 45// RegisterChannelzServiceToServer registers the channelz service to the given server. 46func RegisterChannelzServiceToServer(s grpc.ServiceRegistrar) { 47 channelzgrpc.RegisterChannelzServer(s, newCZServer()) 48} 49 50func newCZServer() channelzgrpc.ChannelzServer { 51 return &serverImpl{} 52} 53 54type serverImpl struct { 55 channelzgrpc.UnimplementedChannelzServer 56} 57 58func connectivityStateToProto(s connectivity.State) *channelzpb.ChannelConnectivityState { 59 switch s { 60 case connectivity.Idle: 61 return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_IDLE} 62 case connectivity.Connecting: 63 return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_CONNECTING} 64 case connectivity.Ready: 65 return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_READY} 66 case connectivity.TransientFailure: 67 return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_TRANSIENT_FAILURE} 68 case connectivity.Shutdown: 69 return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_SHUTDOWN} 70 default: 71 return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_UNKNOWN} 72 } 73} 74 75func channelTraceToProto(ct *channelz.ChannelTrace) *channelzpb.ChannelTrace { 76 pbt := &channelzpb.ChannelTrace{} 77 pbt.NumEventsLogged = ct.EventNum 78 if ts, err := ptypes.TimestampProto(ct.CreationTime); err == nil { 79 pbt.CreationTimestamp = ts 80 } 81 var events []*channelzpb.ChannelTraceEvent 82 for _, e := range ct.Events { 83 cte := &channelzpb.ChannelTraceEvent{ 84 Description: e.Desc, 85 Severity: channelzpb.ChannelTraceEvent_Severity(e.Severity), 86 } 87 if ts, err := ptypes.TimestampProto(e.Timestamp); err == nil { 88 cte.Timestamp = ts 89 } 90 if e.RefID != 0 { 91 switch e.RefType { 92 case channelz.RefChannel: 93 cte.ChildRef = &channelzpb.ChannelTraceEvent_ChannelRef{ChannelRef: &channelzpb.ChannelRef{ChannelId: e.RefID, Name: e.RefName}} 94 case channelz.RefSubChannel: 95 cte.ChildRef = &channelzpb.ChannelTraceEvent_SubchannelRef{SubchannelRef: &channelzpb.SubchannelRef{SubchannelId: e.RefID, Name: e.RefName}} 96 } 97 } 98 events = append(events, cte) 99 } 100 pbt.Events = events 101 return pbt 102} 103 104func channelMetricToProto(cm *channelz.ChannelMetric) *channelzpb.Channel { 105 c := &channelzpb.Channel{} 106 c.Ref = &channelzpb.ChannelRef{ChannelId: cm.ID, Name: cm.RefName} 107 108 c.Data = &channelzpb.ChannelData{ 109 State: connectivityStateToProto(cm.ChannelData.State), 110 Target: cm.ChannelData.Target, 111 CallsStarted: cm.ChannelData.CallsStarted, 112 CallsSucceeded: cm.ChannelData.CallsSucceeded, 113 CallsFailed: cm.ChannelData.CallsFailed, 114 } 115 if ts, err := ptypes.TimestampProto(cm.ChannelData.LastCallStartedTimestamp); err == nil { 116 c.Data.LastCallStartedTimestamp = ts 117 } 118 nestedChans := make([]*channelzpb.ChannelRef, 0, len(cm.NestedChans)) 119 for id, ref := range cm.NestedChans { 120 nestedChans = append(nestedChans, &channelzpb.ChannelRef{ChannelId: id, Name: ref}) 121 } 122 c.ChannelRef = nestedChans 123 124 subChans := make([]*channelzpb.SubchannelRef, 0, len(cm.SubChans)) 125 for id, ref := range cm.SubChans { 126 subChans = append(subChans, &channelzpb.SubchannelRef{SubchannelId: id, Name: ref}) 127 } 128 c.SubchannelRef = subChans 129 130 sockets := make([]*channelzpb.SocketRef, 0, len(cm.Sockets)) 131 for id, ref := range cm.Sockets { 132 sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref}) 133 } 134 c.SocketRef = sockets 135 c.Data.Trace = channelTraceToProto(cm.Trace) 136 return c 137} 138 139func subChannelMetricToProto(cm *channelz.SubChannelMetric) *channelzpb.Subchannel { 140 sc := &channelzpb.Subchannel{} 141 sc.Ref = &channelzpb.SubchannelRef{SubchannelId: cm.ID, Name: cm.RefName} 142 143 sc.Data = &channelzpb.ChannelData{ 144 State: connectivityStateToProto(cm.ChannelData.State), 145 Target: cm.ChannelData.Target, 146 CallsStarted: cm.ChannelData.CallsStarted, 147 CallsSucceeded: cm.ChannelData.CallsSucceeded, 148 CallsFailed: cm.ChannelData.CallsFailed, 149 } 150 if ts, err := ptypes.TimestampProto(cm.ChannelData.LastCallStartedTimestamp); err == nil { 151 sc.Data.LastCallStartedTimestamp = ts 152 } 153 nestedChans := make([]*channelzpb.ChannelRef, 0, len(cm.NestedChans)) 154 for id, ref := range cm.NestedChans { 155 nestedChans = append(nestedChans, &channelzpb.ChannelRef{ChannelId: id, Name: ref}) 156 } 157 sc.ChannelRef = nestedChans 158 159 subChans := make([]*channelzpb.SubchannelRef, 0, len(cm.SubChans)) 160 for id, ref := range cm.SubChans { 161 subChans = append(subChans, &channelzpb.SubchannelRef{SubchannelId: id, Name: ref}) 162 } 163 sc.SubchannelRef = subChans 164 165 sockets := make([]*channelzpb.SocketRef, 0, len(cm.Sockets)) 166 for id, ref := range cm.Sockets { 167 sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref}) 168 } 169 sc.SocketRef = sockets 170 sc.Data.Trace = channelTraceToProto(cm.Trace) 171 return sc 172} 173 174func securityToProto(se credentials.ChannelzSecurityValue) *channelzpb.Security { 175 switch v := se.(type) { 176 case *credentials.TLSChannelzSecurityValue: 177 return &channelzpb.Security{Model: &channelzpb.Security_Tls_{Tls: &channelzpb.Security_Tls{ 178 CipherSuite: &channelzpb.Security_Tls_StandardName{StandardName: v.StandardName}, 179 LocalCertificate: v.LocalCertificate, 180 RemoteCertificate: v.RemoteCertificate, 181 }}} 182 case *credentials.OtherChannelzSecurityValue: 183 otherSecurity := &channelzpb.Security_OtherSecurity{ 184 Name: v.Name, 185 } 186 if anyval, err := ptypes.MarshalAny(v.Value); err == nil { 187 otherSecurity.Value = anyval 188 } 189 return &channelzpb.Security{Model: &channelzpb.Security_Other{Other: otherSecurity}} 190 } 191 return nil 192} 193 194func addrToProto(a net.Addr) *channelzpb.Address { 195 switch a.Network() { 196 case "udp": 197 // TODO: Address_OtherAddress{}. Need proto def for Value. 198 case "ip": 199 // Note zone info is discarded through the conversion. 200 return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.IPAddr).IP}}} 201 case "ip+net": 202 // Note mask info is discarded through the conversion. 203 return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.IPNet).IP}}} 204 case "tcp": 205 // Note zone info is discarded through the conversion. 206 return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.TCPAddr).IP, Port: int32(a.(*net.TCPAddr).Port)}}} 207 case "unix", "unixgram", "unixpacket": 208 return &channelzpb.Address{Address: &channelzpb.Address_UdsAddress_{UdsAddress: &channelzpb.Address_UdsAddress{Filename: a.String()}}} 209 default: 210 } 211 return &channelzpb.Address{} 212} 213 214func socketMetricToProto(sm *channelz.SocketMetric) *channelzpb.Socket { 215 s := &channelzpb.Socket{} 216 s.Ref = &channelzpb.SocketRef{SocketId: sm.ID, Name: sm.RefName} 217 218 s.Data = &channelzpb.SocketData{ 219 StreamsStarted: sm.SocketData.StreamsStarted, 220 StreamsSucceeded: sm.SocketData.StreamsSucceeded, 221 StreamsFailed: sm.SocketData.StreamsFailed, 222 MessagesSent: sm.SocketData.MessagesSent, 223 MessagesReceived: sm.SocketData.MessagesReceived, 224 KeepAlivesSent: sm.SocketData.KeepAlivesSent, 225 } 226 if ts, err := ptypes.TimestampProto(sm.SocketData.LastLocalStreamCreatedTimestamp); err == nil { 227 s.Data.LastLocalStreamCreatedTimestamp = ts 228 } 229 if ts, err := ptypes.TimestampProto(sm.SocketData.LastRemoteStreamCreatedTimestamp); err == nil { 230 s.Data.LastRemoteStreamCreatedTimestamp = ts 231 } 232 if ts, err := ptypes.TimestampProto(sm.SocketData.LastMessageSentTimestamp); err == nil { 233 s.Data.LastMessageSentTimestamp = ts 234 } 235 if ts, err := ptypes.TimestampProto(sm.SocketData.LastMessageReceivedTimestamp); err == nil { 236 s.Data.LastMessageReceivedTimestamp = ts 237 } 238 s.Data.LocalFlowControlWindow = &wrpb.Int64Value{Value: sm.SocketData.LocalFlowControlWindow} 239 s.Data.RemoteFlowControlWindow = &wrpb.Int64Value{Value: sm.SocketData.RemoteFlowControlWindow} 240 241 if sm.SocketData.SocketOptions != nil { 242 s.Data.Option = sockoptToProto(sm.SocketData.SocketOptions) 243 } 244 if sm.SocketData.Security != nil { 245 s.Security = securityToProto(sm.SocketData.Security) 246 } 247 248 if sm.SocketData.LocalAddr != nil { 249 s.Local = addrToProto(sm.SocketData.LocalAddr) 250 } 251 if sm.SocketData.RemoteAddr != nil { 252 s.Remote = addrToProto(sm.SocketData.RemoteAddr) 253 } 254 s.RemoteName = sm.SocketData.RemoteName 255 return s 256} 257 258func (s *serverImpl) GetTopChannels(ctx context.Context, req *channelzpb.GetTopChannelsRequest) (*channelzpb.GetTopChannelsResponse, error) { 259 metrics, end := channelz.GetTopChannels(req.GetStartChannelId(), req.GetMaxResults()) 260 resp := &channelzpb.GetTopChannelsResponse{} 261 for _, m := range metrics { 262 resp.Channel = append(resp.Channel, channelMetricToProto(m)) 263 } 264 resp.End = end 265 return resp, nil 266} 267 268func serverMetricToProto(sm *channelz.ServerMetric) *channelzpb.Server { 269 s := &channelzpb.Server{} 270 s.Ref = &channelzpb.ServerRef{ServerId: sm.ID, Name: sm.RefName} 271 272 s.Data = &channelzpb.ServerData{ 273 CallsStarted: sm.ServerData.CallsStarted, 274 CallsSucceeded: sm.ServerData.CallsSucceeded, 275 CallsFailed: sm.ServerData.CallsFailed, 276 } 277 278 if ts, err := ptypes.TimestampProto(sm.ServerData.LastCallStartedTimestamp); err == nil { 279 s.Data.LastCallStartedTimestamp = ts 280 } 281 sockets := make([]*channelzpb.SocketRef, 0, len(sm.ListenSockets)) 282 for id, ref := range sm.ListenSockets { 283 sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref}) 284 } 285 s.ListenSocket = sockets 286 return s 287} 288 289func (s *serverImpl) GetServers(ctx context.Context, req *channelzpb.GetServersRequest) (*channelzpb.GetServersResponse, error) { 290 metrics, end := channelz.GetServers(req.GetStartServerId(), req.GetMaxResults()) 291 resp := &channelzpb.GetServersResponse{} 292 for _, m := range metrics { 293 resp.Server = append(resp.Server, serverMetricToProto(m)) 294 } 295 resp.End = end 296 return resp, nil 297} 298 299func (s *serverImpl) GetServerSockets(ctx context.Context, req *channelzpb.GetServerSocketsRequest) (*channelzpb.GetServerSocketsResponse, error) { 300 metrics, end := channelz.GetServerSockets(req.GetServerId(), req.GetStartSocketId(), req.GetMaxResults()) 301 resp := &channelzpb.GetServerSocketsResponse{} 302 for _, m := range metrics { 303 resp.SocketRef = append(resp.SocketRef, &channelzpb.SocketRef{SocketId: m.ID, Name: m.RefName}) 304 } 305 resp.End = end 306 return resp, nil 307} 308 309func (s *serverImpl) GetChannel(ctx context.Context, req *channelzpb.GetChannelRequest) (*channelzpb.GetChannelResponse, error) { 310 var metric *channelz.ChannelMetric 311 if metric = channelz.GetChannel(req.GetChannelId()); metric == nil { 312 return nil, status.Errorf(codes.NotFound, "requested channel %d not found", req.GetChannelId()) 313 } 314 resp := &channelzpb.GetChannelResponse{Channel: channelMetricToProto(metric)} 315 return resp, nil 316} 317 318func (s *serverImpl) GetSubchannel(ctx context.Context, req *channelzpb.GetSubchannelRequest) (*channelzpb.GetSubchannelResponse, error) { 319 var metric *channelz.SubChannelMetric 320 if metric = channelz.GetSubChannel(req.GetSubchannelId()); metric == nil { 321 return nil, status.Errorf(codes.NotFound, "requested sub channel %d not found", req.GetSubchannelId()) 322 } 323 resp := &channelzpb.GetSubchannelResponse{Subchannel: subChannelMetricToProto(metric)} 324 return resp, nil 325} 326 327func (s *serverImpl) GetSocket(ctx context.Context, req *channelzpb.GetSocketRequest) (*channelzpb.GetSocketResponse, error) { 328 var metric *channelz.SocketMetric 329 if metric = channelz.GetSocket(req.GetSocketId()); metric == nil { 330 return nil, status.Errorf(codes.NotFound, "requested socket %d not found", req.GetSocketId()) 331 } 332 resp := &channelzpb.GetSocketResponse{Socket: socketMetricToProto(metric)} 333 return resp, nil 334} 335 336func (s *serverImpl) GetServer(ctx context.Context, req *channelzpb.GetServerRequest) (*channelzpb.GetServerResponse, error) { 337 var metric *channelz.ServerMetric 338 if metric = channelz.GetServer(req.GetServerId()); metric == nil { 339 return nil, status.Errorf(codes.NotFound, "requested server %d not found", req.GetServerId()) 340 } 341 resp := &channelzpb.GetServerResponse{Server: serverMetricToProto(metric)} 342 return resp, nil 343} 344