1 // Part of Measurement Kit <https://measurement-kit.github.io/>.
2 // Measurement Kit is free software under the BSD license. See AUTHORS
3 // and LICENSE for more information on the copying conditions.
4 #ifndef SRC_LIBMEASUREMENT_KIT_NET_LIBEVENT_HPP
5 #define SRC_LIBMEASUREMENT_KIT_NET_LIBEVENT_HPP
6 
7 #include "src/libmeasurement_kit/net/emitter.hpp"
8 #include "src/libmeasurement_kit/net/utils.hpp"
9 #include "src/libmeasurement_kit/common/utils.hpp"
10 #include "src/libmeasurement_kit/common/non_copyable.hpp"
11 #include "src/libmeasurement_kit/common/non_movable.hpp"
12 #include "src/libmeasurement_kit/common/reactor.hpp"
13 #include <cassert>
14 #include <cerrno>
15 #include <event2/buffer.h>
16 #include <event2/bufferevent.h>
17 #include <event2/bufferevent_ssl.h>
18 #include <event2/event.h>
19 #include <functional>
20 #include "src/libmeasurement_kit/common/error.hpp"
21 #include "src/libmeasurement_kit/common/error_or.hpp"
22 #include "src/libmeasurement_kit/common/logger.hpp"
23 #include "src/libmeasurement_kit/net/buffer.hpp"
24 #include "src/libmeasurement_kit/net/error.hpp"
25 #include "src/libmeasurement_kit/net/transport.hpp"
26 #include "src/libmeasurement_kit/net/utils.hpp"
27 #include <measurement_kit/common/aaa_base.h>
28 #include <new>
29 #include <openssl/err.h>
30 #include <ostream>
31 #include <stdexcept>
32 #include <utility>
33 
34 extern "C" {
35 
36 static inline void handle_libevent_read(bufferevent *, void *);
37 static inline void handle_libevent_write(bufferevent *, void *);
38 static inline void handle_libevent_event(bufferevent *, short, void *);
39 
40 } // extern "C"
41 
42 namespace mk {
43 namespace net {
44 
map_bufferevent_event(short what)45 static inline std::string map_bufferevent_event(short what) {
46     std::stringstream ss;
47     ss << ((what & BEV_EVENT_EOF) ? "Z" : "z")
48        << ((what & BEV_EVENT_TIMEOUT) ? "T" : "t")
49        << ((what & BEV_EVENT_ERROR) ? "F" : "f")
50        << ((what & BEV_EVENT_READING) ? "R" : "r")
51        << ((what & BEV_EVENT_WRITING) ? "W" : "w");
52     return ss.str();
53 }
54 
55 class LibeventEmitter : public EmitterBase, public NonMovable, public NonCopyable {
56   public:
make(bufferevent * bev,SharedPtr<Reactor> reactor,SharedPtr<Logger> logger)57     static SharedPtr<Transport> make(bufferevent *bev, SharedPtr<Reactor> reactor,
58                                SharedPtr<Logger> logger) {
59         LibeventEmitter *conn = new LibeventEmitter(bev, reactor, logger);
60         conn->self = SharedPtr<Transport>(conn);
61         return conn->self;
62     }
63 
~LibeventEmitter()64     ~LibeventEmitter() override {
65         if (bev != nullptr) {
66             bufferevent_free(bev);
67         }
68     }
69 
get_bufferevent()70     bufferevent *get_bufferevent() override {
71         return bev;
72     }
73 
set_bufferevent(bufferevent * new_bev)74     void set_bufferevent(bufferevent *new_bev) override {
75         bev = new_bev;
76     }
77 
78   protected:
adjust_timeout(double timeout)79     void adjust_timeout(double timeout) override {
80         timeval tv, *tvp = mk::timeval_init(&tv, timeout);
81         bufferevent *underlying = bufferevent_get_underlying(this->bev);
82         if (underlying) {
83             // When we have a underlying bufferevent (i.e., a socket) set the
84             // timeout to it rather than to the outer buffer because we have
85             // seen running a long download that setting the timeout of the SSL
86             // bufferevent leads to interrupted download due to timeout.
87             if (bufferevent_set_timeouts(underlying, tvp, tvp) != 0) {
88                 throw std::runtime_error("cannot set timeout");
89             }
90             return;
91         }
92         if (bufferevent_set_timeouts(this->bev, tvp, tvp) != 0) {
93             throw std::runtime_error("cannot set timeout");
94         }
95     }
96 
start_writing()97     void start_writing() override {
98         output_buff >> bufferevent_get_output(bev);
99     }
100 
start_reading()101     void start_reading() override {
102         if (bufferevent_enable(this->bev, EV_READ) != 0) {
103             throw std::runtime_error("cannot enable read");
104         }
105     }
106 
stop_reading()107     void stop_reading() override {
108         if (bufferevent_disable(this->bev, EV_READ) != 0) {
109             throw std::runtime_error("cannot disable read");
110         }
111     }
112 
shutdown()113     void shutdown() override {
114         if (shutdown_called) {
115             return; // Just for extra safety
116         }
117         shutdown_called = true;
118         bufferevent_setcb(bev, nullptr, nullptr, nullptr, nullptr);
119         reactor->call_soon([=]() { this->self = nullptr; });
120     }
121 
sockname_peername_()122     template <decltype(getsockname) func> Endpoint sockname_peername_() {
123         // Assumption: in the common case this operation won't fail. When it
124         // fails, we'll just return an empty endpoint.
125         assert(bev != nullptr);
126         auto fd = bufferevent_getfd(bev);
127         if (fd == -1) {
128             logger->warn("connection: bufferevent attached to invalid socket");
129             return {};
130         }
131         sockaddr_storage ss{};
132         socklen_t sslen = sizeof(ss);
133         if (func(fd, (sockaddr *)&ss, &sslen) != 0) {
134             logger->warn("connection: cannot get socket name / peer name");
135             return {};
136         }
137         ErrorOr<Endpoint> epnt = endpoint_from_sockaddr_storage(&ss);
138         if (!epnt) {
139             logger->warn("connection: cannot get endpoint from "
140                          "sockaddr_storage structure");
141             return {};
142         }
143         return *epnt;
144     }
145 
sockname()146     Endpoint sockname() override { return sockname_peername_<::getsockname>(); }
147 
peername()148     Endpoint peername() override { return sockname_peername_<::getpeername>(); }
149 
150   public:
151     // They MUST be public because they're called by C code
152 
handle_event_(short what)153     void handle_event_(short what) {
154 
155         logger->debug("connection: got bufferevent event: %s",
156                       map_bufferevent_event(what).c_str());
157 
158         if ((what & BEV_EVENT_EOF) != 0) {
159             auto input = bufferevent_get_input(bev);
160             if (evbuffer_get_length(input) > 0) {
161                 logger->debug(
162                       "Suppress EOF with data lingering in input buffer");
163                 suppressed_eof = true;
164                 return;
165             }
166             emit_error(EofError());
167             return;
168         }
169 
170         if ((what & BEV_EVENT_TIMEOUT) != 0) {
171             emit_error(TimeoutError());
172             return;
173         }
174 
175 #ifdef _WIN32
176         Error sys_error = net::map_errno(WSAGetLastError());
177 #else
178         Error sys_error = net::map_errno(errno);
179 #endif
180 
181         if (sys_error == NoError()) {
182             unsigned long openssl_error;
183             char buff[128];
184             while ((openssl_error = bufferevent_get_openssl_error(bev)) != 0) {
185                 if (sys_error == NoError()) {
186                     sys_error = SslError();
187                 }
188                 ERR_error_string_n(openssl_error, buff, sizeof(buff));
189                 sys_error.add_child_error(SslError(buff));
190             }
191             if (sys_error != SslError()) {
192                 /*
193                  * This is the case of the SSL dirty shutdown. The connection
194                  * was not closed cleanly from the other end and in theory this
195                  * could also be the effect of an attack.
196                  */
197                 logger->warn("libevent has detected an SSL dirty shutdown");
198                 sys_error = SslDirtyShutdownError();
199             }
200         }
201 
202         logger->warn("Got error: %s", sys_error.what());
203         emit_error(sys_error);
204     }
205 
handle_read_()206     void handle_read_() {
207         Buffer buff(bufferevent_get_input(bev));
208         try {
209             emit_data(buff);
210         } catch (Error &error) {
211             emit_error(error);
212             return;
213         }
214         if (suppressed_eof) {
215             suppressed_eof = false;
216             logger->debug("Deliver previously suppressed EOF");
217             emit_error(EofError());
218             return;
219         }
220     }
221 
handle_write_()222     void handle_write_() {
223         try {
224             emit_flush();
225         } catch (Error &error) {
226             emit_error(error);
227         }
228     }
229 
230   private:
LibeventEmitter(bufferevent * bev,SharedPtr<Reactor> reactor,SharedPtr<Logger> logger)231     LibeventEmitter(bufferevent *bev, SharedPtr<Reactor> reactor, SharedPtr<Logger> logger)
232         : EmitterBase{reactor, logger} {
233 
234         this->bev = bev;
235 
236         // The following makes this non copyable and non movable.
237         bufferevent_setcb(this->bev, handle_libevent_read,
238                           handle_libevent_write, handle_libevent_event, this);
239     }
240 
241     bufferevent *bev = nullptr;
242     SharedPtr<Transport> self;
243     Callback<> close_cb;
244     bool suppressed_eof = false;
245     bool shutdown_called = false;
246 };
247 
248 } // namespace net
249 } // namespace mk
250 #endif
251 
252 extern "C" {
253 
handle_libevent_read(bufferevent *,void * opaque)254 static inline void handle_libevent_read(bufferevent *, void *opaque) {
255     static_cast<mk::net::LibeventEmitter *>(opaque)->handle_read_();
256 }
257 
handle_libevent_write(bufferevent *,void * opaque)258 static inline void handle_libevent_write(bufferevent *, void *opaque) {
259     static_cast<mk::net::LibeventEmitter *>(opaque)->handle_write_();
260 }
261 
handle_libevent_event(bufferevent *,short what,void * opaque)262 static inline void handle_libevent_event(bufferevent *, short what,
263                                          void *opaque) {
264     static_cast<mk::net::LibeventEmitter *>(opaque)->handle_event_(what);
265 }
266 
267 } // extern "C"
268