1// Copyright (c) 2017 Uber Technologies, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package utils 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "io" 22 "net" 23 "time" 24 25 "github.com/uber/jaeger-client-go/log" 26 "github.com/uber/jaeger-client-go/thrift" 27 28 "github.com/uber/jaeger-client-go/thrift-gen/agent" 29 "github.com/uber/jaeger-client-go/thrift-gen/jaeger" 30 "github.com/uber/jaeger-client-go/thrift-gen/zipkincore" 31) 32 33// UDPPacketMaxLength is the max size of UDP packet we want to send, synced with jaeger-agent 34const UDPPacketMaxLength = 65000 35 36// AgentClientUDP is a UDP client to Jaeger agent that implements agent.Agent interface. 37type AgentClientUDP struct { 38 agent.Agent 39 io.Closer 40 41 connUDP udpConn 42 client *agent.AgentClient 43 maxPacketSize int // max size of datagram in bytes 44 thriftBuffer *thrift.TMemoryBuffer // buffer used to calculate byte size of a span 45} 46 47type udpConn interface { 48 Write([]byte) (int, error) 49 SetWriteBuffer(int) error 50 Close() error 51} 52 53// AgentClientUDPParams allows specifying options for initializing an AgentClientUDP. An instance of this struct should 54// be passed to NewAgentClientUDPWithParams. 55type AgentClientUDPParams struct { 56 HostPort string 57 MaxPacketSize int 58 Logger log.Logger 59 DisableAttemptReconnecting bool 60 AttemptReconnectInterval time.Duration 61} 62 63// NewAgentClientUDPWithParams creates a client that sends spans to Jaeger Agent over UDP. 64func NewAgentClientUDPWithParams(params AgentClientUDPParams) (*AgentClientUDP, error) { 65 // validate hostport 66 if _, _, err := net.SplitHostPort(params.HostPort); err != nil { 67 return nil, err 68 } 69 70 if params.MaxPacketSize == 0 { 71 params.MaxPacketSize = UDPPacketMaxLength 72 } 73 74 if params.Logger == nil { 75 params.Logger = log.StdLogger 76 } 77 78 if !params.DisableAttemptReconnecting && params.AttemptReconnectInterval == 0 { 79 params.AttemptReconnectInterval = time.Second * 30 80 } 81 82 thriftBuffer := thrift.NewTMemoryBufferLen(params.MaxPacketSize) 83 protocolFactory := thrift.NewTCompactProtocolFactory() 84 client := agent.NewAgentClientFactory(thriftBuffer, protocolFactory) 85 86 var connUDP udpConn 87 var err error 88 89 if params.DisableAttemptReconnecting { 90 destAddr, err := net.ResolveUDPAddr("udp", params.HostPort) 91 if err != nil { 92 return nil, err 93 } 94 95 connUDP, err = net.DialUDP(destAddr.Network(), nil, destAddr) 96 if err != nil { 97 return nil, err 98 } 99 } else { 100 // host is hostname, setup resolver loop in case host record changes during operation 101 connUDP, err = newReconnectingUDPConn(params.HostPort, params.AttemptReconnectInterval, net.ResolveUDPAddr, net.DialUDP, params.Logger) 102 if err != nil { 103 return nil, err 104 } 105 } 106 107 if err := connUDP.SetWriteBuffer(params.MaxPacketSize); err != nil { 108 return nil, err 109 } 110 111 return &AgentClientUDP{ 112 connUDP: connUDP, 113 client: client, 114 maxPacketSize: params.MaxPacketSize, 115 thriftBuffer: thriftBuffer, 116 }, nil 117} 118 119// NewAgentClientUDP creates a client that sends spans to Jaeger Agent over UDP. 120func NewAgentClientUDP(hostPort string, maxPacketSize int) (*AgentClientUDP, error) { 121 return NewAgentClientUDPWithParams(AgentClientUDPParams{ 122 HostPort: hostPort, 123 MaxPacketSize: maxPacketSize, 124 }) 125} 126 127// EmitZipkinBatch implements EmitZipkinBatch() of Agent interface 128func (a *AgentClientUDP) EmitZipkinBatch(context.Context, []*zipkincore.Span) error { 129 return errors.New("Not implemented") 130} 131 132// EmitBatch implements EmitBatch() of Agent interface 133func (a *AgentClientUDP) EmitBatch(ctx context.Context, batch *jaeger.Batch) error { 134 a.thriftBuffer.Reset() 135 if err := a.client.EmitBatch(ctx, batch); err != nil { 136 return err 137 } 138 if a.thriftBuffer.Len() > a.maxPacketSize { 139 return fmt.Errorf("data does not fit within one UDP packet; size %d, max %d, spans %d", 140 a.thriftBuffer.Len(), a.maxPacketSize, len(batch.Spans)) 141 } 142 _, err := a.connUDP.Write(a.thriftBuffer.Bytes()) 143 return err 144} 145 146// Close implements Close() of io.Closer and closes the underlying UDP connection. 147func (a *AgentClientUDP) Close() error { 148 return a.connUDP.Close() 149} 150