1// Copyright 2019, OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package statsd 16 17// See https://github.com/b/statsd_spec for the best-available statsd 18// syntax specification. See also 19// https://github.com/statsd/statsd/edit/master/docs/metric_types.md 20 21import ( 22 "bytes" 23 "context" 24 "fmt" 25 "io" 26 "net" 27 "net/url" 28 "strconv" 29 30 "go.opentelemetry.io/otel/api/core" 31 "go.opentelemetry.io/otel/api/unit" 32 export "go.opentelemetry.io/otel/sdk/export/metric" 33 "go.opentelemetry.io/otel/sdk/export/metric/aggregator" 34) 35 36type ( 37 // Config supports common configuration that applies to statsd exporters. 38 Config struct { 39 // URL describes the destination for exporting statsd data. 40 // e.g., udp://host:port 41 // tcp://host:port 42 // unix:///socket/path 43 URL string 44 45 // Writer is an alternate to providing a URL. When Writer is 46 // non-nil, URL will be ignored and the exporter will write to 47 // the configured Writer interface. 48 Writer io.Writer 49 50 // MaxPacketSize this limits the packet size for packet-oriented transports. 51 MaxPacketSize int 52 53 // TODO support Dial and Write timeouts 54 } 55 56 // Exporter is common type meant to implement concrete statsd 57 // exporters. 58 Exporter struct { 59 adapter Adapter 60 config Config 61 conn net.Conn 62 writer io.Writer 63 buffer bytes.Buffer 64 } 65 66 // Adapter supports statsd syntax variations, primarily plain 67 // statsd vs. dogstatsd. 68 Adapter interface { 69 AppendName(export.Record, *bytes.Buffer) 70 AppendTags(export.Record, *bytes.Buffer) 71 } 72) 73 74const ( 75 formatCounter = "c" 76 formatHistogram = "h" 77 formatGauge = "g" 78 formatTiming = "ms" 79 80 MaxPacketSize = 1 << 16 81) 82 83var ( 84 _ export.Exporter = &Exporter{} 85 86 ErrInvalidScheme = fmt.Errorf("invalid statsd transport") 87) 88 89// NewExport returns a common implementation for exporters that Export 90// statsd syntax. 91func NewExporter(config Config, adapter Adapter) (*Exporter, error) { 92 if config.MaxPacketSize <= 0 { 93 config.MaxPacketSize = MaxPacketSize 94 } 95 var writer io.Writer 96 var conn net.Conn 97 var err error 98 if config.Writer != nil { 99 writer = config.Writer 100 } else { 101 conn, err = dial(config.URL) 102 if conn != nil { 103 writer = conn 104 } 105 } 106 // TODO: If err != nil, we return it _with_ a valid exporter; the 107 // exporter should attempt to re-dial if it's retryable. Add a 108 // Start() and Stop() API. 109 return &Exporter{ 110 adapter: adapter, 111 config: config, 112 conn: conn, 113 writer: writer, 114 }, err 115} 116 117// dial connects to a statsd service using several common network 118// types. Presently "udp" and "unix" datagram socket connections are 119// supported. 120func dial(endpoint string) (net.Conn, error) { 121 dest, err := url.Parse(endpoint) 122 if err != nil { 123 return nil, err 124 } 125 126 // TODO: Support tcp destination, need configurable timeouts first. 127 128 scheme := dest.Scheme 129 switch scheme { 130 case "udp", "udp4", "udp6": 131 udpAddr, err := net.ResolveUDPAddr(scheme, dest.Host) 132 locAddr := &net.UDPAddr{} 133 if err != nil { 134 return nil, err 135 } 136 conn, err := net.DialUDP(scheme, locAddr, udpAddr) 137 if err != nil { 138 return nil, err 139 } 140 return conn, err 141 case "unix", "unixgram": 142 scheme = "unixgram" 143 locAddr := &net.UnixAddr{} 144 145 sockAddr, err := net.ResolveUnixAddr(scheme, dest.Path) 146 if err != nil { 147 return nil, err 148 } 149 conn, err := net.DialUnix(scheme, locAddr, sockAddr) 150 if err != nil { 151 return nil, err 152 } 153 return conn, err 154 } 155 return nil, ErrInvalidScheme 156} 157 158// Export is common code for any statsd-based metric.Exporter implementation. 159func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error { 160 buf := &e.buffer 161 buf.Reset() 162 163 var aggErr error 164 var sendErr error 165 166 checkpointSet.ForEach(func(rec export.Record) { 167 before := buf.Len() 168 169 if err := e.formatMetric(rec, buf); err != nil && aggErr == nil { 170 aggErr = err 171 return 172 } 173 174 if buf.Len() < e.config.MaxPacketSize { 175 return 176 } 177 if before == 0 { 178 // A single metric >= packet size 179 if err := e.send(buf.Bytes()); err != nil && sendErr == nil { 180 sendErr = err 181 } 182 buf.Reset() 183 return 184 } 185 186 // Send and copy the leftover 187 if err := e.send(buf.Bytes()[:before]); err != nil && sendErr == nil { 188 sendErr = err 189 } 190 191 leftover := buf.Len() - before 192 193 copy(buf.Bytes()[0:leftover], buf.Bytes()[before:]) 194 195 buf.Truncate(leftover) 196 }) 197 if err := e.send(buf.Bytes()); err != nil && sendErr == nil { 198 sendErr = err 199 } 200 if sendErr != nil { 201 return sendErr 202 } 203 return aggErr 204} 205 206// send writes a complete buffer to the writer as a blocking call. 207func (e *Exporter) send(buf []byte) error { 208 for len(buf) != 0 { 209 n, err := e.writer.Write(buf) 210 if err != nil { 211 return err 212 } 213 buf = buf[n:] 214 } 215 return nil 216} 217 218// formatMetric formats an individual export record. For some records 219// this will emit a single statistic, for some it will emit more than 220// one. 221func (e *Exporter) formatMetric(rec export.Record, buf *bytes.Buffer) error { 222 desc := rec.Descriptor() 223 agg := rec.Aggregator() 224 225 // TODO handle non-Points Distribution/MaxSumCount by 226 // formatting individual quantiles, the sum, and the count as 227 // single statistics. For the dogstatsd variation, assuming 228 // open-source systems like Veneur add support, figure out the 229 // proper encoding for "d"-type distribution data. 230 231 if pts, ok := agg.(aggregator.Points); ok { 232 var format string 233 if desc.Unit() == unit.Milliseconds { 234 format = formatTiming 235 } else { 236 format = formatHistogram 237 } 238 points, err := pts.Points() 239 if err != nil { 240 return err 241 } 242 for _, pt := range points { 243 e.formatSingleStat(rec, pt, format, buf) 244 } 245 246 } else if sum, ok := agg.(aggregator.Sum); ok { 247 sum, err := sum.Sum() 248 if err != nil { 249 return err 250 } 251 e.formatSingleStat(rec, sum, formatCounter, buf) 252 253 } else if lv, ok := agg.(aggregator.LastValue); ok { 254 lv, _, err := lv.LastValue() 255 if err != nil { 256 return err 257 } 258 e.formatSingleStat(rec, lv, formatGauge, buf) 259 } 260 return nil 261} 262 263// formatSingleStat encodes a single item of statsd data followed by a 264// newline. 265func (e *Exporter) formatSingleStat(rec export.Record, val core.Number, fmtStr string, buf *bytes.Buffer) { 266 e.adapter.AppendName(rec, buf) 267 _, _ = buf.WriteRune(':') 268 writeNumber(buf, val, rec.Descriptor().NumberKind()) 269 _, _ = buf.WriteRune('|') 270 _, _ = buf.WriteString(fmtStr) 271 e.adapter.AppendTags(rec, buf) 272 _, _ = buf.WriteRune('\n') 273} 274 275func writeNumber(buf *bytes.Buffer, num core.Number, kind core.NumberKind) { 276 var tmp [128]byte 277 var conv []byte 278 switch kind { 279 case core.Int64NumberKind: 280 conv = strconv.AppendInt(tmp[:0], num.AsInt64(), 10) 281 case core.Float64NumberKind: 282 conv = strconv.AppendFloat(tmp[:0], num.AsFloat64(), 'g', -1, 64) 283 case core.Uint64NumberKind: 284 conv = strconv.AppendUint(tmp[:0], num.AsUint64(), 10) 285 286 } 287 _, _ = buf.Write(conv) 288} 289