1 use crate::{
2     event::{self, Event, LogEvent, Value},
3     shutdown::ShutdownSignal,
4     tls::{MaybeTlsSettings, TlsConfig},
5     topology::config::{DataType, GlobalOptions, SourceConfig},
6 };
7 use bytes05::{buf::BufExt, Bytes};
8 use chrono::{DateTime, TimeZone, Utc};
9 use flate2::read::GzDecoder;
10 use futures::{
11     compat::{AsyncRead01CompatExt, Future01CompatExt, Stream01CompatExt},
12     FutureExt, TryFutureExt, TryStreamExt,
13 };
14 use futures01::{sync::mpsc, Async, Future, Sink, Stream};
15 use http::StatusCode;
16 use lazy_static::lazy_static;
17 use serde::{de, Deserialize, Serialize};
18 use serde_json::{de::IoRead, json, Deserializer, Value as JsonValue};
19 use snafu::Snafu;
20 use std::{
21     io::Read,
22     net::{Ipv4Addr, SocketAddr},
23 };
24 use string_cache::DefaultAtom as Atom;
25 use tokio_util::compat::FuturesAsyncReadCompatExt;
26 use warp::{filters::BoxedFilter, path, reject::Rejection, reply::Response, Filter, Reply};
27 
28 // Event fields unique to splunk_hec source
29 lazy_static! {
30     pub static ref CHANNEL: Atom = Atom::from("splunk_channel");
31     pub static ref INDEX: Atom = Atom::from("splunk_index");
32     pub static ref SOURCE: Atom = Atom::from("splunk_source");
33     pub static ref SOURCETYPE: Atom = Atom::from("splunk_sourcetype");
34 }
35 
36 /// Accepts HTTP requests.
37 #[derive(Deserialize, Serialize, Debug, Clone)]
38 #[serde(deny_unknown_fields, default)]
39 pub struct SplunkConfig {
40     /// Local address on which to listen
41     #[serde(default = "default_socket_address")]
42     address: SocketAddr,
43     /// Splunk HEC token
44     token: Option<String>,
45     tls: Option<TlsConfig>,
46 }
47 
48 impl SplunkConfig {
49     #[cfg(test)]
on(address: SocketAddr) -> Self50     pub fn on(address: SocketAddr) -> Self {
51         SplunkConfig {
52             address,
53             ..Self::default()
54         }
55     }
56 }
57 
58 impl Default for SplunkConfig {
default() -> Self59     fn default() -> Self {
60         SplunkConfig {
61             address: default_socket_address(),
62             token: None,
63             tls: None,
64         }
65     }
66 }
67 
default_socket_address() -> SocketAddr68 fn default_socket_address() -> SocketAddr {
69     SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 8088)
70 }
71 
72 #[typetag::serde(name = "splunk_hec")]
73 impl SourceConfig for SplunkConfig {
build( &self, _: &str, _: &GlobalOptions, shutdown: ShutdownSignal, out: mpsc::Sender<Event>, ) -> crate::Result<super::Source>74     fn build(
75         &self,
76         _: &str,
77         _: &GlobalOptions,
78         shutdown: ShutdownSignal,
79         out: mpsc::Sender<Event>,
80     ) -> crate::Result<super::Source> {
81         let source = SplunkSource::new(self);
82 
83         let event_service = source.event_service(out.clone());
84         let raw_service = source.raw_service(out.clone());
85         let health_service = source.health_service(out);
86         let options = SplunkSource::options();
87 
88         let services = path!("services" / "collector" / ..)
89             .and(
90                 event_service
91                     .or(raw_service)
92                     .unify()
93                     .or(health_service)
94                     .unify()
95                     .or(options)
96                     .unify(),
97             )
98             .or_else(finish_err);
99 
100         let tls = MaybeTlsSettings::from_config(&self.tls, true)?;
101         let incoming = tls.bind(&self.address)?.incoming();
102 
103         let fut = async move {
104             let _ = warp::serve(services)
105                 .serve_incoming_with_graceful_shutdown(
106                     incoming.compat().map_ok(|s| s.compat().compat()),
107                     shutdown.clone().compat().map(|_| ()),
108                 )
109                 .await;
110             // We need to drop the last copy of ShutdownSignalToken only after server has shut down.
111             drop(shutdown);
112             Ok(())
113         };
114         Ok(Box::new(fut.boxed().compat()))
115     }
116 
output_type(&self) -> DataType117     fn output_type(&self) -> DataType {
118         DataType::Log
119     }
120 
source_type(&self) -> &'static str121     fn source_type(&self) -> &'static str {
122         "splunk_hec"
123     }
124 }
125 
126 /// Shared data for responding to requests.
127 struct SplunkSource {
128     credentials: Option<Bytes>,
129 }
130 
131 impl SplunkSource {
new(config: &SplunkConfig) -> Self132     fn new(config: &SplunkConfig) -> Self {
133         SplunkSource {
134             credentials: config
135                 .token
136                 .as_ref()
137                 .map(|token| format!("Splunk {}", token).into()),
138         }
139     }
140 
event_service(&self, out: mpsc::Sender<Event>) -> BoxedFilter<(Response,)>141     fn event_service(&self, out: mpsc::Sender<Event>) -> BoxedFilter<(Response,)> {
142         warp::post()
143             .and(path!("event").or(path!("event" / "1.0")))
144             .and(self.authorization())
145             .and(warp::header::optional::<String>("x-splunk-request-channel"))
146             .and(warp::header::optional::<String>("host"))
147             .and(self.gzip())
148             .and(warp::body::bytes())
149             .and_then(
150                 move |_,
151                       _,
152                       channel: Option<String>,
153                       host: Option<String>,
154                       gzip: bool,
155                       body: Bytes| {
156                     let out = out.clone();
157                     async move {
158                         // Construct event parser
159                         if gzip {
160                             EventStream::new(GzDecoder::new(body.reader()), channel, host)
161                                 .forward(out.clone().sink_map_err(|_| ApiError::ServerShutdown))
162                                 .map(|_| ())
163                                 .compat()
164                                 .await
165                         } else {
166                             EventStream::new(body.reader(), channel, host)
167                                 .forward(out.clone().sink_map_err(|_| ApiError::ServerShutdown))
168                                 .map(|_| ())
169                                 .compat()
170                                 .await
171                         }
172                     }
173                 },
174             )
175             .map(finish_ok)
176             .boxed()
177     }
178 
raw_service(&self, out: mpsc::Sender<Event>) -> BoxedFilter<(Response,)>179     fn raw_service(&self, out: mpsc::Sender<Event>) -> BoxedFilter<(Response,)> {
180         warp::post()
181             .and(path!("raw" / "1.0").or(path!("raw")))
182             .and(self.authorization())
183             .and(
184                 warp::header::optional::<String>("x-splunk-request-channel").and_then(
185                     |channel: Option<String>| async {
186                         if let Some(channel) = channel {
187                             Ok(channel)
188                         } else {
189                             Err(Rejection::from(ApiError::MissingChannel))
190                         }
191                     },
192                 ),
193             )
194             .and(warp::header::optional::<String>("host"))
195             .and(self.gzip())
196             .and(warp::body::bytes())
197             .and_then(
198                 move |_, _, channel: String, host: Option<String>, gzip: bool, body: Bytes| {
199                     let out = out.clone();
200                     async move {
201                         // Construct event parser
202                         futures01::stream::once(raw_event(body, gzip, channel, host))
203                             .forward(out.clone().sink_map_err(|_| ApiError::ServerShutdown))
204                             .map(|_| ())
205                             .compat()
206                             .await
207                     }
208                 },
209             )
210             .map(finish_ok)
211             .boxed()
212     }
213 
health_service(&self, out: mpsc::Sender<Event>) -> BoxedFilter<(Response,)>214     fn health_service(&self, out: mpsc::Sender<Event>) -> BoxedFilter<(Response,)> {
215         let credentials = self.credentials.clone();
216         let authorize =
217             warp::header::optional("Authorization").and_then(move |token: Option<String>| {
218                 let credentials = credentials.clone();
219                 async move {
220                     match (token, credentials) {
221                         (_, None) => Ok(()),
222                         (Some(token), Some(password)) if token.as_bytes() == password.as_ref() => {
223                             Ok(())
224                         }
225                         _ => Err(Rejection::from(ApiError::BadRequest)),
226                     }
227                 }
228             });
229 
230         warp::get()
231             .and(path!("health" / "1.0").or(path!("health")))
232             .and(authorize)
233             .and_then(move |_, _| {
234                 let out = out.clone();
235                 async move {
236                     match out.clone().poll_ready() {
237                         Ok(Async::Ready(())) => Ok(warp::reply().into_response()),
238                         // Since channel of mpsc::Sender increase by one with each sender, technically
239                         // channel will never be full, and this will never be returned.
240                         // This behavior dosn't fulfill one of purposes of healthcheck.
241                         Ok(Async::NotReady) => Ok(warp::reply::with_status(
242                             warp::reply(),
243                             StatusCode::SERVICE_UNAVAILABLE,
244                         )
245                         .into_response()),
246                         Err(_) => Err(Rejection::from(ApiError::ServerShutdown)),
247                     }
248                 }
249             })
250             .boxed()
251     }
252 
options() -> BoxedFilter<(Response,)>253     fn options() -> BoxedFilter<(Response,)> {
254         let post = warp::options()
255             .and(
256                 path!("event")
257                     .or(path!("event" / "1.0"))
258                     .or(path!("raw" / "1.0"))
259                     .or(path!("raw")),
260             )
261             .map(|_| warp::reply::with_header(warp::reply(), "Allow", "POST").into_response());
262 
263         let get = warp::options()
264             .and(path!("health").or(path!("health" / "1.0")))
265             .map(|_| warp::reply::with_header(warp::reply(), "Allow", "GET").into_response());
266 
267         post.or(get).unify().boxed()
268     }
269 
270     /// Authorize request
authorization(&self) -> BoxedFilter<((),)>271     fn authorization(&self) -> BoxedFilter<((),)> {
272         let credentials = self.credentials.clone();
273         warp::header::optional("Authorization")
274             .and_then(move |token: Option<String>| {
275                 let credentials = credentials.clone();
276                 async move {
277                     match (token, credentials) {
278                         (_, None) => Ok(()),
279                         (Some(token), Some(password)) if token.as_bytes() == password.as_ref() => {
280                             Ok(())
281                         }
282                         (Some(_), Some(_)) => Err(Rejection::from(ApiError::InvalidAuthorization)),
283                         (None, Some(_)) => Err(Rejection::from(ApiError::MissingAuthorization)),
284                     }
285                 }
286             })
287             .boxed()
288     }
289 
290     /// Is body encoded with gzip
gzip(&self) -> BoxedFilter<(bool,)>291     fn gzip(&self) -> BoxedFilter<(bool,)> {
292         warp::header::optional::<String>("Content-Encoding")
293             .and_then(|encoding: Option<String>| async move {
294                 match encoding {
295                     Some(s) if s.as_bytes() == b"gzip" => Ok(true),
296                     Some(_) => Err(Rejection::from(ApiError::UnsupportedEncoding)),
297                     None => Ok(false),
298                 }
299             })
300             .boxed()
301     }
302 }
303 
304 /// Constructs one ore more events from json-s coming from reader.
305 /// If errors, it's done with input.
306 struct EventStream<R: Read> {
307     /// Remaining request with JSON events
308     data: R,
309     /// Count of sended events
310     events: usize,
311     /// Optinal channel from headers
312     channel: Option<Value>,
313     /// Default time
314     time: Time,
315     /// Remaining extracted default values
316     extractors: [DefaultExtractor; 4],
317 }
318 
319 impl<R: Read> EventStream<R> {
new(data: R, channel: Option<String>, host: Option<String>) -> Self320     fn new(data: R, channel: Option<String>, host: Option<String>) -> Self {
321         EventStream {
322             data,
323             events: 0,
324             channel: channel.map(|value| value.as_bytes().into()),
325             time: Time::Now(Utc::now()),
326             extractors: [
327                 DefaultExtractor::new_with(
328                     "host",
329                     &event::log_schema().host_key(),
330                     host.map(|value| value.as_bytes().into()),
331                 ),
332                 DefaultExtractor::new("index", &INDEX),
333                 DefaultExtractor::new("source", &SOURCE),
334                 DefaultExtractor::new("sourcetype", &SOURCETYPE),
335             ],
336         }
337     }
338 
339     /// As serde_json::from_reader, but doesn't require that all data has to be consumed,
340     /// nor that it has to exist.
from_reader_take<T>(&mut self) -> Result<Option<T>, serde_json::Error> where T: de::DeserializeOwned,341     fn from_reader_take<T>(&mut self) -> Result<Option<T>, serde_json::Error>
342     where
343         T: de::DeserializeOwned,
344     {
345         use serde_json::de::Read;
346         let mut reader = IoRead::new(&mut self.data);
347         match reader.peek()? {
348             None => Ok(None),
349             Some(_) => Deserialize::deserialize(&mut Deserializer::new(reader)).map(Some),
350         }
351     }
352 }
353 
354 impl<R: Read> Stream for EventStream<R> {
355     type Item = Event;
356     type Error = Rejection;
poll(&mut self) -> Result<Async<Option<Event>>, Rejection>357     fn poll(&mut self) -> Result<Async<Option<Event>>, Rejection> {
358         // Parse JSON object
359         let mut json = match self.from_reader_take::<JsonValue>() {
360             Ok(Some(json)) => json,
361             Ok(None) => {
362                 return if self.events == 0 {
363                     Err(ApiError::NoData.into())
364                 } else {
365                     Ok(Async::Ready(None))
366                 };
367             }
368             Err(error) => {
369                 error!(message = "Malformed request body",%error);
370                 return Err(ApiError::InvalidDataFormat { event: self.events }.into());
371             }
372         };
373 
374         // Concstruct Event from parsed json event
375         let mut event = Event::new_empty_log();
376         let log = event.as_mut_log();
377 
378         // Add source type
379         log.insert(event::log_schema().source_type_key(), "splunk_hec");
380 
381         // Process event field
382         match json.get_mut("event") {
383             Some(event) => match event.take() {
384                 JsonValue::String(string) => {
385                     if string.is_empty() {
386                         return Err(ApiError::EmptyEventField { event: self.events }.into());
387                     }
388                     log.insert(event::log_schema().message_key().clone(), string);
389                 }
390                 JsonValue::Object(mut object) => {
391                     if object.is_empty() {
392                         return Err(ApiError::EmptyEventField { event: self.events }.into());
393                     }
394 
395                     // Add 'line' value as 'event::schema().message_key'
396                     if let Some(line) = object.remove("line") {
397                         match line {
398                             // This don't quite fit the meaning of a event::schema().message_key
399                             JsonValue::Array(_) | JsonValue::Object(_) => {
400                                 log.insert("line", line);
401                             }
402                             _ => {
403                                 log.insert(event::log_schema().message_key(), line);
404                             }
405                         }
406                     }
407 
408                     for (key, value) in object {
409                         log.insert(key, value);
410                     }
411                 }
412                 _ => return Err(ApiError::InvalidDataFormat { event: self.events }.into()),
413             },
414             None => return Err(ApiError::MissingEventField { event: self.events }.into()),
415         }
416 
417         // Process channel field
418         if let Some(JsonValue::String(guid)) = json.get_mut("channel").map(JsonValue::take) {
419             log.insert(CHANNEL.clone(), guid);
420         } else if let Some(guid) = self.channel.as_ref() {
421             log.insert(CHANNEL.clone(), guid.clone());
422         }
423 
424         // Process fields field
425         if let Some(JsonValue::Object(object)) = json.get_mut("fields").map(JsonValue::take) {
426             for (key, value) in object {
427                 log.insert(key, value);
428             }
429         }
430 
431         // Process time field
432         let parsed_time = match json.get_mut("time").map(JsonValue::take) {
433             Some(JsonValue::Number(time)) => Some(Some(time)),
434             Some(JsonValue::String(time)) => Some(time.parse::<serde_json::Number>().ok()),
435             _ => None,
436         };
437         match parsed_time {
438             None => (),
439             Some(Some(t)) => {
440                 if let Some(t) = t.as_u64() {
441                     let time = parse_timestamp(t as i64)
442                         .ok_or_else(|| ApiError::InvalidDataFormat { event: self.events })?;
443 
444                     self.time = Time::Provided(time);
445                 } else if let Some(t) = t.as_f64() {
446                     self.time = Time::Provided(Utc.timestamp(
447                         t.floor() as i64,
448                         (t.fract() * 1000.0 * 1000.0 * 1000.0) as u32,
449                     ));
450                 } else {
451                     return Err(ApiError::InvalidDataFormat { event: self.events }.into());
452                 }
453             }
454             Some(None) => return Err(ApiError::InvalidDataFormat { event: self.events }.into()),
455         }
456 
457         // Add time field
458         match self.time.clone() {
459             Time::Provided(time) => log.insert(event::log_schema().timestamp_key().clone(), time),
460             Time::Now(time) => log.insert(event::log_schema().timestamp_key().clone(), time),
461         };
462 
463         // Extract default extracted fields
464         for de in self.extractors.iter_mut() {
465             de.extract(log, &mut json);
466         }
467 
468         self.events += 1;
469 
470         Ok(Async::Ready(Some(event)))
471     }
472 }
473 
474 /// Parse a `i64` unix timestamp that can either be in seconds, milliseconds or
475 /// nanoseconds.
476 ///
477 /// This attempts to parse timestamps based on what cutoff range they fall into.
478 /// For seconds to be parsed the timestamp must be less than the unix epoch of
479 /// the year `2400`. For this to parse milliseconds the time must be smaller
480 /// than the year `10,000` in unix epcoch milliseconds. If the value is larger
481 /// than both we attempt to parse it as nanoseconds.
482 ///
483 /// Returns `None` if `t` is negative.
parse_timestamp(t: i64) -> Option<DateTime<Utc>>484 fn parse_timestamp(t: i64) -> Option<DateTime<Utc>> {
485     // Utc.ymd(2400, 1, 1).and_hms(0,0,0).timestamp();
486     const SEC_CUTOFF: i64 = 13569465600;
487     // Utc.ymd(10_000, 1, 1).and_hms(0,0,0).timestamp_millis();
488     const MILLISEC_CUTOFF: i64 = 253402300800000;
489 
490     // Timestamps can't be negative!
491     if t < 0 {
492         return None;
493     }
494 
495     let ts = if t < SEC_CUTOFF {
496         Utc.timestamp(t, 0)
497     } else if t < MILLISEC_CUTOFF {
498         Utc.timestamp_millis(t)
499     } else {
500         Utc.timestamp_nanos(t)
501     };
502 
503     Some(ts)
504 }
505 
506 /// Maintains last known extracted value of field and uses it in the absence of field.
507 struct DefaultExtractor {
508     field: &'static str,
509     to_field: &'static Atom,
510     value: Option<Value>,
511 }
512 
513 impl DefaultExtractor {
new(field: &'static str, to_field: &'static Atom) -> Self514     fn new(field: &'static str, to_field: &'static Atom) -> Self {
515         DefaultExtractor {
516             field,
517             to_field,
518             value: None,
519         }
520     }
521 
new_with( field: &'static str, to_field: &'static Atom, value: impl Into<Option<Value>>, ) -> Self522     fn new_with(
523         field: &'static str,
524         to_field: &'static Atom,
525         value: impl Into<Option<Value>>,
526     ) -> Self {
527         DefaultExtractor {
528             field,
529             to_field,
530             value: value.into(),
531         }
532     }
533 
extract(&mut self, log: &mut LogEvent, value: &mut JsonValue)534     fn extract(&mut self, log: &mut LogEvent, value: &mut JsonValue) {
535         // Process json_field
536         if let Some(JsonValue::String(new_value)) = value.get_mut(self.field).map(JsonValue::take) {
537             self.value = Some(new_value.into());
538         }
539 
540         // Add data field
541         if let Some(index) = self.value.as_ref() {
542             log.insert(self.to_field.clone(), index.clone());
543         }
544     }
545 }
546 
547 /// For tracking origin of the timestamp
548 #[derive(Clone, Debug)]
549 enum Time {
550     /// Backup
551     Now(DateTime<Utc>),
552     /// Provided in the request
553     Provided(DateTime<Utc>),
554 }
555 
556 /// Creates event from raw request
raw_event( bytes: Bytes, gzip: bool, channel: String, host: Option<String>, ) -> Result<Event, Rejection>557 fn raw_event(
558     bytes: Bytes,
559     gzip: bool,
560     channel: String,
561     host: Option<String>,
562 ) -> Result<Event, Rejection> {
563     // Process gzip
564     let message: Value = if gzip {
565         let mut data = Vec::new();
566         match GzDecoder::new(bytes.reader()).read_to_end(&mut data) {
567             Ok(0) => return Err(ApiError::NoData.into()),
568             Ok(_) => data.into(),
569             Err(error) => {
570                 error!(message = "Malformed request body",%error);
571                 return Err(ApiError::InvalidDataFormat { event: 0 }.into());
572             }
573         }
574     } else {
575         bytes.into()
576     };
577 
578     // Construct event
579     let mut event = Event::new_empty_log();
580     let log = event.as_mut_log();
581 
582     // Add message
583     log.insert(event::log_schema().message_key().clone(), message);
584 
585     // Add channel
586     log.insert(CHANNEL.clone(), channel.as_bytes());
587 
588     // Add host
589     if let Some(host) = host {
590         log.insert(event::log_schema().host_key().clone(), host.as_bytes());
591     }
592 
593     // Add timestamp
594     log.insert(event::log_schema().timestamp_key().clone(), Utc::now());
595 
596     // Add source type
597     event
598         .as_mut_log()
599         .try_insert(event::log_schema().source_type_key(), "splunk_hec");
600 
601     Ok(event)
602 }
603 
604 #[derive(Debug, Snafu)]
605 enum ApiError {
606     MissingAuthorization,
607     InvalidAuthorization,
608     UnsupportedEncoding,
609     MissingChannel,
610     NoData,
611     InvalidDataFormat { event: usize },
612     ServerShutdown,
613     EmptyEventField { event: usize },
614     MissingEventField { event: usize },
615     BadRequest,
616 }
617 
618 impl From<ApiError> for Rejection {
from(error: ApiError) -> Self619     fn from(error: ApiError) -> Self {
620         warp::reject::custom(error)
621     }
622 }
623 
624 impl warp::reject::Reject for ApiError {}
625 
626 /// Cached bodies for common responses
627 mod splunk_response {
628     use bytes::Bytes;
629     use lazy_static::lazy_static;
630     use serde_json::{json, Value};
631 
json_to_bytes(value: Value) -> Bytes632     fn json_to_bytes(value: Value) -> Bytes {
633         serde_json::to_string(&value).unwrap().into()
634     }
635 
636     lazy_static! {
637         pub static ref INVALID_AUTHORIZATION: Bytes =
638             json_to_bytes(json!({"text":"Invalid authorization","code":3}));
639         pub static ref MISSING_CREDENTIALS: Bytes =
640             json_to_bytes(json!({"text":"Token is required","code":2}));
641         pub static ref NO_DATA: Bytes = json_to_bytes(json!({"text":"No data","code":5}));
642         pub static ref SUCCESS: Bytes = json_to_bytes(json!({"text":"Success","code":0}));
643         pub static ref SERVER_ERROR: Bytes =
644             json_to_bytes(json!({"text":"Internal server error","code":8}));
645         pub static ref SERVER_SHUTDOWN: Bytes =
646             json_to_bytes(json!({"text":"Server is shuting down","code":9}));
647         pub static ref UNSUPPORTED_MEDIA_TYPE: Bytes =
648             json_to_bytes(json!({"text":"unsupported content encoding"}));
649         pub static ref NO_CHANNEL: Bytes =
650             json_to_bytes(json!({"text":"Data channel is missing","code":10}));
651     }
652 }
653 
finish_ok(_: ()) -> Response654 fn finish_ok(_: ()) -> Response {
655     response_json(StatusCode::OK, splunk_response::SUCCESS.as_ref())
656 }
657 
finish_err(rejection: Rejection) -> Result<(Response,), Rejection>658 async fn finish_err(rejection: Rejection) -> Result<(Response,), Rejection> {
659     if let Some(error) = rejection.find::<ApiError>() {
660         Ok((match error {
661             ApiError::MissingAuthorization => response_json(
662                 StatusCode::UNAUTHORIZED,
663                 splunk_response::MISSING_CREDENTIALS.as_ref(),
664             ),
665             ApiError::InvalidAuthorization => response_json(
666                 StatusCode::UNAUTHORIZED,
667                 splunk_response::INVALID_AUTHORIZATION.as_ref(),
668             ),
669             ApiError::UnsupportedEncoding => response_json(
670                 StatusCode::UNSUPPORTED_MEDIA_TYPE,
671                 splunk_response::UNSUPPORTED_MEDIA_TYPE.as_ref(),
672             ),
673             ApiError::MissingChannel => response_json(
674                 StatusCode::BAD_REQUEST,
675                 splunk_response::NO_CHANNEL.as_ref(),
676             ),
677             ApiError::NoData => {
678                 response_json(StatusCode::BAD_REQUEST, splunk_response::NO_DATA.as_ref())
679             }
680             ApiError::ServerShutdown => response_json(
681                 StatusCode::SERVICE_UNAVAILABLE,
682                 splunk_response::SERVER_SHUTDOWN.as_ref(),
683             ),
684             ApiError::InvalidDataFormat { event } => event_error("Invalid data format", 6, *event),
685             ApiError::EmptyEventField { event } => {
686                 event_error("Event field cannot be blank", 13, *event)
687             }
688             ApiError::MissingEventField { event } => {
689                 event_error("Event field is required", 12, *event)
690             }
691             ApiError::BadRequest => empty_response(StatusCode::BAD_REQUEST),
692         },))
693     } else {
694         Err(rejection)
695     }
696 }
697 
698 /// Response without body
empty_response(code: StatusCode) -> Response699 fn empty_response(code: StatusCode) -> Response {
700     let mut res = Response::default();
701     *res.status_mut() = code;
702     res
703 }
704 
705 /// Response with body
response_json(code: StatusCode, body: impl Serialize) -> Response706 fn response_json(code: StatusCode, body: impl Serialize) -> Response {
707     warp::reply::with_status(warp::reply::json(&body), code).into_response()
708 }
709 
710 /// Error happened during parsing of events
event_error(text: &str, code: u16, event: usize) -> Response711 fn event_error(text: &str, code: u16, event: usize) -> Response {
712     let body = json!({
713         "text":text,
714         "code":code,
715         "invalid-event-number":event
716     });
717     match serde_json::to_string(&body) {
718         Ok(string) => response_json(StatusCode::BAD_REQUEST, string),
719         Err(error) => {
720             error!("Error encoding json body: {}", error);
721             response_json(
722                 StatusCode::INTERNAL_SERVER_ERROR,
723                 splunk_response::SERVER_ERROR.clone(),
724             )
725         }
726     }
727 }
728 
729 #[cfg(feature = "sinks-splunk_hec")]
730 #[cfg(test)]
731 mod tests {
732     use super::{parse_timestamp, SplunkConfig};
733     use crate::runtime::Runtime;
734     use crate::test_util::{self, collect_n, runtime};
735     use crate::{
736         event::{self, Event},
737         shutdown::ShutdownSignal,
738         sinks::{
739             splunk_hec::{Encoding, HecSinkConfig},
740             util::{encoding::EncodingConfigWithDefault, Compression},
741             Healthcheck, RouterSink,
742         },
743         topology::config::{GlobalOptions, SinkConfig, SinkContext, SourceConfig},
744     };
745     use chrono::{TimeZone, Utc};
746     use futures::compat::Future01CompatExt;
747     use futures01::{stream, sync::mpsc, Sink};
748     use std::net::SocketAddr;
749 
750     /// Splunk token
751     const TOKEN: &str = "token";
752 
753     const CHANNEL_CAPACITY: usize = 1000;
754 
source(rt: &mut Runtime) -> (mpsc::Receiver<Event>, SocketAddr)755     fn source(rt: &mut Runtime) -> (mpsc::Receiver<Event>, SocketAddr) {
756         source_with(rt, Some(TOKEN.to_owned()))
757     }
758 
source_with(rt: &mut Runtime, token: Option<String>) -> (mpsc::Receiver<Event>, SocketAddr)759     fn source_with(rt: &mut Runtime, token: Option<String>) -> (mpsc::Receiver<Event>, SocketAddr) {
760         test_util::trace_init();
761         let (sender, recv) = mpsc::channel(CHANNEL_CAPACITY);
762         let address = test_util::next_addr();
763         rt.spawn(
764             SplunkConfig {
765                 address,
766                 token,
767                 tls: None,
768             }
769             .build(
770                 "default",
771                 &GlobalOptions::default(),
772                 ShutdownSignal::noop(),
773                 sender,
774             )
775             .unwrap(),
776         );
777         (recv, address)
778     }
779 
sink( address: SocketAddr, encoding: impl Into<EncodingConfigWithDefault<Encoding>>, compression: Compression, ) -> (RouterSink, Healthcheck)780     fn sink(
781         address: SocketAddr,
782         encoding: impl Into<EncodingConfigWithDefault<Encoding>>,
783         compression: Compression,
784     ) -> (RouterSink, Healthcheck) {
785         HecSinkConfig {
786             host: format!("http://{}", address),
787             token: TOKEN.to_owned(),
788             encoding: encoding.into(),
789             compression,
790             ..HecSinkConfig::default()
791         }
792         .build(SinkContext::new_test())
793         .unwrap()
794     }
795 
start( encoding: impl Into<EncodingConfigWithDefault<Encoding>>, compression: Compression, ) -> (Runtime, RouterSink, mpsc::Receiver<Event>)796     fn start(
797         encoding: impl Into<EncodingConfigWithDefault<Encoding>>,
798         compression: Compression,
799     ) -> (Runtime, RouterSink, mpsc::Receiver<Event>) {
800         let mut rt = runtime();
801         let (source, address) = source(&mut rt);
802         let (sink, health) = sink(address, encoding, compression);
803         assert!(rt.block_on(health).is_ok());
804         (rt, sink, source)
805     }
806 
channel_n( messages: Vec<impl Into<Event> + Send + 'static>, sink: RouterSink, source: mpsc::Receiver<Event>, rt: &mut Runtime, ) -> Vec<Event>807     fn channel_n(
808         messages: Vec<impl Into<Event> + Send + 'static>,
809         sink: RouterSink,
810         source: mpsc::Receiver<Event>,
811         rt: &mut Runtime,
812     ) -> Vec<Event> {
813         let n = messages.len();
814         assert!(
815             n <= CHANNEL_CAPACITY,
816             "To much messages for the sink channel"
817         );
818         let pump = sink.send_all(stream::iter_ok(messages.into_iter().map(Into::into)));
819         let _ = rt.block_on(pump).unwrap();
820         let events = rt.block_on(collect_n(source, n)).unwrap();
821 
822         assert_eq!(n, events.len());
823 
824         events
825     }
826 
post(address: SocketAddr, api: &str, message: &str) -> u16827     async fn post(address: SocketAddr, api: &str, message: &str) -> u16 {
828         send_with(address, api, message, TOKEN).await
829     }
830 
send_with(address: SocketAddr, api: &str, message: &str, token: &str) -> u16831     async fn send_with(address: SocketAddr, api: &str, message: &str, token: &str) -> u16 {
832         reqwest::Client::new()
833             .post(&format!("http://{}/{}", address, api))
834             .header("Authorization", format!("Splunk {}", token))
835             .header("x-splunk-request-channel", "guid")
836             .body(message.to_owned())
837             .send()
838             .await
839             .unwrap()
840             .status()
841             .as_u16()
842     }
843 
844     #[test]
no_compression_text_event()845     fn no_compression_text_event() {
846         let message = "gzip_text_event";
847         let (mut rt, sink, source) = start(Encoding::Text, Compression::None);
848 
849         let event = channel_n(vec![message], sink, source, &mut rt).remove(0);
850 
851         assert_eq!(
852             event.as_log()[&event::log_schema().message_key()],
853             message.into()
854         );
855         assert!(event
856             .as_log()
857             .get(&event::log_schema().timestamp_key())
858             .is_some());
859         assert_eq!(
860             event.as_log()[event::log_schema().source_type_key()],
861             "splunk_hec".into()
862         );
863     }
864 
865     #[test]
one_simple_text_event()866     fn one_simple_text_event() {
867         let message = "one_simple_text_event";
868         let (mut rt, sink, source) = start(Encoding::Text, Compression::Gzip);
869 
870         let event = channel_n(vec![message], sink, source, &mut rt).remove(0);
871 
872         assert_eq!(
873             event.as_log()[&event::log_schema().message_key()],
874             message.into()
875         );
876         assert!(event
877             .as_log()
878             .get(&event::log_schema().timestamp_key())
879             .is_some());
880         assert_eq!(
881             event.as_log()[event::log_schema().source_type_key()],
882             "splunk_hec".into()
883         );
884     }
885 
886     #[test]
multiple_simple_text_event()887     fn multiple_simple_text_event() {
888         let n = 200;
889         let (mut rt, sink, source) = start(Encoding::Text, Compression::None);
890 
891         let messages = (0..n)
892             .map(|i| format!("multiple_simple_text_event_{}", i))
893             .collect::<Vec<_>>();
894         let events = channel_n(messages.clone(), sink, source, &mut rt);
895 
896         for (msg, event) in messages.into_iter().zip(events.into_iter()) {
897             assert_eq!(
898                 event.as_log()[&event::log_schema().message_key()],
899                 msg.into()
900             );
901             assert!(event
902                 .as_log()
903                 .get(&event::log_schema().timestamp_key())
904                 .is_some());
905             assert_eq!(
906                 event.as_log()[event::log_schema().source_type_key()],
907                 "splunk_hec".into()
908             );
909         }
910     }
911 
912     #[test]
one_simple_json_event()913     fn one_simple_json_event() {
914         let message = "one_simple_json_event";
915         let (mut rt, sink, source) = start(Encoding::Json, Compression::Gzip);
916 
917         let event = channel_n(vec![message], sink, source, &mut rt).remove(0);
918 
919         assert_eq!(
920             event.as_log()[&event::log_schema().message_key()],
921             message.into()
922         );
923         assert!(event
924             .as_log()
925             .get(&event::log_schema().timestamp_key())
926             .is_some());
927         assert_eq!(
928             event.as_log()[event::log_schema().source_type_key()],
929             "splunk_hec".into()
930         );
931     }
932 
933     #[test]
multiple_simple_json_event()934     fn multiple_simple_json_event() {
935         let n = 200;
936         let (mut rt, sink, source) = start(Encoding::Json, Compression::Gzip);
937 
938         let messages = (0..n)
939             .map(|i| format!("multiple_simple_json_event{}", i))
940             .collect::<Vec<_>>();
941         let events = channel_n(messages.clone(), sink, source, &mut rt);
942 
943         for (msg, event) in messages.into_iter().zip(events.into_iter()) {
944             assert_eq!(
945                 event.as_log()[&event::log_schema().message_key()],
946                 msg.into()
947             );
948             assert!(event
949                 .as_log()
950                 .get(&event::log_schema().timestamp_key())
951                 .is_some());
952             assert_eq!(
953                 event.as_log()[event::log_schema().source_type_key()],
954                 "splunk_hec".into()
955             );
956         }
957     }
958 
959     #[test]
json_event()960     fn json_event() {
961         let (mut rt, sink, source) = start(Encoding::Json, Compression::Gzip);
962 
963         let mut event = Event::new_empty_log();
964         event.as_mut_log().insert("greeting", "hello");
965         event.as_mut_log().insert("name", "bob");
966 
967         let pump = sink.send(event);
968         let _ = rt.block_on(pump).unwrap();
969         let event = rt.block_on(collect_n(source, 1)).unwrap().remove(0);
970 
971         assert_eq!(event.as_log()[&"greeting".into()], "hello".into());
972         assert_eq!(event.as_log()[&"name".into()], "bob".into());
973         assert!(event
974             .as_log()
975             .get(&event::log_schema().timestamp_key())
976             .is_some());
977         assert_eq!(
978             event.as_log()[event::log_schema().source_type_key()],
979             "splunk_hec".into()
980         );
981     }
982 
983     #[test]
line_to_message()984     fn line_to_message() {
985         let (mut rt, sink, source) = start(Encoding::Json, Compression::Gzip);
986 
987         let mut event = Event::new_empty_log();
988         event.as_mut_log().insert("line", "hello");
989 
990         let pump = sink.send(event);
991         let _ = rt.block_on(pump).unwrap();
992         let event = rt.block_on(collect_n(source, 1)).unwrap().remove(0);
993 
994         assert_eq!(
995             event.as_log()[&event::log_schema().message_key()],
996             "hello".into()
997         );
998     }
999 
1000     #[test]
raw()1001     fn raw() {
1002         let message = "raw";
1003         let mut rt = runtime();
1004         let (source, address) = source(&mut rt);
1005 
1006         rt.block_on_std(async move {
1007             assert_eq!(200, post(address, "services/collector/raw", message).await);
1008 
1009             let event = collect_n(source, 1).compat().await.unwrap().remove(0);
1010             assert_eq!(
1011                 event.as_log()[&event::log_schema().message_key()],
1012                 message.into()
1013             );
1014             assert_eq!(event.as_log()[&super::CHANNEL], "guid".into());
1015             assert!(event
1016                 .as_log()
1017                 .get(&event::log_schema().timestamp_key())
1018                 .is_some());
1019             assert_eq!(
1020                 event.as_log()[event::log_schema().source_type_key()],
1021                 "splunk_hec".into()
1022             );
1023         });
1024     }
1025 
1026     #[test]
no_data()1027     fn no_data() {
1028         let mut rt = runtime();
1029         let (_source, address) = source(&mut rt);
1030 
1031         rt.block_on_std(async move {
1032             assert_eq!(400, post(address, "services/collector/event", "").await);
1033         });
1034     }
1035 
1036     #[test]
invalid_token()1037     fn invalid_token() {
1038         let mut rt = runtime();
1039         let (_source, address) = source(&mut rt);
1040 
1041         rt.block_on_std(async move {
1042             assert_eq!(
1043                 401,
1044                 send_with(address, "services/collector/event", "", "nope").await
1045             );
1046         });
1047     }
1048 
1049     #[test]
no_autorization()1050     fn no_autorization() {
1051         let message = "no_autorization";
1052         let mut rt = runtime();
1053         let (source, address) = source_with(&mut rt, None);
1054         let (sink, health) = sink(address, Encoding::Text, Compression::Gzip);
1055         assert!(rt.block_on(health).is_ok());
1056 
1057         let event = channel_n(vec![message], sink, source, &mut rt).remove(0);
1058 
1059         assert_eq!(
1060             event.as_log()[&event::log_schema().message_key()],
1061             message.into()
1062         );
1063     }
1064 
1065     #[test]
partial()1066     fn partial() {
1067         let message = r#"{"event":"first"}{"event":"second""#;
1068         let mut rt = runtime();
1069         let (source, address) = source(&mut rt);
1070 
1071         rt.block_on_std(async move {
1072             assert_eq!(
1073                 400,
1074                 post(address, "services/collector/event", message).await
1075             );
1076 
1077             let event = collect_n(source, 1).compat().await.unwrap().remove(0);
1078             assert_eq!(
1079                 event.as_log()[&event::log_schema().message_key()],
1080                 "first".into()
1081             );
1082             assert!(event
1083                 .as_log()
1084                 .get(&event::log_schema().timestamp_key())
1085                 .is_some());
1086             assert_eq!(
1087                 event.as_log()[event::log_schema().source_type_key()],
1088                 "splunk_hec".into()
1089             );
1090         });
1091     }
1092 
1093     #[test]
default()1094     fn default() {
1095         let message = r#"{"event":"first","source":"main"}{"event":"second"}{"event":"third","source":"secondary"}"#;
1096         let mut rt = runtime();
1097         let (source, address) = source(&mut rt);
1098 
1099         rt.block_on_std(async move {
1100             assert_eq!(
1101                 200,
1102                 post(address, "services/collector/event", message).await
1103             );
1104 
1105             let events = collect_n(source, 3).compat().await.unwrap();
1106 
1107             assert_eq!(
1108                 events[0].as_log()[&event::log_schema().message_key()],
1109                 "first".into()
1110             );
1111             assert_eq!(events[0].as_log()[&super::SOURCE], "main".into());
1112 
1113             assert_eq!(
1114                 events[1].as_log()[&event::log_schema().message_key()],
1115                 "second".into()
1116             );
1117             assert_eq!(events[1].as_log()[&super::SOURCE], "main".into());
1118 
1119             assert_eq!(
1120                 events[2].as_log()[&event::log_schema().message_key()],
1121                 "third".into()
1122             );
1123             assert_eq!(events[2].as_log()[&super::SOURCE], "secondary".into());
1124         });
1125     }
1126 
1127     #[test]
parse_timestamps()1128     fn parse_timestamps() {
1129         let cases = vec![
1130             Utc::now(),
1131             Utc.ymd(1971, 11, 7).and_hms(1, 1, 1),
1132             Utc.ymd(2011, 8, 5).and_hms(1, 1, 1),
1133             Utc.ymd(2189, 11, 4).and_hms(2, 2, 2),
1134         ];
1135 
1136         for case in cases {
1137             let sec = case.timestamp();
1138             let millis = case.timestamp_millis();
1139             let nano = case.timestamp_nanos();
1140 
1141             assert_eq!(
1142                 parse_timestamp(sec as i64).unwrap().timestamp(),
1143                 case.timestamp()
1144             );
1145             assert_eq!(
1146                 parse_timestamp(millis as i64).unwrap().timestamp_millis(),
1147                 case.timestamp_millis()
1148             );
1149             assert_eq!(
1150                 parse_timestamp(nano as i64).unwrap().timestamp_nanos(),
1151                 case.timestamp_nanos()
1152             );
1153         }
1154 
1155         assert!(parse_timestamp(-1).is_none());
1156     }
1157 }
1158