1 /***
2 This file is part of snapcast
3 Copyright (C) 2014-2021 Johannes Pohl
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17 ***/
18
19 #include "control_server.hpp"
20 #include "common/aixlog.hpp"
21 #include "common/snap_exception.hpp"
22 #include "common/utils.hpp"
23 #include "config.hpp"
24 #include "control_session_http.hpp"
25 #include "control_session_tcp.hpp"
26 #include "jsonrpcpp.hpp"
27 #include "message/time.hpp"
28
29 #include <iostream>
30
31 using namespace std;
32 using json = nlohmann::json;
33
34 static constexpr auto LOG_TAG = "ControlServer";
35
36
ControlServer(boost::asio::io_context & io_context,const ServerSettings::Tcp & tcp_settings,const ServerSettings::Http & http_settings,ControlMessageReceiver * controlMessageReceiver)37 ControlServer::ControlServer(boost::asio::io_context& io_context, const ServerSettings::Tcp& tcp_settings, const ServerSettings::Http& http_settings,
38 ControlMessageReceiver* controlMessageReceiver)
39 : io_context_(io_context), tcp_settings_(tcp_settings), http_settings_(http_settings), controlMessageReceiver_(controlMessageReceiver)
40 {
41 }
42
43
~ControlServer()44 ControlServer::~ControlServer()
45 {
46 stop();
47 }
48
49
cleanup()50 void ControlServer::cleanup()
51 {
52 auto new_end = std::remove_if(sessions_.begin(), sessions_.end(), [](std::weak_ptr<ControlSession> session) { return session.expired(); });
53 auto count = distance(new_end, sessions_.end());
54 if (count > 0)
55 {
56 LOG(ERROR, LOG_TAG) << "Removing " << count << " inactive session(s), active sessions: " << sessions_.size() - count << "\n";
57 sessions_.erase(new_end, sessions_.end());
58 }
59 }
60
61
send(const std::string & message,const ControlSession * excludeSession)62 void ControlServer::send(const std::string& message, const ControlSession* excludeSession)
63 {
64 std::lock_guard<std::recursive_mutex> mlock(session_mutex_);
65 for (const auto& s : sessions_)
66 {
67 if (auto session = s.lock())
68 {
69 if (session.get() != excludeSession)
70 session->sendAsync(message);
71 }
72 }
73 cleanup();
74 }
75
76
onMessageReceived(ControlSession * session,const std::string & message)77 std::string ControlServer::onMessageReceived(ControlSession* session, const std::string& message)
78 {
79 // LOG(DEBUG, LOG_TAG) << "received: \"" << message << "\"\n";
80 if (controlMessageReceiver_ != nullptr)
81 return controlMessageReceiver_->onMessageReceived(session, message);
82 return "";
83 }
84
85
onNewSession(const shared_ptr<ControlSession> & session)86 void ControlServer::onNewSession(const shared_ptr<ControlSession>& session)
87 {
88 std::lock_guard<std::recursive_mutex> mlock(session_mutex_);
89 session->start();
90 sessions_.emplace_back(session);
91 cleanup();
92 }
93
94
onNewSession(const std::shared_ptr<StreamSession> & session)95 void ControlServer::onNewSession(const std::shared_ptr<StreamSession>& session)
96 {
97 if (controlMessageReceiver_ != nullptr)
98 controlMessageReceiver_->onNewSession(session);
99 }
100
101
startAccept()102 void ControlServer::startAccept()
103 {
104 auto accept_handler_tcp = [this](error_code ec, tcp::socket socket) {
105 if (!ec)
106 handleAccept<ControlSessionTcp>(std::move(socket));
107 else
108 LOG(ERROR, LOG_TAG) << "Error while accepting socket connection: " << ec.message() << "\n";
109 };
110
111 auto accept_handler_http = [this](error_code ec, tcp::socket socket) {
112 if (!ec)
113 handleAccept<ControlSessionHttp>(std::move(socket), http_settings_);
114 else
115 LOG(ERROR, LOG_TAG) << "Error while accepting socket connection: " << ec.message() << "\n";
116 };
117
118 for (auto& acceptor : acceptor_tcp_)
119 acceptor->async_accept(accept_handler_tcp);
120
121 for (auto& acceptor : acceptor_http_)
122 acceptor->async_accept(accept_handler_http);
123 }
124
125
126 template <typename SessionType, typename... Args>
handleAccept(tcp::socket socket,Args &&...args)127 void ControlServer::handleAccept(tcp::socket socket, Args&&... args)
128 {
129 try
130 {
131 struct timeval tv;
132 tv.tv_sec = 5;
133 tv.tv_usec = 0;
134 setsockopt(socket.native_handle(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
135 setsockopt(socket.native_handle(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
136 // socket->set_option(boost::asio::ip::tcp::no_delay(false));
137 LOG(NOTICE, LOG_TAG) << "ControlServer::NewConnection: " << socket.remote_endpoint().address().to_string() << endl;
138 shared_ptr<SessionType> session = make_shared<SessionType>(this, io_context_, std::move(socket), std::forward<Args>(args)...);
139 onNewSession(session);
140 }
141 catch (const std::exception& e)
142 {
143 LOG(ERROR, LOG_TAG) << "Exception in ControlServer::handleAccept: " << e.what() << endl;
144 }
145 startAccept();
146 }
147
148
149
start()150 void ControlServer::start()
151 {
152 if (tcp_settings_.enabled)
153 {
154 for (const auto& address : tcp_settings_.bind_to_address)
155 {
156 try
157 {
158 LOG(INFO, LOG_TAG) << "Creating TCP acceptor for address: " << address << ", port: " << tcp_settings_.port << "\n";
159 acceptor_tcp_.emplace_back(
160 make_unique<tcp::acceptor>(io_context_, tcp::endpoint(boost::asio::ip::address::from_string(address), tcp_settings_.port)));
161 }
162 catch (const boost::system::system_error& e)
163 {
164 LOG(ERROR, LOG_TAG) << "error creating TCP acceptor: " << e.what() << ", code: " << e.code() << "\n";
165 }
166 }
167 }
168 if (http_settings_.enabled)
169 {
170 for (const auto& address : http_settings_.bind_to_address)
171 {
172 try
173 {
174 LOG(INFO, LOG_TAG) << "Creating HTTP acceptor for address: " << address << ", port: " << http_settings_.port << "\n";
175 acceptor_http_.emplace_back(
176 make_unique<tcp::acceptor>(io_context_, tcp::endpoint(boost::asio::ip::address::from_string(address), http_settings_.port)));
177 }
178 catch (const boost::system::system_error& e)
179 {
180 LOG(ERROR, LOG_TAG) << "error creating HTTP acceptor: " << e.what() << ", code: " << e.code() << "\n";
181 }
182 }
183 }
184
185 startAccept();
186 }
187
188
stop()189 void ControlServer::stop()
190 {
191 for (auto& acceptor : acceptor_tcp_)
192 acceptor->cancel();
193
194 for (auto& acceptor : acceptor_http_)
195 acceptor->cancel();
196
197 acceptor_tcp_.clear();
198 acceptor_http_.clear();
199
200 std::lock_guard<std::recursive_mutex> mlock(session_mutex_);
201 cleanup();
202 for (const auto& s : sessions_)
203 {
204 if (auto session = s.lock())
205 session->stop();
206 }
207 }
208