1/* 2 * 3 * Copyright 2018 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19//go:generate ./regenerate.sh 20 21// Package service provides an implementation for channelz service server. 22package service 23 24import ( 25 "context" 26 "net" 27 "time" 28 29 "github.com/golang/protobuf/ptypes" 30 durpb "github.com/golang/protobuf/ptypes/duration" 31 wrpb "github.com/golang/protobuf/ptypes/wrappers" 32 "google.golang.org/grpc" 33 channelzgrpc "google.golang.org/grpc/channelz/grpc_channelz_v1" 34 channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1" 35 "google.golang.org/grpc/codes" 36 "google.golang.org/grpc/connectivity" 37 "google.golang.org/grpc/credentials" 38 "google.golang.org/grpc/internal/channelz" 39 "google.golang.org/grpc/status" 40) 41 42func init() { 43 channelz.TurnOn() 44} 45 46func convertToPtypesDuration(sec int64, usec int64) *durpb.Duration { 47 return ptypes.DurationProto(time.Duration(sec*1e9 + usec*1e3)) 48} 49 50// RegisterChannelzServiceToServer registers the channelz service to the given server. 51func RegisterChannelzServiceToServer(s *grpc.Server) { 52 channelzgrpc.RegisterChannelzServer(s, newCZServer()) 53} 54 55func newCZServer() channelzgrpc.ChannelzServer { 56 return &serverImpl{} 57} 58 59type serverImpl struct{} 60 61func connectivityStateToProto(s connectivity.State) *channelzpb.ChannelConnectivityState { 62 switch s { 63 case connectivity.Idle: 64 return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_IDLE} 65 case connectivity.Connecting: 66 return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_CONNECTING} 67 case connectivity.Ready: 68 return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_READY} 69 case connectivity.TransientFailure: 70 return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_TRANSIENT_FAILURE} 71 case connectivity.Shutdown: 72 return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_SHUTDOWN} 73 default: 74 return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_UNKNOWN} 75 } 76} 77 78func channelTraceToProto(ct *channelz.ChannelTrace) *channelzpb.ChannelTrace { 79 pbt := &channelzpb.ChannelTrace{} 80 pbt.NumEventsLogged = ct.EventNum 81 if ts, err := ptypes.TimestampProto(ct.CreationTime); err == nil { 82 pbt.CreationTimestamp = ts 83 } 84 var events []*channelzpb.ChannelTraceEvent 85 for _, e := range ct.Events { 86 cte := &channelzpb.ChannelTraceEvent{ 87 Description: e.Desc, 88 Severity: channelzpb.ChannelTraceEvent_Severity(e.Severity), 89 } 90 if ts, err := ptypes.TimestampProto(e.Timestamp); err == nil { 91 cte.Timestamp = ts 92 } 93 if e.RefID != 0 { 94 switch e.RefType { 95 case channelz.RefChannel: 96 cte.ChildRef = &channelzpb.ChannelTraceEvent_ChannelRef{ChannelRef: &channelzpb.ChannelRef{ChannelId: e.RefID, Name: e.RefName}} 97 case channelz.RefSubChannel: 98 cte.ChildRef = &channelzpb.ChannelTraceEvent_SubchannelRef{SubchannelRef: &channelzpb.SubchannelRef{SubchannelId: e.RefID, Name: e.RefName}} 99 } 100 } 101 events = append(events, cte) 102 } 103 pbt.Events = events 104 return pbt 105} 106 107func channelMetricToProto(cm *channelz.ChannelMetric) *channelzpb.Channel { 108 c := &channelzpb.Channel{} 109 c.Ref = &channelzpb.ChannelRef{ChannelId: cm.ID, Name: cm.RefName} 110 111 c.Data = &channelzpb.ChannelData{ 112 State: connectivityStateToProto(cm.ChannelData.State), 113 Target: cm.ChannelData.Target, 114 CallsStarted: cm.ChannelData.CallsStarted, 115 CallsSucceeded: cm.ChannelData.CallsSucceeded, 116 CallsFailed: cm.ChannelData.CallsFailed, 117 } 118 if ts, err := ptypes.TimestampProto(cm.ChannelData.LastCallStartedTimestamp); err == nil { 119 c.Data.LastCallStartedTimestamp = ts 120 } 121 nestedChans := make([]*channelzpb.ChannelRef, 0, len(cm.NestedChans)) 122 for id, ref := range cm.NestedChans { 123 nestedChans = append(nestedChans, &channelzpb.ChannelRef{ChannelId: id, Name: ref}) 124 } 125 c.ChannelRef = nestedChans 126 127 subChans := make([]*channelzpb.SubchannelRef, 0, len(cm.SubChans)) 128 for id, ref := range cm.SubChans { 129 subChans = append(subChans, &channelzpb.SubchannelRef{SubchannelId: id, Name: ref}) 130 } 131 c.SubchannelRef = subChans 132 133 sockets := make([]*channelzpb.SocketRef, 0, len(cm.Sockets)) 134 for id, ref := range cm.Sockets { 135 sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref}) 136 } 137 c.SocketRef = sockets 138 c.Data.Trace = channelTraceToProto(cm.Trace) 139 return c 140} 141 142func subChannelMetricToProto(cm *channelz.SubChannelMetric) *channelzpb.Subchannel { 143 sc := &channelzpb.Subchannel{} 144 sc.Ref = &channelzpb.SubchannelRef{SubchannelId: cm.ID, Name: cm.RefName} 145 146 sc.Data = &channelzpb.ChannelData{ 147 State: connectivityStateToProto(cm.ChannelData.State), 148 Target: cm.ChannelData.Target, 149 CallsStarted: cm.ChannelData.CallsStarted, 150 CallsSucceeded: cm.ChannelData.CallsSucceeded, 151 CallsFailed: cm.ChannelData.CallsFailed, 152 } 153 if ts, err := ptypes.TimestampProto(cm.ChannelData.LastCallStartedTimestamp); err == nil { 154 sc.Data.LastCallStartedTimestamp = ts 155 } 156 nestedChans := make([]*channelzpb.ChannelRef, 0, len(cm.NestedChans)) 157 for id, ref := range cm.NestedChans { 158 nestedChans = append(nestedChans, &channelzpb.ChannelRef{ChannelId: id, Name: ref}) 159 } 160 sc.ChannelRef = nestedChans 161 162 subChans := make([]*channelzpb.SubchannelRef, 0, len(cm.SubChans)) 163 for id, ref := range cm.SubChans { 164 subChans = append(subChans, &channelzpb.SubchannelRef{SubchannelId: id, Name: ref}) 165 } 166 sc.SubchannelRef = subChans 167 168 sockets := make([]*channelzpb.SocketRef, 0, len(cm.Sockets)) 169 for id, ref := range cm.Sockets { 170 sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref}) 171 } 172 sc.SocketRef = sockets 173 sc.Data.Trace = channelTraceToProto(cm.Trace) 174 return sc 175} 176 177func securityToProto(se credentials.ChannelzSecurityValue) *channelzpb.Security { 178 switch v := se.(type) { 179 case *credentials.TLSChannelzSecurityValue: 180 return &channelzpb.Security{Model: &channelzpb.Security_Tls_{Tls: &channelzpb.Security_Tls{ 181 CipherSuite: &channelzpb.Security_Tls_StandardName{StandardName: v.StandardName}, 182 LocalCertificate: v.LocalCertificate, 183 RemoteCertificate: v.RemoteCertificate, 184 }}} 185 case *credentials.OtherChannelzSecurityValue: 186 otherSecurity := &channelzpb.Security_OtherSecurity{ 187 Name: v.Name, 188 } 189 if anyval, err := ptypes.MarshalAny(v.Value); err == nil { 190 otherSecurity.Value = anyval 191 } 192 return &channelzpb.Security{Model: &channelzpb.Security_Other{Other: otherSecurity}} 193 } 194 return nil 195} 196 197func addrToProto(a net.Addr) *channelzpb.Address { 198 switch a.Network() { 199 case "udp": 200 // TODO: Address_OtherAddress{}. Need proto def for Value. 201 case "ip": 202 // Note zone info is discarded through the conversion. 203 return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.IPAddr).IP}}} 204 case "ip+net": 205 // Note mask info is discarded through the conversion. 206 return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.IPNet).IP}}} 207 case "tcp": 208 // Note zone info is discarded through the conversion. 209 return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.TCPAddr).IP, Port: int32(a.(*net.TCPAddr).Port)}}} 210 case "unix", "unixgram", "unixpacket": 211 return &channelzpb.Address{Address: &channelzpb.Address_UdsAddress_{UdsAddress: &channelzpb.Address_UdsAddress{Filename: a.String()}}} 212 default: 213 } 214 return &channelzpb.Address{} 215} 216 217func socketMetricToProto(sm *channelz.SocketMetric) *channelzpb.Socket { 218 s := &channelzpb.Socket{} 219 s.Ref = &channelzpb.SocketRef{SocketId: sm.ID, Name: sm.RefName} 220 221 s.Data = &channelzpb.SocketData{ 222 StreamsStarted: sm.SocketData.StreamsStarted, 223 StreamsSucceeded: sm.SocketData.StreamsSucceeded, 224 StreamsFailed: sm.SocketData.StreamsFailed, 225 MessagesSent: sm.SocketData.MessagesSent, 226 MessagesReceived: sm.SocketData.MessagesReceived, 227 KeepAlivesSent: sm.SocketData.KeepAlivesSent, 228 } 229 if ts, err := ptypes.TimestampProto(sm.SocketData.LastLocalStreamCreatedTimestamp); err == nil { 230 s.Data.LastLocalStreamCreatedTimestamp = ts 231 } 232 if ts, err := ptypes.TimestampProto(sm.SocketData.LastRemoteStreamCreatedTimestamp); err == nil { 233 s.Data.LastRemoteStreamCreatedTimestamp = ts 234 } 235 if ts, err := ptypes.TimestampProto(sm.SocketData.LastMessageSentTimestamp); err == nil { 236 s.Data.LastMessageSentTimestamp = ts 237 } 238 if ts, err := ptypes.TimestampProto(sm.SocketData.LastMessageReceivedTimestamp); err == nil { 239 s.Data.LastMessageReceivedTimestamp = ts 240 } 241 s.Data.LocalFlowControlWindow = &wrpb.Int64Value{Value: sm.SocketData.LocalFlowControlWindow} 242 s.Data.RemoteFlowControlWindow = &wrpb.Int64Value{Value: sm.SocketData.RemoteFlowControlWindow} 243 244 if sm.SocketData.SocketOptions != nil { 245 s.Data.Option = sockoptToProto(sm.SocketData.SocketOptions) 246 } 247 if sm.SocketData.Security != nil { 248 s.Security = securityToProto(sm.SocketData.Security) 249 } 250 251 if sm.SocketData.LocalAddr != nil { 252 s.Local = addrToProto(sm.SocketData.LocalAddr) 253 } 254 if sm.SocketData.RemoteAddr != nil { 255 s.Remote = addrToProto(sm.SocketData.RemoteAddr) 256 } 257 s.RemoteName = sm.SocketData.RemoteName 258 return s 259} 260 261func (s *serverImpl) GetTopChannels(ctx context.Context, req *channelzpb.GetTopChannelsRequest) (*channelzpb.GetTopChannelsResponse, error) { 262 metrics, end := channelz.GetTopChannels(req.GetStartChannelId(), req.GetMaxResults()) 263 resp := &channelzpb.GetTopChannelsResponse{} 264 for _, m := range metrics { 265 resp.Channel = append(resp.Channel, channelMetricToProto(m)) 266 } 267 resp.End = end 268 return resp, nil 269} 270 271func serverMetricToProto(sm *channelz.ServerMetric) *channelzpb.Server { 272 s := &channelzpb.Server{} 273 s.Ref = &channelzpb.ServerRef{ServerId: sm.ID, Name: sm.RefName} 274 275 s.Data = &channelzpb.ServerData{ 276 CallsStarted: sm.ServerData.CallsStarted, 277 CallsSucceeded: sm.ServerData.CallsSucceeded, 278 CallsFailed: sm.ServerData.CallsFailed, 279 } 280 281 if ts, err := ptypes.TimestampProto(sm.ServerData.LastCallStartedTimestamp); err == nil { 282 s.Data.LastCallStartedTimestamp = ts 283 } 284 sockets := make([]*channelzpb.SocketRef, 0, len(sm.ListenSockets)) 285 for id, ref := range sm.ListenSockets { 286 sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref}) 287 } 288 s.ListenSocket = sockets 289 return s 290} 291 292func (s *serverImpl) GetServers(ctx context.Context, req *channelzpb.GetServersRequest) (*channelzpb.GetServersResponse, error) { 293 metrics, end := channelz.GetServers(req.GetStartServerId(), req.GetMaxResults()) 294 resp := &channelzpb.GetServersResponse{} 295 for _, m := range metrics { 296 resp.Server = append(resp.Server, serverMetricToProto(m)) 297 } 298 resp.End = end 299 return resp, nil 300} 301 302func (s *serverImpl) GetServerSockets(ctx context.Context, req *channelzpb.GetServerSocketsRequest) (*channelzpb.GetServerSocketsResponse, error) { 303 metrics, end := channelz.GetServerSockets(req.GetServerId(), req.GetStartSocketId(), req.GetMaxResults()) 304 resp := &channelzpb.GetServerSocketsResponse{} 305 for _, m := range metrics { 306 resp.SocketRef = append(resp.SocketRef, &channelzpb.SocketRef{SocketId: m.ID, Name: m.RefName}) 307 } 308 resp.End = end 309 return resp, nil 310} 311 312func (s *serverImpl) GetChannel(ctx context.Context, req *channelzpb.GetChannelRequest) (*channelzpb.GetChannelResponse, error) { 313 var metric *channelz.ChannelMetric 314 if metric = channelz.GetChannel(req.GetChannelId()); metric == nil { 315 return nil, status.Errorf(codes.NotFound, "requested channel %d not found", req.GetChannelId()) 316 } 317 resp := &channelzpb.GetChannelResponse{Channel: channelMetricToProto(metric)} 318 return resp, nil 319} 320 321func (s *serverImpl) GetSubchannel(ctx context.Context, req *channelzpb.GetSubchannelRequest) (*channelzpb.GetSubchannelResponse, error) { 322 var metric *channelz.SubChannelMetric 323 if metric = channelz.GetSubChannel(req.GetSubchannelId()); metric == nil { 324 return nil, status.Errorf(codes.NotFound, "requested sub channel %d not found", req.GetSubchannelId()) 325 } 326 resp := &channelzpb.GetSubchannelResponse{Subchannel: subChannelMetricToProto(metric)} 327 return resp, nil 328} 329 330func (s *serverImpl) GetSocket(ctx context.Context, req *channelzpb.GetSocketRequest) (*channelzpb.GetSocketResponse, error) { 331 var metric *channelz.SocketMetric 332 if metric = channelz.GetSocket(req.GetSocketId()); metric == nil { 333 return nil, status.Errorf(codes.NotFound, "requested socket %d not found", req.GetSocketId()) 334 } 335 resp := &channelzpb.GetSocketResponse{Socket: socketMetricToProto(metric)} 336 return resp, nil 337} 338 339func (s *serverImpl) GetServer(ctx context.Context, req *channelzpb.GetServerRequest) (*channelzpb.GetServerResponse, error) { 340 var metric *channelz.ServerMetric 341 if metric = channelz.GetServer(req.GetServerId()); metric == nil { 342 return nil, status.Errorf(codes.NotFound, "requested server %d not found", req.GetServerId()) 343 } 344 resp := &channelzpb.GetServerResponse{Server: serverMetricToProto(metric)} 345 return resp, nil 346} 347