1 use crate::{
2     sinks::http::{HttpMethod, HttpSinkConfig},
3     sinks::util::{
4         encoding::{EncodingConfigWithDefault, EncodingConfiguration},
5         service2::TowerRequestConfig,
6         BatchConfig, Compression,
7     },
8     topology::config::{DataType, SinkConfig, SinkContext, SinkDescription},
9 };
10 use http::Uri;
11 use indexmap::IndexMap;
12 use serde::{Deserialize, Serialize};
13 use snafu::Snafu;
14 
15 #[derive(Debug, Snafu)]
16 enum BuildError {
17     #[snafu(display(
18         "Missing authentication key, must provide either 'license_key' or 'insert_key'"
19     ))]
20     MissingAuthParam,
21 }
22 
23 #[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Derivative)]
24 #[serde(rename_all = "snake_case")]
25 #[derivative(Default)]
26 pub enum NewRelicLogsRegion {
27     #[derivative(Default)]
28     Us,
29     Eu,
30 }
31 
32 #[derive(Deserialize, Serialize, Debug, Derivative, Clone)]
33 #[derivative(Default)]
34 pub struct NewRelicLogsConfig {
35     pub license_key: Option<String>,
36     pub insert_key: Option<String>,
37     pub region: Option<NewRelicLogsRegion>,
38     #[serde(skip_serializing_if = "skip_serializing_if_default", default)]
39     pub encoding: EncodingConfigWithDefault<Encoding>,
40     #[serde(default)]
41     pub compression: Compression,
42     #[serde(default)]
43     pub batch: BatchConfig,
44 
45     #[serde(default)]
46     pub request: TowerRequestConfig,
47 }
48 
49 inventory::submit! {
50     SinkDescription::new::<NewRelicLogsConfig>("new_relic_logs")
51 }
52 
53 #[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Derivative)]
54 #[serde(rename_all = "snake_case")]
55 #[derivative(Default)]
56 pub enum Encoding {
57     #[derivative(Default)]
58     Json,
59 }
60 
61 impl From<Encoding> for crate::sinks::http::Encoding {
from(v: Encoding) -> crate::sinks::http::Encoding62     fn from(v: Encoding) -> crate::sinks::http::Encoding {
63         match v {
64             Encoding::Json => crate::sinks::http::Encoding::Json,
65         }
66     }
67 }
68 
69 // There is another one of these in `util::encoding`, but this one is specialized for New Relic.
70 /// For encodings, answers "Is it possible to skip serializing this value, because it's the
71 /// default?"
skip_serializing_if_default(e: &EncodingConfigWithDefault<Encoding>) -> bool72 pub(crate) fn skip_serializing_if_default(e: &EncodingConfigWithDefault<Encoding>) -> bool {
73     e.codec() == &Encoding::default()
74 }
75 
76 #[typetag::serde(name = "new_relic_logs")]
77 impl SinkConfig for NewRelicLogsConfig {
build(&self, cx: SinkContext) -> crate::Result<(super::RouterSink, super::Healthcheck)>78     fn build(&self, cx: SinkContext) -> crate::Result<(super::RouterSink, super::Healthcheck)> {
79         let http_conf = self.create_config()?;
80         http_conf.build(cx)
81     }
82 
input_type(&self) -> DataType83     fn input_type(&self) -> DataType {
84         DataType::Log
85     }
86 
sink_type(&self) -> &'static str87     fn sink_type(&self) -> &'static str {
88         "new_relic_logs"
89     }
90 }
91 
92 impl NewRelicLogsConfig {
create_config(&self) -> crate::Result<HttpSinkConfig>93     fn create_config(&self) -> crate::Result<HttpSinkConfig> {
94         let mut headers: IndexMap<String, String> = IndexMap::new();
95 
96         if let Some(license_key) = &self.license_key {
97             headers.insert("X-License-Key".to_owned(), license_key.clone());
98         } else if let Some(insert_key) = &self.insert_key {
99             headers.insert("X-Insert-Key".to_owned(), insert_key.clone());
100         } else {
101             return Err(Box::new(BuildError::MissingAuthParam));
102         }
103 
104         let uri = match self.region.as_ref().unwrap_or(&NewRelicLogsRegion::Us) {
105             NewRelicLogsRegion::Us => Uri::from_static("https://log-api.newrelic.com/log/v1"),
106             NewRelicLogsRegion::Eu => Uri::from_static("https://log-api.eu.newrelic.com/log/v1"),
107         };
108 
109         let batch = self.batch.use_size_as_bytes()?;
110         let batch = BatchConfig {
111             // The max request size is 10MiB, so in order to be comfortably
112             // within this we batch up to 5MiB.
113             max_bytes: Some(batch.max_bytes.unwrap_or(bytesize::mib(5u64) as usize)),
114             max_events: None,
115             ..batch
116         };
117 
118         let request = TowerRequestConfig {
119             // The default throughput ceiling defaults are relatively
120             // conservative so we crank them up for New Relic.
121             in_flight_limit: Some(self.request.in_flight_limit.unwrap_or(100)),
122             rate_limit_num: Some(self.request.rate_limit_num.unwrap_or(100)),
123             ..self.request
124         };
125 
126         Ok(HttpSinkConfig {
127             uri: uri.into(),
128             method: Some(HttpMethod::Post),
129             healthcheck_uri: None,
130             auth: None,
131             headers: Some(headers),
132             compression: self.compression,
133             encoding: self.encoding.clone().without_default(),
134 
135             batch,
136             request,
137 
138             tls: None,
139         })
140     }
141 }
142 
143 #[cfg(test)]
144 mod tests {
145     use super::*;
146     use crate::{
147         event::Event,
148         sinks::util::test::build_test_server,
149         test_util::{next_addr, runtime, shutdown_on_idle},
150         topology::config::SinkConfig,
151     };
152     use bytes05::buf::BufExt;
153     use futures01::{stream, Sink, Stream};
154     use hyper::Method;
155     use serde_json::Value;
156     use std::io::BufRead;
157 
158     #[test]
new_relic_logs_check_config_no_auth()159     fn new_relic_logs_check_config_no_auth() {
160         assert_eq!(
161             format!(
162                 "{}",
163                 NewRelicLogsConfig::default().create_config().unwrap_err()
164             ),
165             "Missing authentication key, must provide either 'license_key' or 'insert_key'"
166                 .to_owned(),
167         );
168     }
169 
170     #[test]
new_relic_logs_check_config_defaults()171     fn new_relic_logs_check_config_defaults() {
172         let mut nr_config = NewRelicLogsConfig::default();
173         nr_config.license_key = Some("foo".to_owned());
174         let http_config = nr_config.create_config().unwrap();
175 
176         assert_eq!(
177             format!("{}", http_config.uri),
178             "https://log-api.newrelic.com/log/v1".to_string()
179         );
180         assert_eq!(http_config.method, Some(HttpMethod::Post));
181         assert_eq!(http_config.encoding.codec(), &Encoding::Json.into());
182         assert_eq!(
183             http_config.batch.max_bytes,
184             Some(bytesize::mib(5u64) as usize)
185         );
186         assert_eq!(http_config.request.in_flight_limit, Some(100));
187         assert_eq!(http_config.request.rate_limit_num, Some(100));
188         assert_eq!(
189             http_config.headers.unwrap()["X-License-Key"],
190             "foo".to_owned()
191         );
192         assert!(http_config.tls.is_none());
193         assert!(http_config.auth.is_none());
194     }
195 
196     #[test]
new_relic_logs_check_config_custom()197     fn new_relic_logs_check_config_custom() {
198         let mut nr_config = NewRelicLogsConfig::default();
199         nr_config.insert_key = Some("foo".to_owned());
200         nr_config.region = Some(NewRelicLogsRegion::Eu);
201         nr_config.batch.max_size = Some(bytesize::mib(8u64) as usize);
202         nr_config.request.in_flight_limit = Some(12);
203         nr_config.request.rate_limit_num = Some(24);
204 
205         let http_config = nr_config.create_config().unwrap();
206 
207         assert_eq!(
208             format!("{}", http_config.uri),
209             "https://log-api.eu.newrelic.com/log/v1".to_string()
210         );
211         assert_eq!(http_config.method, Some(HttpMethod::Post));
212         assert_eq!(http_config.encoding.codec(), &Encoding::Json.into());
213         assert_eq!(
214             http_config.batch.max_bytes,
215             Some(bytesize::mib(8u64) as usize)
216         );
217         assert_eq!(http_config.request.in_flight_limit, Some(12));
218         assert_eq!(http_config.request.rate_limit_num, Some(24));
219         assert_eq!(
220             http_config.headers.unwrap()["X-Insert-Key"],
221             "foo".to_owned()
222         );
223         assert!(http_config.tls.is_none());
224         assert!(http_config.auth.is_none());
225     }
226 
227     #[test]
new_relic_logs_check_config_custom_from_toml()228     fn new_relic_logs_check_config_custom_from_toml() {
229         let config = r#"
230         insert_key = "foo"
231         region = "eu"
232 
233         [batch]
234         max_size = 8388608
235 
236         [request]
237         in_flight_limit = 12
238         rate_limit_num = 24
239     "#;
240         let nr_config: NewRelicLogsConfig = toml::from_str(&config).unwrap();
241 
242         let http_config = nr_config.create_config().unwrap();
243 
244         assert_eq!(
245             format!("{}", http_config.uri),
246             "https://log-api.eu.newrelic.com/log/v1".to_string()
247         );
248         assert_eq!(http_config.method, Some(HttpMethod::Post));
249         assert_eq!(http_config.encoding.codec(), &Encoding::Json.into());
250         assert_eq!(
251             http_config.batch.max_bytes,
252             Some(bytesize::mib(8u64) as usize)
253         );
254         assert_eq!(http_config.request.in_flight_limit, Some(12));
255         assert_eq!(http_config.request.rate_limit_num, Some(24));
256         assert_eq!(
257             http_config.headers.unwrap()["X-Insert-Key"],
258             "foo".to_owned()
259         );
260         assert!(http_config.tls.is_none());
261         assert!(http_config.auth.is_none());
262     }
263 
264     #[test]
new_relic_logs_happy_path()265     fn new_relic_logs_happy_path() {
266         let in_addr = next_addr();
267 
268         let mut nr_config = NewRelicLogsConfig::default();
269         nr_config.license_key = Some("foo".to_owned());
270         let mut http_config = nr_config.create_config().unwrap();
271         http_config.uri = format!("http://{}/fake_nr", in_addr)
272             .parse::<http::Uri>()
273             .unwrap()
274             .into();
275 
276         let mut rt = runtime();
277 
278         let (sink, _healthcheck) = http_config.build(SinkContext::new_test()).unwrap();
279         let (rx, trigger, server) = build_test_server(in_addr, &mut rt);
280 
281         let input_lines = (0..100).map(|i| format!("msg {}", i)).collect::<Vec<_>>();
282         let events = stream::iter_ok(input_lines.clone().into_iter().map(Event::from));
283 
284         let pump = sink.send_all(events);
285 
286         rt.spawn(server);
287 
288         let _ = rt.block_on(pump).unwrap();
289         drop(trigger);
290 
291         let output_lines = rx
292             .wait()
293             .map(Result::unwrap)
294             .map(|(parts, body)| {
295                 assert_eq!(Method::POST, parts.method);
296                 assert_eq!("/fake_nr", parts.uri.path());
297                 assert_eq!(
298                     parts
299                         .headers
300                         .get("X-License-Key")
301                         .and_then(|v| v.to_str().ok()),
302                     Some("foo")
303                 );
304                 body.reader()
305             })
306             .flat_map(BufRead::lines)
307             .map(Result::unwrap)
308             .flat_map(|s| -> Vec<String> {
309                 let vals: Vec<Value> = serde_json::from_str(&s).unwrap();
310                 vals.iter()
311                     .map(|v| v.get("message").unwrap().as_str().unwrap().to_owned())
312                     .collect()
313             })
314             .collect::<Vec<_>>();
315 
316         shutdown_on_idle(rt);
317 
318         assert_eq!(input_lines, output_lines);
319     }
320 }
321