1 use super::Transform;
2 use crate::{
3 event::{self, Event},
4 topology::config::{DataType, TransformConfig, TransformContext, TransformDescription},
5 types::{parse_check_conversion_map, Conversion},
6 };
7 use serde::{Deserialize, Serialize};
8 use std::collections::HashMap;
9 use std::str;
10 use string_cache::DefaultAtom as Atom;
11
12 #[derive(Deserialize, Serialize, Debug, Default)]
13 #[serde(default, deny_unknown_fields)]
14 pub struct SplitConfig {
15 pub field_names: Vec<Atom>,
16 pub separator: Option<String>,
17 pub field: Option<Atom>,
18 pub drop_field: bool,
19 pub types: HashMap<Atom, String>,
20 }
21
22 inventory::submit! {
23 TransformDescription::new::<SplitConfig>("split")
24 }
25
26 #[typetag::serde(name = "split")]
27 impl TransformConfig for SplitConfig {
build(&self, _cx: TransformContext) -> crate::Result<Box<dyn Transform>>28 fn build(&self, _cx: TransformContext) -> crate::Result<Box<dyn Transform>> {
29 let field = self
30 .field
31 .as_ref()
32 .unwrap_or(&event::log_schema().message_key());
33
34 let types = parse_check_conversion_map(&self.types, &self.field_names)
35 .map_err(|err| format!("{}", err))?;
36
37 // don't drop the source field if it's getting overwritten by a parsed value
38 let drop_field = self.drop_field && !self.field_names.iter().any(|f| f == field);
39
40 Ok(Box::new(Split::new(
41 self.field_names.clone(),
42 self.separator.clone(),
43 field.clone(),
44 drop_field,
45 types,
46 )))
47 }
48
input_type(&self) -> DataType49 fn input_type(&self) -> DataType {
50 DataType::Log
51 }
52
output_type(&self) -> DataType53 fn output_type(&self) -> DataType {
54 DataType::Log
55 }
56
transform_type(&self) -> &'static str57 fn transform_type(&self) -> &'static str {
58 "split"
59 }
60 }
61
62 pub struct Split {
63 field_names: Vec<(Atom, Conversion)>,
64 separator: Option<String>,
65 field: Atom,
66 drop_field: bool,
67 }
68
69 impl Split {
new( field_names: Vec<Atom>, separator: Option<String>, field: Atom, drop_field: bool, types: HashMap<Atom, Conversion>, ) -> Self70 pub fn new(
71 field_names: Vec<Atom>,
72 separator: Option<String>,
73 field: Atom,
74 drop_field: bool,
75 types: HashMap<Atom, Conversion>,
76 ) -> Self {
77 let field_names = field_names
78 .into_iter()
79 .map(|name| {
80 let conversion = types.get(&name).unwrap_or(&Conversion::Bytes).clone();
81 (name, conversion)
82 })
83 .collect();
84
85 Self {
86 field_names,
87 separator,
88 field,
89 drop_field,
90 }
91 }
92 }
93
94 impl Transform for Split {
transform(&mut self, mut event: Event) -> Option<Event>95 fn transform(&mut self, mut event: Event) -> Option<Event> {
96 let value = event.as_log().get(&self.field).map(|s| s.to_string_lossy());
97
98 if let Some(value) = &value {
99 for ((name, conversion), value) in self
100 .field_names
101 .iter()
102 .zip(split(value, self.separator.clone()).into_iter())
103 {
104 match conversion.convert(value.as_bytes().into()) {
105 Ok(value) => {
106 event.as_mut_log().insert(name.clone(), value);
107 }
108 Err(error) => {
109 debug!(
110 message = "Could not convert types.",
111 name = &name[..],
112 %error
113 );
114 }
115 }
116 }
117 if self.drop_field {
118 event.as_mut_log().remove(&self.field);
119 }
120 } else {
121 debug!(
122 message = "Field does not exist.",
123 field = self.field.as_ref(),
124 );
125 };
126
127 Some(event)
128 }
129 }
130
131 // Splits the given input by a separator.
132 // If the separator is `None`, then it will split on whitespace.
split(input: &str, separator: Option<String>) -> Vec<&str>133 pub fn split(input: &str, separator: Option<String>) -> Vec<&str> {
134 match separator {
135 Some(separator) => input.split(&separator).collect(),
136 None => input.split_whitespace().collect(),
137 }
138 }
139
140 #[cfg(test)]
141 mod tests {
142 use super::split;
143 use super::SplitConfig;
144 use crate::event::{LogEvent, Value};
145 use crate::{
146 topology::config::{TransformConfig, TransformContext},
147 Event,
148 };
149 use string_cache::DefaultAtom as Atom;
150
151 #[test]
split_whitespace()152 fn split_whitespace() {
153 assert_eq!(split("foo bar", None), &["foo", "bar"]);
154 assert_eq!(split("foo\t bar", None), &["foo", "bar"]);
155 assert_eq!(split("foo \t bar baz", None), &["foo", "bar", "baz"]);
156 }
157
158 #[test]
split_comma()159 fn split_comma() {
160 assert_eq!(split("foo", Some(",".to_string())), &["foo"]);
161 assert_eq!(split("foo,bar", Some(",".to_string())), &["foo", "bar"]);
162 }
163
164 #[test]
split_semicolon()165 fn split_semicolon() {
166 assert_eq!(
167 split("foo,bar;baz", Some(";".to_string())),
168 &["foo,bar", "baz"]
169 );
170 }
171
parse_log( text: &str, fields: &str, separator: Option<String>, field: Option<&str>, drop_field: bool, types: &[(&str, &str)], ) -> LogEvent172 fn parse_log(
173 text: &str,
174 fields: &str,
175 separator: Option<String>,
176 field: Option<&str>,
177 drop_field: bool,
178 types: &[(&str, &str)],
179 ) -> LogEvent {
180 let event = Event::from(text);
181 let field_names = fields.split(' ').map(|s| s.into()).collect::<Vec<Atom>>();
182 let field = field.map(|f| f.into());
183 let mut parser = SplitConfig {
184 field_names,
185 separator,
186 field,
187 drop_field,
188 types: types.iter().map(|&(k, v)| (k.into(), v.into())).collect(),
189 }
190 .build(TransformContext::new_test())
191 .unwrap();
192
193 parser.transform(event).unwrap().into_log()
194 }
195
196 #[test]
split_adds_parsed_field_to_event()197 fn split_adds_parsed_field_to_event() {
198 let log = parse_log("1234 5678", "status time", None, None, false, &[]);
199
200 assert_eq!(log[&"status".into()], "1234".into());
201 assert_eq!(log[&"time".into()], "5678".into());
202 assert!(log.get(&"message".into()).is_some());
203 }
204
205 #[test]
split_does_drop_parsed_field()206 fn split_does_drop_parsed_field() {
207 let log = parse_log("1234 5678", "status time", None, Some("message"), true, &[]);
208
209 assert_eq!(log[&"status".into()], "1234".into());
210 assert_eq!(log[&"time".into()], "5678".into());
211 assert!(log.get(&"message".into()).is_none());
212 }
213
214 #[test]
split_does_not_drop_same_name_parsed_field()215 fn split_does_not_drop_same_name_parsed_field() {
216 let log = parse_log(
217 "1234 yes",
218 "status message",
219 None,
220 Some("message"),
221 true,
222 &[],
223 );
224
225 assert_eq!(log[&"status".into()], "1234".into());
226 assert_eq!(log[&"message".into()], "yes".into());
227 }
228
229 #[test]
split_coerces_fields_to_types()230 fn split_coerces_fields_to_types() {
231 let log = parse_log(
232 "1234 yes 42.3 word",
233 "code flag number rest",
234 None,
235 None,
236 false,
237 &[("flag", "bool"), ("code", "integer"), ("number", "float")],
238 );
239
240 assert_eq!(log[&"number".into()], Value::Float(42.3));
241 assert_eq!(log[&"flag".into()], Value::Boolean(true));
242 assert_eq!(log[&"code".into()], Value::Integer(1234));
243 assert_eq!(log[&"rest".into()], Value::Bytes("word".into()));
244 }
245
246 #[test]
split_works_with_different_separator()247 fn split_works_with_different_separator() {
248 let log = parse_log(
249 "1234,foo,bar",
250 "code who why",
251 Some(",".into()),
252 None,
253 false,
254 &[("code", "integer"), ("who", "string"), ("why", "string")],
255 );
256 assert_eq!(log[&"code".into()], Value::Integer(1234));
257 assert_eq!(log[&"who".into()], Value::Bytes("foo".into()));
258 assert_eq!(log[&"why".into()], Value::Bytes("bar".into()));
259 }
260 }
261