1 /***
2 This file is part of snapcast
3 Copyright (C) 2014-2021 Johannes Pohl
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17 ***/
18
19 #ifndef NOMINMAX
20 #define NOMINMAX
21 #endif // NOMINMAX
22
23 #include "controller.hpp"
24 #include "decoder/pcm_decoder.hpp"
25 #if defined(HAS_OGG) && (defined(HAS_TREMOR) || defined(HAS_VORBIS))
26 #include "decoder/ogg_decoder.hpp"
27 #endif
28 #if defined(HAS_FLAC)
29 #include "decoder/flac_decoder.hpp"
30 #endif
31 #if defined(HAS_OPUS)
32 #include "decoder/opus_decoder.hpp"
33 #endif
34
35 #ifdef HAS_ALSA
36 #include "player/alsa_player.hpp"
37 #endif
38 #ifdef HAS_PULSE
39 #include "player/pulse_player.hpp"
40 #endif
41 #ifdef HAS_OPENSL
42 #include "player/opensl_player.hpp"
43 #endif
44 #ifdef HAS_OBOE
45 #include "player/oboe_player.hpp"
46 #endif
47 #ifdef HAS_COREAUDIO
48 #include "player/coreaudio_player.hpp"
49 #endif
50 #ifdef HAS_WASAPI
51 #include "player/wasapi_player.hpp"
52 #endif
53 #include "player/file_player.hpp"
54
55 #include "browseZeroConf/browse_mdns.hpp"
56 #include "common/aixlog.hpp"
57 #include "common/snap_exception.hpp"
58 #include "message/client_info.hpp"
59 #include "message/hello.hpp"
60 #include "message/time.hpp"
61 #include "time_provider.hpp"
62
63 #include <algorithm>
64 #include <iostream>
65 #include <memory>
66 #include <string>
67
68 using namespace std;
69 using namespace player;
70
71 static constexpr auto LOG_TAG = "Controller";
72 static constexpr auto TIME_SYNC_INTERVAL = 1s;
73
Controller(boost::asio::io_context & io_context,const ClientSettings & settings,std::unique_ptr<MetadataAdapter> meta)74 Controller::Controller(boost::asio::io_context& io_context, const ClientSettings& settings, std::unique_ptr<MetadataAdapter> meta)
75 : io_context_(io_context), timer_(io_context), settings_(settings), stream_(nullptr), decoder_(nullptr), player_(nullptr), meta_(std::move(meta)),
76 serverSettings_(nullptr)
77 {
78 }
79
80
81 template <typename PlayerType>
createPlayer(ClientSettings::Player & settings,const std::string & player_name)82 std::unique_ptr<Player> Controller::createPlayer(ClientSettings::Player& settings, const std::string& player_name)
83 {
84 if (settings.player_name.empty() || settings.player_name == player_name)
85 {
86 settings.player_name = player_name;
87 return make_unique<PlayerType>(io_context_, settings, stream_);
88 }
89 return nullptr;
90 }
91
getSupportedPlayerNames()92 std::vector<std::string> Controller::getSupportedPlayerNames()
93 {
94 std::vector<std::string> result;
95 #ifdef HAS_ALSA
96 result.emplace_back(player::ALSA);
97 #endif
98 #ifdef HAS_PULSE
99 result.emplace_back(player::PULSE);
100 #endif
101 #ifdef HAS_OBOE
102 result.emplace_back(player::OBOE);
103 #endif
104 #ifdef HAS_OPENSL
105 result.emplace_back(player::OPENSL);
106 #endif
107 #ifdef HAS_COREAUDIO
108 result.emplace_back(player::COREAUDIO);
109 #endif
110 #ifdef HAS_WASAPI
111 result.emplace_back(player::WASAPI);
112 #endif
113 result.emplace_back(player::FILE);
114 return result;
115 }
116
117
getNextMessage()118 void Controller::getNextMessage()
119 {
120 clientConnection_->getNextMessage([this](const boost::system::error_code& ec, std::unique_ptr<msg::BaseMessage> response) {
121 if (ec)
122 {
123 reconnect();
124 return;
125 }
126
127 if (response->type == message_type::kWireChunk)
128 {
129 if (stream_ && decoder_)
130 {
131 // execute on the io_context to do the (costly) decoding on another thread (if more than one thread is used)
132 // boost::asio::post(io_context_, [this, response = std::move(response)]() mutable {
133 auto pcmChunk = msg::message_cast<msg::PcmChunk>(std::move(response));
134 pcmChunk->format = sampleFormat_;
135 // LOG(TRACE, LOG_TAG) << "chunk: " << pcmChunk->payloadSize << ", sampleFormat: " << sampleFormat_.toString() << "\n";
136 if (decoder_->decode(pcmChunk.get()))
137 {
138 // LOG(TRACE, LOG_TAG) << ", decoded: " << pcmChunk->payloadSize << ", Duration: " << pcmChunk->durationMs() << ", sec: " <<
139 // pcmChunk->timestamp.sec << ", usec: " << pcmChunk->timestamp.usec / 1000 << ", type: " << pcmChunk->type << "\n";
140 stream_->addChunk(std::move(pcmChunk));
141 }
142 // });
143 }
144 }
145 else if (response->type == message_type::kServerSettings)
146 {
147 serverSettings_ = msg::message_cast<msg::ServerSettings>(std::move(response));
148 LOG(INFO, LOG_TAG) << "ServerSettings - buffer: " << serverSettings_->getBufferMs() << ", latency: " << serverSettings_->getLatency()
149 << ", volume: " << serverSettings_->getVolume() << ", muted: " << serverSettings_->isMuted() << "\n";
150 if (stream_ && player_)
151 {
152 player_->setVolume(serverSettings_->getVolume() / 100., serverSettings_->isMuted());
153 stream_->setBufferLen(std::max(0, serverSettings_->getBufferMs() - serverSettings_->getLatency() - settings_.player.latency));
154 }
155 }
156 else if (response->type == message_type::kCodecHeader)
157 {
158 headerChunk_ = msg::message_cast<msg::CodecHeader>(std::move(response));
159 decoder_.reset(nullptr);
160 stream_ = nullptr;
161 player_.reset(nullptr);
162
163 if (headerChunk_->codec == "pcm")
164 decoder_ = make_unique<decoder::PcmDecoder>();
165 #if defined(HAS_OGG) && (defined(HAS_TREMOR) || defined(HAS_VORBIS))
166 else if (headerChunk_->codec == "ogg")
167 decoder_ = make_unique<decoder::OggDecoder>();
168 #endif
169 #if defined(HAS_FLAC)
170 else if (headerChunk_->codec == "flac")
171 decoder_ = make_unique<decoder::FlacDecoder>();
172 #endif
173 #if defined(HAS_OPUS)
174 else if (headerChunk_->codec == "opus")
175 decoder_ = make_unique<decoder::OpusDecoder>();
176 #endif
177 else
178 throw SnapException("codec not supported: \"" + headerChunk_->codec + "\"");
179
180 sampleFormat_ = decoder_->setHeader(headerChunk_.get());
181 LOG(INFO, LOG_TAG) << "Codec: " << headerChunk_->codec << ", sampleformat: " << sampleFormat_.toString() << "\n";
182
183 stream_ = make_shared<Stream>(sampleFormat_, settings_.player.sample_format);
184 stream_->setBufferLen(std::max(0, serverSettings_->getBufferMs() - serverSettings_->getLatency() - settings_.player.latency));
185
186 #ifdef HAS_ALSA
187 if (!player_)
188 player_ = createPlayer<AlsaPlayer>(settings_.player, player::ALSA);
189 #endif
190 #ifdef HAS_PULSE
191 if (!player_)
192 player_ = createPlayer<PulsePlayer>(settings_.player, player::PULSE);
193 #endif
194 #ifdef HAS_OBOE
195 if (!player_)
196 player_ = createPlayer<OboePlayer>(settings_.player, player::OBOE);
197 #endif
198 #ifdef HAS_OPENSL
199 if (!player_)
200 player_ = createPlayer<OpenslPlayer>(settings_.player, player::OPENSL);
201 #endif
202 #ifdef HAS_COREAUDIO
203 if (!player_)
204 player_ = createPlayer<CoreAudioPlayer>(settings_.player, player::COREAUDIO);
205 #endif
206 #ifdef HAS_WASAPI
207 if (!player_)
208 player_ = createPlayer<WASAPIPlayer>(settings_.player, player::WASAPI);
209 #endif
210 if (!player_ && (settings_.player.player_name == player::FILE))
211 player_ = createPlayer<FilePlayer>(settings_.player, player::FILE);
212
213 if (!player_)
214 throw SnapException("No audio player support" + (settings_.player.player_name.empty() ? "" : " for: " + settings_.player.player_name));
215
216 player_->setVolumeCallback([this](double volume, bool muted) {
217 static double last_volume(-1);
218 static bool last_muted(true);
219 if ((volume != last_volume) || (last_muted != muted))
220 {
221 last_volume = volume;
222 last_muted = muted;
223 auto info = std::make_shared<msg::ClientInfo>();
224 info->setVolume(static_cast<uint16_t>(volume * 100.));
225 info->setMuted(muted);
226 clientConnection_->send(info, [this](const boost::system::error_code& ec) {
227 if (ec)
228 {
229 LOG(ERROR, LOG_TAG) << "Failed to send client info, error: " << ec.message() << "\n";
230 reconnect();
231 return;
232 }
233 });
234 }
235 });
236 player_->start();
237 // Don't change the initial hardware mixer volume on the user's device.
238 // The player class will send the device's volume to the server instead
239 // if (settings_.player.mixer.mode != ClientSettings::Mixer::Mode::hardware)
240 // {
241 player_->setVolume(serverSettings_->getVolume() / 100., serverSettings_->isMuted());
242 // }
243 }
244 else if (response->type == message_type::kStreamTags)
245 {
246 if (meta_)
247 {
248 auto stream_tags = msg::message_cast<msg::StreamTags>(std::move(response));
249 meta_->push(stream_tags->msg);
250 }
251 }
252 else
253 {
254 LOG(WARNING, LOG_TAG) << "Unexpected message received, type: " << response->type << "\n";
255 }
256 getNextMessage();
257 });
258 }
259
260
sendTimeSyncMessage(int quick_syncs)261 void Controller::sendTimeSyncMessage(int quick_syncs)
262 {
263 auto timeReq = std::make_shared<msg::Time>();
264 clientConnection_->sendRequest<msg::Time>(
265 timeReq, 2s, [this, quick_syncs](const boost::system::error_code& ec, const std::unique_ptr<msg::Time>& response) mutable {
266 if (ec)
267 {
268 LOG(ERROR, LOG_TAG) << "Time sync request failed: " << ec.message() << "\n";
269 reconnect();
270 return;
271 }
272 else
273 {
274 TimeProvider::getInstance().setDiff(response->latency, response->received - response->sent);
275 }
276
277 std::chrono::microseconds next = TIME_SYNC_INTERVAL;
278 if (quick_syncs > 0)
279 {
280 if (--quick_syncs == 0)
281 LOG(INFO, LOG_TAG) << "diff to server [ms]: "
282 << static_cast<float>(TimeProvider::getInstance().getDiffToServer<chronos::usec>().count()) / 1000.f << "\n";
283 next = 100us;
284 }
285 timer_.expires_after(next);
286 timer_.async_wait([this, quick_syncs](const boost::system::error_code& ec) {
287 if (!ec)
288 {
289 sendTimeSyncMessage(quick_syncs);
290 }
291 });
292 });
293 }
294
browseMdns(const MdnsHandler & handler)295 void Controller::browseMdns(const MdnsHandler& handler)
296 {
297 #if defined(HAS_AVAHI) || defined(HAS_BONJOUR)
298 try
299 {
300 BrowseZeroConf browser;
301 mDNSResult avahiResult;
302 if (browser.browse("_snapcast._tcp", avahiResult, 1000))
303 {
304 string host = avahiResult.ip;
305 uint16_t port = avahiResult.port;
306 if (avahiResult.ip_version == IPVersion::IPv6)
307 host += "%" + cpt::to_string(avahiResult.iface_idx);
308 handler({}, host, port);
309 return;
310 }
311 }
312 catch (const std::exception& e)
313 {
314 LOG(ERROR, LOG_TAG) << "Exception: " << e.what() << std::endl;
315 }
316
317 timer_.expires_after(500ms);
318 timer_.async_wait([this, handler](const boost::system::error_code& ec) {
319 if (!ec)
320 {
321 browseMdns(handler);
322 }
323 else
324 {
325 handler(ec, "", 0);
326 }
327 });
328 #else
329 handler(boost::asio::error::operation_not_supported, "", 0);
330 #endif
331 }
332
start()333 void Controller::start()
334 {
335 if (settings_.server.host.empty())
336 {
337 browseMdns([this](const boost::system::error_code& ec, const std::string& host, uint16_t port) {
338 if (ec)
339 {
340 LOG(ERROR, LOG_TAG) << "Failed to browse MDNS, error: " << ec.message() << "\n";
341 }
342 else
343 {
344 settings_.server.host = host;
345 settings_.server.port = port;
346 LOG(INFO, LOG_TAG) << "Found server " << settings_.server.host << ":" << settings_.server.port << "\n";
347 clientConnection_ = make_unique<ClientConnection>(io_context_, settings_.server);
348 worker();
349 }
350 });
351 }
352 else
353 {
354 clientConnection_ = make_unique<ClientConnection>(io_context_, settings_.server);
355 worker();
356 }
357 }
358
359
360 // void Controller::stop()
361 // {
362 // LOG(DEBUG, LOG_TAG) << "Stopping\n";
363 // timer_.cancel();
364 // }
365
reconnect()366 void Controller::reconnect()
367 {
368 timer_.cancel();
369 clientConnection_->disconnect();
370 player_.reset();
371 stream_.reset();
372 decoder_.reset();
373 timer_.expires_after(1s);
374 timer_.async_wait([this](const boost::system::error_code& ec) {
375 if (!ec)
376 {
377 worker();
378 }
379 });
380 }
381
worker()382 void Controller::worker()
383 {
384 clientConnection_->connect([this](const boost::system::error_code& ec) {
385 if (!ec)
386 {
387 // LOG(INFO, LOG_TAG) << "Connected!\n";
388 string macAddress = clientConnection_->getMacAddress();
389 if (settings_.host_id.empty())
390 settings_.host_id = ::getHostId(macAddress);
391
392 // Say hello to the server
393 auto hello = std::make_shared<msg::Hello>(macAddress, settings_.host_id, settings_.instance);
394 clientConnection_->sendRequest<msg::ServerSettings>(
395 hello, 2s, [this](const boost::system::error_code& ec, std::unique_ptr<msg::ServerSettings> response) mutable {
396 if (ec)
397 {
398 LOG(ERROR, LOG_TAG) << "Failed to send hello request, error: " << ec.message() << "\n";
399 reconnect();
400 return;
401 }
402 else
403 {
404 serverSettings_ = std::move(response);
405 LOG(INFO, LOG_TAG) << "ServerSettings - buffer: " << serverSettings_->getBufferMs() << ", latency: " << serverSettings_->getLatency()
406 << ", volume: " << serverSettings_->getVolume() << ", muted: " << serverSettings_->isMuted() << "\n";
407 }
408 });
409
410 // Do initial time sync with the server
411 sendTimeSyncMessage(50);
412 // Start receiver loop
413 getNextMessage();
414 }
415 else
416 {
417 LOG(ERROR, LOG_TAG) << "Error: " << ec.message() << "\n";
418 reconnect();
419 }
420 });
421 }
422