1 use super::Transform;
2 use crate::{
3     event::{self, Event},
4     topology::config::{DataType, TransformConfig, TransformContext, TransformDescription},
5     types::{parse_check_conversion_map, Conversion},
6 };
7 use serde::{Deserialize, Serialize};
8 use std::collections::HashMap;
9 use std::str;
10 use string_cache::DefaultAtom as Atom;
11 
12 #[derive(Deserialize, Serialize, Debug, Default)]
13 #[serde(default, deny_unknown_fields)]
14 pub struct SplitConfig {
15     pub field_names: Vec<Atom>,
16     pub separator: Option<String>,
17     pub field: Option<Atom>,
18     pub drop_field: bool,
19     pub types: HashMap<Atom, String>,
20 }
21 
22 inventory::submit! {
23     TransformDescription::new::<SplitConfig>("split")
24 }
25 
26 #[typetag::serde(name = "split")]
27 impl TransformConfig for SplitConfig {
build(&self, _cx: TransformContext) -> crate::Result<Box<dyn Transform>>28     fn build(&self, _cx: TransformContext) -> crate::Result<Box<dyn Transform>> {
29         let field = self
30             .field
31             .as_ref()
32             .unwrap_or(&event::log_schema().message_key());
33 
34         let types = parse_check_conversion_map(&self.types, &self.field_names)
35             .map_err(|err| format!("{}", err))?;
36 
37         // don't drop the source field if it's getting overwritten by a parsed value
38         let drop_field = self.drop_field && !self.field_names.iter().any(|f| f == field);
39 
40         Ok(Box::new(Split::new(
41             self.field_names.clone(),
42             self.separator.clone(),
43             field.clone(),
44             drop_field,
45             types,
46         )))
47     }
48 
input_type(&self) -> DataType49     fn input_type(&self) -> DataType {
50         DataType::Log
51     }
52 
output_type(&self) -> DataType53     fn output_type(&self) -> DataType {
54         DataType::Log
55     }
56 
transform_type(&self) -> &'static str57     fn transform_type(&self) -> &'static str {
58         "split"
59     }
60 }
61 
62 pub struct Split {
63     field_names: Vec<(Atom, Conversion)>,
64     separator: Option<String>,
65     field: Atom,
66     drop_field: bool,
67 }
68 
69 impl Split {
new( field_names: Vec<Atom>, separator: Option<String>, field: Atom, drop_field: bool, types: HashMap<Atom, Conversion>, ) -> Self70     pub fn new(
71         field_names: Vec<Atom>,
72         separator: Option<String>,
73         field: Atom,
74         drop_field: bool,
75         types: HashMap<Atom, Conversion>,
76     ) -> Self {
77         let field_names = field_names
78             .into_iter()
79             .map(|name| {
80                 let conversion = types.get(&name).unwrap_or(&Conversion::Bytes).clone();
81                 (name, conversion)
82             })
83             .collect();
84 
85         Self {
86             field_names,
87             separator,
88             field,
89             drop_field,
90         }
91     }
92 }
93 
94 impl Transform for Split {
transform(&mut self, mut event: Event) -> Option<Event>95     fn transform(&mut self, mut event: Event) -> Option<Event> {
96         let value = event.as_log().get(&self.field).map(|s| s.to_string_lossy());
97 
98         if let Some(value) = &value {
99             for ((name, conversion), value) in self
100                 .field_names
101                 .iter()
102                 .zip(split(value, self.separator.clone()).into_iter())
103             {
104                 match conversion.convert(value.as_bytes().into()) {
105                     Ok(value) => {
106                         event.as_mut_log().insert(name.clone(), value);
107                     }
108                     Err(error) => {
109                         debug!(
110                             message = "Could not convert types.",
111                             name = &name[..],
112                             %error
113                         );
114                     }
115                 }
116             }
117             if self.drop_field {
118                 event.as_mut_log().remove(&self.field);
119             }
120         } else {
121             debug!(
122                 message = "Field does not exist.",
123                 field = self.field.as_ref(),
124             );
125         };
126 
127         Some(event)
128     }
129 }
130 
131 // Splits the given input by a separator.
132 // If the separator is `None`, then it will split on whitespace.
split(input: &str, separator: Option<String>) -> Vec<&str>133 pub fn split(input: &str, separator: Option<String>) -> Vec<&str> {
134     match separator {
135         Some(separator) => input.split(&separator).collect(),
136         None => input.split_whitespace().collect(),
137     }
138 }
139 
140 #[cfg(test)]
141 mod tests {
142     use super::split;
143     use super::SplitConfig;
144     use crate::event::{LogEvent, Value};
145     use crate::{
146         topology::config::{TransformConfig, TransformContext},
147         Event,
148     };
149     use string_cache::DefaultAtom as Atom;
150 
151     #[test]
split_whitespace()152     fn split_whitespace() {
153         assert_eq!(split("foo bar", None), &["foo", "bar"]);
154         assert_eq!(split("foo\t bar", None), &["foo", "bar"]);
155         assert_eq!(split("foo  \t bar     baz", None), &["foo", "bar", "baz"]);
156     }
157 
158     #[test]
split_comma()159     fn split_comma() {
160         assert_eq!(split("foo", Some(",".to_string())), &["foo"]);
161         assert_eq!(split("foo,bar", Some(",".to_string())), &["foo", "bar"]);
162     }
163 
164     #[test]
split_semicolon()165     fn split_semicolon() {
166         assert_eq!(
167             split("foo,bar;baz", Some(";".to_string())),
168             &["foo,bar", "baz"]
169         );
170     }
171 
parse_log( text: &str, fields: &str, separator: Option<String>, field: Option<&str>, drop_field: bool, types: &[(&str, &str)], ) -> LogEvent172     fn parse_log(
173         text: &str,
174         fields: &str,
175         separator: Option<String>,
176         field: Option<&str>,
177         drop_field: bool,
178         types: &[(&str, &str)],
179     ) -> LogEvent {
180         let event = Event::from(text);
181         let field_names = fields.split(' ').map(|s| s.into()).collect::<Vec<Atom>>();
182         let field = field.map(|f| f.into());
183         let mut parser = SplitConfig {
184             field_names,
185             separator,
186             field,
187             drop_field,
188             types: types.iter().map(|&(k, v)| (k.into(), v.into())).collect(),
189         }
190         .build(TransformContext::new_test())
191         .unwrap();
192 
193         parser.transform(event).unwrap().into_log()
194     }
195 
196     #[test]
split_adds_parsed_field_to_event()197     fn split_adds_parsed_field_to_event() {
198         let log = parse_log("1234 5678", "status time", None, None, false, &[]);
199 
200         assert_eq!(log[&"status".into()], "1234".into());
201         assert_eq!(log[&"time".into()], "5678".into());
202         assert!(log.get(&"message".into()).is_some());
203     }
204 
205     #[test]
split_does_drop_parsed_field()206     fn split_does_drop_parsed_field() {
207         let log = parse_log("1234 5678", "status time", None, Some("message"), true, &[]);
208 
209         assert_eq!(log[&"status".into()], "1234".into());
210         assert_eq!(log[&"time".into()], "5678".into());
211         assert!(log.get(&"message".into()).is_none());
212     }
213 
214     #[test]
split_does_not_drop_same_name_parsed_field()215     fn split_does_not_drop_same_name_parsed_field() {
216         let log = parse_log(
217             "1234 yes",
218             "status message",
219             None,
220             Some("message"),
221             true,
222             &[],
223         );
224 
225         assert_eq!(log[&"status".into()], "1234".into());
226         assert_eq!(log[&"message".into()], "yes".into());
227     }
228 
229     #[test]
split_coerces_fields_to_types()230     fn split_coerces_fields_to_types() {
231         let log = parse_log(
232             "1234 yes 42.3 word",
233             "code flag number rest",
234             None,
235             None,
236             false,
237             &[("flag", "bool"), ("code", "integer"), ("number", "float")],
238         );
239 
240         assert_eq!(log[&"number".into()], Value::Float(42.3));
241         assert_eq!(log[&"flag".into()], Value::Boolean(true));
242         assert_eq!(log[&"code".into()], Value::Integer(1234));
243         assert_eq!(log[&"rest".into()], Value::Bytes("word".into()));
244     }
245 
246     #[test]
split_works_with_different_separator()247     fn split_works_with_different_separator() {
248         let log = parse_log(
249             "1234,foo,bar",
250             "code who why",
251             Some(",".into()),
252             None,
253             false,
254             &[("code", "integer"), ("who", "string"), ("why", "string")],
255         );
256         assert_eq!(log[&"code".into()], Value::Integer(1234));
257         assert_eq!(log[&"who".into()], Value::Bytes("foo".into()));
258         assert_eq!(log[&"why".into()], Value::Bytes("bar".into()));
259     }
260 }
261