1 /* Copyright 2016, Ableton AG, Berlin. All rights reserved. 2 * 3 * This program is free software: you can redistribute it and/or modify 4 * it under the terms of the GNU General Public License as published by 5 * the Free Software Foundation, either version 2 of the License, or 6 * (at your option) any later version. 7 * 8 * This program is distributed in the hope that it will be useful, 9 * but WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 * GNU General Public License for more details. 12 * 13 * You should have received a copy of the GNU General Public License 14 * along with this program. If not, see <http://www.gnu.org/licenses/>. 15 * 16 * If you would like to incorporate Link into a proprietary software application, 17 * please contact <link-devs@ableton.com>. 18 */ 19 20 #pragma once 21 22 #include <ableton/discovery/Payload.hpp> 23 #include <ableton/discovery/Socket.hpp> 24 #include <ableton/link/PayloadEntries.hpp> 25 #include <ableton/link/PeerState.hpp> 26 #include <ableton/link/SessionId.hpp> 27 #include <ableton/link/v1/Messages.hpp> 28 #include <ableton/platforms/asio/AsioService.hpp> 29 #include <ableton/util/Injected.hpp> 30 #include <chrono> 31 #include <memory> 32 33 namespace ableton 34 { 35 namespace link 36 { 37 38 template <typename IoService, typename Clock, typename Socket, typename Log> 39 struct Measurement 40 { 41 using Point = std::pair<double, double>; 42 using Callback = std::function<void(std::vector<Point>)>; 43 using Micros = std::chrono::microseconds; 44 using Timer = typename IoService::Timer; 45 46 static const std::size_t kNumberDataPoints = 100; 47 static const std::size_t kNumberMeasurements = 5; 48 49 Measurement() = default; 50 Measurementableton::link::Measurement51 Measurement(const PeerState& state, 52 Callback callback, 53 asio::ip::address_v4 address, 54 Clock clock, 55 util::Injected<Log> log) 56 : mpIo(new IoService{}) 57 , mpImpl(std::make_shared<Impl>(*mpIo, 58 std::move(state), 59 std::move(callback), 60 std::move(address), 61 std::move(clock), 62 std::move(log))) 63 { 64 mpImpl->listen(); 65 } 66 Measurementableton::link::Measurement67 Measurement(Measurement&& rhs) 68 : mpIo(std::move(rhs.mpIo)) 69 , mpImpl(std::move(rhs.mpImpl)) 70 { 71 } 72 ~Measurementableton::link::Measurement73 ~Measurement() 74 { 75 postImplDestruction(); 76 } 77 operator =ableton::link::Measurement78 Measurement& operator=(Measurement&& rhs) 79 { 80 postImplDestruction(); 81 mpIo = std::move(rhs.mpIo); 82 mpImpl = std::move(rhs.mpImpl); 83 return *this; 84 } 85 postImplDestructionableton::link::Measurement86 void postImplDestruction() 87 { 88 // Post destruction of the impl object into the io thread if valid 89 if (mpIo) 90 { 91 mpIo->post(ImplDeleter{*this}); 92 } 93 } 94 95 struct Impl : std::enable_shared_from_this<Impl> 96 { Implableton::link::Measurement::Impl97 Impl(IoService& io, 98 const PeerState& state, 99 Callback callback, 100 asio::ip::address_v4 address, 101 Clock clock, 102 util::Injected<Log> log) 103 : mpSocket(std::make_shared<Socket>(io)) 104 , mSessionId(state.nodeState.sessionId) 105 , mEndpoint(state.endpoint) 106 , mCallback(std::move(callback)) 107 , mClock(std::move(clock)) 108 , mTimer(util::injectVal(io.makeTimer())) 109 , mMeasurementsStarted(0) 110 , mLog(std::move(log)) 111 , mSuccess(false) 112 { 113 configureUnicastSocket(*mpSocket, address); 114 115 const auto ht = HostTime{mClock.micros()}; 116 sendPing(mEndpoint, discovery::makePayload(ht)); 117 resetTimer(); 118 } 119 resetTimerableton::link::Measurement::Impl120 void resetTimer() 121 { 122 mTimer->cancel(); 123 mTimer->expires_from_now(std::chrono::milliseconds(50)); 124 mTimer->async_wait([this](const typename Timer::ErrorCode e) { 125 if (!e) 126 { 127 if (mMeasurementsStarted < kNumberMeasurements) 128 { 129 const auto ht = HostTime{mClock.micros()}; 130 sendPing(mEndpoint, discovery::makePayload(ht)); 131 ++mMeasurementsStarted; 132 resetTimer(); 133 } 134 else 135 { 136 fail(); 137 } 138 } 139 }); 140 } 141 listenableton::link::Measurement::Impl142 void listen() 143 { 144 mpSocket->receive(util::makeAsyncSafe(this->shared_from_this())); 145 } 146 147 // Operator to handle incoming messages on the interface 148 template <typename It> operator ()ableton::link::Measurement::Impl149 void operator()( 150 const asio::ip::udp::endpoint& from, const It messageBegin, const It messageEnd) 151 { 152 using namespace std; 153 const auto result = v1::parseMessageHeader(messageBegin, messageEnd); 154 const auto& header = result.first; 155 const auto payloadBegin = result.second; 156 157 if (header.messageType == v1::kPong) 158 { 159 debug(*mLog) << "Received Pong message from " << from; 160 161 // parse for all entries 162 SessionId sessionId{}; 163 std::chrono::microseconds ghostTime{0}; 164 std::chrono::microseconds prevGHostTime{0}; 165 std::chrono::microseconds prevHostTime{0}; 166 167 try 168 { 169 discovery::parsePayload<SessionMembership, GHostTime, PrevGHostTime, HostTime>( 170 payloadBegin, messageEnd, 171 [&sessionId](const SessionMembership& sms) { sessionId = sms.sessionId; }, 172 [&ghostTime](GHostTime gt) { ghostTime = std::move(gt.time); }, 173 [&prevGHostTime](PrevGHostTime gt) { prevGHostTime = std::move(gt.time); }, 174 [&prevHostTime](HostTime ht) { prevHostTime = std::move(ht.time); }); 175 } 176 catch (const std::runtime_error& err) 177 { 178 warning(*mLog) << "Failed parsing payload, caught exception: " << err.what(); 179 listen(); 180 return; 181 } 182 183 if (mSessionId == sessionId) 184 { 185 const auto hostTime = mClock.micros(); 186 187 const auto payload = 188 discovery::makePayload(HostTime{hostTime}, PrevGHostTime{ghostTime}); 189 190 sendPing(from, payload); 191 listen(); 192 193 if (prevGHostTime != Micros{0}) 194 { 195 mData.push_back( 196 std::make_pair(static_cast<double>((hostTime + prevHostTime).count()) * 0.5, 197 static_cast<double>(ghostTime.count()))); 198 mData.push_back(std::make_pair(static_cast<double>(prevHostTime.count()), 199 static_cast<double>((ghostTime + prevGHostTime).count()) * 0.5)); 200 } 201 202 if (mData.size() > kNumberDataPoints) 203 { 204 finish(); 205 } 206 else 207 { 208 resetTimer(); 209 } 210 } 211 else 212 { 213 fail(); 214 } 215 } 216 else 217 { 218 debug(*mLog) << "Received invalid message from " << from; 219 listen(); 220 } 221 } 222 223 template <typename Payload> sendPingableton::link::Measurement::Impl224 void sendPing(asio::ip::udp::endpoint to, const Payload& payload) 225 { 226 v1::MessageBuffer buffer; 227 const auto msgBegin = std::begin(buffer); 228 const auto msgEnd = v1::pingMessage(payload, msgBegin); 229 const auto numBytes = static_cast<size_t>(std::distance(msgBegin, msgEnd)); 230 231 try 232 { 233 mpSocket->send(buffer.data(), numBytes, to); 234 } 235 catch (const std::runtime_error& err) 236 { 237 info(*mLog) << "Failed to send Ping to " << to.address().to_string() << ": " 238 << err.what(); 239 } 240 } 241 finishableton::link::Measurement::Impl242 void finish() 243 { 244 mTimer->cancel(); 245 mCallback(std::move(mData)); 246 mData = {}; 247 mSuccess = true; 248 debug(*mLog) << "Measuring " << mEndpoint << " done."; 249 } 250 failableton::link::Measurement::Impl251 void fail() 252 { 253 mCallback(std::vector<Point>{}); 254 mData = {}; 255 debug(*mLog) << "Measuring " << mEndpoint << " failed."; 256 } 257 258 std::shared_ptr<Socket> mpSocket; 259 SessionId mSessionId; 260 asio::ip::udp::endpoint mEndpoint; 261 std::vector<std::pair<double, double>> mData; 262 Callback mCallback; 263 Clock mClock; 264 util::Injected<typename IoService::Timer> mTimer; 265 std::size_t mMeasurementsStarted; 266 util::Injected<Log> mLog; 267 bool mSuccess; 268 }; 269 270 struct ImplDeleter 271 { ImplDeleterableton::link::Measurement::ImplDeleter272 ImplDeleter(Measurement& measurement) 273 : mpImpl(std::move(measurement.mpImpl)) 274 { 275 } 276 operator ()ableton::link::Measurement::ImplDeleter277 void operator()() 278 { 279 // Notify callback that the measurement has failed if it did 280 // not succeed before destruction 281 if (!mpImpl->mSuccess) 282 { 283 mpImpl->fail(); 284 } 285 mpImpl.reset(); 286 } 287 288 std::shared_ptr<Impl> mpImpl; 289 }; 290 291 std::unique_ptr<IoService> mpIo; 292 std::shared_ptr<Impl> mpImpl; 293 }; 294 295 } // namespace link 296 } // namespace ableton 297