1/* 2 * 3 * Copyright 2018 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19//go:generate ./regenerate.sh 20 21// Package service provides an implementation for channelz service server. 22package service 23 24import ( 25 "context" 26 "net" 27 28 "github.com/golang/protobuf/ptypes" 29 wrpb "github.com/golang/protobuf/ptypes/wrappers" 30 "google.golang.org/grpc" 31 channelzgrpc "google.golang.org/grpc/channelz/grpc_channelz_v1" 32 channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1" 33 "google.golang.org/grpc/codes" 34 "google.golang.org/grpc/connectivity" 35 "google.golang.org/grpc/credentials" 36 "google.golang.org/grpc/internal/channelz" 37 "google.golang.org/grpc/status" 38) 39 40func init() { 41 channelz.TurnOn() 42} 43 44// RegisterChannelzServiceToServer registers the channelz service to the given server. 45func RegisterChannelzServiceToServer(s *grpc.Server) { 46 channelzgrpc.RegisterChannelzServer(s, newCZServer()) 47} 48 49func newCZServer() channelzgrpc.ChannelzServer { 50 return &serverImpl{} 51} 52 53type serverImpl struct{} 54 55func connectivityStateToProto(s connectivity.State) *channelzpb.ChannelConnectivityState { 56 switch s { 57 case connectivity.Idle: 58 return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_IDLE} 59 case connectivity.Connecting: 60 return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_CONNECTING} 61 case connectivity.Ready: 62 return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_READY} 63 case connectivity.TransientFailure: 64 return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_TRANSIENT_FAILURE} 65 case connectivity.Shutdown: 66 return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_SHUTDOWN} 67 default: 68 return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_UNKNOWN} 69 } 70} 71 72func channelTraceToProto(ct *channelz.ChannelTrace) *channelzpb.ChannelTrace { 73 pbt := &channelzpb.ChannelTrace{} 74 pbt.NumEventsLogged = ct.EventNum 75 if ts, err := ptypes.TimestampProto(ct.CreationTime); err == nil { 76 pbt.CreationTimestamp = ts 77 } 78 var events []*channelzpb.ChannelTraceEvent 79 for _, e := range ct.Events { 80 cte := &channelzpb.ChannelTraceEvent{ 81 Description: e.Desc, 82 Severity: channelzpb.ChannelTraceEvent_Severity(e.Severity), 83 } 84 if ts, err := ptypes.TimestampProto(e.Timestamp); err == nil { 85 cte.Timestamp = ts 86 } 87 if e.RefID != 0 { 88 switch e.RefType { 89 case channelz.RefChannel: 90 cte.ChildRef = &channelzpb.ChannelTraceEvent_ChannelRef{ChannelRef: &channelzpb.ChannelRef{ChannelId: e.RefID, Name: e.RefName}} 91 case channelz.RefSubChannel: 92 cte.ChildRef = &channelzpb.ChannelTraceEvent_SubchannelRef{SubchannelRef: &channelzpb.SubchannelRef{SubchannelId: e.RefID, Name: e.RefName}} 93 } 94 } 95 events = append(events, cte) 96 } 97 pbt.Events = events 98 return pbt 99} 100 101func channelMetricToProto(cm *channelz.ChannelMetric) *channelzpb.Channel { 102 c := &channelzpb.Channel{} 103 c.Ref = &channelzpb.ChannelRef{ChannelId: cm.ID, Name: cm.RefName} 104 105 c.Data = &channelzpb.ChannelData{ 106 State: connectivityStateToProto(cm.ChannelData.State), 107 Target: cm.ChannelData.Target, 108 CallsStarted: cm.ChannelData.CallsStarted, 109 CallsSucceeded: cm.ChannelData.CallsSucceeded, 110 CallsFailed: cm.ChannelData.CallsFailed, 111 } 112 if ts, err := ptypes.TimestampProto(cm.ChannelData.LastCallStartedTimestamp); err == nil { 113 c.Data.LastCallStartedTimestamp = ts 114 } 115 nestedChans := make([]*channelzpb.ChannelRef, 0, len(cm.NestedChans)) 116 for id, ref := range cm.NestedChans { 117 nestedChans = append(nestedChans, &channelzpb.ChannelRef{ChannelId: id, Name: ref}) 118 } 119 c.ChannelRef = nestedChans 120 121 subChans := make([]*channelzpb.SubchannelRef, 0, len(cm.SubChans)) 122 for id, ref := range cm.SubChans { 123 subChans = append(subChans, &channelzpb.SubchannelRef{SubchannelId: id, Name: ref}) 124 } 125 c.SubchannelRef = subChans 126 127 sockets := make([]*channelzpb.SocketRef, 0, len(cm.Sockets)) 128 for id, ref := range cm.Sockets { 129 sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref}) 130 } 131 c.SocketRef = sockets 132 c.Data.Trace = channelTraceToProto(cm.Trace) 133 return c 134} 135 136func subChannelMetricToProto(cm *channelz.SubChannelMetric) *channelzpb.Subchannel { 137 sc := &channelzpb.Subchannel{} 138 sc.Ref = &channelzpb.SubchannelRef{SubchannelId: cm.ID, Name: cm.RefName} 139 140 sc.Data = &channelzpb.ChannelData{ 141 State: connectivityStateToProto(cm.ChannelData.State), 142 Target: cm.ChannelData.Target, 143 CallsStarted: cm.ChannelData.CallsStarted, 144 CallsSucceeded: cm.ChannelData.CallsSucceeded, 145 CallsFailed: cm.ChannelData.CallsFailed, 146 } 147 if ts, err := ptypes.TimestampProto(cm.ChannelData.LastCallStartedTimestamp); err == nil { 148 sc.Data.LastCallStartedTimestamp = ts 149 } 150 nestedChans := make([]*channelzpb.ChannelRef, 0, len(cm.NestedChans)) 151 for id, ref := range cm.NestedChans { 152 nestedChans = append(nestedChans, &channelzpb.ChannelRef{ChannelId: id, Name: ref}) 153 } 154 sc.ChannelRef = nestedChans 155 156 subChans := make([]*channelzpb.SubchannelRef, 0, len(cm.SubChans)) 157 for id, ref := range cm.SubChans { 158 subChans = append(subChans, &channelzpb.SubchannelRef{SubchannelId: id, Name: ref}) 159 } 160 sc.SubchannelRef = subChans 161 162 sockets := make([]*channelzpb.SocketRef, 0, len(cm.Sockets)) 163 for id, ref := range cm.Sockets { 164 sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref}) 165 } 166 sc.SocketRef = sockets 167 sc.Data.Trace = channelTraceToProto(cm.Trace) 168 return sc 169} 170 171func securityToProto(se credentials.ChannelzSecurityValue) *channelzpb.Security { 172 switch v := se.(type) { 173 case *credentials.TLSChannelzSecurityValue: 174 return &channelzpb.Security{Model: &channelzpb.Security_Tls_{Tls: &channelzpb.Security_Tls{ 175 CipherSuite: &channelzpb.Security_Tls_StandardName{StandardName: v.StandardName}, 176 LocalCertificate: v.LocalCertificate, 177 RemoteCertificate: v.RemoteCertificate, 178 }}} 179 case *credentials.OtherChannelzSecurityValue: 180 otherSecurity := &channelzpb.Security_OtherSecurity{ 181 Name: v.Name, 182 } 183 if anyval, err := ptypes.MarshalAny(v.Value); err == nil { 184 otherSecurity.Value = anyval 185 } 186 return &channelzpb.Security{Model: &channelzpb.Security_Other{Other: otherSecurity}} 187 } 188 return nil 189} 190 191func addrToProto(a net.Addr) *channelzpb.Address { 192 switch a.Network() { 193 case "udp": 194 // TODO: Address_OtherAddress{}. Need proto def for Value. 195 case "ip": 196 // Note zone info is discarded through the conversion. 197 return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.IPAddr).IP}}} 198 case "ip+net": 199 // Note mask info is discarded through the conversion. 200 return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.IPNet).IP}}} 201 case "tcp": 202 // Note zone info is discarded through the conversion. 203 return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.TCPAddr).IP, Port: int32(a.(*net.TCPAddr).Port)}}} 204 case "unix", "unixgram", "unixpacket": 205 return &channelzpb.Address{Address: &channelzpb.Address_UdsAddress_{UdsAddress: &channelzpb.Address_UdsAddress{Filename: a.String()}}} 206 default: 207 } 208 return &channelzpb.Address{} 209} 210 211func socketMetricToProto(sm *channelz.SocketMetric) *channelzpb.Socket { 212 s := &channelzpb.Socket{} 213 s.Ref = &channelzpb.SocketRef{SocketId: sm.ID, Name: sm.RefName} 214 215 s.Data = &channelzpb.SocketData{ 216 StreamsStarted: sm.SocketData.StreamsStarted, 217 StreamsSucceeded: sm.SocketData.StreamsSucceeded, 218 StreamsFailed: sm.SocketData.StreamsFailed, 219 MessagesSent: sm.SocketData.MessagesSent, 220 MessagesReceived: sm.SocketData.MessagesReceived, 221 KeepAlivesSent: sm.SocketData.KeepAlivesSent, 222 } 223 if ts, err := ptypes.TimestampProto(sm.SocketData.LastLocalStreamCreatedTimestamp); err == nil { 224 s.Data.LastLocalStreamCreatedTimestamp = ts 225 } 226 if ts, err := ptypes.TimestampProto(sm.SocketData.LastRemoteStreamCreatedTimestamp); err == nil { 227 s.Data.LastRemoteStreamCreatedTimestamp = ts 228 } 229 if ts, err := ptypes.TimestampProto(sm.SocketData.LastMessageSentTimestamp); err == nil { 230 s.Data.LastMessageSentTimestamp = ts 231 } 232 if ts, err := ptypes.TimestampProto(sm.SocketData.LastMessageReceivedTimestamp); err == nil { 233 s.Data.LastMessageReceivedTimestamp = ts 234 } 235 s.Data.LocalFlowControlWindow = &wrpb.Int64Value{Value: sm.SocketData.LocalFlowControlWindow} 236 s.Data.RemoteFlowControlWindow = &wrpb.Int64Value{Value: sm.SocketData.RemoteFlowControlWindow} 237 238 if sm.SocketData.SocketOptions != nil { 239 s.Data.Option = sockoptToProto(sm.SocketData.SocketOptions) 240 } 241 if sm.SocketData.Security != nil { 242 s.Security = securityToProto(sm.SocketData.Security) 243 } 244 245 if sm.SocketData.LocalAddr != nil { 246 s.Local = addrToProto(sm.SocketData.LocalAddr) 247 } 248 if sm.SocketData.RemoteAddr != nil { 249 s.Remote = addrToProto(sm.SocketData.RemoteAddr) 250 } 251 s.RemoteName = sm.SocketData.RemoteName 252 return s 253} 254 255func (s *serverImpl) GetTopChannels(ctx context.Context, req *channelzpb.GetTopChannelsRequest) (*channelzpb.GetTopChannelsResponse, error) { 256 metrics, end := channelz.GetTopChannels(req.GetStartChannelId(), req.GetMaxResults()) 257 resp := &channelzpb.GetTopChannelsResponse{} 258 for _, m := range metrics { 259 resp.Channel = append(resp.Channel, channelMetricToProto(m)) 260 } 261 resp.End = end 262 return resp, nil 263} 264 265func serverMetricToProto(sm *channelz.ServerMetric) *channelzpb.Server { 266 s := &channelzpb.Server{} 267 s.Ref = &channelzpb.ServerRef{ServerId: sm.ID, Name: sm.RefName} 268 269 s.Data = &channelzpb.ServerData{ 270 CallsStarted: sm.ServerData.CallsStarted, 271 CallsSucceeded: sm.ServerData.CallsSucceeded, 272 CallsFailed: sm.ServerData.CallsFailed, 273 } 274 275 if ts, err := ptypes.TimestampProto(sm.ServerData.LastCallStartedTimestamp); err == nil { 276 s.Data.LastCallStartedTimestamp = ts 277 } 278 sockets := make([]*channelzpb.SocketRef, 0, len(sm.ListenSockets)) 279 for id, ref := range sm.ListenSockets { 280 sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref}) 281 } 282 s.ListenSocket = sockets 283 return s 284} 285 286func (s *serverImpl) GetServers(ctx context.Context, req *channelzpb.GetServersRequest) (*channelzpb.GetServersResponse, error) { 287 metrics, end := channelz.GetServers(req.GetStartServerId(), req.GetMaxResults()) 288 resp := &channelzpb.GetServersResponse{} 289 for _, m := range metrics { 290 resp.Server = append(resp.Server, serverMetricToProto(m)) 291 } 292 resp.End = end 293 return resp, nil 294} 295 296func (s *serverImpl) GetServerSockets(ctx context.Context, req *channelzpb.GetServerSocketsRequest) (*channelzpb.GetServerSocketsResponse, error) { 297 metrics, end := channelz.GetServerSockets(req.GetServerId(), req.GetStartSocketId(), req.GetMaxResults()) 298 resp := &channelzpb.GetServerSocketsResponse{} 299 for _, m := range metrics { 300 resp.SocketRef = append(resp.SocketRef, &channelzpb.SocketRef{SocketId: m.ID, Name: m.RefName}) 301 } 302 resp.End = end 303 return resp, nil 304} 305 306func (s *serverImpl) GetChannel(ctx context.Context, req *channelzpb.GetChannelRequest) (*channelzpb.GetChannelResponse, error) { 307 var metric *channelz.ChannelMetric 308 if metric = channelz.GetChannel(req.GetChannelId()); metric == nil { 309 return nil, status.Errorf(codes.NotFound, "requested channel %d not found", req.GetChannelId()) 310 } 311 resp := &channelzpb.GetChannelResponse{Channel: channelMetricToProto(metric)} 312 return resp, nil 313} 314 315func (s *serverImpl) GetSubchannel(ctx context.Context, req *channelzpb.GetSubchannelRequest) (*channelzpb.GetSubchannelResponse, error) { 316 var metric *channelz.SubChannelMetric 317 if metric = channelz.GetSubChannel(req.GetSubchannelId()); metric == nil { 318 return nil, status.Errorf(codes.NotFound, "requested sub channel %d not found", req.GetSubchannelId()) 319 } 320 resp := &channelzpb.GetSubchannelResponse{Subchannel: subChannelMetricToProto(metric)} 321 return resp, nil 322} 323 324func (s *serverImpl) GetSocket(ctx context.Context, req *channelzpb.GetSocketRequest) (*channelzpb.GetSocketResponse, error) { 325 var metric *channelz.SocketMetric 326 if metric = channelz.GetSocket(req.GetSocketId()); metric == nil { 327 return nil, status.Errorf(codes.NotFound, "requested socket %d not found", req.GetSocketId()) 328 } 329 resp := &channelzpb.GetSocketResponse{Socket: socketMetricToProto(metric)} 330 return resp, nil 331} 332 333func (s *serverImpl) GetServer(ctx context.Context, req *channelzpb.GetServerRequest) (*channelzpb.GetServerResponse, error) { 334 var metric *channelz.ServerMetric 335 if metric = channelz.GetServer(req.GetServerId()); metric == nil { 336 return nil, status.Errorf(codes.NotFound, "requested server %d not found", req.GetServerId()) 337 } 338 resp := &channelzpb.GetServerResponse{Server: serverMetricToProto(metric)} 339 return resp, nil 340} 341