1 /***
2     This file is part of snapcast
3     Copyright (C) 2014-2021  Johannes Pohl
4 
5     This program is free software: you can redistribute it and/or modify
6     it under the terms of the GNU General Public License as published by
7     the Free Software Foundation, either version 3 of the License, or
8     (at your option) any later version.
9 
10     This program is distributed in the hope that it will be useful,
11     but WITHOUT ANY WARRANTY; without even the implied warranty of
12     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14 
15     You should have received a copy of the GNU General Public License
16     along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 ***/
18 
19 #ifndef NOMINMAX
20 #define NOMINMAX
21 #endif // NOMINMAX
22 
23 #include "controller.hpp"
24 #include "decoder/pcm_decoder.hpp"
25 #if defined(HAS_OGG) && (defined(HAS_TREMOR) || defined(HAS_VORBIS))
26 #include "decoder/ogg_decoder.hpp"
27 #endif
28 #if defined(HAS_FLAC)
29 #include "decoder/flac_decoder.hpp"
30 #endif
31 #if defined(HAS_OPUS)
32 #include "decoder/opus_decoder.hpp"
33 #endif
34 
35 #ifdef HAS_ALSA
36 #include "player/alsa_player.hpp"
37 #endif
38 #ifdef HAS_PULSE
39 #include "player/pulse_player.hpp"
40 #endif
41 #ifdef HAS_OPENSL
42 #include "player/opensl_player.hpp"
43 #endif
44 #ifdef HAS_OBOE
45 #include "player/oboe_player.hpp"
46 #endif
47 #ifdef HAS_COREAUDIO
48 #include "player/coreaudio_player.hpp"
49 #endif
50 #ifdef HAS_WASAPI
51 #include "player/wasapi_player.hpp"
52 #endif
53 #include "player/file_player.hpp"
54 
55 #include "browseZeroConf/browse_mdns.hpp"
56 #include "common/aixlog.hpp"
57 #include "common/snap_exception.hpp"
58 #include "message/client_info.hpp"
59 #include "message/hello.hpp"
60 #include "message/time.hpp"
61 #include "time_provider.hpp"
62 
63 #include <algorithm>
64 #include <iostream>
65 #include <memory>
66 #include <string>
67 
68 using namespace std;
69 using namespace player;
70 
71 static constexpr auto LOG_TAG = "Controller";
72 static constexpr auto TIME_SYNC_INTERVAL = 1s;
73 
Controller(boost::asio::io_context & io_context,const ClientSettings & settings,std::unique_ptr<MetadataAdapter> meta)74 Controller::Controller(boost::asio::io_context& io_context, const ClientSettings& settings, std::unique_ptr<MetadataAdapter> meta)
75     : io_context_(io_context), timer_(io_context), settings_(settings), stream_(nullptr), decoder_(nullptr), player_(nullptr), meta_(std::move(meta)),
76       serverSettings_(nullptr)
77 {
78 }
79 
80 
81 template <typename PlayerType>
createPlayer(ClientSettings::Player & settings,const std::string & player_name)82 std::unique_ptr<Player> Controller::createPlayer(ClientSettings::Player& settings, const std::string& player_name)
83 {
84     if (settings.player_name.empty() || settings.player_name == player_name)
85     {
86         settings.player_name = player_name;
87         return make_unique<PlayerType>(io_context_, settings, stream_);
88     }
89     return nullptr;
90 }
91 
getSupportedPlayerNames()92 std::vector<std::string> Controller::getSupportedPlayerNames()
93 {
94     std::vector<std::string> result;
95 #ifdef HAS_ALSA
96     result.emplace_back(player::ALSA);
97 #endif
98 #ifdef HAS_PULSE
99     result.emplace_back(player::PULSE);
100 #endif
101 #ifdef HAS_OBOE
102     result.emplace_back(player::OBOE);
103 #endif
104 #ifdef HAS_OPENSL
105     result.emplace_back(player::OPENSL);
106 #endif
107 #ifdef HAS_COREAUDIO
108     result.emplace_back(player::COREAUDIO);
109 #endif
110 #ifdef HAS_WASAPI
111     result.emplace_back(player::WASAPI);
112 #endif
113     result.emplace_back(player::FILE);
114     return result;
115 }
116 
117 
getNextMessage()118 void Controller::getNextMessage()
119 {
120     clientConnection_->getNextMessage([this](const boost::system::error_code& ec, std::unique_ptr<msg::BaseMessage> response) {
121         if (ec)
122         {
123             reconnect();
124             return;
125         }
126 
127         if (response->type == message_type::kWireChunk)
128         {
129             if (stream_ && decoder_)
130             {
131                 // execute on the io_context to do the (costly) decoding on another thread (if more than one thread is used)
132                 // boost::asio::post(io_context_, [this, response = std::move(response)]() mutable {
133                 auto pcmChunk = msg::message_cast<msg::PcmChunk>(std::move(response));
134                 pcmChunk->format = sampleFormat_;
135                 // LOG(TRACE, LOG_TAG) << "chunk: " << pcmChunk->payloadSize << ", sampleFormat: " << sampleFormat_.toString() << "\n";
136                 if (decoder_->decode(pcmChunk.get()))
137                 {
138                     // LOG(TRACE, LOG_TAG) << ", decoded: " << pcmChunk->payloadSize << ", Duration: " << pcmChunk->durationMs() << ", sec: " <<
139                     // pcmChunk->timestamp.sec << ", usec: " << pcmChunk->timestamp.usec / 1000 << ", type: " << pcmChunk->type << "\n";
140                     stream_->addChunk(std::move(pcmChunk));
141                 }
142                 // });
143             }
144         }
145         else if (response->type == message_type::kServerSettings)
146         {
147             serverSettings_ = msg::message_cast<msg::ServerSettings>(std::move(response));
148             LOG(INFO, LOG_TAG) << "ServerSettings - buffer: " << serverSettings_->getBufferMs() << ", latency: " << serverSettings_->getLatency()
149                                << ", volume: " << serverSettings_->getVolume() << ", muted: " << serverSettings_->isMuted() << "\n";
150             if (stream_ && player_)
151             {
152                 player_->setVolume(serverSettings_->getVolume() / 100., serverSettings_->isMuted());
153                 stream_->setBufferLen(std::max(0, serverSettings_->getBufferMs() - serverSettings_->getLatency() - settings_.player.latency));
154             }
155         }
156         else if (response->type == message_type::kCodecHeader)
157         {
158             headerChunk_ = msg::message_cast<msg::CodecHeader>(std::move(response));
159             decoder_.reset(nullptr);
160             stream_ = nullptr;
161             player_.reset(nullptr);
162 
163             if (headerChunk_->codec == "pcm")
164                 decoder_ = make_unique<decoder::PcmDecoder>();
165 #if defined(HAS_OGG) && (defined(HAS_TREMOR) || defined(HAS_VORBIS))
166             else if (headerChunk_->codec == "ogg")
167                 decoder_ = make_unique<decoder::OggDecoder>();
168 #endif
169 #if defined(HAS_FLAC)
170             else if (headerChunk_->codec == "flac")
171                 decoder_ = make_unique<decoder::FlacDecoder>();
172 #endif
173 #if defined(HAS_OPUS)
174             else if (headerChunk_->codec == "opus")
175                 decoder_ = make_unique<decoder::OpusDecoder>();
176 #endif
177             else
178                 throw SnapException("codec not supported: \"" + headerChunk_->codec + "\"");
179 
180             sampleFormat_ = decoder_->setHeader(headerChunk_.get());
181             LOG(INFO, LOG_TAG) << "Codec: " << headerChunk_->codec << ", sampleformat: " << sampleFormat_.toString() << "\n";
182 
183             stream_ = make_shared<Stream>(sampleFormat_, settings_.player.sample_format);
184             stream_->setBufferLen(std::max(0, serverSettings_->getBufferMs() - serverSettings_->getLatency() - settings_.player.latency));
185 
186 #ifdef HAS_ALSA
187             if (!player_)
188                 player_ = createPlayer<AlsaPlayer>(settings_.player, player::ALSA);
189 #endif
190 #ifdef HAS_PULSE
191             if (!player_)
192                 player_ = createPlayer<PulsePlayer>(settings_.player, player::PULSE);
193 #endif
194 #ifdef HAS_OBOE
195             if (!player_)
196                 player_ = createPlayer<OboePlayer>(settings_.player, player::OBOE);
197 #endif
198 #ifdef HAS_OPENSL
199             if (!player_)
200                 player_ = createPlayer<OpenslPlayer>(settings_.player, player::OPENSL);
201 #endif
202 #ifdef HAS_COREAUDIO
203             if (!player_)
204                 player_ = createPlayer<CoreAudioPlayer>(settings_.player, player::COREAUDIO);
205 #endif
206 #ifdef HAS_WASAPI
207             if (!player_)
208                 player_ = createPlayer<WASAPIPlayer>(settings_.player, player::WASAPI);
209 #endif
210             if (!player_ && (settings_.player.player_name == player::FILE))
211                 player_ = createPlayer<FilePlayer>(settings_.player, player::FILE);
212 
213             if (!player_)
214                 throw SnapException("No audio player support" + (settings_.player.player_name.empty() ? "" : " for: " + settings_.player.player_name));
215 
216             player_->setVolumeCallback([this](double volume, bool muted) {
217                 static double last_volume(-1);
218                 static bool last_muted(true);
219                 if ((volume != last_volume) || (last_muted != muted))
220                 {
221                     last_volume = volume;
222                     last_muted = muted;
223                     auto info = std::make_shared<msg::ClientInfo>();
224                     info->setVolume(static_cast<uint16_t>(volume * 100.));
225                     info->setMuted(muted);
226                     clientConnection_->send(info, [this](const boost::system::error_code& ec) {
227                         if (ec)
228                         {
229                             LOG(ERROR, LOG_TAG) << "Failed to send client info, error: " << ec.message() << "\n";
230                             reconnect();
231                             return;
232                         }
233                     });
234                 }
235             });
236             player_->start();
237             // Don't change the initial hardware mixer volume on the user's device.
238             // The player class will send the device's volume to the server instead
239             // if (settings_.player.mixer.mode != ClientSettings::Mixer::Mode::hardware)
240             // {
241             player_->setVolume(serverSettings_->getVolume() / 100., serverSettings_->isMuted());
242             // }
243         }
244         else if (response->type == message_type::kStreamTags)
245         {
246             if (meta_)
247             {
248                 auto stream_tags = msg::message_cast<msg::StreamTags>(std::move(response));
249                 meta_->push(stream_tags->msg);
250             }
251         }
252         else
253         {
254             LOG(WARNING, LOG_TAG) << "Unexpected message received, type: " << response->type << "\n";
255         }
256         getNextMessage();
257     });
258 }
259 
260 
sendTimeSyncMessage(int quick_syncs)261 void Controller::sendTimeSyncMessage(int quick_syncs)
262 {
263     auto timeReq = std::make_shared<msg::Time>();
264     clientConnection_->sendRequest<msg::Time>(
265         timeReq, 2s, [this, quick_syncs](const boost::system::error_code& ec, const std::unique_ptr<msg::Time>& response) mutable {
266             if (ec)
267             {
268                 LOG(ERROR, LOG_TAG) << "Time sync request failed: " << ec.message() << "\n";
269                 reconnect();
270                 return;
271             }
272             else
273             {
274                 TimeProvider::getInstance().setDiff(response->latency, response->received - response->sent);
275             }
276 
277             std::chrono::microseconds next = TIME_SYNC_INTERVAL;
278             if (quick_syncs > 0)
279             {
280                 if (--quick_syncs == 0)
281                     LOG(INFO, LOG_TAG) << "diff to server [ms]: "
282                                        << static_cast<float>(TimeProvider::getInstance().getDiffToServer<chronos::usec>().count()) / 1000.f << "\n";
283                 next = 100us;
284             }
285             timer_.expires_after(next);
286             timer_.async_wait([this, quick_syncs](const boost::system::error_code& ec) {
287                 if (!ec)
288                 {
289                     sendTimeSyncMessage(quick_syncs);
290                 }
291             });
292         });
293 }
294 
browseMdns(const MdnsHandler & handler)295 void Controller::browseMdns(const MdnsHandler& handler)
296 {
297 #if defined(HAS_AVAHI) || defined(HAS_BONJOUR)
298     try
299     {
300         BrowseZeroConf browser;
301         mDNSResult avahiResult;
302         if (browser.browse("_snapcast._tcp", avahiResult, 1000))
303         {
304             string host = avahiResult.ip;
305             uint16_t port = avahiResult.port;
306             if (avahiResult.ip_version == IPVersion::IPv6)
307                 host += "%" + cpt::to_string(avahiResult.iface_idx);
308             handler({}, host, port);
309             return;
310         }
311     }
312     catch (const std::exception& e)
313     {
314         LOG(ERROR, LOG_TAG) << "Exception: " << e.what() << std::endl;
315     }
316 
317     timer_.expires_after(500ms);
318     timer_.async_wait([this, handler](const boost::system::error_code& ec) {
319         if (!ec)
320         {
321             browseMdns(handler);
322         }
323         else
324         {
325             handler(ec, "", 0);
326         }
327     });
328 #else
329     handler(boost::asio::error::operation_not_supported, "", 0);
330 #endif
331 }
332 
start()333 void Controller::start()
334 {
335     if (settings_.server.host.empty())
336     {
337         browseMdns([this](const boost::system::error_code& ec, const std::string& host, uint16_t port) {
338             if (ec)
339             {
340                 LOG(ERROR, LOG_TAG) << "Failed to browse MDNS, error: " << ec.message() << "\n";
341             }
342             else
343             {
344                 settings_.server.host = host;
345                 settings_.server.port = port;
346                 LOG(INFO, LOG_TAG) << "Found server " << settings_.server.host << ":" << settings_.server.port << "\n";
347                 clientConnection_ = make_unique<ClientConnection>(io_context_, settings_.server);
348                 worker();
349             }
350         });
351     }
352     else
353     {
354         clientConnection_ = make_unique<ClientConnection>(io_context_, settings_.server);
355         worker();
356     }
357 }
358 
359 
360 // void Controller::stop()
361 // {
362 //     LOG(DEBUG, LOG_TAG) << "Stopping\n";
363 //     timer_.cancel();
364 // }
365 
reconnect()366 void Controller::reconnect()
367 {
368     timer_.cancel();
369     clientConnection_->disconnect();
370     player_.reset();
371     stream_.reset();
372     decoder_.reset();
373     timer_.expires_after(1s);
374     timer_.async_wait([this](const boost::system::error_code& ec) {
375         if (!ec)
376         {
377             worker();
378         }
379     });
380 }
381 
worker()382 void Controller::worker()
383 {
384     clientConnection_->connect([this](const boost::system::error_code& ec) {
385         if (!ec)
386         {
387             // LOG(INFO, LOG_TAG) << "Connected!\n";
388             string macAddress = clientConnection_->getMacAddress();
389             if (settings_.host_id.empty())
390                 settings_.host_id = ::getHostId(macAddress);
391 
392             // Say hello to the server
393             auto hello = std::make_shared<msg::Hello>(macAddress, settings_.host_id, settings_.instance);
394             clientConnection_->sendRequest<msg::ServerSettings>(
395                 hello, 2s, [this](const boost::system::error_code& ec, std::unique_ptr<msg::ServerSettings> response) mutable {
396                     if (ec)
397                     {
398                         LOG(ERROR, LOG_TAG) << "Failed to send hello request, error: " << ec.message() << "\n";
399                         reconnect();
400                         return;
401                     }
402                     else
403                     {
404                         serverSettings_ = std::move(response);
405                         LOG(INFO, LOG_TAG) << "ServerSettings - buffer: " << serverSettings_->getBufferMs() << ", latency: " << serverSettings_->getLatency()
406                                            << ", volume: " << serverSettings_->getVolume() << ", muted: " << serverSettings_->isMuted() << "\n";
407                     }
408                 });
409 
410             // Do initial time sync with the server
411             sendTimeSyncMessage(50);
412             // Start receiver loop
413             getNextMessage();
414         }
415         else
416         {
417             LOG(ERROR, LOG_TAG) << "Error: " << ec.message() << "\n";
418             reconnect();
419         }
420     });
421 }
422