1 use super::Transform;
2 use crate::{
3     event::{self, Event, PathComponent, PathIter},
4     topology::config::{DataType, TransformConfig, TransformContext, TransformDescription},
5     types::{parse_conversion_map_no_atoms, Conversion},
6 };
7 use grok::Pattern;
8 use serde::{Deserialize, Serialize};
9 use snafu::{ResultExt, Snafu};
10 use std::collections::HashMap;
11 use std::str;
12 use string_cache::DefaultAtom as Atom;
13 
14 #[derive(Debug, Snafu)]
15 enum BuildError {
16     #[snafu(display("Invalid grok pattern: {}", source))]
17     InvalidGrok { source: grok::Error },
18 }
19 
20 #[derive(Deserialize, Serialize, Debug, Derivative)]
21 #[serde(deny_unknown_fields, default)]
22 #[derivative(Default)]
23 pub struct GrokParserConfig {
24     pub pattern: String,
25     pub field: Option<Atom>,
26     #[derivative(Default(value = "true"))]
27     pub drop_field: bool,
28     pub types: HashMap<String, String>,
29 }
30 
31 inventory::submit! {
32     TransformDescription::new::<GrokParserConfig>("grok_parser")
33 }
34 
35 #[typetag::serde(name = "grok_parser")]
36 impl TransformConfig for GrokParserConfig {
build(&self, _cx: TransformContext) -> crate::Result<Box<dyn Transform>>37     fn build(&self, _cx: TransformContext) -> crate::Result<Box<dyn Transform>> {
38         let field = self
39             .field
40             .as_ref()
41             .unwrap_or(&event::log_schema().message_key());
42 
43         let mut grok = grok::Grok::with_patterns();
44 
45         let types = parse_conversion_map_no_atoms(&self.types)?;
46 
47         Ok(grok
48             .compile(&self.pattern, true)
49             .map::<Box<dyn Transform>, _>(|p| {
50                 Box::new(GrokParser {
51                     pattern: p,
52                     field: field.clone(),
53                     drop_field: self.drop_field,
54                     types,
55                     paths: HashMap::new(),
56                 })
57             })
58             .context(InvalidGrok)?)
59     }
60 
input_type(&self) -> DataType61     fn input_type(&self) -> DataType {
62         DataType::Log
63     }
64 
output_type(&self) -> DataType65     fn output_type(&self) -> DataType {
66         DataType::Log
67     }
68 
transform_type(&self) -> &'static str69     fn transform_type(&self) -> &'static str {
70         "grok_parser"
71     }
72 }
73 
74 pub struct GrokParser {
75     pattern: Pattern,
76     field: Atom,
77     drop_field: bool,
78     types: HashMap<String, Conversion>,
79     paths: HashMap<String, Vec<PathComponent>>,
80 }
81 
82 impl Transform for GrokParser {
transform(&mut self, event: Event) -> Option<Event>83     fn transform(&mut self, event: Event) -> Option<Event> {
84         let mut event = event.into_log();
85         let value = event.get(&self.field).map(|s| s.to_string_lossy());
86 
87         if let Some(value) = value {
88             if let Some(matches) = self.pattern.match_against(&value) {
89                 let drop_field = self.drop_field && matches.get(&self.field).is_none();
90                 for (name, value) in matches.iter() {
91                     let conv = self.types.get(name).unwrap_or(&Conversion::Bytes);
92                     match conv.convert(value.into()) {
93                         Ok(value) => {
94                             if let Some(path) = self.paths.get(name) {
95                                 event.insert_path(path.to_vec(), value);
96                             } else {
97                                 let path = PathIter::new(name).collect::<Vec<_>>();
98                                 self.paths.insert(name.to_string(), path.clone());
99                                 event.insert_path(path, value);
100                             }
101                         }
102                         Err(error) => {
103                             debug!(
104                                 message = "Could not convert types.",
105                                 %name,
106                                 %error,
107                                 rate_limit_secs = 30,
108                             );
109                         }
110                     }
111                 }
112 
113                 if drop_field {
114                     event.remove(&self.field);
115                 }
116             } else {
117                 debug!(message = "No fields captured from grok pattern.");
118             }
119         } else {
120             debug!(
121                 message = "Field does not exist.",
122                 field = self.field.as_ref(),
123                 rate_limit_secs = 30,
124             );
125         }
126 
127         Some(Event::Log(event))
128     }
129 }
130 
131 #[cfg(test)]
132 mod tests {
133     use super::GrokParserConfig;
134     use crate::event::LogEvent;
135     use crate::{
136         event,
137         topology::config::{TransformConfig, TransformContext},
138         Event,
139     };
140     use pretty_assertions::assert_eq;
141     use serde_json::json;
142 
parse_log( event: &str, pattern: &str, field: Option<&str>, drop_field: bool, types: &[(&str, &str)], ) -> LogEvent143     fn parse_log(
144         event: &str,
145         pattern: &str,
146         field: Option<&str>,
147         drop_field: bool,
148         types: &[(&str, &str)],
149     ) -> LogEvent {
150         let event = Event::from(event);
151         let mut parser = GrokParserConfig {
152             pattern: pattern.into(),
153             field: field.map(|s| s.into()),
154             drop_field,
155             types: types.iter().map(|&(k, v)| (k.into(), v.into())).collect(),
156         }
157         .build(TransformContext::new_test())
158         .unwrap();
159         parser.transform(event).unwrap().into_log()
160     }
161 
162     #[test]
grok_parser_adds_parsed_fields_to_event()163     fn grok_parser_adds_parsed_fields_to_event() {
164         let event = parse_log(
165             r#"109.184.11.34 - - [12/Dec/2015:18:32:56 +0100] "GET /administrator/ HTTP/1.1" 200 4263"#,
166             "%{HTTPD_COMMONLOG}",
167             None,
168             true,
169             &[],
170         );
171 
172         let expected = json!({
173             "clientip": "109.184.11.34",
174             "ident": "-",
175             "auth": "-",
176             "timestamp": "12/Dec/2015:18:32:56 +0100",
177             "verb": "GET",
178             "request": "/administrator/",
179             "httpversion": "1.1",
180             "rawrequest": "",
181             "response": "200",
182             "bytes": "4263",
183         });
184 
185         assert_eq!(expected, serde_json::to_value(&event.all_fields()).unwrap());
186     }
187 
188     #[test]
grok_parser_does_nothing_on_no_match()189     fn grok_parser_does_nothing_on_no_match() {
190         let event = parse_log(
191             r#"help i'm stuck in an http server"#,
192             "%{HTTPD_COMMONLOG}",
193             None,
194             true,
195             &[],
196         );
197 
198         assert_eq!(2, event.keys().count());
199         assert_eq!(
200             event::Value::from("help i'm stuck in an http server"),
201             event[&event::log_schema().message_key()]
202         );
203         assert!(!event[&event::log_schema().timestamp_key()]
204             .to_string_lossy()
205             .is_empty());
206     }
207 
208     #[test]
grok_parser_can_not_drop_parsed_field()209     fn grok_parser_can_not_drop_parsed_field() {
210         let event = parse_log(
211             r#"109.184.11.34 - - [12/Dec/2015:18:32:56 +0100] "GET /administrator/ HTTP/1.1" 200 4263"#,
212             "%{HTTPD_COMMONLOG}",
213             None,
214             false,
215             &[],
216         );
217 
218         let expected = json!({
219             "clientip": "109.184.11.34",
220             "ident": "-",
221             "auth": "-",
222             "timestamp": "12/Dec/2015:18:32:56 +0100",
223             "verb": "GET",
224             "request": "/administrator/",
225             "httpversion": "1.1",
226             "rawrequest": "",
227             "response": "200",
228             "bytes": "4263",
229             "message": r#"109.184.11.34 - - [12/Dec/2015:18:32:56 +0100] "GET /administrator/ HTTP/1.1" 200 4263"#,
230         });
231 
232         assert_eq!(expected, serde_json::to_value(&event.all_fields()).unwrap());
233     }
234 
235     #[test]
grok_parser_does_nothing_on_missing_field()236     fn grok_parser_does_nothing_on_missing_field() {
237         let event = parse_log(
238             "i am the only field",
239             "^(?<foo>.*)",
240             Some("bar"),
241             false,
242             &[],
243         );
244 
245         assert_eq!(2, event.keys().count());
246         assert_eq!(
247             event::Value::from("i am the only field"),
248             event[&event::log_schema().message_key()]
249         );
250         assert!(!event[&event::log_schema().timestamp_key()]
251             .to_string_lossy()
252             .is_empty());
253     }
254 
255     #[test]
grok_parser_coerces_types()256     fn grok_parser_coerces_types() {
257         let event = parse_log(
258             r#"109.184.11.34 - - [12/Dec/2015:18:32:56 +0100] "GET /administrator/ HTTP/1.1" 200 4263"#,
259             "%{HTTPD_COMMONLOG}",
260             None,
261             true,
262             &[("response", "int"), ("bytes", "int")],
263         );
264 
265         let expected = json!({
266             "clientip": "109.184.11.34",
267             "ident": "-",
268             "auth": "-",
269             "timestamp": "12/Dec/2015:18:32:56 +0100",
270             "verb": "GET",
271             "request": "/administrator/",
272             "httpversion": "1.1",
273             "rawrequest": "",
274             "response": 200,
275             "bytes": 4263,
276         });
277 
278         assert_eq!(expected, serde_json::to_value(&event.all_fields()).unwrap());
279     }
280 
281     #[test]
grok_parser_does_not_drop_parsed_message_field()282     fn grok_parser_does_not_drop_parsed_message_field() {
283         let event = parse_log(
284             "12/Dec/2015:18:32:56 +0100 42",
285             "%{HTTPDATE:timestamp} %{NUMBER:message}",
286             None,
287             true,
288             &[],
289         );
290 
291         let expected = json!({
292             "timestamp": "12/Dec/2015:18:32:56 +0100",
293             "message": "42",
294         });
295 
296         assert_eq!(expected, serde_json::to_value(&event.all_fields()).unwrap());
297     }
298 }
299