1 /*
2  *  Copyright (C) 2017-2019 Savoir-faire Linux Inc.
3  *  Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
4  *          Adrien Béraud <adrien.beraud@savoirfairelinux.com>
5  *
6  *  This program is free software; you can redistribute it and/or modify
7  *  it under the terms of the GNU General Public License as published by
8  *  the Free Software Foundation; either version 3 of the License, or
9  *  (at your option) any later version.
10  *
11  *  This program is distributed in the hope that it will be useful,
12  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *  GNU General Public License for more details.
15  *
16  *  You should have received a copy of the GNU General Public License
17  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
18  */
19 
20 #include "dht_proxy_server.h"
21 
22 #include "thread_pool.h"
23 #include "default_types.h"
24 #include "dhtrunner.h"
25 
26 #include <msgpack.hpp>
27 #include <json/json.h>
28 
29 #include <chrono>
30 #include <functional>
31 #include <limits>
32 #include <iostream>
33 
34 using namespace std::placeholders;
35 
36 namespace dht {
37 
38 struct DhtProxyServer::PermanentPut {
39     time_point expiration;
40     std::string pushToken;
41     std::string clientId;
42     Sp<Scheduler::Job> expireJob;
43     Sp<Scheduler::Job> expireNotifyJob;
44 };
45 struct DhtProxyServer::SearchPuts {
46     std::map<dht::Value::Id, PermanentPut> puts;
47 };
48 
49 constexpr const std::chrono::minutes PRINT_STATS_PERIOD {2};
50 constexpr const size_t IO_THREADS_MAX {64};
51 
52 
DhtProxyServer(std::shared_ptr<DhtRunner> dht,in_port_t port,const std::string & pushServer)53 DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port , const std::string& pushServer)
54 : dht_(dht), threadPool_(new ThreadPool(IO_THREADS_MAX)), pushServer_(pushServer)
55 {
56     if (not dht_)
57         throw std::invalid_argument("A DHT instance must be provided");
58     // NOTE in c++14, use make_unique
59     service_ = std::unique_ptr<restbed::Service>(new restbed::Service());
60 
61     std::cout << "Running DHT proxy server on port " << port << std::endl;
62     if (not pushServer.empty()) {
63 #ifdef OPENDHT_PUSH_NOTIFICATIONS
64         std::cout << "Using push notification server: " << pushServer << std::endl;
65 #else
66         std::cerr << "Push server defined but built OpenDHT built without push notification support" << std::endl;
67 #endif
68     }
69 
70     jsonBuilder_["commentStyle"] = "None";
71     jsonBuilder_["indentation"] = "";
72 
73     server_thread = std::thread([this, port]() {
74         // Create endpoints
75         auto resource = std::make_shared<restbed::Resource>();
76         resource->set_path("/");
77         resource->set_method_handler("GET", std::bind(&DhtProxyServer::getNodeInfo, this, _1));
78         resource->set_method_handler("STATS", std::bind(&DhtProxyServer::getStats, this, _1));
79         service_->publish(resource);
80         resource = std::make_shared<restbed::Resource>();
81         resource->set_path("/{hash: .*}");
82         resource->set_method_handler("GET", std::bind(&DhtProxyServer::get, this, _1));
83         resource->set_method_handler("LISTEN", [this](const Sp<restbed::Session>& session) mutable { listen(session); } );
84 #ifdef OPENDHT_PUSH_NOTIFICATIONS
85         resource->set_method_handler("SUBSCRIBE", [this](const Sp<restbed::Session>& session) mutable { subscribe(session); } );
86         resource->set_method_handler("UNSUBSCRIBE", [this](const Sp<restbed::Session>& session) mutable { unsubscribe(session); } );
87 #endif //OPENDHT_PUSH_NOTIFICATIONS
88         resource->set_method_handler("POST", [this](const Sp<restbed::Session>& session) mutable { put(session); });
89 #ifdef OPENDHT_PROXY_SERVER_IDENTITY
90         resource->set_method_handler("SIGN", std::bind(&DhtProxyServer::putSigned, this, _1));
91         resource->set_method_handler("ENCRYPT", std::bind(&DhtProxyServer::putEncrypted, this, _1));
92 #endif // OPENDHT_PROXY_SERVER_IDENTITY
93         resource->set_method_handler("OPTIONS", std::bind(&DhtProxyServer::handleOptionsMethod, this, _1));
94         service_->publish(resource);
95         resource = std::make_shared<restbed::Resource>();
96         resource->set_path("/{hash: .*}/{value: .*}");
97         resource->set_method_handler("GET", std::bind(&DhtProxyServer::getFiltered, this, _1));
98         service_->publish(resource);
99 
100         // Start server
101         auto settings = std::make_shared<restbed::Settings>();
102         settings->set_default_header("Content-Type", "application/json");
103         settings->set_default_header("Connection", "keep-alive");
104         settings->set_default_header("Access-Control-Allow-Origin", "*");
105         std::chrono::milliseconds timeout(std::numeric_limits<int>::max());
106         settings->set_connection_timeout(timeout); // there is a timeout, but really huge
107         settings->set_port(port);
108         auto maxThreads = std::thread::hardware_concurrency() - 1;
109         settings->set_worker_limit(maxThreads > 1 ? maxThreads : 1);
110         lastStatsReset_ = clock::now();
111         try {
112             service_->start(settings);
113         } catch(std::system_error& e) {
114             std::cerr << "Error running server on port " << port << ": " << e.what() << std::endl;
115         }
116     });
117 
118     listenThread_ = std::thread([this]() {
119         while (not service_->is_up() and not stopListeners) {
120             std::this_thread::sleep_for(std::chrono::seconds(1));
121         }
122         while (service_->is_up() and not stopListeners) {
123             removeClosedListeners();
124             std::this_thread::sleep_for(std::chrono::seconds(1));
125         }
126         // Remove last listeners
127         removeClosedListeners(false);
128     });
129     schedulerThread_ = std::thread([this]() {
130         while (not service_->is_up() and not stopListeners) {
131             std::this_thread::sleep_for(std::chrono::seconds(1));
132         }
133         while (service_->is_up()  and not stopListeners) {
134             std::unique_lock<std::mutex> lock(schedulerLock_);
135             auto next = scheduler_.run();
136             if (next == time_point::max())
137                 schedulerCv_.wait(lock);
138             else
139                 schedulerCv_.wait_until(lock, next);
140         }
141     });
142     dht->forwardAllMessages(true);
143     printStatsJob_ = scheduler_.add(scheduler_.time() + PRINT_STATS_PERIOD, [this] {
144         if (stopListeners) return;
145         if (service_->is_up())
146             updateStats();
147         // Refresh stats cache
148         auto newInfo = dht_->getNodeInfo();
149         {
150             std::lock_guard<std::mutex> lck(statsMutex_);
151             nodeInfo_ = std::move(newInfo);
152         }
153         scheduler_.edit(printStatsJob_, scheduler_.time() + PRINT_STATS_PERIOD);
154     });
155 }
156 
~DhtProxyServer()157 DhtProxyServer::~DhtProxyServer()
158 {
159     stop();
160 }
161 
162 void
stop()163 DhtProxyServer::stop()
164 {
165     if (printStatsJob_)
166         printStatsJob_->cancel();
167     service_->stop();
168     {
169         std::lock_guard<std::mutex> lock(lockListener_);
170         auto listener = currentListeners_.begin();
171         while (listener != currentListeners_.end()) {
172             listener->session->close();
173             ++listener;
174         }
175     }
176     stopListeners = true;
177     schedulerCv_.notify_all();
178     // listenThreads_ will stop because there is no more sessions
179     if (listenThread_.joinable())
180         listenThread_.join();
181     if (schedulerThread_.joinable())
182         schedulerThread_.join();
183     if (server_thread.joinable())
184         server_thread.join();
185     threadPool_->stop();
186 }
187 
188 void
updateStats() const189 DhtProxyServer::updateStats() const
190 {
191     auto now = clock::now();
192     auto last = lastStatsReset_.exchange(now);
193     auto count = requestNum_.exchange(0);
194     auto dt = std::chrono::duration<double>(now - last);
195     stats_.requestRate = count / dt.count();
196 #ifdef OPENDHT_PUSH_NOTIFICATIONS
197     stats_.pushListenersCount = pushListeners_.size();
198 #endif
199     stats_.putCount = puts_.size();
200     stats_.listenCount = currentListeners_.size();
201     stats_.nodeInfo = nodeInfo_;
202 }
203 
204 void
getNodeInfo(const Sp<restbed::Session> & session) const205 DhtProxyServer::getNodeInfo(const Sp<restbed::Session>& session) const
206 {
207     requestNum_++;
208     const auto request = session->get_request();
209     int content_length = std::stoi(request->get_header("Content-Length", "0"));
210     session->fetch(content_length,
211         [this](const Sp<restbed::Session>& s, const restbed::Bytes& /*b*/) mutable
212         {
213             try {
214                 if (dht_) {
215                     Json::Value result;
216                     {
217                         std::lock_guard<std::mutex> lck(statsMutex_);
218                         if (nodeInfo_.ipv4.good_nodes == 0 && nodeInfo_.ipv6.good_nodes == 0) {
219                             // NOTE: we want to avoid the disconnected state as much as possible
220                             // So, if the node is disconnected, we should force the update of the cache
221                             // and reconnect as soon as possible
222                             // This should not happen much
223                             nodeInfo_ = dht_->getNodeInfo();
224                         }
225                         result = nodeInfo_.toJson();
226                     }
227                     result["public_ip"] = s->get_origin(); // [ipv6:ipv4]:port or ipv4:port
228                     auto output = Json::writeString(jsonBuilder_, result) + "\n";
229                     s->close(restbed::OK, output);
230                 }
231                 else
232                     s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}");
233             } catch (...) {
234                 s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}");
235             }
236         }
237     );
238 }
239 
240 void
getStats(const Sp<restbed::Session> & session) const241 DhtProxyServer::getStats(const Sp<restbed::Session>& session) const
242 {
243     requestNum_++;
244     const auto request = session->get_request();
245     int content_length = std::stoi(request->get_header("Content-Length", "0"));
246     session->fetch(content_length,
247         [this](const Sp<restbed::Session>& s, const restbed::Bytes& /*b*/) mutable
248         {
249             try {
250                 if (dht_) {
251 #ifdef OPENDHT_JSONCPP
252                     auto output = Json::writeString(jsonBuilder_, stats_.toJson()) + "\n";
253                     s->close(restbed::OK, output);
254 #else
255                     s->close(restbed::NotFound, "{\"err\":\"JSON not enabled on this instance\"}");
256 #endif
257                 }
258                 else
259                     s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}");
260             } catch (...) {
261                 s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}");
262             }
263         }
264     );
265 }
266 
267 void
get(const Sp<restbed::Session> & session) const268 DhtProxyServer::get(const Sp<restbed::Session>& session) const
269 {
270     requestNum_++;
271     const auto request = session->get_request();
272     int content_length = std::stoi(request->get_header("Content-Length", "0"));
273     auto hash = request->get_path_parameter("hash");
274     session->fetch(content_length,
275         [=](const Sp<restbed::Session>& s, const restbed::Bytes& /*b* */)
276         {
277             try {
278                 if (dht_) {
279                     InfoHash infoHash(hash);
280                     if (!infoHash) {
281                         infoHash = InfoHash::get(hash);
282                     }
283                     s->yield(restbed::OK, "", [=](const Sp<restbed::Session>&) {});
284                     dht_->get(infoHash, [this,s](const Sp<Value>& value) {
285                         if (s->is_closed()) return false;
286                         // Send values as soon as we get them
287                         auto output = Json::writeString(jsonBuilder_, value->toJson()) + "\n";
288                         s->yield(output, [](const Sp<restbed::Session>& /*session*/){ });
289                         return true;
290                     }, [s](bool /*ok* */) {
291                         // Communication is finished
292                         if (not s->is_closed()) {
293                             s->close();
294                         }
295                     });
296                 } else {
297                     s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}");
298                 }
299             } catch (...) {
300                 s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}");
301             }
302         }
303     );
304 }
305 
306 void
listen(const Sp<restbed::Session> & session)307 DhtProxyServer::listen(const Sp<restbed::Session>& session)
308 {
309     requestNum_++;
310     const auto request = session->get_request();
311     int content_length = std::stoi(request->get_header("Content-Length", "0"));
312     auto hash = request->get_path_parameter("hash");
313     InfoHash infoHash(hash);
314     if (!infoHash)
315         infoHash = InfoHash::get(hash);
316     session->fetch(content_length,
317         [=](const Sp<restbed::Session>& s, const restbed::Bytes& /*b* */)
318         {
319             try {
320                 if (dht_) {
321                     InfoHash infoHash(hash);
322                     if (!infoHash) {
323                         infoHash = InfoHash::get(hash);
324                     }
325                     s->yield(restbed::OK);
326                     // Handle client deconnection
327                     // NOTE: for now, there is no handler, so we test the session in a thread
328                     // will be the case in restbed 5.0
329                     SessionToHashToken listener;
330                     listener.session = session;
331                     listener.hash = infoHash;
332                     // cache the session to avoid an incrementation of the shared_ptr's counter
333                     // else, the session->close() will not close the socket.
334                     auto cacheSession = std::weak_ptr<restbed::Session>(s);
335                     listener.token = dht_->listen(infoHash, [this,cacheSession](const std::vector<Sp<Value>>& values, bool expired) {
336                         auto s = cacheSession.lock();
337                         if (!s) return false;
338                         // Send values as soon as we get them
339                         if (!s->is_closed()) {
340                             for (const auto& value : values) {
341                                 auto val = value->toJson();
342                                 if (expired)
343                                     val["expired"] = true;
344                                 auto output = Json::writeString(jsonBuilder_, val) + "\n";
345                                 s->yield(output, [](const Sp<restbed::Session>&){ });
346                             }
347                         }
348                         return !s->is_closed();
349                     });
350                     {
351                         std::lock_guard<std::mutex> lock(lockListener_);
352                         currentListeners_.emplace_back(std::move(listener));
353                     }
354                 } else {
355                     session->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}");
356                 }
357             } catch (...) {
358                 s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}");
359             }
360         }
361     );
362 }
363 
364 #ifdef OPENDHT_PUSH_NOTIFICATIONS
365 
366 struct DhtProxyServer::Listener {
367     std::string clientId;
368     std::future<size_t> internalToken;
369     Sp<Scheduler::Job> expireJob;
370     Sp<Scheduler::Job> expireNotifyJob;
371 };
372 struct DhtProxyServer::PushListener {
373     std::map<InfoHash, std::vector<Listener>> listeners;
374     bool isAndroid;
375 };
376 
377 void
subscribe(const std::shared_ptr<restbed::Session> & session)378 DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session)
379 {
380     requestNum_++;
381     const auto request = session->get_request();
382     int content_length = std::stoi(request->get_header("Content-Length", "0"));
383     auto hash = request->get_path_parameter("hash");
384     InfoHash infoHash(hash);
385     if (!infoHash)
386         infoHash = InfoHash::get(hash);
387     session->fetch(content_length,
388         [=](const std::shared_ptr<restbed::Session> s, const restbed::Bytes& b) mutable
389         {
390             try {
391                 std::string err;
392                 Json::Value root;
393                 Json::CharReaderBuilder rbuilder;
394                 auto* char_data = reinterpret_cast<const char*>(b.data());
395                 auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
396                 if (!reader->parse(char_data, char_data + b.size(), &root, &err)) {
397                     s->close(restbed::BAD_REQUEST, "{\"err\":\"Incorrect JSON\"}");
398                     return;
399                 }
400                 auto pushToken = root["key"].asString();
401                 if (pushToken.empty()) {
402                     s->close(restbed::BAD_REQUEST, "{\"err\":\"No token\"}");
403                     return;
404                 }
405                 auto platform = root["platform"].asString();
406                 auto isAndroid = platform == "android";
407                 auto clientId = root.isMember("client_id") ? root["client_id"].asString() : std::string();
408 
409                 std::cout << "Subscribe " << infoHash << " client:" << clientId << std::endl;
410 
411                 {
412                     std::lock(schedulerLock_, lockListener_);
413                     std::lock_guard<std::mutex> lk1(lockListener_, std::adopt_lock);
414                     std::lock_guard<std::mutex> lk2(schedulerLock_, std::adopt_lock);
415                     scheduler_.syncTime();
416                     auto timeout = scheduler_.time() + proxy::OP_TIMEOUT;
417                     // Check if listener is already present and refresh timeout if launched
418                     // One push listener per pushToken.infoHash.clientId
419                     auto pushListener = pushListeners_.emplace(pushToken, PushListener{}).first;
420                     auto listeners = pushListener->second.listeners.emplace(infoHash, std::vector<Listener>{}).first;
421                     for (auto& listener: listeners->second) {
422                         if (listener.clientId == clientId) {
423                             scheduler_.edit(listener.expireJob, timeout);
424                             scheduler_.edit(listener.expireNotifyJob, timeout - proxy::OP_MARGIN);
425                             s->yield(restbed::OK);
426 
427                             if (!root.isMember("refresh") or !root["refresh"].asBool()) {
428                                 dht_->get(
429                                     infoHash,
430                                     [this, s](const Sp<Value> &value) {
431                                         if (s->is_closed())
432                                             return false;
433                                         // Send values as soon as we get them
434                                         auto output = Json::writeString(jsonBuilder_, value->toJson()) + "\n";
435                                         s->yield(output, [](const Sp<restbed::Session>
436                                                                 & /*session*/) {});
437                                         return true;
438                                     },
439                                     [s](bool /*ok* */) {
440                                         // Communication is finished
441                                         if (not s->is_closed()) {
442                                             s->close("{}\n");
443                                         }
444                                     });
445                             } else {
446                                 // Communication is finished
447                                 if (not s->is_closed()) {
448                                     s->close("{}\n");
449                                 }
450                             }
451                             schedulerCv_.notify_one();
452                             return;
453                         }
454                     }
455                     listeners->second.emplace_back(Listener{});
456                     auto& listener = listeners->second.back();
457                     listener.clientId = clientId;
458 
459                     // New listener
460                     pushListener->second.isAndroid = isAndroid;
461 
462                     // The listener is not found, so add it.
463                     listener.internalToken = dht_->listen(infoHash,
464                         [this, infoHash, pushToken, isAndroid, clientId](const std::vector<std::shared_ptr<Value>>& values, bool expired) {
465                             threadPool_->run([this, infoHash, pushToken, isAndroid, clientId, values, expired]() {
466                                 // Build message content
467                                 Json::Value json;
468                                 json["key"] = infoHash.toString();
469                                 json["to"] = clientId;
470                                 if (expired and values.size() < 3) {
471                                     std::stringstream ss;
472                                     for(size_t i = 0; i < values.size(); ++i) {
473                                         if(i != 0) ss << ",";
474                                         ss << values[i]->id;
475                                     }
476                                     json["exp"] = ss.str();
477                                 }
478                                 sendPushNotification(pushToken, std::move(json), isAndroid);
479                             });
480                             return true;
481                         }
482                     );
483                     listener.expireJob = scheduler_.add(timeout,
484                         [this, clientId, infoHash, pushToken] {
485                             cancelPushListen(pushToken, infoHash, clientId);
486                         }
487                     );
488                     listener.expireNotifyJob = scheduler_.add(timeout - proxy::OP_MARGIN,
489                         [this, infoHash, pushToken, isAndroid, clientId] {
490                             std::cout << "Listener: sending refresh " << infoHash << std::endl;
491                             Json::Value json;
492                             json["timeout"] = infoHash.toString();
493                             json["to"] = clientId;
494                             sendPushNotification(pushToken, std::move(json), isAndroid);
495                         }
496                     );
497                 }
498                 schedulerCv_.notify_one();
499                 s->close(restbed::OK, "{}\n");
500             } catch (...) {
501                 s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}");
502             }
503         }
504     );
505 }
506 
507 void
unsubscribe(const std::shared_ptr<restbed::Session> & session)508 DhtProxyServer::unsubscribe(const std::shared_ptr<restbed::Session>& session)
509 {
510     requestNum_++;
511     const auto request = session->get_request();
512     int content_length = std::stoi(request->get_header("Content-Length", "0"));
513     auto hash = request->get_path_parameter("hash");
514     InfoHash infoHash(hash);
515     if (!infoHash)
516         infoHash = InfoHash::get(hash);
517     session->fetch(content_length,
518         [=](const std::shared_ptr<restbed::Session> s, const restbed::Bytes& b)
519         {
520             try {
521                 std::string err;
522                 Json::Value root;
523                 Json::CharReaderBuilder rbuilder;
524                 auto* char_data = reinterpret_cast<const char*>(b.data());
525                 auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
526                 if (!reader->parse(char_data, char_data + b.size(), &root, &err)) {
527                     s->close(restbed::BAD_REQUEST, "{\"err\":\"Incorrect JSON\"}");
528                     return;
529                 }
530                 auto pushToken = root["key"].asString();
531                 if (pushToken.empty()) return;
532                 auto clientId = root["client_id"].asString();
533 
534                 cancelPushListen(pushToken, infoHash, clientId);
535                 s->close(restbed::OK);
536             } catch (...) {
537                 s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}");
538             }
539         }
540     );
541 }
542 
543 void
cancelPushListen(const std::string & pushToken,const dht::InfoHash & key,const std::string & clientId)544 DhtProxyServer::cancelPushListen(const std::string& pushToken, const dht::InfoHash& key, const std::string& clientId)
545 {
546     std::cout << "cancelPushListen: " << key << " clientId:" << clientId << std::endl;
547     std::lock_guard<std::mutex> lock(lockListener_);
548     auto pushListener = pushListeners_.find(pushToken);
549     if (pushListener == pushListeners_.end())
550         return;
551     auto listeners = pushListener->second.listeners.find(key);
552     if (listeners == pushListener->second.listeners.end())
553         return;
554     for (auto listener = listeners->second.begin(); listener != listeners->second.end();) {
555         if (listener->clientId == clientId) {
556             if (dht_)
557                 dht_->cancelListen(key, std::move(listener->internalToken));
558             listener = listeners->second.erase(listener);
559         } else {
560             ++listener;
561         }
562     }
563     if (listeners->second.empty()) {
564         pushListener->second.listeners.erase(listeners);
565     }
566     if (pushListener->second.listeners.empty()) {
567         pushListeners_.erase(pushListener);
568     }
569 }
570 
571 void
sendPushNotification(const std::string & token,Json::Value && json,bool isAndroid) const572 DhtProxyServer::sendPushNotification(const std::string& token, Json::Value&& json, bool isAndroid) const
573 {
574     if (pushServer_.empty())
575         return;
576     restbed::Uri uri(proxy::HTTP_PROTO + pushServer_ + "/api/push");
577     auto req = std::make_shared<restbed::Request>(uri);
578     req->set_method("POST");
579 
580     // NOTE: see https://github.com/appleboy/gorush
581     Json::Value notification(Json::objectValue);
582     Json::Value tokens(Json::arrayValue);
583     tokens[0] = token;
584     notification["tokens"] = std::move(tokens);
585     notification["platform"] = isAndroid ? 2 : 1;
586     notification["data"] = std::move(json);
587     notification["priority"] = "high";
588     notification["time_to_live"] = 600;
589 
590     Json::Value notifications(Json::arrayValue);
591     notifications[0] = notification;
592 
593     Json::Value content;
594     content["notifications"] = std::move(notifications);
595 
596     Json::StreamWriterBuilder wbuilder;
597     wbuilder["commentStyle"] = "None";
598     wbuilder["indentation"] = "";
599     auto valueStr = Json::writeString(wbuilder, content);
600 
601     req->set_header("Content-Type", "application/json");
602     req->set_header("Accept", "*/*");
603     req->set_header("Host", pushServer_);
604     req->set_header("Content-Length", std::to_string(valueStr.length()));
605     req->set_body(valueStr);
606 
607     // Send request.
608     restbed::Http::async(req, {});
609 }
610 
611 #endif //OPENDHT_PUSH_NOTIFICATIONS
612 
613 void
cancelPut(const InfoHash & key,Value::Id vid)614 DhtProxyServer::cancelPut(const InfoHash& key, Value::Id vid)
615 {
616     std::cout << "cancelPut " << key << " " << vid << std::endl;
617     auto sPuts = puts_.find(key);
618     if (sPuts == puts_.end())
619         return;
620     auto& sPutsMap = sPuts->second.puts;
621     auto put = sPutsMap.find(vid);
622     if (put == sPutsMap.end())
623         return;
624     if (dht_)
625         dht_->cancelPut(key, vid);
626     if (put->second.expireNotifyJob)
627         put->second.expireNotifyJob->cancel();
628     sPutsMap.erase(put);
629     if (sPutsMap.empty())
630         puts_.erase(sPuts);
631 }
632 
633 void
put(const std::shared_ptr<restbed::Session> & session)634 DhtProxyServer::put(const std::shared_ptr<restbed::Session>& session)
635 {
636     requestNum_++;
637     const auto request = session->get_request();
638     int content_length = std::stoi(request->get_header("Content-Length", "0"));
639     auto hash = request->get_path_parameter("hash");
640     InfoHash infoHash(hash);
641     if (!infoHash)
642         infoHash = InfoHash::get(hash);
643 
644     session->fetch(content_length,
645         [=](const std::shared_ptr<restbed::Session> s, const restbed::Bytes& b)
646         {
647             try {
648                 if (dht_) {
649                     if(b.empty()) {
650                         std::string response("{\"err\":\"Missing parameters\"}");
651                         s->close(restbed::BAD_REQUEST, response);
652                     } else {
653                         std::string err;
654                         Json::Value root;
655                         Json::CharReaderBuilder rbuilder;
656                         auto* char_data = reinterpret_cast<const char*>(b.data());
657                         auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
658                         if (reader->parse(char_data, char_data + b.size(), &root, &err)) {
659                             // Build the Value from json
660                             auto value = std::make_shared<Value>(root);
661                             bool permanent = root.isMember("permanent");
662                             std::cout << "Got put " << infoHash << " " << *value << " " << (permanent ? "permanent" : "") << std::endl;
663 
664                             if (permanent) {
665                                 std::string pushToken, clientId, platform;
666                                 auto& pVal = root["permanent"];
667                                 if (pVal.isObject()) {
668                                     pushToken = pVal["key"].asString();
669                                     clientId = pVal["client_id"].asString();
670                                     platform = pVal["platform"].asString();
671                                 }
672                                 std::unique_lock<std::mutex> lock(schedulerLock_);
673                                 scheduler_.syncTime();
674                                 auto timeout = scheduler_.time() + proxy::OP_TIMEOUT;
675                                 auto vid = value->id;
676                                 auto sPuts = puts_.emplace(infoHash, SearchPuts{}).first;
677                                 auto r = sPuts->second.puts.emplace(vid, PermanentPut{});
678                                 auto& pput = r.first->second;
679                                 if (r.second) {
680                                     pput.expireJob = scheduler_.add(timeout, [this, infoHash, vid]{
681                                         std::cout << "Permanent put expired: " << infoHash << " " << vid << std::endl;
682                                         cancelPut(infoHash, vid);
683                                     });
684 #ifdef OPENDHT_PUSH_NOTIFICATIONS
685                                     if (not pushToken.empty()) {
686                                         bool isAndroid = platform == "android";
687                                         pput.expireNotifyJob = scheduler_.add(timeout - proxy::OP_MARGIN,
688                                             [this, infoHash, vid, pushToken, clientId, isAndroid]
689                                         {
690                                             std::cout << "Permanent put refresh: " << infoHash << " " << vid << std::endl;
691                                             Json::Value json;
692                                             json["timeout"] = infoHash.toString();
693                                             json["to"] = clientId;
694                                             json["vid"] = std::to_string(vid);
695                                             sendPushNotification(pushToken, std::move(json), isAndroid);
696                                         });
697                                     }
698 #endif
699                                 } else {
700                                     scheduler_.edit(pput.expireJob, timeout);
701                                     if (pput.expireNotifyJob)
702                                         scheduler_.edit(pput.expireNotifyJob, timeout - proxy::OP_MARGIN);
703                                 }
704                                 lock.unlock();
705                                 schedulerCv_.notify_one();
706                             }
707 
708                             dht_->put(infoHash, value, [s, value](bool ok) {
709                                 if (ok) {
710                                     Json::StreamWriterBuilder wbuilder;
711                                     wbuilder["commentStyle"] = "None";
712                                     wbuilder["indentation"] = "";
713                                     if (s->is_open())
714                                         s->close(restbed::OK, Json::writeString(wbuilder, value->toJson()) + "\n");
715                                 } else {
716                                     if (s->is_open())
717                                         s->close(restbed::BAD_GATEWAY, "{\"err\":\"put failed\"}");
718                                 }
719                             }, time_point::max(), permanent);
720                         } else {
721                             s->close(restbed::BAD_REQUEST, "{\"err\":\"Incorrect JSON\"}");
722                         }
723                     }
724                 } else {
725                     s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}");
726                 }
727             } catch (const std::exception& e) {
728                 std::cout << "Error performing put: " << e.what() << std::endl;
729                 s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}");
730             }
731         }
732     );
733 }
734 
735 #ifdef OPENDHT_PROXY_SERVER_IDENTITY
736 void
putSigned(const std::shared_ptr<restbed::Session> & session) const737 DhtProxyServer::putSigned(const std::shared_ptr<restbed::Session>& session) const
738 {
739     requestNum_++;
740     const auto request = session->get_request();
741     int content_length = std::stoi(request->get_header("Content-Length", "0"));
742     auto hash = request->get_path_parameter("hash");
743     InfoHash infoHash(hash);
744     if (!infoHash)
745         infoHash = InfoHash::get(hash);
746 
747     session->fetch(content_length,
748         [=](const std::shared_ptr<restbed::Session> s, const restbed::Bytes& b)
749         {
750             try {
751                 if (dht_) {
752                     if(b.empty()) {
753                         std::string response("{\"err\":\"Missing parameters\"}");
754                         s->close(restbed::BAD_REQUEST, response);
755                     } else {
756                         std::string err;
757                         Json::Value root;
758                         Json::CharReaderBuilder rbuilder;
759                         auto* char_data = reinterpret_cast<const char*>(b.data());
760                         auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
761                         if (reader->parse(char_data, char_data + b.size(), &root, &err)) {
762                             auto value = std::make_shared<Value>(root);
763 
764                             Json::StreamWriterBuilder wbuilder;
765                             wbuilder["commentStyle"] = "None";
766                             wbuilder["indentation"] = "";
767                             auto output = Json::writeString(wbuilder, value->toJson()) + "\n";
768                             dht_->putSigned(infoHash, value);
769                             s->close(restbed::OK, output);
770                         } else {
771                             s->close(restbed::BAD_REQUEST, "{\"err\":\"Incorrect JSON\"}");
772                         }
773                     }
774                 } else {
775                     s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}");
776                 }
777             } catch (...) {
778                 s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}");
779             }
780         }
781     );
782 }
783 
784 void
putEncrypted(const std::shared_ptr<restbed::Session> & session) const785 DhtProxyServer::putEncrypted(const std::shared_ptr<restbed::Session>& session) const
786 {
787     requestNum_++;
788     const auto request = session->get_request();
789     int content_length = std::stoi(request->get_header("Content-Length", "0"));
790     auto hash = request->get_path_parameter("hash");
791     InfoHash key(hash);
792     if (!key)
793         key = InfoHash::get(hash);
794 
795     session->fetch(content_length,
796         [=](const std::shared_ptr<restbed::Session> s, const restbed::Bytes& b)
797         {
798             try {
799                 if (dht_) {
800                     if(b.empty()) {
801                         std::string response("{\"err\":\"Missing parameters\"}");
802                         s->close(restbed::BAD_REQUEST, response);
803                     } else {
804                         std::string err;
805                         Json::Value root;
806                         Json::CharReaderBuilder rbuilder;
807                         auto* char_data = reinterpret_cast<const char*>(b.data());
808                         auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
809                         bool parsingSuccessful = reader->parse(char_data, char_data + b.size(), &root, &err);
810                         InfoHash to(root["to"].asString());
811                         if (parsingSuccessful && to) {
812                             auto value = std::make_shared<Value>(root);
813                             Json::StreamWriterBuilder wbuilder;
814                             wbuilder["commentStyle"] = "None";
815                             wbuilder["indentation"] = "";
816                             auto output = Json::writeString(wbuilder, value->toJson()) + "\n";
817                             dht_->putEncrypted(key, to, value);
818                             s->close(restbed::OK, output);
819                         } else {
820                             if(!parsingSuccessful)
821                                 s->close(restbed::BAD_REQUEST, "{\"err\":\"Incorrect JSON\"}");
822                             else
823                                 s->close(restbed::BAD_REQUEST, "{\"err\":\"No destination found\"}");
824                         }
825                     }
826                 } else {
827                     s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}");
828                 }
829             } catch (...) {
830                 s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}");
831             }
832         }
833     );
834 }
835 #endif // OPENDHT_PROXY_SERVER_IDENTITY
836 
837 void
handleOptionsMethod(const std::shared_ptr<restbed::Session> & session) const838 DhtProxyServer::handleOptionsMethod(const std::shared_ptr<restbed::Session>& session) const
839 {
840     requestNum_++;
841 #ifdef OPENDHT_PROXY_SERVER_IDENTITY
842     const auto allowed = "OPTIONS, GET, POST, LISTEN, SIGN, ENCRYPT";
843 #else
844     const auto allowed = "OPTIONS, GET, POST, LISTEN";
845 #endif //OPENDHT_PROXY_SERVER_IDENTITY
846     session->close(restbed::OK, {{"Access-Control-Allow-Methods", allowed},
847                                  {"Access-Control-Allow-Headers", "content-type"},
848                                  {"Access-Control-Max-Age", "86400"}});
849 }
850 
851 void
getFiltered(const std::shared_ptr<restbed::Session> & session) const852 DhtProxyServer::getFiltered(const std::shared_ptr<restbed::Session>& session) const
853 {
854     requestNum_++;
855     const auto request = session->get_request();
856     int content_length = std::stoi(request->get_header("Content-Length", "0"));
857     auto hash = request->get_path_parameter("hash");
858     auto value = request->get_path_parameter("value");
859     session->fetch(content_length,
860         [=](const std::shared_ptr<restbed::Session> s, const restbed::Bytes& /*b* */)
861         {
862             try {
863                 if (dht_) {
864                     InfoHash infoHash(hash);
865                     if (!infoHash) {
866                         infoHash = InfoHash::get(hash);
867                     }
868                     s->yield(restbed::OK, "", [=]( const std::shared_ptr< restbed::Session > s) {
869                         dht_->get(infoHash, [s](std::shared_ptr<Value> v) {
870                             // Send values as soon as we get them
871                             Json::StreamWriterBuilder wbuilder;
872                             wbuilder["commentStyle"] = "None";
873                             wbuilder["indentation"] = "";
874                             auto output = Json::writeString(wbuilder, v->toJson()) + "\n";
875                             s->yield(output, [](const std::shared_ptr<restbed::Session> /*session*/){ });
876                             return true;
877                         }, [s](bool /*ok* */) {
878                             // Communication is finished
879                             s->close();
880                         }, {}, value);
881                     });
882                 } else {
883                     s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}");
884                 }
885             } catch (...) {
886                 s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}");
887             }
888         }
889     );
890 }
891 
892 void
removeClosedListeners(bool testSession)893 DhtProxyServer::removeClosedListeners(bool testSession)
894 {
895     // clean useless listeners
896     std::lock_guard<std::mutex> lock(lockListener_);
897     auto listener = currentListeners_.begin();
898     while (listener != currentListeners_.end()) {
899         auto cancel = dht_ and (not testSession or listener->session->is_closed());
900         if (cancel) {
901             dht_->cancelListen(listener->hash, std::move(listener->token));
902             // Remove listener if unused
903             listener = currentListeners_.erase(listener);
904         } else {
905              ++listener;
906         }
907     }
908 }
909 
910 }
911