1 /***
2     This file is part of snapcast
3     Copyright (C) 2014-2021  Johannes Pohl
4 
5     This program is free software: you can redistribute it and/or modify
6     it under the terms of the GNU General Public License as published by
7     the Free Software Foundation, either version 3 of the License, or
8     (at your option) any later version.
9 
10     This program is distributed in the hope that it will be useful,
11     but WITHOUT ANY WARRANTY; without even the implied warranty of
12     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14 
15     You should have received a copy of the GNU General Public License
16     along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 ***/
18 
19 #include "control_server.hpp"
20 #include "common/aixlog.hpp"
21 #include "common/snap_exception.hpp"
22 #include "common/utils.hpp"
23 #include "config.hpp"
24 #include "control_session_http.hpp"
25 #include "control_session_tcp.hpp"
26 #include "jsonrpcpp.hpp"
27 #include "message/time.hpp"
28 
29 #include <iostream>
30 
31 using namespace std;
32 using json = nlohmann::json;
33 
34 static constexpr auto LOG_TAG = "ControlServer";
35 
36 
ControlServer(boost::asio::io_context & io_context,const ServerSettings::Tcp & tcp_settings,const ServerSettings::Http & http_settings,ControlMessageReceiver * controlMessageReceiver)37 ControlServer::ControlServer(boost::asio::io_context& io_context, const ServerSettings::Tcp& tcp_settings, const ServerSettings::Http& http_settings,
38                              ControlMessageReceiver* controlMessageReceiver)
39     : io_context_(io_context), tcp_settings_(tcp_settings), http_settings_(http_settings), controlMessageReceiver_(controlMessageReceiver)
40 {
41 }
42 
43 
~ControlServer()44 ControlServer::~ControlServer()
45 {
46     stop();
47 }
48 
49 
cleanup()50 void ControlServer::cleanup()
51 {
52     auto new_end = std::remove_if(sessions_.begin(), sessions_.end(), [](std::weak_ptr<ControlSession> session) { return session.expired(); });
53     auto count = distance(new_end, sessions_.end());
54     if (count > 0)
55     {
56         LOG(ERROR, LOG_TAG) << "Removing " << count << " inactive session(s), active sessions: " << sessions_.size() - count << "\n";
57         sessions_.erase(new_end, sessions_.end());
58     }
59 }
60 
61 
send(const std::string & message,const ControlSession * excludeSession)62 void ControlServer::send(const std::string& message, const ControlSession* excludeSession)
63 {
64     std::lock_guard<std::recursive_mutex> mlock(session_mutex_);
65     for (const auto& s : sessions_)
66     {
67         if (auto session = s.lock())
68         {
69             if (session.get() != excludeSession)
70                 session->sendAsync(message);
71         }
72     }
73     cleanup();
74 }
75 
76 
onMessageReceived(ControlSession * session,const std::string & message)77 std::string ControlServer::onMessageReceived(ControlSession* session, const std::string& message)
78 {
79     // LOG(DEBUG, LOG_TAG) << "received: \"" << message << "\"\n";
80     if (controlMessageReceiver_ != nullptr)
81         return controlMessageReceiver_->onMessageReceived(session, message);
82     return "";
83 }
84 
85 
onNewSession(const shared_ptr<ControlSession> & session)86 void ControlServer::onNewSession(const shared_ptr<ControlSession>& session)
87 {
88     std::lock_guard<std::recursive_mutex> mlock(session_mutex_);
89     session->start();
90     sessions_.emplace_back(session);
91     cleanup();
92 }
93 
94 
onNewSession(const std::shared_ptr<StreamSession> & session)95 void ControlServer::onNewSession(const std::shared_ptr<StreamSession>& session)
96 {
97     if (controlMessageReceiver_ != nullptr)
98         controlMessageReceiver_->onNewSession(session);
99 }
100 
101 
startAccept()102 void ControlServer::startAccept()
103 {
104     auto accept_handler_tcp = [this](error_code ec, tcp::socket socket) {
105         if (!ec)
106             handleAccept<ControlSessionTcp>(std::move(socket));
107         else
108             LOG(ERROR, LOG_TAG) << "Error while accepting socket connection: " << ec.message() << "\n";
109     };
110 
111     auto accept_handler_http = [this](error_code ec, tcp::socket socket) {
112         if (!ec)
113             handleAccept<ControlSessionHttp>(std::move(socket), http_settings_);
114         else
115             LOG(ERROR, LOG_TAG) << "Error while accepting socket connection: " << ec.message() << "\n";
116     };
117 
118     for (auto& acceptor : acceptor_tcp_)
119         acceptor->async_accept(accept_handler_tcp);
120 
121     for (auto& acceptor : acceptor_http_)
122         acceptor->async_accept(accept_handler_http);
123 }
124 
125 
126 template <typename SessionType, typename... Args>
handleAccept(tcp::socket socket,Args &&...args)127 void ControlServer::handleAccept(tcp::socket socket, Args&&... args)
128 {
129     try
130     {
131         struct timeval tv;
132         tv.tv_sec = 5;
133         tv.tv_usec = 0;
134         setsockopt(socket.native_handle(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
135         setsockopt(socket.native_handle(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
136         //	socket->set_option(boost::asio::ip::tcp::no_delay(false));
137         LOG(NOTICE, LOG_TAG) << "ControlServer::NewConnection: " << socket.remote_endpoint().address().to_string() << endl;
138         shared_ptr<SessionType> session = make_shared<SessionType>(this, io_context_, std::move(socket), std::forward<Args>(args)...);
139         onNewSession(session);
140     }
141     catch (const std::exception& e)
142     {
143         LOG(ERROR, LOG_TAG) << "Exception in ControlServer::handleAccept: " << e.what() << endl;
144     }
145     startAccept();
146 }
147 
148 
149 
start()150 void ControlServer::start()
151 {
152     if (tcp_settings_.enabled)
153     {
154         for (const auto& address : tcp_settings_.bind_to_address)
155         {
156             try
157             {
158                 LOG(INFO, LOG_TAG) << "Creating TCP acceptor for address: " << address << ", port: " << tcp_settings_.port << "\n";
159                 acceptor_tcp_.emplace_back(
160                     make_unique<tcp::acceptor>(io_context_, tcp::endpoint(boost::asio::ip::address::from_string(address), tcp_settings_.port)));
161             }
162             catch (const boost::system::system_error& e)
163             {
164                 LOG(ERROR, LOG_TAG) << "error creating TCP acceptor: " << e.what() << ", code: " << e.code() << "\n";
165             }
166         }
167     }
168     if (http_settings_.enabled)
169     {
170         for (const auto& address : http_settings_.bind_to_address)
171         {
172             try
173             {
174                 LOG(INFO, LOG_TAG) << "Creating HTTP acceptor for address: " << address << ", port: " << http_settings_.port << "\n";
175                 acceptor_http_.emplace_back(
176                     make_unique<tcp::acceptor>(io_context_, tcp::endpoint(boost::asio::ip::address::from_string(address), http_settings_.port)));
177             }
178             catch (const boost::system::system_error& e)
179             {
180                 LOG(ERROR, LOG_TAG) << "error creating HTTP acceptor: " << e.what() << ", code: " << e.code() << "\n";
181             }
182         }
183     }
184 
185     startAccept();
186 }
187 
188 
stop()189 void ControlServer::stop()
190 {
191     for (auto& acceptor : acceptor_tcp_)
192         acceptor->cancel();
193 
194     for (auto& acceptor : acceptor_http_)
195         acceptor->cancel();
196 
197     acceptor_tcp_.clear();
198     acceptor_http_.clear();
199 
200     std::lock_guard<std::recursive_mutex> mlock(session_mutex_);
201     cleanup();
202     for (const auto& s : sessions_)
203     {
204         if (auto session = s.lock())
205             session->stop();
206     }
207 }
208