1 /* Copyright 2016, Ableton AG, Berlin. All rights reserved.
2  *
3  *  This program is free software: you can redistribute it and/or modify
4  *  it under the terms of the GNU General Public License as published by
5  *  the Free Software Foundation, either version 2 of the License, or
6  *  (at your option) any later version.
7  *
8  *  This program is distributed in the hope that it will be useful,
9  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11  *  GNU General Public License for more details.
12  *
13  *  You should have received a copy of the GNU General Public License
14  *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
15  *
16  *  If you would like to incorporate Link into a proprietary software application,
17  *  please contact <link-devs@ableton.com>.
18  */
19 
20 #pragma once
21 
22 #include <ableton/discovery/IpV4Interface.hpp>
23 #include <ableton/discovery/MessageTypes.hpp>
24 #include <ableton/discovery/v1/Messages.hpp>
25 #include <ableton/platforms/asio/AsioWrapper.hpp>
26 #include <ableton/util/Injected.hpp>
27 #include <ableton/util/SafeAsyncHandler.hpp>
28 #include <algorithm>
29 #include <memory>
30 
31 namespace ableton
32 {
33 namespace discovery
34 {
35 
36 // An exception thrown when sending a udp message fails. Stores the
37 // interface through which the sending failed.
38 struct UdpSendException : std::runtime_error
39 {
UdpSendExceptionableton::discovery::UdpSendException40   UdpSendException(const std::runtime_error& e, asio::ip::address ifAddr)
41     : std::runtime_error(e.what())
42     , interfaceAddr(std::move(ifAddr))
43   {
44   }
45 
46   asio::ip::address interfaceAddr;
47 };
48 
49 // Throws UdpSendException
50 template <typename Interface, typename NodeId, typename Payload>
sendUdpMessage(Interface & iface,NodeId from,const uint8_t ttl,const v1::MessageType messageType,const Payload & payload,const asio::ip::udp::endpoint & to)51 void sendUdpMessage(Interface& iface,
52   NodeId from,
53   const uint8_t ttl,
54   const v1::MessageType messageType,
55   const Payload& payload,
56   const asio::ip::udp::endpoint& to)
57 {
58   using namespace std;
59   v1::MessageBuffer buffer;
60   const auto messageBegin = begin(buffer);
61   const auto messageEnd =
62     v1::detail::encodeMessage(move(from), ttl, messageType, payload, messageBegin);
63   const auto numBytes = static_cast<size_t>(distance(messageBegin, messageEnd));
64   try
65   {
66     iface.send(buffer.data(), numBytes, to);
67   }
68   catch (const std::runtime_error& err)
69   {
70     throw UdpSendException{err, iface.endpoint().address()};
71   }
72 }
73 
74 // UdpMessenger uses a "shared_ptr pImpl" pattern to make it movable
75 // and to support safe async handler callbacks when receiving messages
76 // on the given interface.
77 template <typename Interface, typename NodeStateT, typename IoContext>
78 class UdpMessenger
79 {
80 public:
81   using NodeState = NodeStateT;
82   using NodeId = typename NodeState::IdType;
83   using Timer = typename util::Injected<IoContext>::type::Timer;
84   using TimerError = typename Timer::ErrorCode;
85   using TimePoint = typename Timer::TimePoint;
86 
UdpMessenger(util::Injected<Interface> iface,NodeState state,util::Injected<IoContext> io,const uint8_t ttl,const uint8_t ttlRatio)87   UdpMessenger(util::Injected<Interface> iface,
88     NodeState state,
89     util::Injected<IoContext> io,
90     const uint8_t ttl,
91     const uint8_t ttlRatio)
92     : mpImpl(std::make_shared<Impl>(
93         std::move(iface), std::move(state), std::move(io), ttl, ttlRatio))
94   {
95     // We need to always listen for incoming traffic in order to
96     // respond to peer state broadcasts
97     mpImpl->listen(MulticastTag{});
98     mpImpl->listen(UnicastTag{});
99     mpImpl->broadcastState();
100   }
101 
102   UdpMessenger(const UdpMessenger&) = delete;
103   UdpMessenger& operator=(const UdpMessenger&) = delete;
104 
UdpMessenger(UdpMessenger && rhs)105   UdpMessenger(UdpMessenger&& rhs)
106     : mpImpl(std::move(rhs.mpImpl))
107   {
108   }
109 
~UdpMessenger()110   ~UdpMessenger()
111   {
112     if (mpImpl != nullptr)
113     {
114       try
115       {
116         mpImpl->sendByeBye();
117       }
118       catch (const UdpSendException& err)
119       {
120         debug(mpImpl->mIo->log()) << "Failed to send bye bye message: " << err.what();
121       }
122     }
123   }
124 
updateState(NodeState state)125   void updateState(NodeState state)
126   {
127     mpImpl->updateState(std::move(state));
128   }
129 
130   // Broadcast the current state of the system to all peers. May throw
131   // std::runtime_error if assembling a broadcast message fails or if
132   // there is an error at the transport layer. Throws on failure.
broadcastState()133   void broadcastState()
134   {
135     mpImpl->broadcastState();
136   }
137 
138   // Asynchronous receive function for incoming messages from peers. Will
139   // return immediately and the handler will be invoked when a message
140   // is received. Handler must have operator() overloads for PeerState and
141   // ByeBye messages.
142   template <typename Handler>
receive(Handler handler)143   void receive(Handler handler)
144   {
145     mpImpl->setReceiveHandler(std::move(handler));
146   }
147 
148 private:
149   struct Impl : std::enable_shared_from_this<Impl>
150   {
Implableton::discovery::UdpMessenger::Impl151     Impl(util::Injected<Interface> iface,
152       NodeState state,
153       util::Injected<IoContext> io,
154       const uint8_t ttl,
155       const uint8_t ttlRatio)
156       : mIo(std::move(io))
157       , mInterface(std::move(iface))
158       , mState(std::move(state))
159       , mTimer(mIo->makeTimer())
160       , mLastBroadcastTime{}
161       , mTtl(ttl)
162       , mTtlRatio(ttlRatio)
__anon1d1f03d00102ableton::discovery::UdpMessenger::Impl163       , mPeerStateHandler([](PeerState<NodeState>) {})
__anon1d1f03d00202ableton::discovery::UdpMessenger::Impl164       , mByeByeHandler([](ByeBye<NodeId>) {})
165     {
166     }
167 
168     template <typename Handler>
setReceiveHandlerableton::discovery::UdpMessenger::Impl169     void setReceiveHandler(Handler handler)
170     {
171       mPeerStateHandler = [handler](
172                             PeerState<NodeState> state) { handler(std::move(state)); };
173 
174       mByeByeHandler = [handler](ByeBye<NodeId> byeBye) { handler(std::move(byeBye)); };
175     }
176 
sendByeByeableton::discovery::UdpMessenger::Impl177     void sendByeBye()
178     {
179       sendUdpMessage(
180         *mInterface, mState.ident(), 0, v1::kByeBye, makePayload(), multicastEndpoint());
181     }
182 
updateStateableton::discovery::UdpMessenger::Impl183     void updateState(NodeState state)
184     {
185       mState = std::move(state);
186     }
187 
broadcastStateableton::discovery::UdpMessenger::Impl188     void broadcastState()
189     {
190       using namespace std::chrono;
191 
192       const auto minBroadcastPeriod = milliseconds{50};
193       const auto nominalBroadcastPeriod = milliseconds(mTtl * 1000 / mTtlRatio);
194       const auto timeSinceLastBroadcast =
195         duration_cast<milliseconds>(mTimer.now() - mLastBroadcastTime);
196 
197       // The rate is limited to maxBroadcastRate to prevent flooding the network.
198       const auto delay = minBroadcastPeriod - timeSinceLastBroadcast;
199 
200       // Schedule the next broadcast before we actually send the
201       // message so that if sending throws an exception we are still
202       // scheduled to try again. We want to keep trying at our
203       // interval as long as this instance is alive.
204       mTimer.expires_from_now(delay > milliseconds{0} ? delay : nominalBroadcastPeriod);
205       mTimer.async_wait([this](const TimerError e) {
206         if (!e)
207         {
208           broadcastState();
209         }
210       });
211 
212       // If we're not delaying, broadcast now
213       if (delay < milliseconds{1})
214       {
215         debug(mIo->log()) << "Broadcasting state";
216         sendPeerState(v1::kAlive, multicastEndpoint());
217       }
218     }
219 
sendPeerStateableton::discovery::UdpMessenger::Impl220     void sendPeerState(
221       const v1::MessageType messageType, const asio::ip::udp::endpoint& to)
222     {
223       sendUdpMessage(
224         *mInterface, mState.ident(), mTtl, messageType, toPayload(mState), to);
225       mLastBroadcastTime = mTimer.now();
226     }
227 
sendResponseableton::discovery::UdpMessenger::Impl228     void sendResponse(const asio::ip::udp::endpoint& to)
229     {
230       sendPeerState(v1::kResponse, to);
231     }
232 
233     template <typename Tag>
listenableton::discovery::UdpMessenger::Impl234     void listen(Tag tag)
235     {
236       mInterface->receive(util::makeAsyncSafe(this->shared_from_this()), tag);
237     }
238 
239     template <typename Tag, typename It>
operator ()ableton::discovery::UdpMessenger::Impl240     void operator()(Tag tag,
241       const asio::ip::udp::endpoint& from,
242       const It messageBegin,
243       const It messageEnd)
244     {
245       auto result = v1::parseMessageHeader<NodeId>(messageBegin, messageEnd);
246 
247       const auto& header = result.first;
248       // Ignore messages from self and other groups
249       if (header.ident != mState.ident() && header.groupId == 0)
250       {
251         debug(mIo->log()) << "Received message type "
252                           << static_cast<int>(header.messageType) << " from peer "
253                           << header.ident;
254 
255         switch (header.messageType)
256         {
257         case v1::kAlive:
258           sendResponse(from);
259           receivePeerState(std::move(result.first), result.second, messageEnd);
260           break;
261         case v1::kResponse:
262           receivePeerState(std::move(result.first), result.second, messageEnd);
263           break;
264         case v1::kByeBye:
265           receiveByeBye(std::move(result.first.ident));
266           break;
267         default:
268           info(mIo->log()) << "Unknown message received of type: " << header.messageType;
269         }
270       }
271       listen(tag);
272     }
273 
274     template <typename It>
receivePeerStateableton::discovery::UdpMessenger::Impl275     void receivePeerState(
276       v1::MessageHeader<NodeId> header, It payloadBegin, It payloadEnd)
277     {
278       try
279       {
280         auto state = NodeState::fromPayload(
281           std::move(header.ident), std::move(payloadBegin), std::move(payloadEnd));
282 
283         // Handlers must only be called once
284         auto handler = std::move(mPeerStateHandler);
285         mPeerStateHandler = [](PeerState<NodeState>) {};
286         handler(PeerState<NodeState>{std::move(state), header.ttl});
287       }
288       catch (const std::runtime_error& err)
289       {
290         info(mIo->log()) << "Ignoring peer state message: " << err.what();
291       }
292     }
293 
receiveByeByeableton::discovery::UdpMessenger::Impl294     void receiveByeBye(NodeId nodeId)
295     {
296       // Handlers must only be called once
297       auto byeByeHandler = std::move(mByeByeHandler);
298       mByeByeHandler = [](ByeBye<NodeId>) {};
299       byeByeHandler(ByeBye<NodeId>{std::move(nodeId)});
300     }
301 
302     util::Injected<IoContext> mIo;
303     util::Injected<Interface> mInterface;
304     NodeState mState;
305     Timer mTimer;
306     TimePoint mLastBroadcastTime;
307     uint8_t mTtl;
308     uint8_t mTtlRatio;
309     std::function<void(PeerState<NodeState>)> mPeerStateHandler;
310     std::function<void(ByeBye<NodeId>)> mByeByeHandler;
311   };
312 
313   std::shared_ptr<Impl> mpImpl;
314 };
315 
316 // Factory function
317 template <typename Interface, typename NodeState, typename IoContext>
makeUdpMessenger(util::Injected<Interface> iface,NodeState state,util::Injected<IoContext> io,const uint8_t ttl,const uint8_t ttlRatio)318 UdpMessenger<Interface, NodeState, IoContext> makeUdpMessenger(
319   util::Injected<Interface> iface,
320   NodeState state,
321   util::Injected<IoContext> io,
322   const uint8_t ttl,
323   const uint8_t ttlRatio)
324 {
325   return UdpMessenger<Interface, NodeState, IoContext>{
326     std::move(iface), std::move(state), std::move(io), ttl, ttlRatio};
327 }
328 
329 } // namespace discovery
330 } // namespace ableton
331