1 use criterion::{criterion_group, criterion_main, Benchmark, Criterion, Throughput};
2 use futures::{
3     compat::{Future01CompatExt, Stream01CompatExt},
4     stream::StreamExt,
5 };
6 use futures01::{stream, AsyncSink, Poll, Sink, StartSend, Stream};
7 use tempfile::tempdir;
8 use vector::{
9     buffers::disk::{leveldb_buffer, DiskBuffer},
10     runtime,
11     sinks::util::StreamSink,
12     Event,
13 };
14 
15 struct NullSink;
16 
17 impl Sink for NullSink {
18     type SinkItem = Event;
19     type SinkError = ();
20 
start_send(&mut self, _item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError>21     fn start_send(&mut self, _item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
22         Ok(AsyncSink::Ready)
23     }
24 
poll_complete(&mut self) -> Poll<(), Self::SinkError>25     fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
26         Ok(().into())
27     }
28 }
29 
benchmark_buffers(c: &mut Criterion)30 fn benchmark_buffers(c: &mut Criterion) {
31     let num_lines: usize = 100_000;
32     let line_size: usize = 200;
33 
34     let data_dir = tempdir().unwrap();
35     let data_dir = data_dir.path().to_path_buf();
36     let data_dir2 = data_dir.clone();
37     let data_dir3 = data_dir.clone();
38 
39     c.bench(
40         "buffers",
41         Benchmark::new("channels/futures01", move |b| {
42             b.iter_with_setup(
43                 || {
44                     let rt = runtime::Runtime::new().unwrap();
45 
46                     let (writer, reader) = futures01::sync::mpsc::channel(100);
47                     let writer = writer.sink_map_err(|e| panic!(e));
48 
49                     let read_loop = reader.for_each(move |_| Ok(()));
50 
51                     (rt, writer, read_loop)
52                 },
53                 |(mut rt, writer, read_loop)| {
54                     let send = writer.send_all(random_events(line_size).take(num_lines as u64));
55 
56                     let read_handle = rt.spawn_handle_std(read_loop.compat());
57                     let write_handle = rt.spawn_handle_std(send.compat());
58 
59                     let (writer, _stream) = rt.block_on_std(write_handle).unwrap().unwrap();
60                     drop(writer);
61 
62                     rt.block_on_std(read_handle).unwrap().unwrap();
63                 },
64             );
65         })
66         .with_function("channels/tokio", move |b| {
67             b.iter_with_setup(
68                 || {
69                     let mut rt = runtime::Runtime::new().unwrap();
70 
71                     let (writer, mut reader) = tokio::sync::mpsc::channel(100);
72 
73                     let read_handle =
74                         rt.spawn_handle_std(async move { while reader.next().await.is_some() {} });
75 
76                     (rt, writer, read_handle)
77                 },
78                 |(mut rt, mut writer, read_handle)| {
79                     let write_handle = rt.spawn_handle_std(async move {
80                         let mut stream = random_events(line_size).take(num_lines as u64).compat();
81                         while let Some(e) = stream.next().await {
82                             writer.send(e).await.unwrap();
83                         }
84                     });
85 
86                     rt.block_on_std(write_handle).unwrap();
87                     rt.block_on_std(read_handle).unwrap();
88                 },
89             );
90         })
91         .with_function("leveldb/writing", move |b| {
92             b.iter_with_setup(
93                 || {
94                     let rt = runtime::Runtime::new().unwrap();
95 
96                     let path = data_dir.join("basic_sink");
97 
98                     // Clear out any existing data
99                     if std::fs::metadata(&path).is_ok() {
100                         std::fs::remove_dir_all(&path).unwrap();
101                     }
102 
103                     let plenty_of_room = num_lines * line_size * 2;
104                     let (writer, _reader, _acker) =
105                         leveldb_buffer::Buffer::build(path, plenty_of_room).unwrap();
106 
107                     (rt, writer)
108                 },
109                 |(mut rt, writer)| {
110                     let send = writer.send_all(random_events(line_size).take(num_lines as u64));
111                     let write_handle = rt.spawn_handle_std(send.compat());
112                     let _ = rt.block_on_std(write_handle).unwrap().unwrap();
113                 },
114             );
115         })
116         .with_function("leveldb/reading", move |b| {
117             b.iter_with_setup(
118                 || {
119                     let mut rt = runtime::Runtime::new().unwrap();
120 
121                     let path = data_dir2.join("basic_sink");
122 
123                     // Clear out any existing data
124                     if std::fs::metadata(&path).is_ok() {
125                         std::fs::remove_dir_all(&path).unwrap();
126                     }
127 
128                     let plenty_of_room = num_lines * line_size * 2;
129                     let (writer, reader, acker) =
130                         leveldb_buffer::Buffer::build(path, plenty_of_room).unwrap();
131 
132                     let send = writer.send_all(random_events(line_size).take(num_lines as u64));
133                     let write_handle = rt.spawn_handle_std(send.compat());
134                     let (writer, _stream) = rt.block_on_std(write_handle).unwrap().unwrap();
135                     drop(writer);
136 
137                     let read_loop = StreamSink::new(NullSink, acker).send_all(reader);
138 
139                     (rt, read_loop)
140                 },
141                 |(mut rt, read_loop)| {
142                     let read_handle = rt.spawn_handle_std(read_loop.compat());
143                     rt.block_on_std(read_handle).unwrap().unwrap();
144                 },
145             );
146         })
147         .with_function("leveldb/both", move |b| {
148             b.iter_with_setup(
149                 || {
150                     let rt = runtime::Runtime::new().unwrap();
151 
152                     let path = data_dir3.join("basic_sink");
153 
154                     // Clear out any existing data
155                     if std::fs::metadata(&path).is_ok() {
156                         std::fs::remove_dir_all(&path).unwrap();
157                     }
158 
159                     let plenty_of_room = num_lines * line_size * 2;
160                     let (writer, reader, acker) =
161                         leveldb_buffer::Buffer::build(path, plenty_of_room).unwrap();
162 
163                     let read_loop = StreamSink::new(NullSink, acker).send_all(reader);
164 
165                     (rt, writer, read_loop)
166                 },
167                 |(mut rt, writer, read_loop)| {
168                     let send = writer.send_all(random_events(line_size).take(num_lines as u64));
169 
170                     let read_handle = rt.spawn_handle_std(read_loop.compat());
171                     let write_handle = rt.spawn_handle_std(send.compat());
172 
173                     let _ = rt.block_on_std(write_handle).unwrap().unwrap();
174                     rt.block_on_std(read_handle).unwrap().unwrap();
175                 },
176             );
177         })
178         .sample_size(10)
179         .noise_threshold(0.05)
180         .throughput(Throughput::Bytes((num_lines * line_size) as u64)),
181     );
182 }
183 
184 criterion_group!(buffers, benchmark_buffers);
185 criterion_main!(buffers);
186 
random_events(size: usize) -> impl Stream<Item = Event, Error = ()>187 fn random_events(size: usize) -> impl Stream<Item = Event, Error = ()> {
188     use rand::distributions::Alphanumeric;
189     use rand::{rngs::SmallRng, thread_rng, Rng, SeedableRng};
190 
191     let mut rng = SmallRng::from_rng(thread_rng()).unwrap();
192 
193     let lines = std::iter::repeat(()).map(move |_| {
194         rng.sample_iter(&Alphanumeric)
195             .take(size)
196             .collect::<String>()
197     });
198     stream::iter_ok(lines).map(Event::from)
199 }
200