1 /* Copyright 2016, Ableton AG, Berlin. All rights reserved.
2  *
3  *  This program is free software: you can redistribute it and/or modify
4  *  it under the terms of the GNU General Public License as published by
5  *  the Free Software Foundation, either version 2 of the License, or
6  *  (at your option) any later version.
7  *
8  *  This program is distributed in the hope that it will be useful,
9  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11  *  GNU General Public License for more details.
12  *
13  *  You should have received a copy of the GNU General Public License
14  *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
15  *
16  *  If you would like to incorporate Link into a proprietary software application,
17  *  please contact <link-devs@ableton.com>.
18  */
19 
20 #pragma once
21 
22 #include <ableton/discovery/Payload.hpp>
23 #include <ableton/discovery/Socket.hpp>
24 #include <ableton/link/PayloadEntries.hpp>
25 #include <ableton/link/PeerState.hpp>
26 #include <ableton/link/SessionId.hpp>
27 #include <ableton/link/v1/Messages.hpp>
28 #include <ableton/platforms/asio/AsioService.hpp>
29 #include <ableton/util/Injected.hpp>
30 #include <chrono>
31 #include <memory>
32 
33 namespace ableton
34 {
35 namespace link
36 {
37 
38 template <typename IoService, typename Clock, typename Socket, typename Log>
39 struct Measurement
40 {
41   using Point = std::pair<double, double>;
42   using Callback = std::function<void(std::vector<Point>)>;
43   using Micros = std::chrono::microseconds;
44   using Timer = typename IoService::Timer;
45 
46   static const std::size_t kNumberDataPoints = 100;
47   static const std::size_t kNumberMeasurements = 5;
48 
49   Measurement() = default;
50 
Measurementableton::link::Measurement51   Measurement(const PeerState& state,
52     Callback callback,
53     asio::ip::address_v4 address,
54     Clock clock,
55     util::Injected<Log> log)
56     : mpIo(new IoService{})
57     , mpImpl(std::make_shared<Impl>(*mpIo,
58         std::move(state),
59         std::move(callback),
60         std::move(address),
61         std::move(clock),
62         std::move(log)))
63   {
64     mpImpl->listen();
65   }
66 
Measurementableton::link::Measurement67   Measurement(Measurement&& rhs)
68     : mpIo(std::move(rhs.mpIo))
69     , mpImpl(std::move(rhs.mpImpl))
70   {
71   }
72 
~Measurementableton::link::Measurement73   ~Measurement()
74   {
75     postImplDestruction();
76   }
77 
operator =ableton::link::Measurement78   Measurement& operator=(Measurement&& rhs)
79   {
80     postImplDestruction();
81     mpIo = std::move(rhs.mpIo);
82     mpImpl = std::move(rhs.mpImpl);
83     return *this;
84   }
85 
postImplDestructionableton::link::Measurement86   void postImplDestruction()
87   {
88     // Post destruction of the impl object into the io thread if valid
89     if (mpIo)
90     {
91       mpIo->post(ImplDeleter{*this});
92     }
93   }
94 
95   struct Impl : std::enable_shared_from_this<Impl>
96   {
Implableton::link::Measurement::Impl97     Impl(IoService& io,
98       const PeerState& state,
99       Callback callback,
100       asio::ip::address_v4 address,
101       Clock clock,
102       util::Injected<Log> log)
103       : mpSocket(std::make_shared<Socket>(io))
104       , mSessionId(state.nodeState.sessionId)
105       , mEndpoint(state.endpoint)
106       , mCallback(std::move(callback))
107       , mClock(std::move(clock))
108       , mTimer(util::injectVal(io.makeTimer()))
109       , mMeasurementsStarted(0)
110       , mLog(std::move(log))
111       , mSuccess(false)
112     {
113       configureUnicastSocket(*mpSocket, address);
114 
115       const auto ht = HostTime{mClock.micros()};
116       sendPing(mEndpoint, discovery::makePayload(ht));
117       resetTimer();
118     }
119 
resetTimerableton::link::Measurement::Impl120     void resetTimer()
121     {
122       mTimer->cancel();
123       mTimer->expires_from_now(std::chrono::milliseconds(50));
124       mTimer->async_wait([this](const typename Timer::ErrorCode e) {
125         if (!e)
126         {
127           if (mMeasurementsStarted < kNumberMeasurements)
128           {
129             const auto ht = HostTime{mClock.micros()};
130             sendPing(mEndpoint, discovery::makePayload(ht));
131             ++mMeasurementsStarted;
132             resetTimer();
133           }
134           else
135           {
136             fail();
137           }
138         }
139       });
140     }
141 
listenableton::link::Measurement::Impl142     void listen()
143     {
144       mpSocket->receive(util::makeAsyncSafe(this->shared_from_this()));
145     }
146 
147     // Operator to handle incoming messages on the interface
148     template <typename It>
operator ()ableton::link::Measurement::Impl149     void operator()(
150       const asio::ip::udp::endpoint& from, const It messageBegin, const It messageEnd)
151     {
152       using namespace std;
153       const auto result = v1::parseMessageHeader(messageBegin, messageEnd);
154       const auto& header = result.first;
155       const auto payloadBegin = result.second;
156 
157       if (header.messageType == v1::kPong)
158       {
159         debug(*mLog) << "Received Pong message from " << from;
160 
161         // parse for all entries
162         SessionId sessionId{};
163         std::chrono::microseconds ghostTime{0};
164         std::chrono::microseconds prevGHostTime{0};
165         std::chrono::microseconds prevHostTime{0};
166 
167         try
168         {
169           discovery::parsePayload<SessionMembership, GHostTime, PrevGHostTime, HostTime>(
170             payloadBegin, messageEnd,
171             [&sessionId](const SessionMembership& sms) { sessionId = sms.sessionId; },
172             [&ghostTime](GHostTime gt) { ghostTime = std::move(gt.time); },
173             [&prevGHostTime](PrevGHostTime gt) { prevGHostTime = std::move(gt.time); },
174             [&prevHostTime](HostTime ht) { prevHostTime = std::move(ht.time); });
175         }
176         catch (const std::runtime_error& err)
177         {
178           warning(*mLog) << "Failed parsing payload, caught exception: " << err.what();
179           listen();
180           return;
181         }
182 
183         if (mSessionId == sessionId)
184         {
185           const auto hostTime = mClock.micros();
186 
187           const auto payload =
188             discovery::makePayload(HostTime{hostTime}, PrevGHostTime{ghostTime});
189 
190           sendPing(from, payload);
191           listen();
192 
193           if (prevGHostTime != Micros{0})
194           {
195             mData.push_back(
196               std::make_pair(static_cast<double>((hostTime + prevHostTime).count()) * 0.5,
197                 static_cast<double>(ghostTime.count())));
198             mData.push_back(std::make_pair(static_cast<double>(prevHostTime.count()),
199               static_cast<double>((ghostTime + prevGHostTime).count()) * 0.5));
200           }
201 
202           if (mData.size() > kNumberDataPoints)
203           {
204             finish();
205           }
206           else
207           {
208             resetTimer();
209           }
210         }
211         else
212         {
213           fail();
214         }
215       }
216       else
217       {
218         debug(*mLog) << "Received invalid message from " << from;
219         listen();
220       }
221     }
222 
223     template <typename Payload>
sendPingableton::link::Measurement::Impl224     void sendPing(asio::ip::udp::endpoint to, const Payload& payload)
225     {
226       v1::MessageBuffer buffer;
227       const auto msgBegin = std::begin(buffer);
228       const auto msgEnd = v1::pingMessage(payload, msgBegin);
229       const auto numBytes = static_cast<size_t>(std::distance(msgBegin, msgEnd));
230 
231       try
232       {
233         mpSocket->send(buffer.data(), numBytes, to);
234       }
235       catch (const std::runtime_error& err)
236       {
237         info(*mLog) << "Failed to send Ping to " << to.address().to_string() << ": "
238                     << err.what();
239       }
240     }
241 
finishableton::link::Measurement::Impl242     void finish()
243     {
244       mTimer->cancel();
245       mCallback(std::move(mData));
246       mData = {};
247       mSuccess = true;
248       debug(*mLog) << "Measuring " << mEndpoint << " done.";
249     }
250 
failableton::link::Measurement::Impl251     void fail()
252     {
253       mCallback(std::vector<Point>{});
254       mData = {};
255       debug(*mLog) << "Measuring " << mEndpoint << " failed.";
256     }
257 
258     std::shared_ptr<Socket> mpSocket;
259     SessionId mSessionId;
260     asio::ip::udp::endpoint mEndpoint;
261     std::vector<std::pair<double, double>> mData;
262     Callback mCallback;
263     Clock mClock;
264     util::Injected<typename IoService::Timer> mTimer;
265     std::size_t mMeasurementsStarted;
266     util::Injected<Log> mLog;
267     bool mSuccess;
268   };
269 
270   struct ImplDeleter
271   {
ImplDeleterableton::link::Measurement::ImplDeleter272     ImplDeleter(Measurement& measurement)
273       : mpImpl(std::move(measurement.mpImpl))
274     {
275     }
276 
operator ()ableton::link::Measurement::ImplDeleter277     void operator()()
278     {
279       // Notify callback that the measurement has failed if it did
280       // not succeed before destruction
281       if (!mpImpl->mSuccess)
282       {
283         mpImpl->fail();
284       }
285       mpImpl.reset();
286     }
287 
288     std::shared_ptr<Impl> mpImpl;
289   };
290 
291   std::unique_ptr<IoService> mpIo;
292   std::shared_ptr<Impl> mpImpl;
293 };
294 
295 } // namespace link
296 } // namespace ableton
297