1 /* Copyright 2016, Ableton AG, Berlin. All rights reserved.
2  *
3  *  This program is free software: you can redistribute it and/or modify
4  *  it under the terms of the GNU General Public License as published by
5  *  the Free Software Foundation, either version 2 of the License, or
6  *  (at your option) any later version.
7  *
8  *  This program is distributed in the hope that it will be useful,
9  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11  *  GNU General Public License for more details.
12  *
13  *  You should have received a copy of the GNU General Public License
14  *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
15  *
16  *  If you would like to incorporate Link into a proprietary software application,
17  *  please contact <link-devs@ableton.com>.
18  */
19 
20 #pragma once
21 
22 #include <ableton/discovery/Service.hpp>
23 #include <ableton/link/CircularFifo.hpp>
24 #include <ableton/link/ClientSessionTimelines.hpp>
25 #include <ableton/link/Gateway.hpp>
26 #include <ableton/link/GhostXForm.hpp>
27 #include <ableton/link/NodeState.hpp>
28 #include <ableton/link/Peers.hpp>
29 #include <ableton/link/SessionState.hpp>
30 #include <ableton/link/Sessions.hpp>
31 #include <ableton/link/StartStopState.hpp>
32 #include <mutex>
33 
34 namespace ableton
35 {
36 namespace link
37 {
38 namespace detail
39 {
40 
41 template <typename Clock>
initXForm(const Clock & clock)42 GhostXForm initXForm(const Clock& clock)
43 {
44   // Make the current time map to a ghost time of 0 with ghost time
45   // increasing at the same rate as clock time
46   return {1.0, -clock.micros()};
47 }
48 
49 template <typename Clock>
initSessionState(const Tempo tempo,const Clock & clock)50 inline SessionState initSessionState(const Tempo tempo, const Clock& clock)
51 {
52   using namespace std::chrono;
53   return {clampTempo(Timeline{tempo, Beats{0.}, microseconds{0}}),
54     StartStopState{false, Beats{0.}, microseconds{0}}, initXForm(clock)};
55 }
56 
initClientState(const SessionState & sessionState)57 inline ClientState initClientState(const SessionState& sessionState)
58 {
59   const auto hostTime = sessionState.ghostXForm.ghostToHost(std::chrono::microseconds{0});
60   return {
61     Timeline{sessionState.timeline.tempo, sessionState.timeline.beatOrigin, hostTime},
62     StartStopState{sessionState.startStopState.isPlaying,
63       sessionState.startStopState.beats, hostTime}};
64 }
65 
initRtClientState(const ClientState & clientState)66 inline RtClientState initRtClientState(const ClientState& clientState)
67 {
68   using namespace std::chrono;
69   return {
70     clientState.timeline, clientState.startStopState, microseconds{0}, microseconds{0}};
71 }
72 
73 // The timespan in which local modifications to the timeline will be
74 // preferred over any modifications coming from the network.
75 const auto kLocalModGracePeriod = std::chrono::milliseconds(1000);
76 const auto kRtHandlerFallbackPeriod = kLocalModGracePeriod / 2;
77 
selectPreferredStartStopState(const StartStopState currentStartStopState,const StartStopState startStopState)78 inline StartStopState selectPreferredStartStopState(
79   const StartStopState currentStartStopState, const StartStopState startStopState)
80 {
81   return startStopState.timestamp > currentStartStopState.timestamp
82            ? startStopState
83            : currentStartStopState;
84 }
85 
mapStartStopStateFromSessionToClient(const StartStopState & sessionStartStopState,const Timeline & clientTimeline,const Timeline & sessionTimeline,const GhostXForm & xForm)86 inline StartStopState mapStartStopStateFromSessionToClient(
87   const StartStopState& sessionStartStopState,
88   const Timeline& clientTimeline,
89   const Timeline& sessionTimeline,
90   const GhostXForm& xForm)
91 {
92   const auto clientBeats = clientTimeline.toBeats(
93     xForm.ghostToHost(sessionTimeline.fromBeats(sessionStartStopState.beats)));
94   const auto clientTime = xForm.ghostToHost(sessionStartStopState.timestamp);
95   return StartStopState{sessionStartStopState.isPlaying, clientBeats, clientTime};
96 }
97 
mapStartStopStateFromClientToSession(const StartStopState & clientStartStopState,const Timeline & clientTimeline,const Timeline & sessionTimeline,const GhostXForm & xForm)98 inline StartStopState mapStartStopStateFromClientToSession(
99   const StartStopState& clientStartStopState,
100   const Timeline& clientTimeline,
101   const Timeline& sessionTimeline,
102   const GhostXForm& xForm)
103 {
104   const auto sessionBeats = sessionTimeline.toBeats(
105     xForm.hostToGhost(clientTimeline.fromBeats(clientStartStopState.beats)));
106   const auto sessionTime = xForm.hostToGhost(clientStartStopState.timestamp);
107   return StartStopState{clientStartStopState.isPlaying, sessionBeats, sessionTime};
108 }
109 
110 } // namespace detail
111 
112 // function types corresponding to the Controller callback type params
113 using PeerCountCallback = std::function<void(std::size_t)>;
114 using TempoCallback = std::function<void(ableton::link::Tempo)>;
115 using StartStopStateCallback = std::function<void(bool)>;
116 
117 
118 // The main Link controller
119 template <typename PeerCountCallback,
120   typename TempoCallback,
121   typename StartStopStateCallback,
122   typename Clock,
123   typename IoContext>
124 class Controller
125 {
126 public:
Controller(Tempo tempo,PeerCountCallback peerCallback,TempoCallback tempoCallback,StartStopStateCallback startStopStateCallback,Clock clock,util::Injected<IoContext> io)127   Controller(Tempo tempo,
128     PeerCountCallback peerCallback,
129     TempoCallback tempoCallback,
130     StartStopStateCallback startStopStateCallback,
131     Clock clock,
132     util::Injected<IoContext> io)
133     : mTempoCallback(std::move(tempoCallback))
134     , mStartStopStateCallback(std::move(startStopStateCallback))
135     , mClock(std::move(clock))
136     , mNodeId(NodeId::random())
137     , mSessionId(mNodeId)
138     , mSessionState(detail::initSessionState(tempo, mClock))
139     , mClientState(detail::initClientState(mSessionState))
140     , mLastIsPlayingForStartStopStateCallback(false)
141     , mRtClientState(detail::initRtClientState(mClientState))
142     , mHasPendingRtClientStates(false)
143     , mSessionPeerCounter(*this, std::move(peerCallback))
144     , mEnabled(false)
145     , mStartStopSyncEnabled(false)
146     , mIo(std::move(io))
147     , mRtClientStateSetter(*this)
148     , mPeers(util::injectRef(*mIo),
149         std::ref(mSessionPeerCounter),
150         SessionTimelineCallback{*this},
151         SessionStartStopStateCallback{*this})
152     , mSessions(
153         {mSessionId, mSessionState.timeline, {mSessionState.ghostXForm, mClock.micros()}},
154         util::injectRef(mPeers),
155         MeasurePeer{*this},
156         JoinSessionCallback{*this},
157         util::injectRef(*mIo),
158         mClock)
159     , mDiscovery(std::make_pair(NodeState{mNodeId, mSessionId, mSessionState.timeline,
160                                   mSessionState.startStopState},
161                    mSessionState.ghostXForm),
162         GatewayFactory{*this},
163         util::injectVal(mIo->clone(UdpSendExceptionHandler{*this})))
164   {
165   }
166 
167   Controller(const Controller&) = delete;
168   Controller(Controller&&) = delete;
169 
170   Controller& operator=(const Controller&) = delete;
171   Controller& operator=(Controller&&) = delete;
172 
enable(const bool bEnable)173   void enable(const bool bEnable)
174   {
175     const bool bWasEnabled = mEnabled.exchange(bEnable);
176     if (bWasEnabled != bEnable)
177     {
178       mIo->async([this, bEnable] {
179         if (bEnable)
180         {
181           // Always reset when first enabling to avoid hijacking
182           // tempo in existing sessions
183           resetState();
184         }
185         mDiscovery.enable(bEnable);
186       });
187     }
188   }
189 
isEnabled() const190   bool isEnabled() const
191   {
192     return mEnabled;
193   }
194 
enableStartStopSync(const bool bEnable)195   void enableStartStopSync(const bool bEnable)
196   {
197     mStartStopSyncEnabled = bEnable;
198   }
199 
isStartStopSyncEnabled() const200   bool isStartStopSyncEnabled() const
201   {
202     return mStartStopSyncEnabled;
203   }
204 
numPeers() const205   std::size_t numPeers() const
206   {
207     return mSessionPeerCounter.mSessionPeerCount;
208   }
209 
210   // Get the current Link client state. Thread-safe but may block, so
211   // it cannot be used from audio thread.
clientState() const212   ClientState clientState() const
213   {
214     std::lock_guard<std::mutex> lock(mClientStateGuard);
215     return mClientState;
216   }
217 
218   // Set the client state to be used, starting at the given time.
219   // Thread-safe but may block, so it cannot be used from audio thread.
setClientState(IncomingClientState newClientState)220   void setClientState(IncomingClientState newClientState)
221   {
222     {
223       std::lock_guard<std::mutex> lock(mClientStateGuard);
224       if (newClientState.timeline)
225       {
226         *newClientState.timeline = clampTempo(*newClientState.timeline);
227         mClientState.timeline = *newClientState.timeline;
228       }
229       if (newClientState.startStopState)
230       {
231         // Prevent updating client start stop state with an outdated start stop state
232         *newClientState.startStopState = detail::selectPreferredStartStopState(
233           mClientState.startStopState, *newClientState.startStopState);
234         mClientState.startStopState = *newClientState.startStopState;
235       }
236     }
237 
238     mIo->async([this, newClientState] { handleClientState(newClientState); });
239   }
240 
241   // Non-blocking client state access for a realtime context. NOT
242   // thread-safe. Must not be called from multiple threads
243   // concurrently and must not be called concurrently with setClientStateRtSafe.
clientStateRtSafe() const244   ClientState clientStateRtSafe() const
245   {
246     // Respect the session state guard and the client state guard but don't
247     // block on them. If we can't access one or both because of concurrent modification
248     // we fall back to our cached version of the timeline and/or start stop state.
249 
250     if (!mHasPendingRtClientStates)
251     {
252       const auto now = mClock.micros();
253       const auto timelineGracePeriodOver =
254         now - mRtClientState.timelineTimestamp > detail::kLocalModGracePeriod;
255       const auto startStopStateGracePeriodOver =
256         now - mRtClientState.startStopStateTimestamp > detail::kLocalModGracePeriod;
257 
258       if ((timelineGracePeriodOver || startStopStateGracePeriodOver)
259           && mClientStateGuard.try_lock())
260       {
261         const auto clientState = mClientState;
262         mClientStateGuard.unlock();
263 
264         if (timelineGracePeriodOver && clientState.timeline != mRtClientState.timeline)
265         {
266           mRtClientState.timeline = clientState.timeline;
267         }
268 
269         if (startStopStateGracePeriodOver
270             && clientState.startStopState != mRtClientState.startStopState)
271         {
272           mRtClientState.startStopState = clientState.startStopState;
273         }
274       }
275     }
276 
277     return {mRtClientState.timeline, mRtClientState.startStopState};
278   }
279 
280   // should only be called from the audio thread
setClientStateRtSafe(IncomingClientState newClientState)281   void setClientStateRtSafe(IncomingClientState newClientState)
282   {
283     if (!newClientState.timeline && !newClientState.startStopState)
284     {
285       return;
286     }
287 
288     if (newClientState.timeline)
289     {
290       *newClientState.timeline = clampTempo(*newClientState.timeline);
291     }
292 
293     if (newClientState.startStopState)
294     {
295       // Prevent updating client start stop state with an outdated start stop state
296       *newClientState.startStopState = detail::selectPreferredStartStopState(
297         mRtClientState.startStopState, *newClientState.startStopState);
298     }
299 
300     // This flag ensures that mRtClientState is only updated after all incoming
301     // client states were processed
302     mHasPendingRtClientStates = true;
303     // This will fail in case the Fifo in the RtClientStateSetter is full. This indicates
304     // a very high rate of calls to the setter. In this case we ignore one value because
305     // we expect the setter to be called again soon.
306     if (mRtClientStateSetter.tryPush(newClientState))
307     {
308       const auto now = mClock.micros();
309       // Cache the new timeline and StartStopState for serving back to the client
310       if (newClientState.timeline)
311       {
312         // Cache the new timeline and StartStopState for serving back to the client
313         mRtClientState.timeline = *newClientState.timeline;
314         mRtClientState.timelineTimestamp = makeRtTimestamp(now);
315       }
316       if (newClientState.startStopState)
317       {
318         mRtClientState.startStopState = *newClientState.startStopState;
319         mRtClientState.startStopStateTimestamp = makeRtTimestamp(now);
320       }
321     }
322   }
323 
324 private:
makeRtTimestamp(const std::chrono::microseconds now) const325   std::chrono::microseconds makeRtTimestamp(const std::chrono::microseconds now) const
326   {
327     return isEnabled() ? now : std::chrono::microseconds(0);
328   }
329 
invokeStartStopStateCallbackIfChanged()330   void invokeStartStopStateCallbackIfChanged()
331   {
332     bool shouldInvokeCallback = false;
333     {
334       std::lock_guard<std::mutex> lock(mClientStateGuard);
335       shouldInvokeCallback =
336         mLastIsPlayingForStartStopStateCallback != mClientState.startStopState.isPlaying;
337       mLastIsPlayingForStartStopStateCallback = mClientState.startStopState.isPlaying;
338     }
339 
340     if (shouldInvokeCallback)
341     {
342       mStartStopStateCallback(mLastIsPlayingForStartStopStateCallback);
343     }
344   }
345 
updateDiscovery()346   void updateDiscovery()
347   {
348     // Push the change to the discovery service
349     mDiscovery.updateNodeState(
350       std::make_pair(NodeState{mNodeId, mSessionId, mSessionState.timeline,
351                        mSessionState.startStopState},
352         mSessionState.ghostXForm));
353   }
354 
updateSessionTiming(const Timeline newTimeline,const GhostXForm newXForm)355   void updateSessionTiming(const Timeline newTimeline, const GhostXForm newXForm)
356   {
357     const auto oldTimeline = mSessionState.timeline;
358     const auto oldXForm = mSessionState.ghostXForm;
359 
360     if (oldTimeline != newTimeline || oldXForm != newXForm)
361     {
362       {
363         std::lock_guard<std::mutex> lock(mSessionStateGuard);
364         mSessionState.timeline = newTimeline;
365         mSessionState.ghostXForm = newXForm;
366       }
367 
368       // Update the client timeline based on the new session timing data
369       {
370         std::lock_guard<std::mutex> lock(mClientStateGuard);
371         mClientState.timeline = updateClientTimelineFromSession(mClientState.timeline,
372           mSessionState.timeline, mClock.micros(), mSessionState.ghostXForm);
373       }
374 
375       if (oldTimeline.tempo != newTimeline.tempo)
376       {
377         mTempoCallback(newTimeline.tempo);
378       }
379     }
380   }
381 
handleTimelineFromSession(SessionId id,Timeline timeline)382   void handleTimelineFromSession(SessionId id, Timeline timeline)
383   {
384     debug(mIo->log()) << "Received timeline with tempo: " << timeline.tempo.bpm()
385                       << " for session: " << id;
386     updateSessionTiming(mSessions.sawSessionTimeline(std::move(id), std::move(timeline)),
387       mSessionState.ghostXForm);
388     updateDiscovery();
389   }
390 
resetSessionStartStopState()391   void resetSessionStartStopState()
392   {
393     mSessionState.startStopState = StartStopState{};
394   }
395 
handleStartStopStateFromSession(SessionId sessionId,StartStopState startStopState)396   void handleStartStopStateFromSession(SessionId sessionId, StartStopState startStopState)
397   {
398     debug(mIo->log()) << "Received start stop state. isPlaying: "
399                       << startStopState.isPlaying
400                       << ", beats: " << startStopState.beats.floating()
401                       << ", time: " << startStopState.timestamp.count()
402                       << " for session: " << sessionId;
403     if (sessionId == mSessionId
404         && startStopState.timestamp > mSessionState.startStopState.timestamp)
405     {
406       mSessionState.startStopState = startStopState;
407 
408       // Always propagate the session start stop state so even a client that doesn't have
409       // the feature enabled can function as a relay.
410       updateDiscovery();
411 
412       if (mStartStopSyncEnabled)
413       {
414         {
415           std::lock_guard<std::mutex> lock(mClientStateGuard);
416           mClientState.startStopState =
417             detail::mapStartStopStateFromSessionToClient(startStopState,
418               mClientState.timeline, mSessionState.timeline, mSessionState.ghostXForm);
419         }
420         invokeStartStopStateCallbackIfChanged();
421       }
422     }
423   }
424 
handleClientState(const IncomingClientState clientState)425   void handleClientState(const IncomingClientState clientState)
426   {
427     auto mustUpdateDiscovery = false;
428 
429     if (clientState.timeline)
430     {
431       auto sessionTimeline = updateSessionTimelineFromClient(mSessionState.timeline,
432         *clientState.timeline, clientState.timelineTimestamp, mSessionState.ghostXForm);
433 
434       mSessions.resetTimeline(sessionTimeline);
435       mPeers.setSessionTimeline(mSessionId, sessionTimeline);
436       updateSessionTiming(std::move(sessionTimeline), mSessionState.ghostXForm);
437 
438       mustUpdateDiscovery = true;
439     }
440 
441     if (mStartStopSyncEnabled && clientState.startStopState)
442     {
443       // Prevent updating with an outdated start stop state
444       const auto newGhostTime =
445         mSessionState.ghostXForm.hostToGhost(clientState.startStopState->timestamp);
446       if (newGhostTime > mSessionState.startStopState.timestamp)
447       {
448         {
449           std::lock_guard<std::mutex> lock(mClientStateGuard);
450           mSessionState.startStopState =
451             detail::mapStartStopStateFromClientToSession(*clientState.startStopState,
452               mClientState.timeline, mSessionState.timeline, mSessionState.ghostXForm);
453           mClientState.startStopState = *clientState.startStopState;
454         }
455 
456         mustUpdateDiscovery = true;
457       }
458     }
459 
460     if (mustUpdateDiscovery)
461     {
462       updateDiscovery();
463     }
464 
465     invokeStartStopStateCallbackIfChanged();
466   }
467 
handleRtClientState(IncomingClientState clientState)468   void handleRtClientState(IncomingClientState clientState)
469   {
470     {
471       std::lock_guard<std::mutex> lock(mClientStateGuard);
472       if (clientState.timeline)
473       {
474         mClientState.timeline = *clientState.timeline;
475       }
476       if (clientState.startStopState)
477       {
478         // Prevent updating client start stop state with an outdated start stop state
479         *clientState.startStopState = detail::selectPreferredStartStopState(
480           mClientState.startStopState, *clientState.startStopState);
481         mClientState.startStopState = *clientState.startStopState;
482       }
483     }
484 
485     handleClientState(clientState);
486     mHasPendingRtClientStates = false;
487   }
488 
joinSession(const Session & session)489   void joinSession(const Session& session)
490   {
491     const bool sessionIdChanged = mSessionId != session.sessionId;
492     mSessionId = session.sessionId;
493 
494     // Prevent passing the start stop state of the previous session to the new one.
495     if (sessionIdChanged)
496     {
497       resetSessionStartStopState();
498     }
499 
500     updateSessionTiming(session.timeline, session.measurement.xform);
501     updateDiscovery();
502 
503     if (sessionIdChanged)
504     {
505       debug(mIo->log()) << "Joining session " << session.sessionId << " with tempo "
506                         << session.timeline.tempo.bpm();
507       mSessionPeerCounter();
508     }
509   }
510 
resetState()511   void resetState()
512   {
513     mNodeId = NodeId::random();
514     mSessionId = mNodeId;
515 
516     const auto xform = detail::initXForm(mClock);
517     const auto hostTime = -xform.intercept;
518     // When creating the new timeline, make it continuous by finding
519     // the beat on the old session timeline corresponding to the
520     // current host time and mapping it to the new ghost time
521     // representation of the current host time.
522     const auto newTl = Timeline{mSessionState.timeline.tempo,
523       mSessionState.timeline.toBeats(mSessionState.ghostXForm.hostToGhost(hostTime)),
524       xform.hostToGhost(hostTime)};
525 
526     resetSessionStartStopState();
527 
528     updateSessionTiming(newTl, xform);
529     updateDiscovery();
530 
531     mSessions.resetSession({mNodeId, newTl, {xform, hostTime}});
532     mPeers.resetPeers();
533   }
534 
535   struct SessionTimelineCallback
536   {
operator ()ableton::link::Controller::SessionTimelineCallback537     void operator()(SessionId id, Timeline timeline)
538     {
539       mController.handleTimelineFromSession(std::move(id), std::move(timeline));
540     }
541 
542     Controller& mController;
543   };
544 
545   struct RtClientStateSetter
546   {
547     using CallbackDispatcher =
548       typename IoContext::template LockFreeCallbackDispatcher<std::function<void()>,
549         std::chrono::milliseconds>;
550 
RtClientStateSetterableton::link::Controller::RtClientStateSetter551     RtClientStateSetter(Controller& controller)
552       : mController(controller)
553       , mCallbackDispatcher(
554           [this] { processPendingClientStates(); }, detail::kRtHandlerFallbackPeriod)
555     {
556     }
557 
tryPushableton::link::Controller::RtClientStateSetter558     bool tryPush(const IncomingClientState clientState)
559     {
560       const auto success = mClientStateFifo.push(clientState);
561       if (success)
562       {
563         mCallbackDispatcher.invoke();
564       }
565       return success;
566     }
567 
568   private:
buildMergedPendingClientStateableton::link::Controller::RtClientStateSetter569     IncomingClientState buildMergedPendingClientState()
570     {
571       auto clientState = IncomingClientState{};
572       while (const auto result = mClientStateFifo.pop())
573       {
574         if (result->timeline)
575         {
576           clientState.timeline = result->timeline;
577           clientState.timelineTimestamp = result->timelineTimestamp;
578         }
579         if (result->startStopState)
580         {
581           clientState.startStopState = result->startStopState;
582         }
583       }
584       return clientState;
585     }
586 
processPendingClientStatesableton::link::Controller::RtClientStateSetter587     void processPendingClientStates()
588     {
589       const auto clientState = buildMergedPendingClientState();
590       mController.mIo->async(
591         [this, clientState]() { mController.handleRtClientState(clientState); });
592     }
593 
594     Controller& mController;
595     // Assuming a wake up time of one ms for the threads owned by the CallbackDispatcher
596     // and the ioService, buffering 16 client states allows to set eight client states
597     // per ms.
598     static const std::size_t kBufferSize = 16;
599     CircularFifo<IncomingClientState, kBufferSize> mClientStateFifo;
600     CallbackDispatcher mCallbackDispatcher;
601   };
602 
603   struct SessionStartStopStateCallback
604   {
operator ()ableton::link::Controller::SessionStartStopStateCallback605     void operator()(SessionId sessionId, StartStopState startStopState)
606     {
607       mController.handleStartStopStateFromSession(sessionId, startStopState);
608     }
609 
610     Controller& mController;
611   };
612 
613   struct SessionPeerCounter
614   {
SessionPeerCounterableton::link::Controller::SessionPeerCounter615     SessionPeerCounter(Controller& controller, PeerCountCallback callback)
616       : mController(controller)
617       , mCallback(std::move(callback))
618       , mSessionPeerCount(0)
619     {
620     }
621 
operator ()ableton::link::Controller::SessionPeerCounter622     void operator()()
623     {
624       const auto count =
625         mController.mPeers.uniqueSessionPeerCount(mController.mSessionId);
626       const auto oldCount = mSessionPeerCount.exchange(count);
627       if (oldCount != count)
628       {
629         if (count == 0)
630         {
631           // When the count goes down to zero, completely reset the
632           // state, effectively founding a new session
633           mController.resetState();
634         }
635         mCallback(count);
636       }
637     }
638 
639     Controller& mController;
640     PeerCountCallback mCallback;
641     std::atomic<std::size_t> mSessionPeerCount;
642   };
643 
644   struct MeasurePeer
645   {
646     template <typename Peer, typename Handler>
operator ()ableton::link::Controller::MeasurePeer647     void operator()(Peer peer, Handler handler)
648     {
649       using It = typename Discovery::ServicePeerGateways::GatewayMap::iterator;
650       using ValueType = typename Discovery::ServicePeerGateways::GatewayMap::value_type;
651       mController.mDiscovery.withGatewaysAsync([peer, handler](It begin, const It end) {
652         const auto addr = peer.second;
653         const auto it = std::find_if(
654           begin, end, [&addr](const ValueType& vt) { return vt.first == addr; });
655         if (it != end)
656         {
657           it->second->measurePeer(std::move(peer.first), std::move(handler));
658         }
659         else
660         {
661           // invoke the handler with an empty result if we couldn't
662           // find the peer's gateway
663           handler(GhostXForm{});
664         }
665       });
666     }
667 
668     Controller& mController;
669   };
670 
671   struct JoinSessionCallback
672   {
operator ()ableton::link::Controller::JoinSessionCallback673     void operator()(Session session)
674     {
675       mController.joinSession(std::move(session));
676     }
677 
678     Controller& mController;
679   };
680 
681   using IoType = typename util::Injected<IoContext>::type;
682 
683   using ControllerPeers = Peers<IoType&,
684     std::reference_wrapper<SessionPeerCounter>,
685     SessionTimelineCallback,
686     SessionStartStopStateCallback>;
687 
688   using ControllerGateway =
689     Gateway<typename ControllerPeers::GatewayObserver, Clock, IoType&>;
690   using GatewayPtr = std::shared_ptr<ControllerGateway>;
691 
692   struct GatewayFactory
693   {
operator ()ableton::link::Controller::GatewayFactory694     GatewayPtr operator()(std::pair<NodeState, GhostXForm> state,
695       util::Injected<IoType&> io,
696       const asio::ip::address& addr)
697     {
698       if (addr.is_v4())
699       {
700         return GatewayPtr{new ControllerGateway{std::move(io), addr.to_v4(),
701           util::injectVal(makeGatewayObserver(mController.mPeers, addr)),
702           std::move(state.first), std::move(state.second), mController.mClock}};
703       }
704       else
705       {
706         throw std::runtime_error("Could not create peer gateway on non-ipV4 address");
707       }
708     }
709 
710     Controller& mController;
711   };
712 
713   struct UdpSendExceptionHandler
714   {
715     using Exception = discovery::UdpSendException;
716 
operator ()ableton::link::Controller::UdpSendExceptionHandler717     void operator()(const Exception& exception)
718     {
719       mController.mDiscovery.repairGateway(exception.interfaceAddr);
720     }
721 
722     Controller& mController;
723   };
724 
725   TempoCallback mTempoCallback;
726   StartStopStateCallback mStartStopStateCallback;
727   Clock mClock;
728   NodeId mNodeId;
729   SessionId mSessionId;
730 
731   mutable std::mutex mSessionStateGuard;
732   SessionState mSessionState;
733 
734   mutable std::mutex mClientStateGuard;
735   ClientState mClientState;
736   bool mLastIsPlayingForStartStopStateCallback;
737 
738   mutable RtClientState mRtClientState;
739   std::atomic<bool> mHasPendingRtClientStates;
740 
741   SessionPeerCounter mSessionPeerCounter;
742 
743   std::atomic<bool> mEnabled;
744 
745   std::atomic<bool> mStartStopSyncEnabled;
746 
747   util::Injected<IoContext> mIo;
748 
749   RtClientStateSetter mRtClientStateSetter;
750 
751   ControllerPeers mPeers;
752 
753   using ControllerSessions = Sessions<ControllerPeers&,
754     MeasurePeer,
755     JoinSessionCallback,
756     typename util::Injected<IoContext>::type&,
757     Clock>;
758   ControllerSessions mSessions;
759 
760   using Discovery =
761     discovery::Service<std::pair<NodeState, GhostXForm>, GatewayFactory, IoContext>;
762   Discovery mDiscovery;
763 };
764 
765 } // namespace link
766 } // namespace ableton
767