1 use crate::event::metric::{Metric, MetricKind, MetricValue};
2 use lazy_static::lazy_static;
3 use regex::Regex;
4 use std::{
5 collections::BTreeMap,
6 error, fmt,
7 num::{ParseFloatError, ParseIntError},
8 };
9
10 lazy_static! {
11 static ref WHITESPACE: Regex = Regex::new(r"\s+").unwrap();
12 static ref NONALPHANUM: Regex = Regex::new(r"[^a-zA-Z_\-0-9\.]").unwrap();
13 }
14
parse(packet: &str) -> Result<Metric, ParseError>15 pub fn parse(packet: &str) -> Result<Metric, ParseError> {
16 // https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/#datagram-format
17 let key_and_body = packet.splitn(2, ':').collect::<Vec<_>>();
18 if key_and_body.len() != 2 {
19 return Err(ParseError::Malformed(
20 "should be key and body with ':' separator",
21 ));
22 }
23 let (key, body) = (key_and_body[0], key_and_body[1]);
24
25 let parts = body.split('|').collect::<Vec<_>>();
26 if parts.len() < 2 {
27 return Err(ParseError::Malformed(
28 "body should have at least two pipe separated components",
29 ));
30 }
31
32 let name = sanitize_key(key);
33 let metric_type = parts[1];
34
35 // sampling part is optional and comes after metric type part
36 let sampling = parts.get(2).filter(|s| s.starts_with('@'));
37 let sample_rate = if let Some(s) = sampling {
38 1.0 / sanitize_sampling(parse_sampling(s)?)
39 } else {
40 1.0
41 };
42
43 // tags are optional and could be found either after sampling of after metric type part
44 let tags = if sampling.is_none() {
45 parts.get(2)
46 } else {
47 parts.get(3)
48 };
49 let tags = tags.filter(|s| s.starts_with('#'));
50 let tags = if let Some(t) = tags {
51 Some(parse_tags(t)?)
52 } else {
53 None
54 };
55
56 let metric = match metric_type {
57 "c" => {
58 let val: f64 = parts[0].parse()?;
59 Metric {
60 name,
61 timestamp: None,
62 tags,
63 kind: MetricKind::Incremental,
64 value: MetricValue::Counter {
65 value: val * sample_rate,
66 },
67 }
68 }
69 unit @ "h" | unit @ "ms" => {
70 let val: f64 = parts[0].parse()?;
71 Metric {
72 name,
73 timestamp: None,
74 tags,
75 kind: MetricKind::Incremental,
76 value: MetricValue::Distribution {
77 values: vec![convert_to_base_units(unit, val)],
78 sample_rates: vec![sample_rate as u32],
79 },
80 }
81 }
82 "g" => {
83 let value = if parts[0]
84 .chars()
85 .next()
86 .map(|c| c.is_ascii_digit())
87 .ok_or_else(|| ParseError::Malformed("empty first body component"))?
88 {
89 parts[0].parse()?
90 } else {
91 parts[0][1..].parse()?
92 };
93
94 match parse_direction(parts[0])? {
95 None => Metric {
96 name,
97 timestamp: None,
98 tags,
99 kind: MetricKind::Absolute,
100 value: MetricValue::Gauge { value },
101 },
102 Some(sign) => Metric {
103 name,
104 timestamp: None,
105 tags,
106 kind: MetricKind::Incremental,
107 value: MetricValue::Gauge {
108 value: value * sign,
109 },
110 },
111 }
112 }
113 "s" => Metric {
114 name,
115 timestamp: None,
116 tags,
117 kind: MetricKind::Incremental,
118 value: MetricValue::Set {
119 values: vec![parts[0].into()].into_iter().collect(),
120 },
121 },
122 other => return Err(ParseError::UnknownMetricType(other.into())),
123 };
124 Ok(metric)
125 }
126
parse_sampling(input: &str) -> Result<f64, ParseError>127 fn parse_sampling(input: &str) -> Result<f64, ParseError> {
128 if !input.starts_with('@') || input.len() < 2 {
129 return Err(ParseError::Malformed(
130 "expected non empty '@'-prefixed sampling component",
131 ));
132 }
133
134 let num: f64 = input[1..].parse()?;
135 if num.is_sign_positive() {
136 Ok(num)
137 } else {
138 Err(ParseError::Malformed("sample rate can't be negative"))
139 }
140 }
141
parse_tags(input: &str) -> Result<BTreeMap<String, String>, ParseError>142 fn parse_tags(input: &str) -> Result<BTreeMap<String, String>, ParseError> {
143 if !input.starts_with('#') || input.len() < 2 {
144 return Err(ParseError::Malformed(
145 "expected non empty '#'-prefixed tags component",
146 ));
147 }
148
149 let mut result = BTreeMap::new();
150
151 let chunks = input[1..].split(',').collect::<Vec<_>>();
152 for chunk in chunks {
153 let pair: Vec<_> = chunk.split(':').collect();
154 let key = &pair[0];
155 // same as in telegraf plugin:
156 // if tag value is not provided, use "true"
157 // https://github.com/influxdata/telegraf/blob/master/plugins/inputs/statsd/datadog.go#L152
158 let value = pair.get(1).unwrap_or(&"true");
159 result.insert((*key).to_owned(), (*value).to_owned());
160 }
161
162 Ok(result)
163 }
164
parse_direction(input: &str) -> Result<Option<f64>, ParseError>165 fn parse_direction(input: &str) -> Result<Option<f64>, ParseError> {
166 match input
167 .chars()
168 .next()
169 .ok_or_else(|| ParseError::Malformed("empty body component"))?
170 {
171 '+' => Ok(Some(1.0)),
172 '-' => Ok(Some(-1.0)),
173 c if c.is_ascii_digit() => Ok(None),
174 _other => Err(ParseError::Malformed("invalid gauge value prefix")),
175 }
176 }
177
sanitize_key(key: &str) -> String178 fn sanitize_key(key: &str) -> String {
179 let s = key.replace("/", "-");
180 let s = WHITESPACE.replace_all(&s, "_");
181 let s = NONALPHANUM.replace_all(&s, "");
182 s.into()
183 }
184
sanitize_sampling(sampling: f64) -> f64185 fn sanitize_sampling(sampling: f64) -> f64 {
186 if sampling == 0.0 {
187 1.0
188 } else {
189 sampling
190 }
191 }
192
convert_to_base_units(unit: &str, val: f64) -> f64193 fn convert_to_base_units(unit: &str, val: f64) -> f64 {
194 match unit {
195 "ms" => val / 1000.0,
196 _ => val,
197 }
198 }
199
200 #[derive(Debug, PartialEq)]
201 pub enum ParseError {
202 Malformed(&'static str),
203 UnknownMetricType(String),
204 InvalidInteger(ParseIntError),
205 InvalidFloat(ParseFloatError),
206 }
207
208 impl fmt::Display for ParseError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result209 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
210 write!(f, "Statsd parse error: {:?}", self)
211 }
212 }
213
214 impl error::Error for ParseError {}
215
216 impl From<ParseIntError> for ParseError {
from(e: ParseIntError) -> ParseError217 fn from(e: ParseIntError) -> ParseError {
218 ParseError::InvalidInteger(e)
219 }
220 }
221
222 impl From<ParseFloatError> for ParseError {
from(e: ParseFloatError) -> ParseError223 fn from(e: ParseFloatError) -> ParseError {
224 ParseError::InvalidFloat(e)
225 }
226 }
227
228 #[cfg(test)]
229 mod test {
230 use super::{parse, sanitize_key, sanitize_sampling};
231 use crate::event::metric::{Metric, MetricKind, MetricValue};
232
233 #[test]
basic_counter()234 fn basic_counter() {
235 assert_eq!(
236 parse("foo:1|c"),
237 Ok(Metric {
238 name: "foo".into(),
239 timestamp: None,
240 tags: None,
241 kind: MetricKind::Incremental,
242 value: MetricValue::Counter { value: 1.0 },
243 }),
244 );
245 }
246
247 #[test]
tagged_counter()248 fn tagged_counter() {
249 assert_eq!(
250 parse("foo:1|c|#tag1,tag2:value"),
251 Ok(Metric {
252 name: "foo".into(),
253 timestamp: None,
254 tags: Some(
255 vec![
256 ("tag1".to_owned(), "true".to_owned()),
257 ("tag2".to_owned(), "value".to_owned()),
258 ]
259 .into_iter()
260 .collect(),
261 ),
262 kind: MetricKind::Incremental,
263 value: MetricValue::Counter { value: 1.0 },
264 }),
265 );
266 }
267
268 #[test]
sampled_counter()269 fn sampled_counter() {
270 assert_eq!(
271 parse("bar:2|c|@0.1"),
272 Ok(Metric {
273 name: "bar".into(),
274 timestamp: None,
275 tags: None,
276 kind: MetricKind::Incremental,
277 value: MetricValue::Counter { value: 20.0 },
278 }),
279 );
280 }
281
282 #[test]
zero_sampled_counter()283 fn zero_sampled_counter() {
284 assert_eq!(
285 parse("bar:2|c|@0"),
286 Ok(Metric {
287 name: "bar".into(),
288 timestamp: None,
289 tags: None,
290 kind: MetricKind::Incremental,
291 value: MetricValue::Counter { value: 2.0 },
292 }),
293 );
294 }
295
296 #[test]
sampled_timer()297 fn sampled_timer() {
298 assert_eq!(
299 parse("glork:320|ms|@0.1"),
300 Ok(Metric {
301 name: "glork".into(),
302 timestamp: None,
303 tags: None,
304 kind: MetricKind::Incremental,
305 value: MetricValue::Distribution {
306 values: vec![0.320],
307 sample_rates: vec![10],
308 },
309 }),
310 );
311 }
312
313 #[test]
sampled_tagged_histogram()314 fn sampled_tagged_histogram() {
315 assert_eq!(
316 parse("glork:320|h|@0.1|#region:us-west1,production,e:"),
317 Ok(Metric {
318 name: "glork".into(),
319 timestamp: None,
320 tags: Some(
321 vec![
322 ("region".to_owned(), "us-west1".to_owned()),
323 ("production".to_owned(), "true".to_owned()),
324 ("e".to_owned(), "".to_owned()),
325 ]
326 .into_iter()
327 .collect(),
328 ),
329 kind: MetricKind::Incremental,
330 value: MetricValue::Distribution {
331 values: vec![320.0],
332 sample_rates: vec![10],
333 },
334 }),
335 );
336 }
337
338 #[test]
simple_gauge()339 fn simple_gauge() {
340 assert_eq!(
341 parse("gaugor:333|g"),
342 Ok(Metric {
343 name: "gaugor".into(),
344 timestamp: None,
345 tags: None,
346 kind: MetricKind::Absolute,
347 value: MetricValue::Gauge { value: 333.0 },
348 }),
349 );
350 }
351
352 #[test]
signed_gauge()353 fn signed_gauge() {
354 assert_eq!(
355 parse("gaugor:-4|g"),
356 Ok(Metric {
357 name: "gaugor".into(),
358 timestamp: None,
359 tags: None,
360 kind: MetricKind::Incremental,
361 value: MetricValue::Gauge { value: -4.0 },
362 }),
363 );
364 assert_eq!(
365 parse("gaugor:+10|g"),
366 Ok(Metric {
367 name: "gaugor".into(),
368 timestamp: None,
369 tags: None,
370 kind: MetricKind::Incremental,
371 value: MetricValue::Gauge { value: 10.0 },
372 }),
373 );
374 }
375
376 #[test]
sets()377 fn sets() {
378 assert_eq!(
379 parse("uniques:765|s"),
380 Ok(Metric {
381 name: "uniques".into(),
382 timestamp: None,
383 tags: None,
384 kind: MetricKind::Incremental,
385 value: MetricValue::Set {
386 values: vec!["765".into()].into_iter().collect()
387 },
388 }),
389 );
390 }
391
392 #[test]
sanitizing_keys()393 fn sanitizing_keys() {
394 assert_eq!("foo-bar-baz", sanitize_key("foo/bar/baz"));
395 assert_eq!("foo_bar_baz", sanitize_key("foo bar baz"));
396 assert_eq!("foo.__bar_.baz", sanitize_key("foo. @& bar_$!#.baz"));
397 }
398
399 #[test]
sanitizing_sampling()400 fn sanitizing_sampling() {
401 assert_eq!(1.0, sanitize_sampling(0.0));
402 assert_eq!(2.5, sanitize_sampling(2.5));
403 assert_eq!(-5.0, sanitize_sampling(-5.0));
404 }
405 }
406