1 use criterion::{criterion_group, criterion_main, Benchmark, Criterion, Throughput};
2 use futures::{
3 compat::{Future01CompatExt, Stream01CompatExt},
4 stream::StreamExt,
5 };
6 use futures01::{stream, AsyncSink, Poll, Sink, StartSend, Stream};
7 use tempfile::tempdir;
8 use vector::{
9 buffers::disk::{leveldb_buffer, DiskBuffer},
10 runtime,
11 sinks::util::StreamSink,
12 Event,
13 };
14
15 struct NullSink;
16
17 impl Sink for NullSink {
18 type SinkItem = Event;
19 type SinkError = ();
20
start_send(&mut self, _item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError>21 fn start_send(&mut self, _item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
22 Ok(AsyncSink::Ready)
23 }
24
poll_complete(&mut self) -> Poll<(), Self::SinkError>25 fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
26 Ok(().into())
27 }
28 }
29
benchmark_buffers(c: &mut Criterion)30 fn benchmark_buffers(c: &mut Criterion) {
31 let num_lines: usize = 100_000;
32 let line_size: usize = 200;
33
34 let data_dir = tempdir().unwrap();
35 let data_dir = data_dir.path().to_path_buf();
36 let data_dir2 = data_dir.clone();
37 let data_dir3 = data_dir.clone();
38
39 c.bench(
40 "buffers",
41 Benchmark::new("channels/futures01", move |b| {
42 b.iter_with_setup(
43 || {
44 let rt = runtime::Runtime::new().unwrap();
45
46 let (writer, reader) = futures01::sync::mpsc::channel(100);
47 let writer = writer.sink_map_err(|e| panic!(e));
48
49 let read_loop = reader.for_each(move |_| Ok(()));
50
51 (rt, writer, read_loop)
52 },
53 |(mut rt, writer, read_loop)| {
54 let send = writer.send_all(random_events(line_size).take(num_lines as u64));
55
56 let read_handle = rt.spawn_handle_std(read_loop.compat());
57 let write_handle = rt.spawn_handle_std(send.compat());
58
59 let (writer, _stream) = rt.block_on_std(write_handle).unwrap().unwrap();
60 drop(writer);
61
62 rt.block_on_std(read_handle).unwrap().unwrap();
63 },
64 );
65 })
66 .with_function("channels/tokio", move |b| {
67 b.iter_with_setup(
68 || {
69 let mut rt = runtime::Runtime::new().unwrap();
70
71 let (writer, mut reader) = tokio::sync::mpsc::channel(100);
72
73 let read_handle =
74 rt.spawn_handle_std(async move { while reader.next().await.is_some() {} });
75
76 (rt, writer, read_handle)
77 },
78 |(mut rt, mut writer, read_handle)| {
79 let write_handle = rt.spawn_handle_std(async move {
80 let mut stream = random_events(line_size).take(num_lines as u64).compat();
81 while let Some(e) = stream.next().await {
82 writer.send(e).await.unwrap();
83 }
84 });
85
86 rt.block_on_std(write_handle).unwrap();
87 rt.block_on_std(read_handle).unwrap();
88 },
89 );
90 })
91 .with_function("leveldb/writing", move |b| {
92 b.iter_with_setup(
93 || {
94 let rt = runtime::Runtime::new().unwrap();
95
96 let path = data_dir.join("basic_sink");
97
98 // Clear out any existing data
99 if std::fs::metadata(&path).is_ok() {
100 std::fs::remove_dir_all(&path).unwrap();
101 }
102
103 let plenty_of_room = num_lines * line_size * 2;
104 let (writer, _reader, _acker) =
105 leveldb_buffer::Buffer::build(path, plenty_of_room).unwrap();
106
107 (rt, writer)
108 },
109 |(mut rt, writer)| {
110 let send = writer.send_all(random_events(line_size).take(num_lines as u64));
111 let write_handle = rt.spawn_handle_std(send.compat());
112 let _ = rt.block_on_std(write_handle).unwrap().unwrap();
113 },
114 );
115 })
116 .with_function("leveldb/reading", move |b| {
117 b.iter_with_setup(
118 || {
119 let mut rt = runtime::Runtime::new().unwrap();
120
121 let path = data_dir2.join("basic_sink");
122
123 // Clear out any existing data
124 if std::fs::metadata(&path).is_ok() {
125 std::fs::remove_dir_all(&path).unwrap();
126 }
127
128 let plenty_of_room = num_lines * line_size * 2;
129 let (writer, reader, acker) =
130 leveldb_buffer::Buffer::build(path, plenty_of_room).unwrap();
131
132 let send = writer.send_all(random_events(line_size).take(num_lines as u64));
133 let write_handle = rt.spawn_handle_std(send.compat());
134 let (writer, _stream) = rt.block_on_std(write_handle).unwrap().unwrap();
135 drop(writer);
136
137 let read_loop = StreamSink::new(NullSink, acker).send_all(reader);
138
139 (rt, read_loop)
140 },
141 |(mut rt, read_loop)| {
142 let read_handle = rt.spawn_handle_std(read_loop.compat());
143 rt.block_on_std(read_handle).unwrap().unwrap();
144 },
145 );
146 })
147 .with_function("leveldb/both", move |b| {
148 b.iter_with_setup(
149 || {
150 let rt = runtime::Runtime::new().unwrap();
151
152 let path = data_dir3.join("basic_sink");
153
154 // Clear out any existing data
155 if std::fs::metadata(&path).is_ok() {
156 std::fs::remove_dir_all(&path).unwrap();
157 }
158
159 let plenty_of_room = num_lines * line_size * 2;
160 let (writer, reader, acker) =
161 leveldb_buffer::Buffer::build(path, plenty_of_room).unwrap();
162
163 let read_loop = StreamSink::new(NullSink, acker).send_all(reader);
164
165 (rt, writer, read_loop)
166 },
167 |(mut rt, writer, read_loop)| {
168 let send = writer.send_all(random_events(line_size).take(num_lines as u64));
169
170 let read_handle = rt.spawn_handle_std(read_loop.compat());
171 let write_handle = rt.spawn_handle_std(send.compat());
172
173 let _ = rt.block_on_std(write_handle).unwrap().unwrap();
174 rt.block_on_std(read_handle).unwrap().unwrap();
175 },
176 );
177 })
178 .sample_size(10)
179 .noise_threshold(0.05)
180 .throughput(Throughput::Bytes((num_lines * line_size) as u64)),
181 );
182 }
183
184 criterion_group!(buffers, benchmark_buffers);
185 criterion_main!(buffers);
186
random_events(size: usize) -> impl Stream<Item = Event, Error = ()>187 fn random_events(size: usize) -> impl Stream<Item = Event, Error = ()> {
188 use rand::distributions::Alphanumeric;
189 use rand::{rngs::SmallRng, thread_rng, Rng, SeedableRng};
190
191 let mut rng = SmallRng::from_rng(thread_rng()).unwrap();
192
193 let lines = std::iter::repeat(()).map(move |_| {
194 rng.sample_iter(&Alphanumeric)
195 .take(size)
196 .collect::<String>()
197 });
198 stream::iter_ok(lines).map(Event::from)
199 }
200