1 use super::Transform; 2 use crate::{ 3 event::{self, Event, PathComponent, PathIter}, 4 topology::config::{DataType, TransformConfig, TransformContext, TransformDescription}, 5 types::{parse_conversion_map_no_atoms, Conversion}, 6 }; 7 use grok::Pattern; 8 use serde::{Deserialize, Serialize}; 9 use snafu::{ResultExt, Snafu}; 10 use std::collections::HashMap; 11 use std::str; 12 use string_cache::DefaultAtom as Atom; 13 14 #[derive(Debug, Snafu)] 15 enum BuildError { 16 #[snafu(display("Invalid grok pattern: {}", source))] 17 InvalidGrok { source: grok::Error }, 18 } 19 20 #[derive(Deserialize, Serialize, Debug, Derivative)] 21 #[serde(deny_unknown_fields, default)] 22 #[derivative(Default)] 23 pub struct GrokParserConfig { 24 pub pattern: String, 25 pub field: Option<Atom>, 26 #[derivative(Default(value = "true"))] 27 pub drop_field: bool, 28 pub types: HashMap<String, String>, 29 } 30 31 inventory::submit! { 32 TransformDescription::new::<GrokParserConfig>("grok_parser") 33 } 34 35 #[typetag::serde(name = "grok_parser")] 36 impl TransformConfig for GrokParserConfig { build(&self, _cx: TransformContext) -> crate::Result<Box<dyn Transform>>37 fn build(&self, _cx: TransformContext) -> crate::Result<Box<dyn Transform>> { 38 let field = self 39 .field 40 .as_ref() 41 .unwrap_or(&event::log_schema().message_key()); 42 43 let mut grok = grok::Grok::with_patterns(); 44 45 let types = parse_conversion_map_no_atoms(&self.types)?; 46 47 Ok(grok 48 .compile(&self.pattern, true) 49 .map::<Box<dyn Transform>, _>(|p| { 50 Box::new(GrokParser { 51 pattern: p, 52 field: field.clone(), 53 drop_field: self.drop_field, 54 types, 55 paths: HashMap::new(), 56 }) 57 }) 58 .context(InvalidGrok)?) 59 } 60 input_type(&self) -> DataType61 fn input_type(&self) -> DataType { 62 DataType::Log 63 } 64 output_type(&self) -> DataType65 fn output_type(&self) -> DataType { 66 DataType::Log 67 } 68 transform_type(&self) -> &'static str69 fn transform_type(&self) -> &'static str { 70 "grok_parser" 71 } 72 } 73 74 pub struct GrokParser { 75 pattern: Pattern, 76 field: Atom, 77 drop_field: bool, 78 types: HashMap<String, Conversion>, 79 paths: HashMap<String, Vec<PathComponent>>, 80 } 81 82 impl Transform for GrokParser { transform(&mut self, event: Event) -> Option<Event>83 fn transform(&mut self, event: Event) -> Option<Event> { 84 let mut event = event.into_log(); 85 let value = event.get(&self.field).map(|s| s.to_string_lossy()); 86 87 if let Some(value) = value { 88 if let Some(matches) = self.pattern.match_against(&value) { 89 let drop_field = self.drop_field && matches.get(&self.field).is_none(); 90 for (name, value) in matches.iter() { 91 let conv = self.types.get(name).unwrap_or(&Conversion::Bytes); 92 match conv.convert(value.into()) { 93 Ok(value) => { 94 if let Some(path) = self.paths.get(name) { 95 event.insert_path(path.to_vec(), value); 96 } else { 97 let path = PathIter::new(name).collect::<Vec<_>>(); 98 self.paths.insert(name.to_string(), path.clone()); 99 event.insert_path(path, value); 100 } 101 } 102 Err(error) => { 103 debug!( 104 message = "Could not convert types.", 105 %name, 106 %error, 107 rate_limit_secs = 30, 108 ); 109 } 110 } 111 } 112 113 if drop_field { 114 event.remove(&self.field); 115 } 116 } else { 117 debug!(message = "No fields captured from grok pattern."); 118 } 119 } else { 120 debug!( 121 message = "Field does not exist.", 122 field = self.field.as_ref(), 123 rate_limit_secs = 30, 124 ); 125 } 126 127 Some(Event::Log(event)) 128 } 129 } 130 131 #[cfg(test)] 132 mod tests { 133 use super::GrokParserConfig; 134 use crate::event::LogEvent; 135 use crate::{ 136 event, 137 topology::config::{TransformConfig, TransformContext}, 138 Event, 139 }; 140 use pretty_assertions::assert_eq; 141 use serde_json::json; 142 parse_log( event: &str, pattern: &str, field: Option<&str>, drop_field: bool, types: &[(&str, &str)], ) -> LogEvent143 fn parse_log( 144 event: &str, 145 pattern: &str, 146 field: Option<&str>, 147 drop_field: bool, 148 types: &[(&str, &str)], 149 ) -> LogEvent { 150 let event = Event::from(event); 151 let mut parser = GrokParserConfig { 152 pattern: pattern.into(), 153 field: field.map(|s| s.into()), 154 drop_field, 155 types: types.iter().map(|&(k, v)| (k.into(), v.into())).collect(), 156 } 157 .build(TransformContext::new_test()) 158 .unwrap(); 159 parser.transform(event).unwrap().into_log() 160 } 161 162 #[test] grok_parser_adds_parsed_fields_to_event()163 fn grok_parser_adds_parsed_fields_to_event() { 164 let event = parse_log( 165 r#"109.184.11.34 - - [12/Dec/2015:18:32:56 +0100] "GET /administrator/ HTTP/1.1" 200 4263"#, 166 "%{HTTPD_COMMONLOG}", 167 None, 168 true, 169 &[], 170 ); 171 172 let expected = json!({ 173 "clientip": "109.184.11.34", 174 "ident": "-", 175 "auth": "-", 176 "timestamp": "12/Dec/2015:18:32:56 +0100", 177 "verb": "GET", 178 "request": "/administrator/", 179 "httpversion": "1.1", 180 "rawrequest": "", 181 "response": "200", 182 "bytes": "4263", 183 }); 184 185 assert_eq!(expected, serde_json::to_value(&event.all_fields()).unwrap()); 186 } 187 188 #[test] grok_parser_does_nothing_on_no_match()189 fn grok_parser_does_nothing_on_no_match() { 190 let event = parse_log( 191 r#"help i'm stuck in an http server"#, 192 "%{HTTPD_COMMONLOG}", 193 None, 194 true, 195 &[], 196 ); 197 198 assert_eq!(2, event.keys().count()); 199 assert_eq!( 200 event::Value::from("help i'm stuck in an http server"), 201 event[&event::log_schema().message_key()] 202 ); 203 assert!(!event[&event::log_schema().timestamp_key()] 204 .to_string_lossy() 205 .is_empty()); 206 } 207 208 #[test] grok_parser_can_not_drop_parsed_field()209 fn grok_parser_can_not_drop_parsed_field() { 210 let event = parse_log( 211 r#"109.184.11.34 - - [12/Dec/2015:18:32:56 +0100] "GET /administrator/ HTTP/1.1" 200 4263"#, 212 "%{HTTPD_COMMONLOG}", 213 None, 214 false, 215 &[], 216 ); 217 218 let expected = json!({ 219 "clientip": "109.184.11.34", 220 "ident": "-", 221 "auth": "-", 222 "timestamp": "12/Dec/2015:18:32:56 +0100", 223 "verb": "GET", 224 "request": "/administrator/", 225 "httpversion": "1.1", 226 "rawrequest": "", 227 "response": "200", 228 "bytes": "4263", 229 "message": r#"109.184.11.34 - - [12/Dec/2015:18:32:56 +0100] "GET /administrator/ HTTP/1.1" 200 4263"#, 230 }); 231 232 assert_eq!(expected, serde_json::to_value(&event.all_fields()).unwrap()); 233 } 234 235 #[test] grok_parser_does_nothing_on_missing_field()236 fn grok_parser_does_nothing_on_missing_field() { 237 let event = parse_log( 238 "i am the only field", 239 "^(?<foo>.*)", 240 Some("bar"), 241 false, 242 &[], 243 ); 244 245 assert_eq!(2, event.keys().count()); 246 assert_eq!( 247 event::Value::from("i am the only field"), 248 event[&event::log_schema().message_key()] 249 ); 250 assert!(!event[&event::log_schema().timestamp_key()] 251 .to_string_lossy() 252 .is_empty()); 253 } 254 255 #[test] grok_parser_coerces_types()256 fn grok_parser_coerces_types() { 257 let event = parse_log( 258 r#"109.184.11.34 - - [12/Dec/2015:18:32:56 +0100] "GET /administrator/ HTTP/1.1" 200 4263"#, 259 "%{HTTPD_COMMONLOG}", 260 None, 261 true, 262 &[("response", "int"), ("bytes", "int")], 263 ); 264 265 let expected = json!({ 266 "clientip": "109.184.11.34", 267 "ident": "-", 268 "auth": "-", 269 "timestamp": "12/Dec/2015:18:32:56 +0100", 270 "verb": "GET", 271 "request": "/administrator/", 272 "httpversion": "1.1", 273 "rawrequest": "", 274 "response": 200, 275 "bytes": 4263, 276 }); 277 278 assert_eq!(expected, serde_json::to_value(&event.all_fields()).unwrap()); 279 } 280 281 #[test] grok_parser_does_not_drop_parsed_message_field()282 fn grok_parser_does_not_drop_parsed_message_field() { 283 let event = parse_log( 284 "12/Dec/2015:18:32:56 +0100 42", 285 "%{HTTPDATE:timestamp} %{NUMBER:message}", 286 None, 287 true, 288 &[], 289 ); 290 291 let expected = json!({ 292 "timestamp": "12/Dec/2015:18:32:56 +0100", 293 "message": "42", 294 }); 295 296 assert_eq!(expected, serde_json::to_value(&event.all_fields()).unwrap()); 297 } 298 } 299