1// Copyright (c) 2017 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package utils
16
17import (
18	"context"
19	"errors"
20	"fmt"
21	"io"
22	"net"
23	"time"
24
25	"github.com/uber/jaeger-client-go/log"
26	"github.com/uber/jaeger-client-go/thrift"
27
28	"github.com/uber/jaeger-client-go/thrift-gen/agent"
29	"github.com/uber/jaeger-client-go/thrift-gen/jaeger"
30	"github.com/uber/jaeger-client-go/thrift-gen/zipkincore"
31)
32
33// UDPPacketMaxLength is the max size of UDP packet we want to send, synced with jaeger-agent
34const UDPPacketMaxLength = 65000
35
36// AgentClientUDP is a UDP client to Jaeger agent that implements agent.Agent interface.
37type AgentClientUDP struct {
38	agent.Agent
39	io.Closer
40
41	connUDP       udpConn
42	client        *agent.AgentClient
43	maxPacketSize int                   // max size of datagram in bytes
44	thriftBuffer  *thrift.TMemoryBuffer // buffer used to calculate byte size of a span
45}
46
47type udpConn interface {
48	Write([]byte) (int, error)
49	SetWriteBuffer(int) error
50	Close() error
51}
52
53// AgentClientUDPParams allows specifying options for initializing an AgentClientUDP. An instance of this struct should
54// be passed to NewAgentClientUDPWithParams.
55type AgentClientUDPParams struct {
56	HostPort                   string
57	MaxPacketSize              int
58	Logger                     log.Logger
59	DisableAttemptReconnecting bool
60	AttemptReconnectInterval   time.Duration
61}
62
63// NewAgentClientUDPWithParams creates a client that sends spans to Jaeger Agent over UDP.
64func NewAgentClientUDPWithParams(params AgentClientUDPParams) (*AgentClientUDP, error) {
65	// validate hostport
66	if _, _, err := net.SplitHostPort(params.HostPort); err != nil {
67		return nil, err
68	}
69
70	if params.MaxPacketSize == 0 {
71		params.MaxPacketSize = UDPPacketMaxLength
72	}
73
74	if params.Logger == nil {
75		params.Logger = log.StdLogger
76	}
77
78	if !params.DisableAttemptReconnecting && params.AttemptReconnectInterval == 0 {
79		params.AttemptReconnectInterval = time.Second * 30
80	}
81
82	thriftBuffer := thrift.NewTMemoryBufferLen(params.MaxPacketSize)
83	protocolFactory := thrift.NewTCompactProtocolFactory()
84	client := agent.NewAgentClientFactory(thriftBuffer, protocolFactory)
85
86	var connUDP udpConn
87	var err error
88
89	if params.DisableAttemptReconnecting {
90		destAddr, err := net.ResolveUDPAddr("udp", params.HostPort)
91		if err != nil {
92			return nil, err
93		}
94
95		connUDP, err = net.DialUDP(destAddr.Network(), nil, destAddr)
96		if err != nil {
97			return nil, err
98		}
99	} else {
100		// host is hostname, setup resolver loop in case host record changes during operation
101		connUDP, err = newReconnectingUDPConn(params.HostPort, params.AttemptReconnectInterval, net.ResolveUDPAddr, net.DialUDP, params.Logger)
102		if err != nil {
103			return nil, err
104		}
105	}
106
107	if err := connUDP.SetWriteBuffer(params.MaxPacketSize); err != nil {
108		return nil, err
109	}
110
111	return &AgentClientUDP{
112		connUDP:       connUDP,
113		client:        client,
114		maxPacketSize: params.MaxPacketSize,
115		thriftBuffer:  thriftBuffer,
116	}, nil
117}
118
119// NewAgentClientUDP creates a client that sends spans to Jaeger Agent over UDP.
120func NewAgentClientUDP(hostPort string, maxPacketSize int) (*AgentClientUDP, error) {
121	return NewAgentClientUDPWithParams(AgentClientUDPParams{
122		HostPort:      hostPort,
123		MaxPacketSize: maxPacketSize,
124	})
125}
126
127// EmitZipkinBatch implements EmitZipkinBatch() of Agent interface
128func (a *AgentClientUDP) EmitZipkinBatch(context.Context, []*zipkincore.Span) error {
129	return errors.New("Not implemented")
130}
131
132// EmitBatch implements EmitBatch() of Agent interface
133func (a *AgentClientUDP) EmitBatch(ctx context.Context, batch *jaeger.Batch) error {
134	a.thriftBuffer.Reset()
135	if err := a.client.EmitBatch(ctx, batch); err != nil {
136		return err
137	}
138	if a.thriftBuffer.Len() > a.maxPacketSize {
139		return fmt.Errorf("data does not fit within one UDP packet; size %d, max %d, spans %d",
140			a.thriftBuffer.Len(), a.maxPacketSize, len(batch.Spans))
141	}
142	_, err := a.connUDP.Write(a.thriftBuffer.Bytes())
143	return err
144}
145
146// Close implements Close() of io.Closer and closes the underlying UDP connection.
147func (a *AgentClientUDP) Close() error {
148	return a.connUDP.Close()
149}
150