1 use crate::event::metric::{Metric, MetricKind, MetricValue};
2 use lazy_static::lazy_static;
3 use regex::Regex;
4 use std::{
5     collections::BTreeMap,
6     error, fmt,
7     num::{ParseFloatError, ParseIntError},
8 };
9 
10 lazy_static! {
11     static ref WHITESPACE: Regex = Regex::new(r"\s+").unwrap();
12     static ref NONALPHANUM: Regex = Regex::new(r"[^a-zA-Z_\-0-9\.]").unwrap();
13 }
14 
parse(packet: &str) -> Result<Metric, ParseError>15 pub fn parse(packet: &str) -> Result<Metric, ParseError> {
16     // https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/#datagram-format
17     let key_and_body = packet.splitn(2, ':').collect::<Vec<_>>();
18     if key_and_body.len() != 2 {
19         return Err(ParseError::Malformed(
20             "should be key and body with ':' separator",
21         ));
22     }
23     let (key, body) = (key_and_body[0], key_and_body[1]);
24 
25     let parts = body.split('|').collect::<Vec<_>>();
26     if parts.len() < 2 {
27         return Err(ParseError::Malformed(
28             "body should have at least two pipe separated components",
29         ));
30     }
31 
32     let name = sanitize_key(key);
33     let metric_type = parts[1];
34 
35     // sampling part is optional and comes after metric type part
36     let sampling = parts.get(2).filter(|s| s.starts_with('@'));
37     let sample_rate = if let Some(s) = sampling {
38         1.0 / sanitize_sampling(parse_sampling(s)?)
39     } else {
40         1.0
41     };
42 
43     // tags are optional and could be found either after sampling of after metric type part
44     let tags = if sampling.is_none() {
45         parts.get(2)
46     } else {
47         parts.get(3)
48     };
49     let tags = tags.filter(|s| s.starts_with('#'));
50     let tags = if let Some(t) = tags {
51         Some(parse_tags(t)?)
52     } else {
53         None
54     };
55 
56     let metric = match metric_type {
57         "c" => {
58             let val: f64 = parts[0].parse()?;
59             Metric {
60                 name,
61                 timestamp: None,
62                 tags,
63                 kind: MetricKind::Incremental,
64                 value: MetricValue::Counter {
65                     value: val * sample_rate,
66                 },
67             }
68         }
69         unit @ "h" | unit @ "ms" => {
70             let val: f64 = parts[0].parse()?;
71             Metric {
72                 name,
73                 timestamp: None,
74                 tags,
75                 kind: MetricKind::Incremental,
76                 value: MetricValue::Distribution {
77                     values: vec![convert_to_base_units(unit, val)],
78                     sample_rates: vec![sample_rate as u32],
79                 },
80             }
81         }
82         "g" => {
83             let value = if parts[0]
84                 .chars()
85                 .next()
86                 .map(|c| c.is_ascii_digit())
87                 .ok_or_else(|| ParseError::Malformed("empty first body component"))?
88             {
89                 parts[0].parse()?
90             } else {
91                 parts[0][1..].parse()?
92             };
93 
94             match parse_direction(parts[0])? {
95                 None => Metric {
96                     name,
97                     timestamp: None,
98                     tags,
99                     kind: MetricKind::Absolute,
100                     value: MetricValue::Gauge { value },
101                 },
102                 Some(sign) => Metric {
103                     name,
104                     timestamp: None,
105                     tags,
106                     kind: MetricKind::Incremental,
107                     value: MetricValue::Gauge {
108                         value: value * sign,
109                     },
110                 },
111             }
112         }
113         "s" => Metric {
114             name,
115             timestamp: None,
116             tags,
117             kind: MetricKind::Incremental,
118             value: MetricValue::Set {
119                 values: vec![parts[0].into()].into_iter().collect(),
120             },
121         },
122         other => return Err(ParseError::UnknownMetricType(other.into())),
123     };
124     Ok(metric)
125 }
126 
parse_sampling(input: &str) -> Result<f64, ParseError>127 fn parse_sampling(input: &str) -> Result<f64, ParseError> {
128     if !input.starts_with('@') || input.len() < 2 {
129         return Err(ParseError::Malformed(
130             "expected non empty '@'-prefixed sampling component",
131         ));
132     }
133 
134     let num: f64 = input[1..].parse()?;
135     if num.is_sign_positive() {
136         Ok(num)
137     } else {
138         Err(ParseError::Malformed("sample rate can't be negative"))
139     }
140 }
141 
parse_tags(input: &str) -> Result<BTreeMap<String, String>, ParseError>142 fn parse_tags(input: &str) -> Result<BTreeMap<String, String>, ParseError> {
143     if !input.starts_with('#') || input.len() < 2 {
144         return Err(ParseError::Malformed(
145             "expected non empty '#'-prefixed tags component",
146         ));
147     }
148 
149     let mut result = BTreeMap::new();
150 
151     let chunks = input[1..].split(',').collect::<Vec<_>>();
152     for chunk in chunks {
153         let pair: Vec<_> = chunk.split(':').collect();
154         let key = &pair[0];
155         // same as in telegraf plugin:
156         // if tag value is not provided, use "true"
157         // https://github.com/influxdata/telegraf/blob/master/plugins/inputs/statsd/datadog.go#L152
158         let value = pair.get(1).unwrap_or(&"true");
159         result.insert((*key).to_owned(), (*value).to_owned());
160     }
161 
162     Ok(result)
163 }
164 
parse_direction(input: &str) -> Result<Option<f64>, ParseError>165 fn parse_direction(input: &str) -> Result<Option<f64>, ParseError> {
166     match input
167         .chars()
168         .next()
169         .ok_or_else(|| ParseError::Malformed("empty body component"))?
170     {
171         '+' => Ok(Some(1.0)),
172         '-' => Ok(Some(-1.0)),
173         c if c.is_ascii_digit() => Ok(None),
174         _other => Err(ParseError::Malformed("invalid gauge value prefix")),
175     }
176 }
177 
sanitize_key(key: &str) -> String178 fn sanitize_key(key: &str) -> String {
179     let s = key.replace("/", "-");
180     let s = WHITESPACE.replace_all(&s, "_");
181     let s = NONALPHANUM.replace_all(&s, "");
182     s.into()
183 }
184 
sanitize_sampling(sampling: f64) -> f64185 fn sanitize_sampling(sampling: f64) -> f64 {
186     if sampling == 0.0 {
187         1.0
188     } else {
189         sampling
190     }
191 }
192 
convert_to_base_units(unit: &str, val: f64) -> f64193 fn convert_to_base_units(unit: &str, val: f64) -> f64 {
194     match unit {
195         "ms" => val / 1000.0,
196         _ => val,
197     }
198 }
199 
200 #[derive(Debug, PartialEq)]
201 pub enum ParseError {
202     Malformed(&'static str),
203     UnknownMetricType(String),
204     InvalidInteger(ParseIntError),
205     InvalidFloat(ParseFloatError),
206 }
207 
208 impl fmt::Display for ParseError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result209     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
210         write!(f, "Statsd parse error: {:?}", self)
211     }
212 }
213 
214 impl error::Error for ParseError {}
215 
216 impl From<ParseIntError> for ParseError {
from(e: ParseIntError) -> ParseError217     fn from(e: ParseIntError) -> ParseError {
218         ParseError::InvalidInteger(e)
219     }
220 }
221 
222 impl From<ParseFloatError> for ParseError {
from(e: ParseFloatError) -> ParseError223     fn from(e: ParseFloatError) -> ParseError {
224         ParseError::InvalidFloat(e)
225     }
226 }
227 
228 #[cfg(test)]
229 mod test {
230     use super::{parse, sanitize_key, sanitize_sampling};
231     use crate::event::metric::{Metric, MetricKind, MetricValue};
232 
233     #[test]
basic_counter()234     fn basic_counter() {
235         assert_eq!(
236             parse("foo:1|c"),
237             Ok(Metric {
238                 name: "foo".into(),
239                 timestamp: None,
240                 tags: None,
241                 kind: MetricKind::Incremental,
242                 value: MetricValue::Counter { value: 1.0 },
243             }),
244         );
245     }
246 
247     #[test]
tagged_counter()248     fn tagged_counter() {
249         assert_eq!(
250             parse("foo:1|c|#tag1,tag2:value"),
251             Ok(Metric {
252                 name: "foo".into(),
253                 timestamp: None,
254                 tags: Some(
255                     vec![
256                         ("tag1".to_owned(), "true".to_owned()),
257                         ("tag2".to_owned(), "value".to_owned()),
258                     ]
259                     .into_iter()
260                     .collect(),
261                 ),
262                 kind: MetricKind::Incremental,
263                 value: MetricValue::Counter { value: 1.0 },
264             }),
265         );
266     }
267 
268     #[test]
sampled_counter()269     fn sampled_counter() {
270         assert_eq!(
271             parse("bar:2|c|@0.1"),
272             Ok(Metric {
273                 name: "bar".into(),
274                 timestamp: None,
275                 tags: None,
276                 kind: MetricKind::Incremental,
277                 value: MetricValue::Counter { value: 20.0 },
278             }),
279         );
280     }
281 
282     #[test]
zero_sampled_counter()283     fn zero_sampled_counter() {
284         assert_eq!(
285             parse("bar:2|c|@0"),
286             Ok(Metric {
287                 name: "bar".into(),
288                 timestamp: None,
289                 tags: None,
290                 kind: MetricKind::Incremental,
291                 value: MetricValue::Counter { value: 2.0 },
292             }),
293         );
294     }
295 
296     #[test]
sampled_timer()297     fn sampled_timer() {
298         assert_eq!(
299             parse("glork:320|ms|@0.1"),
300             Ok(Metric {
301                 name: "glork".into(),
302                 timestamp: None,
303                 tags: None,
304                 kind: MetricKind::Incremental,
305                 value: MetricValue::Distribution {
306                     values: vec![0.320],
307                     sample_rates: vec![10],
308                 },
309             }),
310         );
311     }
312 
313     #[test]
sampled_tagged_histogram()314     fn sampled_tagged_histogram() {
315         assert_eq!(
316             parse("glork:320|h|@0.1|#region:us-west1,production,e:"),
317             Ok(Metric {
318                 name: "glork".into(),
319                 timestamp: None,
320                 tags: Some(
321                     vec![
322                         ("region".to_owned(), "us-west1".to_owned()),
323                         ("production".to_owned(), "true".to_owned()),
324                         ("e".to_owned(), "".to_owned()),
325                     ]
326                     .into_iter()
327                     .collect(),
328                 ),
329                 kind: MetricKind::Incremental,
330                 value: MetricValue::Distribution {
331                     values: vec![320.0],
332                     sample_rates: vec![10],
333                 },
334             }),
335         );
336     }
337 
338     #[test]
simple_gauge()339     fn simple_gauge() {
340         assert_eq!(
341             parse("gaugor:333|g"),
342             Ok(Metric {
343                 name: "gaugor".into(),
344                 timestamp: None,
345                 tags: None,
346                 kind: MetricKind::Absolute,
347                 value: MetricValue::Gauge { value: 333.0 },
348             }),
349         );
350     }
351 
352     #[test]
signed_gauge()353     fn signed_gauge() {
354         assert_eq!(
355             parse("gaugor:-4|g"),
356             Ok(Metric {
357                 name: "gaugor".into(),
358                 timestamp: None,
359                 tags: None,
360                 kind: MetricKind::Incremental,
361                 value: MetricValue::Gauge { value: -4.0 },
362             }),
363         );
364         assert_eq!(
365             parse("gaugor:+10|g"),
366             Ok(Metric {
367                 name: "gaugor".into(),
368                 timestamp: None,
369                 tags: None,
370                 kind: MetricKind::Incremental,
371                 value: MetricValue::Gauge { value: 10.0 },
372             }),
373         );
374     }
375 
376     #[test]
sets()377     fn sets() {
378         assert_eq!(
379             parse("uniques:765|s"),
380             Ok(Metric {
381                 name: "uniques".into(),
382                 timestamp: None,
383                 tags: None,
384                 kind: MetricKind::Incremental,
385                 value: MetricValue::Set {
386                     values: vec!["765".into()].into_iter().collect()
387                 },
388             }),
389         );
390     }
391 
392     #[test]
sanitizing_keys()393     fn sanitizing_keys() {
394         assert_eq!("foo-bar-baz", sanitize_key("foo/bar/baz"));
395         assert_eq!("foo_bar_baz", sanitize_key("foo bar  baz"));
396         assert_eq!("foo.__bar_.baz", sanitize_key("foo. @& bar_$!#.baz"));
397     }
398 
399     #[test]
sanitizing_sampling()400     fn sanitizing_sampling() {
401         assert_eq!(1.0, sanitize_sampling(0.0));
402         assert_eq!(2.5, sanitize_sampling(2.5));
403         assert_eq!(-5.0, sanitize_sampling(-5.0));
404     }
405 }
406