1 use super::{LogEvent, Value};
2 use std::{
3     collections::BTreeMap,
4     hash::{Hash, Hasher},
5 };
6 use string_cache::DefaultAtom as Atom;
7 
8 // TODO: if we had `Value` implement `Eq` and `Hash`, the implementation here
9 // would be much easier. The issue is with `f64` type. We should consider using
10 // a newtype for `f64` there that'd implement `Eq` and `Hash` is it's safe, for
11 // example `NormalF64`, and guard the values with `val.is_normal() == true`
12 // invariant.
13 // See also: https://internals.rust-lang.org/t/f32-f64-should-implement-hash/5436/32
14 
15 /// An event discriminant identifies a distinguishable subset of events.
16 /// Intended for disecting streams of events to substreams, for instance to
17 /// be able to allocate a buffer per substream.
18 /// Implements `PartialEq`, `Eq` and `Hash` to enable use as a `HashMap` key.
19 #[derive(Debug, Clone)]
20 pub struct Discriminant {
21     values: Vec<Option<Value>>,
22 }
23 
24 impl Discriminant {
25     /// Create a new Discriminant from the `LogEvent` and an ordered slice of
26     /// fields to include into a discriminant value.
from_log_event(event: &LogEvent, discriminant_fields: &[Atom]) -> Self27     pub fn from_log_event(event: &LogEvent, discriminant_fields: &[Atom]) -> Self {
28         let values: Vec<Option<Value>> = discriminant_fields
29             .iter()
30             .map(|discriminant_field| event.get(discriminant_field).cloned())
31             .collect();
32         Self { values }
33     }
34 }
35 
36 impl PartialEq for Discriminant {
eq(&self, other: &Self) -> bool37     fn eq(&self, other: &Self) -> bool {
38         self.values
39             .iter()
40             .zip(other.values.iter())
41             .all(|(this, other)| match (this, other) {
42                 (None, None) => true,
43                 (Some(this), Some(other)) => value_eq(this, other),
44                 _ => false,
45             })
46     }
47 }
48 
49 impl Eq for Discriminant {}
50 
51 // Equality check for for discriminant purposes.
value_eq(this: &Value, other: &Value) -> bool52 fn value_eq(this: &Value, other: &Value) -> bool {
53     match (this, other) {
54         // Trivial.
55         (Value::Bytes(this), Value::Bytes(other)) => this.eq(other),
56         (Value::Boolean(this), Value::Boolean(other)) => this.eq(other),
57         (Value::Integer(this), Value::Integer(other)) => this.eq(other),
58         (Value::Timestamp(this), Value::Timestamp(other)) => this.eq(other),
59         (Value::Null, Value::Null) => true,
60         // Non-trivial.
61         (Value::Float(this), Value::Float(other)) => f64_eq(this, other),
62         (Value::Array(this), Value::Array(other)) => array_eq(this, other),
63         (Value::Map(this), Value::Map(other)) => map_eq(this, other),
64         // Type mismatch.
65         _ => false,
66     }
67 }
68 
69 // Does an f64 comparison that is suitable for discriminant purposes.
f64_eq(this: &f64, other: &f64) -> bool70 fn f64_eq(this: &f64, other: &f64) -> bool {
71     if this.is_nan() && other.is_nan() {
72         return true;
73     }
74     if this != other {
75         return false;
76     };
77     if (this.is_sign_positive() && other.is_sign_negative())
78         || (this.is_sign_negative() && other.is_sign_positive())
79     {
80         return false;
81     }
82     true
83 }
84 
array_eq(this: &Vec<Value>, other: &Vec<Value>) -> bool85 fn array_eq(this: &Vec<Value>, other: &Vec<Value>) -> bool {
86     if this.len() != other.len() {
87         return false;
88     }
89 
90     this.iter()
91         .zip(other.iter())
92         .all(|(first, second)| value_eq(first, second))
93 }
94 
map_eq(this: &BTreeMap<String, Value>, other: &BTreeMap<String, Value>) -> bool95 fn map_eq(this: &BTreeMap<String, Value>, other: &BTreeMap<String, Value>) -> bool {
96     if this.len() != other.len() {
97         return false;
98     }
99 
100     this.iter()
101         .zip(other.iter())
102         .all(|((key1, value1), (key2, value2))| key1 == key2 && value_eq(value1, value2))
103 }
104 
105 impl Hash for Discriminant {
hash<H: Hasher>(&self, state: &mut H)106     fn hash<H: Hasher>(&self, state: &mut H) {
107         for value in &self.values {
108             match value {
109                 Some(value) => {
110                     state.write_u8(1);
111                     hash_value(state, value);
112                 }
113                 None => state.write_u8(0),
114             }
115         }
116     }
117 }
118 
119 // Hashes value for discriminant purposes.
hash_value<H: Hasher>(hasher: &mut H, value: &Value)120 fn hash_value<H: Hasher>(hasher: &mut H, value: &Value) {
121     match value {
122         // Trivial.
123         Value::Bytes(val) => val.hash(hasher),
124         Value::Boolean(val) => val.hash(hasher),
125         Value::Integer(val) => val.hash(hasher),
126         Value::Timestamp(val) => val.hash(hasher),
127         // Non-trivial.
128         Value::Float(val) => hash_f64(hasher, val),
129         Value::Array(val) => hash_array(hasher, val),
130         Value::Map(val) => hash_map(hasher, val),
131         Value::Null => hash_null(hasher),
132     }
133 }
134 
135 // Does f64 hashing that is suitable for discriminant purposes.
hash_f64<H: Hasher>(hasher: &mut H, value: &f64)136 fn hash_f64<H: Hasher>(hasher: &mut H, value: &f64) {
137     hasher.write(&value.to_ne_bytes());
138 }
139 
hash_array<H: Hasher>(hasher: &mut H, array: &Vec<Value>)140 fn hash_array<H: Hasher>(hasher: &mut H, array: &Vec<Value>) {
141     for val in array.iter() {
142         hash_value(hasher, val);
143     }
144 }
145 
hash_map<H: Hasher>(hasher: &mut H, map: &BTreeMap<String, Value>)146 fn hash_map<H: Hasher>(hasher: &mut H, map: &BTreeMap<String, Value>) {
147     for (key, val) in map.iter() {
148         hasher.write(key.as_bytes());
149         hash_value(hasher, val);
150     }
151 }
152 
hash_null<H: Hasher>(hasher: &mut H)153 fn hash_null<H: Hasher>(hasher: &mut H) {
154     hasher.write_u8(0);
155 }
156 
157 #[cfg(test)]
158 mod tests {
159     use super::*;
160     use crate::event::LogEvent;
161     use std::collections::{hash_map::DefaultHasher, HashMap};
162 
hash<H: Hash>(hash: H) -> u64163     fn hash<H: Hash>(hash: H) -> u64 {
164         let mut hasher = DefaultHasher::new();
165         hash.hash(&mut hasher);
166         hasher.finish()
167     }
168 
169     #[test]
equal()170     fn equal() {
171         let mut event_1 = LogEvent::default();
172         event_1.insert("hostname", "localhost");
173         event_1.insert("irrelevant", "not even used");
174         let mut event_2 = event_1.clone();
175         event_2.insert("irrelevant", "does not matter if it's different");
176 
177         let discriminant_fields = vec![Atom::from("hostname"), Atom::from("container_id")];
178 
179         let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields);
180         let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields);
181 
182         assert_eq!(discriminant_1, discriminant_2);
183         assert_eq!(hash(discriminant_1), hash(discriminant_2));
184     }
185 
186     #[test]
not_equal()187     fn not_equal() {
188         let mut event_1 = LogEvent::default();
189         event_1.insert("hostname", "localhost");
190         event_1.insert("container_id", "abc");
191         let mut event_2 = event_1.clone();
192         event_2.insert("container_id", "def");
193 
194         let discriminant_fields = vec![Atom::from("hostname"), Atom::from("container_id")];
195 
196         let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields);
197         let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields);
198 
199         assert_ne!(discriminant_1, discriminant_2);
200         assert_ne!(hash(discriminant_1), hash(discriminant_2));
201     }
202 
203     #[test]
field_order()204     fn field_order() {
205         let mut event_1 = LogEvent::default();
206         event_1.insert("a", "a");
207         event_1.insert("b", "b");
208         let mut event_2 = LogEvent::default();
209         event_2.insert("b", "b");
210         event_2.insert("a", "a");
211 
212         let discriminant_fields = vec![Atom::from("a"), Atom::from("b")];
213 
214         let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields);
215         let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields);
216 
217         assert_eq!(discriminant_1, discriminant_2);
218         assert_eq!(hash(discriminant_1), hash(discriminant_2));
219     }
220 
221     #[test]
map_values_key_order()222     fn map_values_key_order() {
223         let mut event_1 = LogEvent::default();
224         event_1.insert("nested.a", "a");
225         event_1.insert("nested.b", "b");
226         let mut event_2 = LogEvent::default();
227         event_2.insert("nested.b", "b");
228         event_2.insert("nested.a", "a");
229 
230         let discriminant_fields = vec![Atom::from("nested")];
231 
232         let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields);
233         let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields);
234 
235         assert_eq!(discriminant_1, discriminant_2);
236         assert_eq!(hash(discriminant_1), hash(discriminant_2));
237     }
238 
239     #[test]
array_values_insertion_order()240     fn array_values_insertion_order() {
241         let mut event_1 = LogEvent::default();
242         event_1.insert("array[0]", "a");
243         event_1.insert("array[1]", "b");
244         let mut event_2 = LogEvent::default();
245         event_2.insert("array[1]", "b");
246         event_2.insert("array[0]", "a");
247 
248         let discriminant_fields = vec![Atom::from("array")];
249 
250         let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields);
251         let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields);
252 
253         assert_eq!(discriminant_1, discriminant_2);
254         assert_eq!(hash(discriminant_1), hash(discriminant_2));
255     }
256 
257     #[test]
map_values_matter_1()258     fn map_values_matter_1() {
259         let mut event_1 = LogEvent::default();
260         event_1.insert("nested.a", "a"); // `nested` is a `Value::Map`
261         let event_2 = LogEvent::default(); // empty event
262 
263         let discriminant_fields = vec![Atom::from("nested")];
264 
265         let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields);
266         let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields);
267 
268         assert_ne!(discriminant_1, discriminant_2);
269         assert_ne!(hash(discriminant_1), hash(discriminant_2));
270     }
271 
272     #[test]
map_values_matter_2()273     fn map_values_matter_2() {
274         let mut event_1 = LogEvent::default();
275         event_1.insert("nested.a", "a"); // `nested` is a `Value::Map`
276         let mut event_2 = LogEvent::default();
277         event_2.insert("nested", "x"); // `nested` is a `Value::String`
278 
279         let discriminant_fields = vec![Atom::from("nested")];
280 
281         let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields);
282         let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields);
283 
284         assert_ne!(discriminant_1, discriminant_2);
285         assert_ne!(hash(discriminant_1), hash(discriminant_2));
286     }
287 
288     #[test]
with_hash_map()289     fn with_hash_map() {
290         let mut map: HashMap<Discriminant, usize> = HashMap::new();
291 
292         let event_stream_1 = {
293             let mut event = LogEvent::default();
294             event.insert("hostname", "a.test");
295             event.insert("container_id", "abc");
296             event
297         };
298 
299         let event_stream_2 = {
300             let mut event = LogEvent::default();
301             event.insert("hostname", "b.test");
302             event.insert("container_id", "def");
303             event
304         };
305 
306         let event_stream_3 = {
307             // no `hostname` or `container_id`
308             LogEvent::default()
309         };
310 
311         let discriminant_fields = vec![Atom::from("hostname"), Atom::from("container_id")];
312 
313         let mut process_event = |event| {
314             let discriminant = Discriminant::from_log_event(&event, &discriminant_fields);
315             *map.entry(discriminant).and_modify(|e| *e += 1).or_insert(0)
316         };
317 
318         {
319             let mut event = event_stream_1.clone();
320             event.insert("message", "a");
321             assert_eq!(process_event(event), 0);
322         }
323 
324         {
325             let mut event = event_stream_1.clone();
326             event.insert("message", "b");
327             event.insert("irrelevant", "c");
328             assert_eq!(process_event(event), 1);
329         }
330 
331         {
332             let mut event = event_stream_2.clone();
333             event.insert("message", "d");
334             assert_eq!(process_event(event), 0);
335         }
336 
337         {
338             let mut event = event_stream_2.clone();
339             event.insert("message", "e");
340             event.insert("irrelevant", "d");
341             assert_eq!(process_event(event), 1);
342         }
343 
344         {
345             let mut event = event_stream_3.clone();
346             event.insert("message", "f");
347             assert_eq!(process_event(event), 0);
348         }
349 
350         {
351             let mut event = event_stream_3.clone();
352             event.insert("message", "g");
353             event.insert("irrelevant", "d");
354             assert_eq!(process_event(event), 1);
355         }
356 
357         // Now assert the amount of events processed per descriminant.
358         assert_eq!(process_event(event_stream_1), 2);
359         assert_eq!(process_event(event_stream_2), 2);
360         assert_eq!(process_event(event_stream_3), 2);
361     }
362 }
363