1 /*
2 * Copyright (C) 2017-2019 Savoir-faire Linux Inc.
3 * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
4 * Adrien Béraud <adrien.beraud@savoirfairelinux.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20 #include "dht_proxy_server.h"
21
22 #include "thread_pool.h"
23 #include "default_types.h"
24 #include "dhtrunner.h"
25
26 #include <msgpack.hpp>
27 #include <json/json.h>
28
29 #include <chrono>
30 #include <functional>
31 #include <limits>
32 #include <iostream>
33
34 using namespace std::placeholders;
35
36 namespace dht {
37
38 struct DhtProxyServer::PermanentPut {
39 time_point expiration;
40 std::string pushToken;
41 std::string clientId;
42 Sp<Scheduler::Job> expireJob;
43 Sp<Scheduler::Job> expireNotifyJob;
44 };
45 struct DhtProxyServer::SearchPuts {
46 std::map<dht::Value::Id, PermanentPut> puts;
47 };
48
49 constexpr const std::chrono::minutes PRINT_STATS_PERIOD {2};
50 constexpr const size_t IO_THREADS_MAX {64};
51
52
DhtProxyServer(std::shared_ptr<DhtRunner> dht,in_port_t port,const std::string & pushServer)53 DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port , const std::string& pushServer)
54 : dht_(dht), threadPool_(new ThreadPool(IO_THREADS_MAX)), pushServer_(pushServer)
55 {
56 if (not dht_)
57 throw std::invalid_argument("A DHT instance must be provided");
58 // NOTE in c++14, use make_unique
59 service_ = std::unique_ptr<restbed::Service>(new restbed::Service());
60
61 std::cout << "Running DHT proxy server on port " << port << std::endl;
62 if (not pushServer.empty()) {
63 #ifdef OPENDHT_PUSH_NOTIFICATIONS
64 std::cout << "Using push notification server: " << pushServer << std::endl;
65 #else
66 std::cerr << "Push server defined but built OpenDHT built without push notification support" << std::endl;
67 #endif
68 }
69
70 jsonBuilder_["commentStyle"] = "None";
71 jsonBuilder_["indentation"] = "";
72
73 server_thread = std::thread([this, port]() {
74 // Create endpoints
75 auto resource = std::make_shared<restbed::Resource>();
76 resource->set_path("/");
77 resource->set_method_handler("GET", std::bind(&DhtProxyServer::getNodeInfo, this, _1));
78 resource->set_method_handler("STATS", std::bind(&DhtProxyServer::getStats, this, _1));
79 service_->publish(resource);
80 resource = std::make_shared<restbed::Resource>();
81 resource->set_path("/{hash: .*}");
82 resource->set_method_handler("GET", std::bind(&DhtProxyServer::get, this, _1));
83 resource->set_method_handler("LISTEN", [this](const Sp<restbed::Session>& session) mutable { listen(session); } );
84 #ifdef OPENDHT_PUSH_NOTIFICATIONS
85 resource->set_method_handler("SUBSCRIBE", [this](const Sp<restbed::Session>& session) mutable { subscribe(session); } );
86 resource->set_method_handler("UNSUBSCRIBE", [this](const Sp<restbed::Session>& session) mutable { unsubscribe(session); } );
87 #endif //OPENDHT_PUSH_NOTIFICATIONS
88 resource->set_method_handler("POST", [this](const Sp<restbed::Session>& session) mutable { put(session); });
89 #ifdef OPENDHT_PROXY_SERVER_IDENTITY
90 resource->set_method_handler("SIGN", std::bind(&DhtProxyServer::putSigned, this, _1));
91 resource->set_method_handler("ENCRYPT", std::bind(&DhtProxyServer::putEncrypted, this, _1));
92 #endif // OPENDHT_PROXY_SERVER_IDENTITY
93 resource->set_method_handler("OPTIONS", std::bind(&DhtProxyServer::handleOptionsMethod, this, _1));
94 service_->publish(resource);
95 resource = std::make_shared<restbed::Resource>();
96 resource->set_path("/{hash: .*}/{value: .*}");
97 resource->set_method_handler("GET", std::bind(&DhtProxyServer::getFiltered, this, _1));
98 service_->publish(resource);
99
100 // Start server
101 auto settings = std::make_shared<restbed::Settings>();
102 settings->set_default_header("Content-Type", "application/json");
103 settings->set_default_header("Connection", "keep-alive");
104 settings->set_default_header("Access-Control-Allow-Origin", "*");
105 std::chrono::milliseconds timeout(std::numeric_limits<int>::max());
106 settings->set_connection_timeout(timeout); // there is a timeout, but really huge
107 settings->set_port(port);
108 auto maxThreads = std::thread::hardware_concurrency() - 1;
109 settings->set_worker_limit(maxThreads > 1 ? maxThreads : 1);
110 lastStatsReset_ = clock::now();
111 try {
112 service_->start(settings);
113 } catch(std::system_error& e) {
114 std::cerr << "Error running server on port " << port << ": " << e.what() << std::endl;
115 }
116 });
117
118 listenThread_ = std::thread([this]() {
119 while (not service_->is_up() and not stopListeners) {
120 std::this_thread::sleep_for(std::chrono::seconds(1));
121 }
122 while (service_->is_up() and not stopListeners) {
123 removeClosedListeners();
124 std::this_thread::sleep_for(std::chrono::seconds(1));
125 }
126 // Remove last listeners
127 removeClosedListeners(false);
128 });
129 schedulerThread_ = std::thread([this]() {
130 while (not service_->is_up() and not stopListeners) {
131 std::this_thread::sleep_for(std::chrono::seconds(1));
132 }
133 while (service_->is_up() and not stopListeners) {
134 std::unique_lock<std::mutex> lock(schedulerLock_);
135 auto next = scheduler_.run();
136 if (next == time_point::max())
137 schedulerCv_.wait(lock);
138 else
139 schedulerCv_.wait_until(lock, next);
140 }
141 });
142 dht->forwardAllMessages(true);
143 printStatsJob_ = scheduler_.add(scheduler_.time() + PRINT_STATS_PERIOD, [this] {
144 if (stopListeners) return;
145 if (service_->is_up())
146 updateStats();
147 // Refresh stats cache
148 auto newInfo = dht_->getNodeInfo();
149 {
150 std::lock_guard<std::mutex> lck(statsMutex_);
151 nodeInfo_ = std::move(newInfo);
152 }
153 scheduler_.edit(printStatsJob_, scheduler_.time() + PRINT_STATS_PERIOD);
154 });
155 }
156
~DhtProxyServer()157 DhtProxyServer::~DhtProxyServer()
158 {
159 stop();
160 }
161
162 void
stop()163 DhtProxyServer::stop()
164 {
165 if (printStatsJob_)
166 printStatsJob_->cancel();
167 service_->stop();
168 {
169 std::lock_guard<std::mutex> lock(lockListener_);
170 auto listener = currentListeners_.begin();
171 while (listener != currentListeners_.end()) {
172 listener->session->close();
173 ++listener;
174 }
175 }
176 stopListeners = true;
177 schedulerCv_.notify_all();
178 // listenThreads_ will stop because there is no more sessions
179 if (listenThread_.joinable())
180 listenThread_.join();
181 if (schedulerThread_.joinable())
182 schedulerThread_.join();
183 if (server_thread.joinable())
184 server_thread.join();
185 threadPool_->stop();
186 }
187
188 void
updateStats() const189 DhtProxyServer::updateStats() const
190 {
191 auto now = clock::now();
192 auto last = lastStatsReset_.exchange(now);
193 auto count = requestNum_.exchange(0);
194 auto dt = std::chrono::duration<double>(now - last);
195 stats_.requestRate = count / dt.count();
196 #ifdef OPENDHT_PUSH_NOTIFICATIONS
197 stats_.pushListenersCount = pushListeners_.size();
198 #endif
199 stats_.putCount = puts_.size();
200 stats_.listenCount = currentListeners_.size();
201 stats_.nodeInfo = nodeInfo_;
202 }
203
204 void
getNodeInfo(const Sp<restbed::Session> & session) const205 DhtProxyServer::getNodeInfo(const Sp<restbed::Session>& session) const
206 {
207 requestNum_++;
208 const auto request = session->get_request();
209 int content_length = std::stoi(request->get_header("Content-Length", "0"));
210 session->fetch(content_length,
211 [this](const Sp<restbed::Session>& s, const restbed::Bytes& /*b*/) mutable
212 {
213 try {
214 if (dht_) {
215 Json::Value result;
216 {
217 std::lock_guard<std::mutex> lck(statsMutex_);
218 if (nodeInfo_.ipv4.good_nodes == 0 && nodeInfo_.ipv6.good_nodes == 0) {
219 // NOTE: we want to avoid the disconnected state as much as possible
220 // So, if the node is disconnected, we should force the update of the cache
221 // and reconnect as soon as possible
222 // This should not happen much
223 nodeInfo_ = dht_->getNodeInfo();
224 }
225 result = nodeInfo_.toJson();
226 }
227 result["public_ip"] = s->get_origin(); // [ipv6:ipv4]:port or ipv4:port
228 auto output = Json::writeString(jsonBuilder_, result) + "\n";
229 s->close(restbed::OK, output);
230 }
231 else
232 s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}");
233 } catch (...) {
234 s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}");
235 }
236 }
237 );
238 }
239
240 void
getStats(const Sp<restbed::Session> & session) const241 DhtProxyServer::getStats(const Sp<restbed::Session>& session) const
242 {
243 requestNum_++;
244 const auto request = session->get_request();
245 int content_length = std::stoi(request->get_header("Content-Length", "0"));
246 session->fetch(content_length,
247 [this](const Sp<restbed::Session>& s, const restbed::Bytes& /*b*/) mutable
248 {
249 try {
250 if (dht_) {
251 #ifdef OPENDHT_JSONCPP
252 auto output = Json::writeString(jsonBuilder_, stats_.toJson()) + "\n";
253 s->close(restbed::OK, output);
254 #else
255 s->close(restbed::NotFound, "{\"err\":\"JSON not enabled on this instance\"}");
256 #endif
257 }
258 else
259 s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}");
260 } catch (...) {
261 s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}");
262 }
263 }
264 );
265 }
266
267 void
get(const Sp<restbed::Session> & session) const268 DhtProxyServer::get(const Sp<restbed::Session>& session) const
269 {
270 requestNum_++;
271 const auto request = session->get_request();
272 int content_length = std::stoi(request->get_header("Content-Length", "0"));
273 auto hash = request->get_path_parameter("hash");
274 session->fetch(content_length,
275 [=](const Sp<restbed::Session>& s, const restbed::Bytes& /*b* */)
276 {
277 try {
278 if (dht_) {
279 InfoHash infoHash(hash);
280 if (!infoHash) {
281 infoHash = InfoHash::get(hash);
282 }
283 s->yield(restbed::OK, "", [=](const Sp<restbed::Session>&) {});
284 dht_->get(infoHash, [this,s](const Sp<Value>& value) {
285 if (s->is_closed()) return false;
286 // Send values as soon as we get them
287 auto output = Json::writeString(jsonBuilder_, value->toJson()) + "\n";
288 s->yield(output, [](const Sp<restbed::Session>& /*session*/){ });
289 return true;
290 }, [s](bool /*ok* */) {
291 // Communication is finished
292 if (not s->is_closed()) {
293 s->close();
294 }
295 });
296 } else {
297 s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}");
298 }
299 } catch (...) {
300 s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}");
301 }
302 }
303 );
304 }
305
306 void
listen(const Sp<restbed::Session> & session)307 DhtProxyServer::listen(const Sp<restbed::Session>& session)
308 {
309 requestNum_++;
310 const auto request = session->get_request();
311 int content_length = std::stoi(request->get_header("Content-Length", "0"));
312 auto hash = request->get_path_parameter("hash");
313 InfoHash infoHash(hash);
314 if (!infoHash)
315 infoHash = InfoHash::get(hash);
316 session->fetch(content_length,
317 [=](const Sp<restbed::Session>& s, const restbed::Bytes& /*b* */)
318 {
319 try {
320 if (dht_) {
321 InfoHash infoHash(hash);
322 if (!infoHash) {
323 infoHash = InfoHash::get(hash);
324 }
325 s->yield(restbed::OK);
326 // Handle client deconnection
327 // NOTE: for now, there is no handler, so we test the session in a thread
328 // will be the case in restbed 5.0
329 SessionToHashToken listener;
330 listener.session = session;
331 listener.hash = infoHash;
332 // cache the session to avoid an incrementation of the shared_ptr's counter
333 // else, the session->close() will not close the socket.
334 auto cacheSession = std::weak_ptr<restbed::Session>(s);
335 listener.token = dht_->listen(infoHash, [this,cacheSession](const std::vector<Sp<Value>>& values, bool expired) {
336 auto s = cacheSession.lock();
337 if (!s) return false;
338 // Send values as soon as we get them
339 if (!s->is_closed()) {
340 for (const auto& value : values) {
341 auto val = value->toJson();
342 if (expired)
343 val["expired"] = true;
344 auto output = Json::writeString(jsonBuilder_, val) + "\n";
345 s->yield(output, [](const Sp<restbed::Session>&){ });
346 }
347 }
348 return !s->is_closed();
349 });
350 {
351 std::lock_guard<std::mutex> lock(lockListener_);
352 currentListeners_.emplace_back(std::move(listener));
353 }
354 } else {
355 session->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}");
356 }
357 } catch (...) {
358 s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}");
359 }
360 }
361 );
362 }
363
364 #ifdef OPENDHT_PUSH_NOTIFICATIONS
365
366 struct DhtProxyServer::Listener {
367 std::string clientId;
368 std::future<size_t> internalToken;
369 Sp<Scheduler::Job> expireJob;
370 Sp<Scheduler::Job> expireNotifyJob;
371 };
372 struct DhtProxyServer::PushListener {
373 std::map<InfoHash, std::vector<Listener>> listeners;
374 bool isAndroid;
375 };
376
377 void
subscribe(const std::shared_ptr<restbed::Session> & session)378 DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session)
379 {
380 requestNum_++;
381 const auto request = session->get_request();
382 int content_length = std::stoi(request->get_header("Content-Length", "0"));
383 auto hash = request->get_path_parameter("hash");
384 InfoHash infoHash(hash);
385 if (!infoHash)
386 infoHash = InfoHash::get(hash);
387 session->fetch(content_length,
388 [=](const std::shared_ptr<restbed::Session> s, const restbed::Bytes& b) mutable
389 {
390 try {
391 std::string err;
392 Json::Value root;
393 Json::CharReaderBuilder rbuilder;
394 auto* char_data = reinterpret_cast<const char*>(b.data());
395 auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
396 if (!reader->parse(char_data, char_data + b.size(), &root, &err)) {
397 s->close(restbed::BAD_REQUEST, "{\"err\":\"Incorrect JSON\"}");
398 return;
399 }
400 auto pushToken = root["key"].asString();
401 if (pushToken.empty()) {
402 s->close(restbed::BAD_REQUEST, "{\"err\":\"No token\"}");
403 return;
404 }
405 auto platform = root["platform"].asString();
406 auto isAndroid = platform == "android";
407 auto clientId = root.isMember("client_id") ? root["client_id"].asString() : std::string();
408
409 std::cout << "Subscribe " << infoHash << " client:" << clientId << std::endl;
410
411 {
412 std::lock(schedulerLock_, lockListener_);
413 std::lock_guard<std::mutex> lk1(lockListener_, std::adopt_lock);
414 std::lock_guard<std::mutex> lk2(schedulerLock_, std::adopt_lock);
415 scheduler_.syncTime();
416 auto timeout = scheduler_.time() + proxy::OP_TIMEOUT;
417 // Check if listener is already present and refresh timeout if launched
418 // One push listener per pushToken.infoHash.clientId
419 auto pushListener = pushListeners_.emplace(pushToken, PushListener{}).first;
420 auto listeners = pushListener->second.listeners.emplace(infoHash, std::vector<Listener>{}).first;
421 for (auto& listener: listeners->second) {
422 if (listener.clientId == clientId) {
423 scheduler_.edit(listener.expireJob, timeout);
424 scheduler_.edit(listener.expireNotifyJob, timeout - proxy::OP_MARGIN);
425 s->yield(restbed::OK);
426
427 if (!root.isMember("refresh") or !root["refresh"].asBool()) {
428 dht_->get(
429 infoHash,
430 [this, s](const Sp<Value> &value) {
431 if (s->is_closed())
432 return false;
433 // Send values as soon as we get them
434 auto output = Json::writeString(jsonBuilder_, value->toJson()) + "\n";
435 s->yield(output, [](const Sp<restbed::Session>
436 & /*session*/) {});
437 return true;
438 },
439 [s](bool /*ok* */) {
440 // Communication is finished
441 if (not s->is_closed()) {
442 s->close("{}\n");
443 }
444 });
445 } else {
446 // Communication is finished
447 if (not s->is_closed()) {
448 s->close("{}\n");
449 }
450 }
451 schedulerCv_.notify_one();
452 return;
453 }
454 }
455 listeners->second.emplace_back(Listener{});
456 auto& listener = listeners->second.back();
457 listener.clientId = clientId;
458
459 // New listener
460 pushListener->second.isAndroid = isAndroid;
461
462 // The listener is not found, so add it.
463 listener.internalToken = dht_->listen(infoHash,
464 [this, infoHash, pushToken, isAndroid, clientId](const std::vector<std::shared_ptr<Value>>& values, bool expired) {
465 threadPool_->run([this, infoHash, pushToken, isAndroid, clientId, values, expired]() {
466 // Build message content
467 Json::Value json;
468 json["key"] = infoHash.toString();
469 json["to"] = clientId;
470 if (expired and values.size() < 3) {
471 std::stringstream ss;
472 for(size_t i = 0; i < values.size(); ++i) {
473 if(i != 0) ss << ",";
474 ss << values[i]->id;
475 }
476 json["exp"] = ss.str();
477 }
478 sendPushNotification(pushToken, std::move(json), isAndroid);
479 });
480 return true;
481 }
482 );
483 listener.expireJob = scheduler_.add(timeout,
484 [this, clientId, infoHash, pushToken] {
485 cancelPushListen(pushToken, infoHash, clientId);
486 }
487 );
488 listener.expireNotifyJob = scheduler_.add(timeout - proxy::OP_MARGIN,
489 [this, infoHash, pushToken, isAndroid, clientId] {
490 std::cout << "Listener: sending refresh " << infoHash << std::endl;
491 Json::Value json;
492 json["timeout"] = infoHash.toString();
493 json["to"] = clientId;
494 sendPushNotification(pushToken, std::move(json), isAndroid);
495 }
496 );
497 }
498 schedulerCv_.notify_one();
499 s->close(restbed::OK, "{}\n");
500 } catch (...) {
501 s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}");
502 }
503 }
504 );
505 }
506
507 void
unsubscribe(const std::shared_ptr<restbed::Session> & session)508 DhtProxyServer::unsubscribe(const std::shared_ptr<restbed::Session>& session)
509 {
510 requestNum_++;
511 const auto request = session->get_request();
512 int content_length = std::stoi(request->get_header("Content-Length", "0"));
513 auto hash = request->get_path_parameter("hash");
514 InfoHash infoHash(hash);
515 if (!infoHash)
516 infoHash = InfoHash::get(hash);
517 session->fetch(content_length,
518 [=](const std::shared_ptr<restbed::Session> s, const restbed::Bytes& b)
519 {
520 try {
521 std::string err;
522 Json::Value root;
523 Json::CharReaderBuilder rbuilder;
524 auto* char_data = reinterpret_cast<const char*>(b.data());
525 auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
526 if (!reader->parse(char_data, char_data + b.size(), &root, &err)) {
527 s->close(restbed::BAD_REQUEST, "{\"err\":\"Incorrect JSON\"}");
528 return;
529 }
530 auto pushToken = root["key"].asString();
531 if (pushToken.empty()) return;
532 auto clientId = root["client_id"].asString();
533
534 cancelPushListen(pushToken, infoHash, clientId);
535 s->close(restbed::OK);
536 } catch (...) {
537 s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}");
538 }
539 }
540 );
541 }
542
543 void
cancelPushListen(const std::string & pushToken,const dht::InfoHash & key,const std::string & clientId)544 DhtProxyServer::cancelPushListen(const std::string& pushToken, const dht::InfoHash& key, const std::string& clientId)
545 {
546 std::cout << "cancelPushListen: " << key << " clientId:" << clientId << std::endl;
547 std::lock_guard<std::mutex> lock(lockListener_);
548 auto pushListener = pushListeners_.find(pushToken);
549 if (pushListener == pushListeners_.end())
550 return;
551 auto listeners = pushListener->second.listeners.find(key);
552 if (listeners == pushListener->second.listeners.end())
553 return;
554 for (auto listener = listeners->second.begin(); listener != listeners->second.end();) {
555 if (listener->clientId == clientId) {
556 if (dht_)
557 dht_->cancelListen(key, std::move(listener->internalToken));
558 listener = listeners->second.erase(listener);
559 } else {
560 ++listener;
561 }
562 }
563 if (listeners->second.empty()) {
564 pushListener->second.listeners.erase(listeners);
565 }
566 if (pushListener->second.listeners.empty()) {
567 pushListeners_.erase(pushListener);
568 }
569 }
570
571 void
sendPushNotification(const std::string & token,Json::Value && json,bool isAndroid) const572 DhtProxyServer::sendPushNotification(const std::string& token, Json::Value&& json, bool isAndroid) const
573 {
574 if (pushServer_.empty())
575 return;
576 restbed::Uri uri(proxy::HTTP_PROTO + pushServer_ + "/api/push");
577 auto req = std::make_shared<restbed::Request>(uri);
578 req->set_method("POST");
579
580 // NOTE: see https://github.com/appleboy/gorush
581 Json::Value notification(Json::objectValue);
582 Json::Value tokens(Json::arrayValue);
583 tokens[0] = token;
584 notification["tokens"] = std::move(tokens);
585 notification["platform"] = isAndroid ? 2 : 1;
586 notification["data"] = std::move(json);
587 notification["priority"] = "high";
588 notification["time_to_live"] = 600;
589
590 Json::Value notifications(Json::arrayValue);
591 notifications[0] = notification;
592
593 Json::Value content;
594 content["notifications"] = std::move(notifications);
595
596 Json::StreamWriterBuilder wbuilder;
597 wbuilder["commentStyle"] = "None";
598 wbuilder["indentation"] = "";
599 auto valueStr = Json::writeString(wbuilder, content);
600
601 req->set_header("Content-Type", "application/json");
602 req->set_header("Accept", "*/*");
603 req->set_header("Host", pushServer_);
604 req->set_header("Content-Length", std::to_string(valueStr.length()));
605 req->set_body(valueStr);
606
607 // Send request.
608 restbed::Http::async(req, {});
609 }
610
611 #endif //OPENDHT_PUSH_NOTIFICATIONS
612
613 void
cancelPut(const InfoHash & key,Value::Id vid)614 DhtProxyServer::cancelPut(const InfoHash& key, Value::Id vid)
615 {
616 std::cout << "cancelPut " << key << " " << vid << std::endl;
617 auto sPuts = puts_.find(key);
618 if (sPuts == puts_.end())
619 return;
620 auto& sPutsMap = sPuts->second.puts;
621 auto put = sPutsMap.find(vid);
622 if (put == sPutsMap.end())
623 return;
624 if (dht_)
625 dht_->cancelPut(key, vid);
626 if (put->second.expireNotifyJob)
627 put->second.expireNotifyJob->cancel();
628 sPutsMap.erase(put);
629 if (sPutsMap.empty())
630 puts_.erase(sPuts);
631 }
632
633 void
put(const std::shared_ptr<restbed::Session> & session)634 DhtProxyServer::put(const std::shared_ptr<restbed::Session>& session)
635 {
636 requestNum_++;
637 const auto request = session->get_request();
638 int content_length = std::stoi(request->get_header("Content-Length", "0"));
639 auto hash = request->get_path_parameter("hash");
640 InfoHash infoHash(hash);
641 if (!infoHash)
642 infoHash = InfoHash::get(hash);
643
644 session->fetch(content_length,
645 [=](const std::shared_ptr<restbed::Session> s, const restbed::Bytes& b)
646 {
647 try {
648 if (dht_) {
649 if(b.empty()) {
650 std::string response("{\"err\":\"Missing parameters\"}");
651 s->close(restbed::BAD_REQUEST, response);
652 } else {
653 std::string err;
654 Json::Value root;
655 Json::CharReaderBuilder rbuilder;
656 auto* char_data = reinterpret_cast<const char*>(b.data());
657 auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
658 if (reader->parse(char_data, char_data + b.size(), &root, &err)) {
659 // Build the Value from json
660 auto value = std::make_shared<Value>(root);
661 bool permanent = root.isMember("permanent");
662 std::cout << "Got put " << infoHash << " " << *value << " " << (permanent ? "permanent" : "") << std::endl;
663
664 if (permanent) {
665 std::string pushToken, clientId, platform;
666 auto& pVal = root["permanent"];
667 if (pVal.isObject()) {
668 pushToken = pVal["key"].asString();
669 clientId = pVal["client_id"].asString();
670 platform = pVal["platform"].asString();
671 }
672 std::unique_lock<std::mutex> lock(schedulerLock_);
673 scheduler_.syncTime();
674 auto timeout = scheduler_.time() + proxy::OP_TIMEOUT;
675 auto vid = value->id;
676 auto sPuts = puts_.emplace(infoHash, SearchPuts{}).first;
677 auto r = sPuts->second.puts.emplace(vid, PermanentPut{});
678 auto& pput = r.first->second;
679 if (r.second) {
680 pput.expireJob = scheduler_.add(timeout, [this, infoHash, vid]{
681 std::cout << "Permanent put expired: " << infoHash << " " << vid << std::endl;
682 cancelPut(infoHash, vid);
683 });
684 #ifdef OPENDHT_PUSH_NOTIFICATIONS
685 if (not pushToken.empty()) {
686 bool isAndroid = platform == "android";
687 pput.expireNotifyJob = scheduler_.add(timeout - proxy::OP_MARGIN,
688 [this, infoHash, vid, pushToken, clientId, isAndroid]
689 {
690 std::cout << "Permanent put refresh: " << infoHash << " " << vid << std::endl;
691 Json::Value json;
692 json["timeout"] = infoHash.toString();
693 json["to"] = clientId;
694 json["vid"] = std::to_string(vid);
695 sendPushNotification(pushToken, std::move(json), isAndroid);
696 });
697 }
698 #endif
699 } else {
700 scheduler_.edit(pput.expireJob, timeout);
701 if (pput.expireNotifyJob)
702 scheduler_.edit(pput.expireNotifyJob, timeout - proxy::OP_MARGIN);
703 }
704 lock.unlock();
705 schedulerCv_.notify_one();
706 }
707
708 dht_->put(infoHash, value, [s, value](bool ok) {
709 if (ok) {
710 Json::StreamWriterBuilder wbuilder;
711 wbuilder["commentStyle"] = "None";
712 wbuilder["indentation"] = "";
713 if (s->is_open())
714 s->close(restbed::OK, Json::writeString(wbuilder, value->toJson()) + "\n");
715 } else {
716 if (s->is_open())
717 s->close(restbed::BAD_GATEWAY, "{\"err\":\"put failed\"}");
718 }
719 }, time_point::max(), permanent);
720 } else {
721 s->close(restbed::BAD_REQUEST, "{\"err\":\"Incorrect JSON\"}");
722 }
723 }
724 } else {
725 s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}");
726 }
727 } catch (const std::exception& e) {
728 std::cout << "Error performing put: " << e.what() << std::endl;
729 s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}");
730 }
731 }
732 );
733 }
734
735 #ifdef OPENDHT_PROXY_SERVER_IDENTITY
736 void
putSigned(const std::shared_ptr<restbed::Session> & session) const737 DhtProxyServer::putSigned(const std::shared_ptr<restbed::Session>& session) const
738 {
739 requestNum_++;
740 const auto request = session->get_request();
741 int content_length = std::stoi(request->get_header("Content-Length", "0"));
742 auto hash = request->get_path_parameter("hash");
743 InfoHash infoHash(hash);
744 if (!infoHash)
745 infoHash = InfoHash::get(hash);
746
747 session->fetch(content_length,
748 [=](const std::shared_ptr<restbed::Session> s, const restbed::Bytes& b)
749 {
750 try {
751 if (dht_) {
752 if(b.empty()) {
753 std::string response("{\"err\":\"Missing parameters\"}");
754 s->close(restbed::BAD_REQUEST, response);
755 } else {
756 std::string err;
757 Json::Value root;
758 Json::CharReaderBuilder rbuilder;
759 auto* char_data = reinterpret_cast<const char*>(b.data());
760 auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
761 if (reader->parse(char_data, char_data + b.size(), &root, &err)) {
762 auto value = std::make_shared<Value>(root);
763
764 Json::StreamWriterBuilder wbuilder;
765 wbuilder["commentStyle"] = "None";
766 wbuilder["indentation"] = "";
767 auto output = Json::writeString(wbuilder, value->toJson()) + "\n";
768 dht_->putSigned(infoHash, value);
769 s->close(restbed::OK, output);
770 } else {
771 s->close(restbed::BAD_REQUEST, "{\"err\":\"Incorrect JSON\"}");
772 }
773 }
774 } else {
775 s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}");
776 }
777 } catch (...) {
778 s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}");
779 }
780 }
781 );
782 }
783
784 void
putEncrypted(const std::shared_ptr<restbed::Session> & session) const785 DhtProxyServer::putEncrypted(const std::shared_ptr<restbed::Session>& session) const
786 {
787 requestNum_++;
788 const auto request = session->get_request();
789 int content_length = std::stoi(request->get_header("Content-Length", "0"));
790 auto hash = request->get_path_parameter("hash");
791 InfoHash key(hash);
792 if (!key)
793 key = InfoHash::get(hash);
794
795 session->fetch(content_length,
796 [=](const std::shared_ptr<restbed::Session> s, const restbed::Bytes& b)
797 {
798 try {
799 if (dht_) {
800 if(b.empty()) {
801 std::string response("{\"err\":\"Missing parameters\"}");
802 s->close(restbed::BAD_REQUEST, response);
803 } else {
804 std::string err;
805 Json::Value root;
806 Json::CharReaderBuilder rbuilder;
807 auto* char_data = reinterpret_cast<const char*>(b.data());
808 auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
809 bool parsingSuccessful = reader->parse(char_data, char_data + b.size(), &root, &err);
810 InfoHash to(root["to"].asString());
811 if (parsingSuccessful && to) {
812 auto value = std::make_shared<Value>(root);
813 Json::StreamWriterBuilder wbuilder;
814 wbuilder["commentStyle"] = "None";
815 wbuilder["indentation"] = "";
816 auto output = Json::writeString(wbuilder, value->toJson()) + "\n";
817 dht_->putEncrypted(key, to, value);
818 s->close(restbed::OK, output);
819 } else {
820 if(!parsingSuccessful)
821 s->close(restbed::BAD_REQUEST, "{\"err\":\"Incorrect JSON\"}");
822 else
823 s->close(restbed::BAD_REQUEST, "{\"err\":\"No destination found\"}");
824 }
825 }
826 } else {
827 s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}");
828 }
829 } catch (...) {
830 s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}");
831 }
832 }
833 );
834 }
835 #endif // OPENDHT_PROXY_SERVER_IDENTITY
836
837 void
handleOptionsMethod(const std::shared_ptr<restbed::Session> & session) const838 DhtProxyServer::handleOptionsMethod(const std::shared_ptr<restbed::Session>& session) const
839 {
840 requestNum_++;
841 #ifdef OPENDHT_PROXY_SERVER_IDENTITY
842 const auto allowed = "OPTIONS, GET, POST, LISTEN, SIGN, ENCRYPT";
843 #else
844 const auto allowed = "OPTIONS, GET, POST, LISTEN";
845 #endif //OPENDHT_PROXY_SERVER_IDENTITY
846 session->close(restbed::OK, {{"Access-Control-Allow-Methods", allowed},
847 {"Access-Control-Allow-Headers", "content-type"},
848 {"Access-Control-Max-Age", "86400"}});
849 }
850
851 void
getFiltered(const std::shared_ptr<restbed::Session> & session) const852 DhtProxyServer::getFiltered(const std::shared_ptr<restbed::Session>& session) const
853 {
854 requestNum_++;
855 const auto request = session->get_request();
856 int content_length = std::stoi(request->get_header("Content-Length", "0"));
857 auto hash = request->get_path_parameter("hash");
858 auto value = request->get_path_parameter("value");
859 session->fetch(content_length,
860 [=](const std::shared_ptr<restbed::Session> s, const restbed::Bytes& /*b* */)
861 {
862 try {
863 if (dht_) {
864 InfoHash infoHash(hash);
865 if (!infoHash) {
866 infoHash = InfoHash::get(hash);
867 }
868 s->yield(restbed::OK, "", [=]( const std::shared_ptr< restbed::Session > s) {
869 dht_->get(infoHash, [s](std::shared_ptr<Value> v) {
870 // Send values as soon as we get them
871 Json::StreamWriterBuilder wbuilder;
872 wbuilder["commentStyle"] = "None";
873 wbuilder["indentation"] = "";
874 auto output = Json::writeString(wbuilder, v->toJson()) + "\n";
875 s->yield(output, [](const std::shared_ptr<restbed::Session> /*session*/){ });
876 return true;
877 }, [s](bool /*ok* */) {
878 // Communication is finished
879 s->close();
880 }, {}, value);
881 });
882 } else {
883 s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}");
884 }
885 } catch (...) {
886 s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}");
887 }
888 }
889 );
890 }
891
892 void
removeClosedListeners(bool testSession)893 DhtProxyServer::removeClosedListeners(bool testSession)
894 {
895 // clean useless listeners
896 std::lock_guard<std::mutex> lock(lockListener_);
897 auto listener = currentListeners_.begin();
898 while (listener != currentListeners_.end()) {
899 auto cancel = dht_ and (not testSession or listener->session->is_closed());
900 if (cancel) {
901 dht_->cancelListen(listener->hash, std::move(listener->token));
902 // Remove listener if unused
903 listener = currentListeners_.erase(listener);
904 } else {
905 ++listener;
906 }
907 }
908 }
909
910 }
911