1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package jaeger // import "go.opentelemetry.io/otel/exporters/trace/jaeger"
16
17import (
18	"bytes"
19	"errors"
20	"fmt"
21	"io"
22	"io/ioutil"
23	"log"
24	"net/http"
25	"time"
26
27	"github.com/apache/thrift/lib/go/thrift"
28
29	gen "go.opentelemetry.io/otel/exporters/trace/jaeger/internal/gen-go/jaeger"
30)
31
32// batchUploader send a batch of spans to Jaeger
33type batchUploader interface {
34	upload(batch *gen.Batch) error
35}
36
37type EndpointOption func() (batchUploader, error)
38
39// WithAgentEndpoint instructs exporter to send spans to jaeger-agent at this address.
40// For example, localhost:6831.
41func WithAgentEndpoint(agentEndpoint string, options ...AgentEndpointOption) EndpointOption {
42	return func() (batchUploader, error) {
43		if agentEndpoint == "" {
44			return nil, errors.New("agentEndpoint must not be empty")
45		}
46
47		o := &AgentEndpointOptions{
48			agentClientUDPParams{
49				HostPort:            agentEndpoint,
50				AttemptReconnecting: true,
51			},
52		}
53
54		for _, opt := range options {
55			opt(o)
56		}
57
58		client, err := newAgentClientUDP(o.agentClientUDPParams)
59		if err != nil {
60			return nil, err
61		}
62
63		return &agentUploader{client: client}, nil
64	}
65}
66
67type AgentEndpointOption func(o *AgentEndpointOptions)
68
69type AgentEndpointOptions struct {
70	agentClientUDPParams
71}
72
73// WithLogger sets a logger to be used by agent client.
74func WithLogger(logger *log.Logger) AgentEndpointOption {
75	return func(o *AgentEndpointOptions) {
76		o.Logger = logger
77	}
78}
79
80// WithDisableAttemptReconnecting sets option to disable reconnecting udp client.
81func WithDisableAttemptReconnecting() AgentEndpointOption {
82	return func(o *AgentEndpointOptions) {
83		o.AttemptReconnecting = false
84	}
85}
86
87// WithAttemptReconnectingInterval sets the interval between attempts to re resolve agent endpoint.
88func WithAttemptReconnectingInterval(interval time.Duration) AgentEndpointOption {
89	return func(o *AgentEndpointOptions) {
90		o.AttemptReconnectInterval = interval
91	}
92}
93
94// WithCollectorEndpoint defines the full url to the Jaeger HTTP Thrift collector.
95// For example, http://localhost:14268/api/traces
96func WithCollectorEndpoint(collectorEndpoint string, options ...CollectorEndpointOption) EndpointOption {
97	return func() (batchUploader, error) {
98		// Overwrite collector endpoint if environment variables are available.
99		if e := CollectorEndpointFromEnv(); e != "" {
100			collectorEndpoint = e
101		}
102
103		if collectorEndpoint == "" {
104			return nil, errors.New("collectorEndpoint must not be empty")
105		}
106
107		o := &CollectorEndpointOptions{
108			httpClient: http.DefaultClient,
109		}
110
111		options = append(options, WithCollectorEndpointOptionFromEnv())
112		for _, opt := range options {
113			opt(o)
114		}
115
116		return &collectorUploader{
117			endpoint:   collectorEndpoint,
118			username:   o.username,
119			password:   o.password,
120			httpClient: o.httpClient,
121		}, nil
122	}
123}
124
125type CollectorEndpointOption func(o *CollectorEndpointOptions)
126
127type CollectorEndpointOptions struct {
128	// username to be used if basic auth is required.
129	username string
130
131	// password to be used if basic auth is required.
132	password string
133
134	// httpClient to be used to make requests to the collector endpoint.
135	httpClient *http.Client
136}
137
138// WithUsername sets the username to be used if basic auth is required.
139func WithUsername(username string) CollectorEndpointOption {
140	return func(o *CollectorEndpointOptions) {
141		o.username = username
142	}
143}
144
145// WithPassword sets the password to be used if basic auth is required.
146func WithPassword(password string) CollectorEndpointOption {
147	return func(o *CollectorEndpointOptions) {
148		o.password = password
149	}
150}
151
152// WithHTTPClient sets the http client to be used to make request to the collector endpoint.
153func WithHTTPClient(client *http.Client) CollectorEndpointOption {
154	return func(o *CollectorEndpointOptions) {
155		o.httpClient = client
156	}
157}
158
159// agentUploader implements batchUploader interface sending batches to
160// Jaeger through the UDP agent.
161type agentUploader struct {
162	client *agentClientUDP
163}
164
165var _ batchUploader = (*agentUploader)(nil)
166
167func (a *agentUploader) upload(batch *gen.Batch) error {
168	return a.client.EmitBatch(batch)
169}
170
171// collectorUploader implements batchUploader interface sending batches to
172// Jaeger through the collector http endpoint.
173type collectorUploader struct {
174	endpoint   string
175	username   string
176	password   string
177	httpClient *http.Client
178}
179
180var _ batchUploader = (*collectorUploader)(nil)
181
182func (c *collectorUploader) upload(batch *gen.Batch) error {
183	body, err := serialize(batch)
184	if err != nil {
185		return err
186	}
187	req, err := http.NewRequest("POST", c.endpoint, body)
188	if err != nil {
189		return err
190	}
191	if c.username != "" && c.password != "" {
192		req.SetBasicAuth(c.username, c.password)
193	}
194	req.Header.Set("Content-Type", "application/x-thrift")
195
196	resp, err := c.httpClient.Do(req)
197	if err != nil {
198		return err
199	}
200
201	_, _ = io.Copy(ioutil.Discard, resp.Body)
202	resp.Body.Close()
203
204	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
205		return fmt.Errorf("failed to upload traces; HTTP status code: %d", resp.StatusCode)
206	}
207	return nil
208}
209
210func serialize(obj thrift.TStruct) (*bytes.Buffer, error) {
211	buf := thrift.NewTMemoryBuffer()
212	if err := obj.Write(thrift.NewTBinaryProtocolTransport(buf)); err != nil {
213		return nil, err
214	}
215	return buf.Buffer, nil
216}
217