1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package jaeger // import "go.opentelemetry.io/otel/exporters/trace/jaeger" 16 17import ( 18 "bytes" 19 "errors" 20 "fmt" 21 "io" 22 "io/ioutil" 23 "log" 24 "net/http" 25 "time" 26 27 "github.com/apache/thrift/lib/go/thrift" 28 29 gen "go.opentelemetry.io/otel/exporters/trace/jaeger/internal/gen-go/jaeger" 30) 31 32// batchUploader send a batch of spans to Jaeger 33type batchUploader interface { 34 upload(batch *gen.Batch) error 35} 36 37type EndpointOption func() (batchUploader, error) 38 39// WithAgentEndpoint instructs exporter to send spans to jaeger-agent at this address. 40// For example, localhost:6831. 41func WithAgentEndpoint(agentEndpoint string, options ...AgentEndpointOption) EndpointOption { 42 return func() (batchUploader, error) { 43 if agentEndpoint == "" { 44 return nil, errors.New("agentEndpoint must not be empty") 45 } 46 47 o := &AgentEndpointOptions{ 48 agentClientUDPParams{ 49 HostPort: agentEndpoint, 50 AttemptReconnecting: true, 51 }, 52 } 53 54 for _, opt := range options { 55 opt(o) 56 } 57 58 client, err := newAgentClientUDP(o.agentClientUDPParams) 59 if err != nil { 60 return nil, err 61 } 62 63 return &agentUploader{client: client}, nil 64 } 65} 66 67type AgentEndpointOption func(o *AgentEndpointOptions) 68 69type AgentEndpointOptions struct { 70 agentClientUDPParams 71} 72 73// WithLogger sets a logger to be used by agent client. 74func WithLogger(logger *log.Logger) AgentEndpointOption { 75 return func(o *AgentEndpointOptions) { 76 o.Logger = logger 77 } 78} 79 80// WithDisableAttemptReconnecting sets option to disable reconnecting udp client. 81func WithDisableAttemptReconnecting() AgentEndpointOption { 82 return func(o *AgentEndpointOptions) { 83 o.AttemptReconnecting = false 84 } 85} 86 87// WithAttemptReconnectingInterval sets the interval between attempts to re resolve agent endpoint. 88func WithAttemptReconnectingInterval(interval time.Duration) AgentEndpointOption { 89 return func(o *AgentEndpointOptions) { 90 o.AttemptReconnectInterval = interval 91 } 92} 93 94// WithCollectorEndpoint defines the full url to the Jaeger HTTP Thrift collector. 95// For example, http://localhost:14268/api/traces 96func WithCollectorEndpoint(collectorEndpoint string, options ...CollectorEndpointOption) EndpointOption { 97 return func() (batchUploader, error) { 98 // Overwrite collector endpoint if environment variables are available. 99 if e := CollectorEndpointFromEnv(); e != "" { 100 collectorEndpoint = e 101 } 102 103 if collectorEndpoint == "" { 104 return nil, errors.New("collectorEndpoint must not be empty") 105 } 106 107 o := &CollectorEndpointOptions{ 108 httpClient: http.DefaultClient, 109 } 110 111 options = append(options, WithCollectorEndpointOptionFromEnv()) 112 for _, opt := range options { 113 opt(o) 114 } 115 116 return &collectorUploader{ 117 endpoint: collectorEndpoint, 118 username: o.username, 119 password: o.password, 120 httpClient: o.httpClient, 121 }, nil 122 } 123} 124 125type CollectorEndpointOption func(o *CollectorEndpointOptions) 126 127type CollectorEndpointOptions struct { 128 // username to be used if basic auth is required. 129 username string 130 131 // password to be used if basic auth is required. 132 password string 133 134 // httpClient to be used to make requests to the collector endpoint. 135 httpClient *http.Client 136} 137 138// WithUsername sets the username to be used if basic auth is required. 139func WithUsername(username string) CollectorEndpointOption { 140 return func(o *CollectorEndpointOptions) { 141 o.username = username 142 } 143} 144 145// WithPassword sets the password to be used if basic auth is required. 146func WithPassword(password string) CollectorEndpointOption { 147 return func(o *CollectorEndpointOptions) { 148 o.password = password 149 } 150} 151 152// WithHTTPClient sets the http client to be used to make request to the collector endpoint. 153func WithHTTPClient(client *http.Client) CollectorEndpointOption { 154 return func(o *CollectorEndpointOptions) { 155 o.httpClient = client 156 } 157} 158 159// agentUploader implements batchUploader interface sending batches to 160// Jaeger through the UDP agent. 161type agentUploader struct { 162 client *agentClientUDP 163} 164 165var _ batchUploader = (*agentUploader)(nil) 166 167func (a *agentUploader) upload(batch *gen.Batch) error { 168 return a.client.EmitBatch(batch) 169} 170 171// collectorUploader implements batchUploader interface sending batches to 172// Jaeger through the collector http endpoint. 173type collectorUploader struct { 174 endpoint string 175 username string 176 password string 177 httpClient *http.Client 178} 179 180var _ batchUploader = (*collectorUploader)(nil) 181 182func (c *collectorUploader) upload(batch *gen.Batch) error { 183 body, err := serialize(batch) 184 if err != nil { 185 return err 186 } 187 req, err := http.NewRequest("POST", c.endpoint, body) 188 if err != nil { 189 return err 190 } 191 if c.username != "" && c.password != "" { 192 req.SetBasicAuth(c.username, c.password) 193 } 194 req.Header.Set("Content-Type", "application/x-thrift") 195 196 resp, err := c.httpClient.Do(req) 197 if err != nil { 198 return err 199 } 200 201 _, _ = io.Copy(ioutil.Discard, resp.Body) 202 resp.Body.Close() 203 204 if resp.StatusCode < 200 || resp.StatusCode >= 300 { 205 return fmt.Errorf("failed to upload traces; HTTP status code: %d", resp.StatusCode) 206 } 207 return nil 208} 209 210func serialize(obj thrift.TStruct) (*bytes.Buffer, error) { 211 buf := thrift.NewTMemoryBuffer() 212 if err := obj.Write(thrift.NewTBinaryProtocolTransport(buf)); err != nil { 213 return nil, err 214 } 215 return buf.Buffer, nil 216} 217