1// Copyright 2019, OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package statsd
16
17// See https://github.com/b/statsd_spec for the best-available statsd
18// syntax specification.  See also
19// https://github.com/statsd/statsd/edit/master/docs/metric_types.md
20
21import (
22	"bytes"
23	"context"
24	"fmt"
25	"io"
26	"net"
27	"net/url"
28	"strconv"
29
30	"go.opentelemetry.io/otel/api/core"
31	"go.opentelemetry.io/otel/api/unit"
32	export "go.opentelemetry.io/otel/sdk/export/metric"
33	"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
34)
35
36type (
37	// Config supports common configuration that applies to statsd exporters.
38	Config struct {
39		// URL describes the destination for exporting statsd data.
40		// e.g., udp://host:port
41		//       tcp://host:port
42		//       unix:///socket/path
43		URL string
44
45		// Writer is an alternate to providing a URL.  When Writer is
46		// non-nil, URL will be ignored and the exporter will write to
47		// the configured Writer interface.
48		Writer io.Writer
49
50		// MaxPacketSize this limits the packet size for packet-oriented transports.
51		MaxPacketSize int
52
53		// TODO support Dial and Write timeouts
54	}
55
56	// Exporter is common type meant to implement concrete statsd
57	// exporters.
58	Exporter struct {
59		adapter Adapter
60		config  Config
61		conn    net.Conn
62		writer  io.Writer
63		buffer  bytes.Buffer
64	}
65
66	// Adapter supports statsd syntax variations, primarily plain
67	// statsd vs. dogstatsd.
68	Adapter interface {
69		AppendName(export.Record, *bytes.Buffer)
70		AppendTags(export.Record, *bytes.Buffer)
71	}
72)
73
74const (
75	formatCounter   = "c"
76	formatHistogram = "h"
77	formatGauge     = "g"
78	formatTiming    = "ms"
79
80	MaxPacketSize = 1 << 16
81)
82
83var (
84	_ export.Exporter = &Exporter{}
85
86	ErrInvalidScheme = fmt.Errorf("invalid statsd transport")
87)
88
89// NewExport returns a common implementation for exporters that Export
90// statsd syntax.
91func NewExporter(config Config, adapter Adapter) (*Exporter, error) {
92	if config.MaxPacketSize <= 0 {
93		config.MaxPacketSize = MaxPacketSize
94	}
95	var writer io.Writer
96	var conn net.Conn
97	var err error
98	if config.Writer != nil {
99		writer = config.Writer
100	} else {
101		conn, err = dial(config.URL)
102		if conn != nil {
103			writer = conn
104		}
105	}
106	// TODO: If err != nil, we return it _with_ a valid exporter; the
107	// exporter should attempt to re-dial if it's retryable.  Add a
108	// Start() and Stop() API.
109	return &Exporter{
110		adapter: adapter,
111		config:  config,
112		conn:    conn,
113		writer:  writer,
114	}, err
115}
116
117// dial connects to a statsd service using several common network
118// types.  Presently "udp" and "unix" datagram socket connections are
119// supported.
120func dial(endpoint string) (net.Conn, error) {
121	dest, err := url.Parse(endpoint)
122	if err != nil {
123		return nil, err
124	}
125
126	// TODO: Support tcp destination, need configurable timeouts first.
127
128	scheme := dest.Scheme
129	switch scheme {
130	case "udp", "udp4", "udp6":
131		udpAddr, err := net.ResolveUDPAddr(scheme, dest.Host)
132		locAddr := &net.UDPAddr{}
133		if err != nil {
134			return nil, err
135		}
136		conn, err := net.DialUDP(scheme, locAddr, udpAddr)
137		if err != nil {
138			return nil, err
139		}
140		return conn, err
141	case "unix", "unixgram":
142		scheme = "unixgram"
143		locAddr := &net.UnixAddr{}
144
145		sockAddr, err := net.ResolveUnixAddr(scheme, dest.Path)
146		if err != nil {
147			return nil, err
148		}
149		conn, err := net.DialUnix(scheme, locAddr, sockAddr)
150		if err != nil {
151			return nil, err
152		}
153		return conn, err
154	}
155	return nil, ErrInvalidScheme
156}
157
158// Export is common code for any statsd-based metric.Exporter implementation.
159func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
160	buf := &e.buffer
161	buf.Reset()
162
163	var aggErr error
164	var sendErr error
165
166	checkpointSet.ForEach(func(rec export.Record) {
167		before := buf.Len()
168
169		if err := e.formatMetric(rec, buf); err != nil && aggErr == nil {
170			aggErr = err
171			return
172		}
173
174		if buf.Len() < e.config.MaxPacketSize {
175			return
176		}
177		if before == 0 {
178			// A single metric >= packet size
179			if err := e.send(buf.Bytes()); err != nil && sendErr == nil {
180				sendErr = err
181			}
182			buf.Reset()
183			return
184		}
185
186		// Send and copy the leftover
187		if err := e.send(buf.Bytes()[:before]); err != nil && sendErr == nil {
188			sendErr = err
189		}
190
191		leftover := buf.Len() - before
192
193		copy(buf.Bytes()[0:leftover], buf.Bytes()[before:])
194
195		buf.Truncate(leftover)
196	})
197	if err := e.send(buf.Bytes()); err != nil && sendErr == nil {
198		sendErr = err
199	}
200	if sendErr != nil {
201		return sendErr
202	}
203	return aggErr
204}
205
206// send writes a complete buffer to the writer as a blocking call.
207func (e *Exporter) send(buf []byte) error {
208	for len(buf) != 0 {
209		n, err := e.writer.Write(buf)
210		if err != nil {
211			return err
212		}
213		buf = buf[n:]
214	}
215	return nil
216}
217
218// formatMetric formats an individual export record.  For some records
219// this will emit a single statistic, for some it will emit more than
220// one.
221func (e *Exporter) formatMetric(rec export.Record, buf *bytes.Buffer) error {
222	desc := rec.Descriptor()
223	agg := rec.Aggregator()
224
225	// TODO handle non-Points Distribution/MaxSumCount by
226	// formatting individual quantiles, the sum, and the count as
227	// single statistics.  For the dogstatsd variation, assuming
228	// open-source systems like Veneur add support, figure out the
229	// proper encoding for "d"-type distribution data.
230
231	if pts, ok := agg.(aggregator.Points); ok {
232		var format string
233		if desc.Unit() == unit.Milliseconds {
234			format = formatTiming
235		} else {
236			format = formatHistogram
237		}
238		points, err := pts.Points()
239		if err != nil {
240			return err
241		}
242		for _, pt := range points {
243			e.formatSingleStat(rec, pt, format, buf)
244		}
245
246	} else if sum, ok := agg.(aggregator.Sum); ok {
247		sum, err := sum.Sum()
248		if err != nil {
249			return err
250		}
251		e.formatSingleStat(rec, sum, formatCounter, buf)
252
253	} else if lv, ok := agg.(aggregator.LastValue); ok {
254		lv, _, err := lv.LastValue()
255		if err != nil {
256			return err
257		}
258		e.formatSingleStat(rec, lv, formatGauge, buf)
259	}
260	return nil
261}
262
263// formatSingleStat encodes a single item of statsd data followed by a
264// newline.
265func (e *Exporter) formatSingleStat(rec export.Record, val core.Number, fmtStr string, buf *bytes.Buffer) {
266	e.adapter.AppendName(rec, buf)
267	_, _ = buf.WriteRune(':')
268	writeNumber(buf, val, rec.Descriptor().NumberKind())
269	_, _ = buf.WriteRune('|')
270	_, _ = buf.WriteString(fmtStr)
271	e.adapter.AppendTags(rec, buf)
272	_, _ = buf.WriteRune('\n')
273}
274
275func writeNumber(buf *bytes.Buffer, num core.Number, kind core.NumberKind) {
276	var tmp [128]byte
277	var conv []byte
278	switch kind {
279	case core.Int64NumberKind:
280		conv = strconv.AppendInt(tmp[:0], num.AsInt64(), 10)
281	case core.Float64NumberKind:
282		conv = strconv.AppendFloat(tmp[:0], num.AsFloat64(), 'g', -1, 64)
283	case core.Uint64NumberKind:
284		conv = strconv.AppendUint(tmp[:0], num.AsUint64(), 10)
285
286	}
287	_, _ = buf.Write(conv)
288}
289