1/* 2 * 3 * Copyright 2018 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19//go:generate ./regenerate.sh 20 21// Package service provides an implementation for channelz service server. 22package service 23 24import ( 25 "net" 26 "time" 27 28 "github.com/golang/protobuf/ptypes" 29 durpb "github.com/golang/protobuf/ptypes/duration" 30 wrpb "github.com/golang/protobuf/ptypes/wrappers" 31 "golang.org/x/net/context" 32 "google.golang.org/grpc" 33 channelzgrpc "google.golang.org/grpc/channelz/grpc_channelz_v1" 34 channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1" 35 "google.golang.org/grpc/connectivity" 36 "google.golang.org/grpc/credentials" 37 "google.golang.org/grpc/internal/channelz" 38) 39 40func init() { 41 channelz.TurnOn() 42} 43 44func convertToPtypesDuration(sec int64, usec int64) *durpb.Duration { 45 return ptypes.DurationProto(time.Duration(sec*1e9 + usec*1e3)) 46} 47 48// RegisterChannelzServiceToServer registers the channelz service to the given server. 49func RegisterChannelzServiceToServer(s *grpc.Server) { 50 channelzgrpc.RegisterChannelzServer(s, newCZServer()) 51} 52 53func newCZServer() channelzgrpc.ChannelzServer { 54 return &serverImpl{} 55} 56 57type serverImpl struct{} 58 59func connectivityStateToProto(s connectivity.State) *channelzpb.ChannelConnectivityState { 60 switch s { 61 case connectivity.Idle: 62 return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_IDLE} 63 case connectivity.Connecting: 64 return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_CONNECTING} 65 case connectivity.Ready: 66 return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_READY} 67 case connectivity.TransientFailure: 68 return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_TRANSIENT_FAILURE} 69 case connectivity.Shutdown: 70 return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_SHUTDOWN} 71 default: 72 return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_UNKNOWN} 73 } 74} 75 76func channelTraceToProto(ct *channelz.ChannelTrace) *channelzpb.ChannelTrace { 77 pbt := &channelzpb.ChannelTrace{} 78 pbt.NumEventsLogged = ct.EventNum 79 if ts, err := ptypes.TimestampProto(ct.CreationTime); err == nil { 80 pbt.CreationTimestamp = ts 81 } 82 var events []*channelzpb.ChannelTraceEvent 83 for _, e := range ct.Events { 84 cte := &channelzpb.ChannelTraceEvent{ 85 Description: e.Desc, 86 Severity: channelzpb.ChannelTraceEvent_Severity(e.Severity), 87 } 88 if ts, err := ptypes.TimestampProto(e.Timestamp); err == nil { 89 cte.Timestamp = ts 90 } 91 if e.RefID != 0 { 92 switch e.RefType { 93 case channelz.RefChannel: 94 cte.ChildRef = &channelzpb.ChannelTraceEvent_ChannelRef{ChannelRef: &channelzpb.ChannelRef{ChannelId: e.RefID, Name: e.RefName}} 95 case channelz.RefSubChannel: 96 cte.ChildRef = &channelzpb.ChannelTraceEvent_SubchannelRef{SubchannelRef: &channelzpb.SubchannelRef{SubchannelId: e.RefID, Name: e.RefName}} 97 } 98 } 99 events = append(events, cte) 100 } 101 pbt.Events = events 102 return pbt 103} 104 105func channelMetricToProto(cm *channelz.ChannelMetric) *channelzpb.Channel { 106 c := &channelzpb.Channel{} 107 c.Ref = &channelzpb.ChannelRef{ChannelId: cm.ID, Name: cm.RefName} 108 109 c.Data = &channelzpb.ChannelData{ 110 State: connectivityStateToProto(cm.ChannelData.State), 111 Target: cm.ChannelData.Target, 112 CallsStarted: cm.ChannelData.CallsStarted, 113 CallsSucceeded: cm.ChannelData.CallsSucceeded, 114 CallsFailed: cm.ChannelData.CallsFailed, 115 } 116 if ts, err := ptypes.TimestampProto(cm.ChannelData.LastCallStartedTimestamp); err == nil { 117 c.Data.LastCallStartedTimestamp = ts 118 } 119 nestedChans := make([]*channelzpb.ChannelRef, 0, len(cm.NestedChans)) 120 for id, ref := range cm.NestedChans { 121 nestedChans = append(nestedChans, &channelzpb.ChannelRef{ChannelId: id, Name: ref}) 122 } 123 c.ChannelRef = nestedChans 124 125 subChans := make([]*channelzpb.SubchannelRef, 0, len(cm.SubChans)) 126 for id, ref := range cm.SubChans { 127 subChans = append(subChans, &channelzpb.SubchannelRef{SubchannelId: id, Name: ref}) 128 } 129 c.SubchannelRef = subChans 130 131 sockets := make([]*channelzpb.SocketRef, 0, len(cm.Sockets)) 132 for id, ref := range cm.Sockets { 133 sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref}) 134 } 135 c.SocketRef = sockets 136 c.Data.Trace = channelTraceToProto(cm.Trace) 137 return c 138} 139 140func subChannelMetricToProto(cm *channelz.SubChannelMetric) *channelzpb.Subchannel { 141 sc := &channelzpb.Subchannel{} 142 sc.Ref = &channelzpb.SubchannelRef{SubchannelId: cm.ID, Name: cm.RefName} 143 144 sc.Data = &channelzpb.ChannelData{ 145 State: connectivityStateToProto(cm.ChannelData.State), 146 Target: cm.ChannelData.Target, 147 CallsStarted: cm.ChannelData.CallsStarted, 148 CallsSucceeded: cm.ChannelData.CallsSucceeded, 149 CallsFailed: cm.ChannelData.CallsFailed, 150 } 151 if ts, err := ptypes.TimestampProto(cm.ChannelData.LastCallStartedTimestamp); err == nil { 152 sc.Data.LastCallStartedTimestamp = ts 153 } 154 nestedChans := make([]*channelzpb.ChannelRef, 0, len(cm.NestedChans)) 155 for id, ref := range cm.NestedChans { 156 nestedChans = append(nestedChans, &channelzpb.ChannelRef{ChannelId: id, Name: ref}) 157 } 158 sc.ChannelRef = nestedChans 159 160 subChans := make([]*channelzpb.SubchannelRef, 0, len(cm.SubChans)) 161 for id, ref := range cm.SubChans { 162 subChans = append(subChans, &channelzpb.SubchannelRef{SubchannelId: id, Name: ref}) 163 } 164 sc.SubchannelRef = subChans 165 166 sockets := make([]*channelzpb.SocketRef, 0, len(cm.Sockets)) 167 for id, ref := range cm.Sockets { 168 sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref}) 169 } 170 sc.SocketRef = sockets 171 sc.Data.Trace = channelTraceToProto(cm.Trace) 172 return sc 173} 174 175func securityToProto(se credentials.ChannelzSecurityValue) *channelzpb.Security { 176 switch v := se.(type) { 177 case *credentials.TLSChannelzSecurityValue: 178 return &channelzpb.Security{Model: &channelzpb.Security_Tls_{Tls: &channelzpb.Security_Tls{ 179 CipherSuite: &channelzpb.Security_Tls_StandardName{StandardName: v.StandardName}, 180 LocalCertificate: v.LocalCertificate, 181 RemoteCertificate: v.RemoteCertificate, 182 }}} 183 case *credentials.OtherChannelzSecurityValue: 184 otherSecurity := &channelzpb.Security_OtherSecurity{ 185 Name: v.Name, 186 } 187 if anyval, err := ptypes.MarshalAny(v.Value); err == nil { 188 otherSecurity.Value = anyval 189 } 190 return &channelzpb.Security{Model: &channelzpb.Security_Other{Other: otherSecurity}} 191 } 192 return nil 193} 194 195func addrToProto(a net.Addr) *channelzpb.Address { 196 switch a.Network() { 197 case "udp": 198 // TODO: Address_OtherAddress{}. Need proto def for Value. 199 case "ip": 200 // Note zone info is discarded through the conversion. 201 return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.IPAddr).IP}}} 202 case "ip+net": 203 // Note mask info is discarded through the conversion. 204 return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.IPNet).IP}}} 205 case "tcp": 206 // Note zone info is discarded through the conversion. 207 return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.TCPAddr).IP, Port: int32(a.(*net.TCPAddr).Port)}}} 208 case "unix", "unixgram", "unixpacket": 209 return &channelzpb.Address{Address: &channelzpb.Address_UdsAddress_{UdsAddress: &channelzpb.Address_UdsAddress{Filename: a.String()}}} 210 default: 211 } 212 return &channelzpb.Address{} 213} 214 215func socketMetricToProto(sm *channelz.SocketMetric) *channelzpb.Socket { 216 s := &channelzpb.Socket{} 217 s.Ref = &channelzpb.SocketRef{SocketId: sm.ID, Name: sm.RefName} 218 219 s.Data = &channelzpb.SocketData{ 220 StreamsStarted: sm.SocketData.StreamsStarted, 221 StreamsSucceeded: sm.SocketData.StreamsSucceeded, 222 StreamsFailed: sm.SocketData.StreamsFailed, 223 MessagesSent: sm.SocketData.MessagesSent, 224 MessagesReceived: sm.SocketData.MessagesReceived, 225 KeepAlivesSent: sm.SocketData.KeepAlivesSent, 226 } 227 if ts, err := ptypes.TimestampProto(sm.SocketData.LastLocalStreamCreatedTimestamp); err == nil { 228 s.Data.LastLocalStreamCreatedTimestamp = ts 229 } 230 if ts, err := ptypes.TimestampProto(sm.SocketData.LastRemoteStreamCreatedTimestamp); err == nil { 231 s.Data.LastRemoteStreamCreatedTimestamp = ts 232 } 233 if ts, err := ptypes.TimestampProto(sm.SocketData.LastMessageSentTimestamp); err == nil { 234 s.Data.LastMessageSentTimestamp = ts 235 } 236 if ts, err := ptypes.TimestampProto(sm.SocketData.LastMessageReceivedTimestamp); err == nil { 237 s.Data.LastMessageReceivedTimestamp = ts 238 } 239 s.Data.LocalFlowControlWindow = &wrpb.Int64Value{Value: sm.SocketData.LocalFlowControlWindow} 240 s.Data.RemoteFlowControlWindow = &wrpb.Int64Value{Value: sm.SocketData.RemoteFlowControlWindow} 241 242 if sm.SocketData.SocketOptions != nil { 243 s.Data.Option = sockoptToProto(sm.SocketData.SocketOptions) 244 } 245 if sm.SocketData.Security != nil { 246 s.Security = securityToProto(sm.SocketData.Security) 247 } 248 249 if sm.SocketData.LocalAddr != nil { 250 s.Local = addrToProto(sm.SocketData.LocalAddr) 251 } 252 if sm.SocketData.RemoteAddr != nil { 253 s.Remote = addrToProto(sm.SocketData.RemoteAddr) 254 } 255 s.RemoteName = sm.SocketData.RemoteName 256 return s 257} 258 259func (s *serverImpl) GetTopChannels(ctx context.Context, req *channelzpb.GetTopChannelsRequest) (*channelzpb.GetTopChannelsResponse, error) { 260 metrics, end := channelz.GetTopChannels(req.GetStartChannelId()) 261 resp := &channelzpb.GetTopChannelsResponse{} 262 for _, m := range metrics { 263 resp.Channel = append(resp.Channel, channelMetricToProto(m)) 264 } 265 resp.End = end 266 return resp, nil 267} 268 269func serverMetricToProto(sm *channelz.ServerMetric) *channelzpb.Server { 270 s := &channelzpb.Server{} 271 s.Ref = &channelzpb.ServerRef{ServerId: sm.ID, Name: sm.RefName} 272 273 s.Data = &channelzpb.ServerData{ 274 CallsStarted: sm.ServerData.CallsStarted, 275 CallsSucceeded: sm.ServerData.CallsSucceeded, 276 CallsFailed: sm.ServerData.CallsFailed, 277 } 278 279 if ts, err := ptypes.TimestampProto(sm.ServerData.LastCallStartedTimestamp); err == nil { 280 s.Data.LastCallStartedTimestamp = ts 281 } 282 sockets := make([]*channelzpb.SocketRef, 0, len(sm.ListenSockets)) 283 for id, ref := range sm.ListenSockets { 284 sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref}) 285 } 286 s.ListenSocket = sockets 287 return s 288} 289 290func (s *serverImpl) GetServers(ctx context.Context, req *channelzpb.GetServersRequest) (*channelzpb.GetServersResponse, error) { 291 metrics, end := channelz.GetServers(req.GetStartServerId()) 292 resp := &channelzpb.GetServersResponse{} 293 for _, m := range metrics { 294 resp.Server = append(resp.Server, serverMetricToProto(m)) 295 } 296 resp.End = end 297 return resp, nil 298} 299 300func (s *serverImpl) GetServerSockets(ctx context.Context, req *channelzpb.GetServerSocketsRequest) (*channelzpb.GetServerSocketsResponse, error) { 301 metrics, end := channelz.GetServerSockets(req.GetServerId(), req.GetStartSocketId()) 302 resp := &channelzpb.GetServerSocketsResponse{} 303 for _, m := range metrics { 304 resp.SocketRef = append(resp.SocketRef, &channelzpb.SocketRef{SocketId: m.ID, Name: m.RefName}) 305 } 306 resp.End = end 307 return resp, nil 308} 309 310func (s *serverImpl) GetChannel(ctx context.Context, req *channelzpb.GetChannelRequest) (*channelzpb.GetChannelResponse, error) { 311 var metric *channelz.ChannelMetric 312 if metric = channelz.GetChannel(req.GetChannelId()); metric == nil { 313 return &channelzpb.GetChannelResponse{}, nil 314 } 315 resp := &channelzpb.GetChannelResponse{Channel: channelMetricToProto(metric)} 316 return resp, nil 317} 318 319func (s *serverImpl) GetSubchannel(ctx context.Context, req *channelzpb.GetSubchannelRequest) (*channelzpb.GetSubchannelResponse, error) { 320 var metric *channelz.SubChannelMetric 321 if metric = channelz.GetSubChannel(req.GetSubchannelId()); metric == nil { 322 return &channelzpb.GetSubchannelResponse{}, nil 323 } 324 resp := &channelzpb.GetSubchannelResponse{Subchannel: subChannelMetricToProto(metric)} 325 return resp, nil 326} 327 328func (s *serverImpl) GetSocket(ctx context.Context, req *channelzpb.GetSocketRequest) (*channelzpb.GetSocketResponse, error) { 329 var metric *channelz.SocketMetric 330 if metric = channelz.GetSocket(req.GetSocketId()); metric == nil { 331 return &channelzpb.GetSocketResponse{}, nil 332 } 333 resp := &channelzpb.GetSocketResponse{Socket: socketMetricToProto(metric)} 334 return resp, nil 335} 336