1 // Part of Measurement Kit <https://measurement-kit.github.io/>.
2 // Measurement Kit is free software under the BSD license. See AUTHORS
3 // and LICENSE for more information on the copying conditions.
4 #ifndef SRC_LIBMEASUREMENT_KIT_NET_LIBEVENT_HPP
5 #define SRC_LIBMEASUREMENT_KIT_NET_LIBEVENT_HPP
6
7 #include "src/libmeasurement_kit/net/emitter.hpp"
8 #include "src/libmeasurement_kit/net/utils.hpp"
9 #include "src/libmeasurement_kit/common/utils.hpp"
10 #include "src/libmeasurement_kit/common/non_copyable.hpp"
11 #include "src/libmeasurement_kit/common/non_movable.hpp"
12 #include "src/libmeasurement_kit/common/reactor.hpp"
13 #include <cassert>
14 #include <cerrno>
15 #include <event2/buffer.h>
16 #include <event2/bufferevent.h>
17 #include <event2/bufferevent_ssl.h>
18 #include <event2/event.h>
19 #include <functional>
20 #include "src/libmeasurement_kit/common/error.hpp"
21 #include "src/libmeasurement_kit/common/error_or.hpp"
22 #include "src/libmeasurement_kit/common/logger.hpp"
23 #include "src/libmeasurement_kit/net/buffer.hpp"
24 #include "src/libmeasurement_kit/net/error.hpp"
25 #include "src/libmeasurement_kit/net/transport.hpp"
26 #include "src/libmeasurement_kit/net/utils.hpp"
27 #include <measurement_kit/common/aaa_base.h>
28 #include <new>
29 #include <openssl/err.h>
30 #include <ostream>
31 #include <stdexcept>
32 #include <utility>
33
34 extern "C" {
35
36 static inline void handle_libevent_read(bufferevent *, void *);
37 static inline void handle_libevent_write(bufferevent *, void *);
38 static inline void handle_libevent_event(bufferevent *, short, void *);
39
40 } // extern "C"
41
42 namespace mk {
43 namespace net {
44
map_bufferevent_event(short what)45 static inline std::string map_bufferevent_event(short what) {
46 std::stringstream ss;
47 ss << ((what & BEV_EVENT_EOF) ? "Z" : "z")
48 << ((what & BEV_EVENT_TIMEOUT) ? "T" : "t")
49 << ((what & BEV_EVENT_ERROR) ? "F" : "f")
50 << ((what & BEV_EVENT_READING) ? "R" : "r")
51 << ((what & BEV_EVENT_WRITING) ? "W" : "w");
52 return ss.str();
53 }
54
55 class LibeventEmitter : public EmitterBase, public NonMovable, public NonCopyable {
56 public:
make(bufferevent * bev,SharedPtr<Reactor> reactor,SharedPtr<Logger> logger)57 static SharedPtr<Transport> make(bufferevent *bev, SharedPtr<Reactor> reactor,
58 SharedPtr<Logger> logger) {
59 LibeventEmitter *conn = new LibeventEmitter(bev, reactor, logger);
60 conn->self = SharedPtr<Transport>(conn);
61 return conn->self;
62 }
63
~LibeventEmitter()64 ~LibeventEmitter() override {
65 if (bev != nullptr) {
66 bufferevent_free(bev);
67 }
68 }
69
get_bufferevent()70 bufferevent *get_bufferevent() override {
71 return bev;
72 }
73
set_bufferevent(bufferevent * new_bev)74 void set_bufferevent(bufferevent *new_bev) override {
75 bev = new_bev;
76 }
77
78 protected:
adjust_timeout(double timeout)79 void adjust_timeout(double timeout) override {
80 timeval tv, *tvp = mk::timeval_init(&tv, timeout);
81 bufferevent *underlying = bufferevent_get_underlying(this->bev);
82 if (underlying) {
83 // When we have a underlying bufferevent (i.e., a socket) set the
84 // timeout to it rather than to the outer buffer because we have
85 // seen running a long download that setting the timeout of the SSL
86 // bufferevent leads to interrupted download due to timeout.
87 if (bufferevent_set_timeouts(underlying, tvp, tvp) != 0) {
88 throw std::runtime_error("cannot set timeout");
89 }
90 return;
91 }
92 if (bufferevent_set_timeouts(this->bev, tvp, tvp) != 0) {
93 throw std::runtime_error("cannot set timeout");
94 }
95 }
96
start_writing()97 void start_writing() override {
98 output_buff >> bufferevent_get_output(bev);
99 }
100
start_reading()101 void start_reading() override {
102 if (bufferevent_enable(this->bev, EV_READ) != 0) {
103 throw std::runtime_error("cannot enable read");
104 }
105 }
106
stop_reading()107 void stop_reading() override {
108 if (bufferevent_disable(this->bev, EV_READ) != 0) {
109 throw std::runtime_error("cannot disable read");
110 }
111 }
112
shutdown()113 void shutdown() override {
114 if (shutdown_called) {
115 return; // Just for extra safety
116 }
117 shutdown_called = true;
118 bufferevent_setcb(bev, nullptr, nullptr, nullptr, nullptr);
119 reactor->call_soon([=]() { this->self = nullptr; });
120 }
121
sockname_peername_()122 template <decltype(getsockname) func> Endpoint sockname_peername_() {
123 // Assumption: in the common case this operation won't fail. When it
124 // fails, we'll just return an empty endpoint.
125 assert(bev != nullptr);
126 auto fd = bufferevent_getfd(bev);
127 if (fd == -1) {
128 logger->warn("connection: bufferevent attached to invalid socket");
129 return {};
130 }
131 sockaddr_storage ss{};
132 socklen_t sslen = sizeof(ss);
133 if (func(fd, (sockaddr *)&ss, &sslen) != 0) {
134 logger->warn("connection: cannot get socket name / peer name");
135 return {};
136 }
137 ErrorOr<Endpoint> epnt = endpoint_from_sockaddr_storage(&ss);
138 if (!epnt) {
139 logger->warn("connection: cannot get endpoint from "
140 "sockaddr_storage structure");
141 return {};
142 }
143 return *epnt;
144 }
145
sockname()146 Endpoint sockname() override { return sockname_peername_<::getsockname>(); }
147
peername()148 Endpoint peername() override { return sockname_peername_<::getpeername>(); }
149
150 public:
151 // They MUST be public because they're called by C code
152
handle_event_(short what)153 void handle_event_(short what) {
154
155 logger->debug("connection: got bufferevent event: %s",
156 map_bufferevent_event(what).c_str());
157
158 if ((what & BEV_EVENT_EOF) != 0) {
159 auto input = bufferevent_get_input(bev);
160 if (evbuffer_get_length(input) > 0) {
161 logger->debug(
162 "Suppress EOF with data lingering in input buffer");
163 suppressed_eof = true;
164 return;
165 }
166 emit_error(EofError());
167 return;
168 }
169
170 if ((what & BEV_EVENT_TIMEOUT) != 0) {
171 emit_error(TimeoutError());
172 return;
173 }
174
175 #ifdef _WIN32
176 Error sys_error = net::map_errno(WSAGetLastError());
177 #else
178 Error sys_error = net::map_errno(errno);
179 #endif
180
181 if (sys_error == NoError()) {
182 unsigned long openssl_error;
183 char buff[128];
184 while ((openssl_error = bufferevent_get_openssl_error(bev)) != 0) {
185 if (sys_error == NoError()) {
186 sys_error = SslError();
187 }
188 ERR_error_string_n(openssl_error, buff, sizeof(buff));
189 sys_error.add_child_error(SslError(buff));
190 }
191 if (sys_error != SslError()) {
192 /*
193 * This is the case of the SSL dirty shutdown. The connection
194 * was not closed cleanly from the other end and in theory this
195 * could also be the effect of an attack.
196 */
197 logger->warn("libevent has detected an SSL dirty shutdown");
198 sys_error = SslDirtyShutdownError();
199 }
200 }
201
202 logger->warn("Got error: %s", sys_error.what());
203 emit_error(sys_error);
204 }
205
handle_read_()206 void handle_read_() {
207 Buffer buff(bufferevent_get_input(bev));
208 try {
209 emit_data(buff);
210 } catch (Error &error) {
211 emit_error(error);
212 return;
213 }
214 if (suppressed_eof) {
215 suppressed_eof = false;
216 logger->debug("Deliver previously suppressed EOF");
217 emit_error(EofError());
218 return;
219 }
220 }
221
handle_write_()222 void handle_write_() {
223 try {
224 emit_flush();
225 } catch (Error &error) {
226 emit_error(error);
227 }
228 }
229
230 private:
LibeventEmitter(bufferevent * bev,SharedPtr<Reactor> reactor,SharedPtr<Logger> logger)231 LibeventEmitter(bufferevent *bev, SharedPtr<Reactor> reactor, SharedPtr<Logger> logger)
232 : EmitterBase{reactor, logger} {
233
234 this->bev = bev;
235
236 // The following makes this non copyable and non movable.
237 bufferevent_setcb(this->bev, handle_libevent_read,
238 handle_libevent_write, handle_libevent_event, this);
239 }
240
241 bufferevent *bev = nullptr;
242 SharedPtr<Transport> self;
243 Callback<> close_cb;
244 bool suppressed_eof = false;
245 bool shutdown_called = false;
246 };
247
248 } // namespace net
249 } // namespace mk
250 #endif
251
252 extern "C" {
253
handle_libevent_read(bufferevent *,void * opaque)254 static inline void handle_libevent_read(bufferevent *, void *opaque) {
255 static_cast<mk::net::LibeventEmitter *>(opaque)->handle_read_();
256 }
257
handle_libevent_write(bufferevent *,void * opaque)258 static inline void handle_libevent_write(bufferevent *, void *opaque) {
259 static_cast<mk::net::LibeventEmitter *>(opaque)->handle_write_();
260 }
261
handle_libevent_event(bufferevent *,short what,void * opaque)262 static inline void handle_libevent_event(bufferevent *, short what,
263 void *opaque) {
264 static_cast<mk::net::LibeventEmitter *>(opaque)->handle_event_(what);
265 }
266
267 } // extern "C"
268