1 /* Copyright 2016, Ableton AG, Berlin. All rights reserved.
2 *
3 * This program is free software: you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation, either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
15 *
16 * If you would like to incorporate Link into a proprietary software application,
17 * please contact <link-devs@ableton.com>.
18 */
19
20 #pragma once
21
22 #include <ableton/discovery/IpV4Interface.hpp>
23 #include <ableton/discovery/MessageTypes.hpp>
24 #include <ableton/discovery/v1/Messages.hpp>
25 #include <ableton/platforms/asio/AsioWrapper.hpp>
26 #include <ableton/util/Injected.hpp>
27 #include <ableton/util/SafeAsyncHandler.hpp>
28 #include <algorithm>
29 #include <memory>
30
31 namespace ableton
32 {
33 namespace discovery
34 {
35
36 // An exception thrown when sending a udp message fails. Stores the
37 // interface through which the sending failed.
38 struct UdpSendException : std::runtime_error
39 {
UdpSendExceptionableton::discovery::UdpSendException40 UdpSendException(const std::runtime_error& e, asio::ip::address ifAddr)
41 : std::runtime_error(e.what())
42 , interfaceAddr(std::move(ifAddr))
43 {
44 }
45
46 asio::ip::address interfaceAddr;
47 };
48
49 // Throws UdpSendException
50 template <typename Interface, typename NodeId, typename Payload>
sendUdpMessage(Interface & iface,NodeId from,const uint8_t ttl,const v1::MessageType messageType,const Payload & payload,const asio::ip::udp::endpoint & to)51 void sendUdpMessage(Interface& iface,
52 NodeId from,
53 const uint8_t ttl,
54 const v1::MessageType messageType,
55 const Payload& payload,
56 const asio::ip::udp::endpoint& to)
57 {
58 using namespace std;
59 v1::MessageBuffer buffer;
60 const auto messageBegin = begin(buffer);
61 const auto messageEnd =
62 v1::detail::encodeMessage(move(from), ttl, messageType, payload, messageBegin);
63 const auto numBytes = static_cast<size_t>(distance(messageBegin, messageEnd));
64 try
65 {
66 iface.send(buffer.data(), numBytes, to);
67 }
68 catch (const std::runtime_error& err)
69 {
70 throw UdpSendException{err, iface.endpoint().address()};
71 }
72 }
73
74 // UdpMessenger uses a "shared_ptr pImpl" pattern to make it movable
75 // and to support safe async handler callbacks when receiving messages
76 // on the given interface.
77 template <typename Interface, typename NodeStateT, typename IoContext>
78 class UdpMessenger
79 {
80 public:
81 using NodeState = NodeStateT;
82 using NodeId = typename NodeState::IdType;
83 using Timer = typename util::Injected<IoContext>::type::Timer;
84 using TimerError = typename Timer::ErrorCode;
85 using TimePoint = typename Timer::TimePoint;
86
UdpMessenger(util::Injected<Interface> iface,NodeState state,util::Injected<IoContext> io,const uint8_t ttl,const uint8_t ttlRatio)87 UdpMessenger(util::Injected<Interface> iface,
88 NodeState state,
89 util::Injected<IoContext> io,
90 const uint8_t ttl,
91 const uint8_t ttlRatio)
92 : mpImpl(std::make_shared<Impl>(
93 std::move(iface), std::move(state), std::move(io), ttl, ttlRatio))
94 {
95 // We need to always listen for incoming traffic in order to
96 // respond to peer state broadcasts
97 mpImpl->listen(MulticastTag{});
98 mpImpl->listen(UnicastTag{});
99 mpImpl->broadcastState();
100 }
101
102 UdpMessenger(const UdpMessenger&) = delete;
103 UdpMessenger& operator=(const UdpMessenger&) = delete;
104
UdpMessenger(UdpMessenger && rhs)105 UdpMessenger(UdpMessenger&& rhs)
106 : mpImpl(std::move(rhs.mpImpl))
107 {
108 }
109
~UdpMessenger()110 ~UdpMessenger()
111 {
112 if (mpImpl != nullptr)
113 {
114 try
115 {
116 mpImpl->sendByeBye();
117 }
118 catch (const UdpSendException& err)
119 {
120 debug(mpImpl->mIo->log()) << "Failed to send bye bye message: " << err.what();
121 }
122 }
123 }
124
updateState(NodeState state)125 void updateState(NodeState state)
126 {
127 mpImpl->updateState(std::move(state));
128 }
129
130 // Broadcast the current state of the system to all peers. May throw
131 // std::runtime_error if assembling a broadcast message fails or if
132 // there is an error at the transport layer. Throws on failure.
broadcastState()133 void broadcastState()
134 {
135 mpImpl->broadcastState();
136 }
137
138 // Asynchronous receive function for incoming messages from peers. Will
139 // return immediately and the handler will be invoked when a message
140 // is received. Handler must have operator() overloads for PeerState and
141 // ByeBye messages.
142 template <typename Handler>
receive(Handler handler)143 void receive(Handler handler)
144 {
145 mpImpl->setReceiveHandler(std::move(handler));
146 }
147
148 private:
149 struct Impl : std::enable_shared_from_this<Impl>
150 {
Implableton::discovery::UdpMessenger::Impl151 Impl(util::Injected<Interface> iface,
152 NodeState state,
153 util::Injected<IoContext> io,
154 const uint8_t ttl,
155 const uint8_t ttlRatio)
156 : mIo(std::move(io))
157 , mInterface(std::move(iface))
158 , mState(std::move(state))
159 , mTimer(mIo->makeTimer())
160 , mLastBroadcastTime{}
161 , mTtl(ttl)
162 , mTtlRatio(ttlRatio)
__anon1d1f03d00102ableton::discovery::UdpMessenger::Impl163 , mPeerStateHandler([](PeerState<NodeState>) {})
__anon1d1f03d00202ableton::discovery::UdpMessenger::Impl164 , mByeByeHandler([](ByeBye<NodeId>) {})
165 {
166 }
167
168 template <typename Handler>
setReceiveHandlerableton::discovery::UdpMessenger::Impl169 void setReceiveHandler(Handler handler)
170 {
171 mPeerStateHandler = [handler](
172 PeerState<NodeState> state) { handler(std::move(state)); };
173
174 mByeByeHandler = [handler](ByeBye<NodeId> byeBye) { handler(std::move(byeBye)); };
175 }
176
sendByeByeableton::discovery::UdpMessenger::Impl177 void sendByeBye()
178 {
179 sendUdpMessage(
180 *mInterface, mState.ident(), 0, v1::kByeBye, makePayload(), multicastEndpoint());
181 }
182
updateStateableton::discovery::UdpMessenger::Impl183 void updateState(NodeState state)
184 {
185 mState = std::move(state);
186 }
187
broadcastStateableton::discovery::UdpMessenger::Impl188 void broadcastState()
189 {
190 using namespace std::chrono;
191
192 const auto minBroadcastPeriod = milliseconds{50};
193 const auto nominalBroadcastPeriod = milliseconds(mTtl * 1000 / mTtlRatio);
194 const auto timeSinceLastBroadcast =
195 duration_cast<milliseconds>(mTimer.now() - mLastBroadcastTime);
196
197 // The rate is limited to maxBroadcastRate to prevent flooding the network.
198 const auto delay = minBroadcastPeriod - timeSinceLastBroadcast;
199
200 // Schedule the next broadcast before we actually send the
201 // message so that if sending throws an exception we are still
202 // scheduled to try again. We want to keep trying at our
203 // interval as long as this instance is alive.
204 mTimer.expires_from_now(delay > milliseconds{0} ? delay : nominalBroadcastPeriod);
205 mTimer.async_wait([this](const TimerError e) {
206 if (!e)
207 {
208 broadcastState();
209 }
210 });
211
212 // If we're not delaying, broadcast now
213 if (delay < milliseconds{1})
214 {
215 debug(mIo->log()) << "Broadcasting state";
216 sendPeerState(v1::kAlive, multicastEndpoint());
217 }
218 }
219
sendPeerStateableton::discovery::UdpMessenger::Impl220 void sendPeerState(
221 const v1::MessageType messageType, const asio::ip::udp::endpoint& to)
222 {
223 sendUdpMessage(
224 *mInterface, mState.ident(), mTtl, messageType, toPayload(mState), to);
225 mLastBroadcastTime = mTimer.now();
226 }
227
sendResponseableton::discovery::UdpMessenger::Impl228 void sendResponse(const asio::ip::udp::endpoint& to)
229 {
230 sendPeerState(v1::kResponse, to);
231 }
232
233 template <typename Tag>
listenableton::discovery::UdpMessenger::Impl234 void listen(Tag tag)
235 {
236 mInterface->receive(util::makeAsyncSafe(this->shared_from_this()), tag);
237 }
238
239 template <typename Tag, typename It>
operator ()ableton::discovery::UdpMessenger::Impl240 void operator()(Tag tag,
241 const asio::ip::udp::endpoint& from,
242 const It messageBegin,
243 const It messageEnd)
244 {
245 auto result = v1::parseMessageHeader<NodeId>(messageBegin, messageEnd);
246
247 const auto& header = result.first;
248 // Ignore messages from self and other groups
249 if (header.ident != mState.ident() && header.groupId == 0)
250 {
251 debug(mIo->log()) << "Received message type "
252 << static_cast<int>(header.messageType) << " from peer "
253 << header.ident;
254
255 switch (header.messageType)
256 {
257 case v1::kAlive:
258 sendResponse(from);
259 receivePeerState(std::move(result.first), result.second, messageEnd);
260 break;
261 case v1::kResponse:
262 receivePeerState(std::move(result.first), result.second, messageEnd);
263 break;
264 case v1::kByeBye:
265 receiveByeBye(std::move(result.first.ident));
266 break;
267 default:
268 info(mIo->log()) << "Unknown message received of type: " << header.messageType;
269 }
270 }
271 listen(tag);
272 }
273
274 template <typename It>
receivePeerStateableton::discovery::UdpMessenger::Impl275 void receivePeerState(
276 v1::MessageHeader<NodeId> header, It payloadBegin, It payloadEnd)
277 {
278 try
279 {
280 auto state = NodeState::fromPayload(
281 std::move(header.ident), std::move(payloadBegin), std::move(payloadEnd));
282
283 // Handlers must only be called once
284 auto handler = std::move(mPeerStateHandler);
285 mPeerStateHandler = [](PeerState<NodeState>) {};
286 handler(PeerState<NodeState>{std::move(state), header.ttl});
287 }
288 catch (const std::runtime_error& err)
289 {
290 info(mIo->log()) << "Ignoring peer state message: " << err.what();
291 }
292 }
293
receiveByeByeableton::discovery::UdpMessenger::Impl294 void receiveByeBye(NodeId nodeId)
295 {
296 // Handlers must only be called once
297 auto byeByeHandler = std::move(mByeByeHandler);
298 mByeByeHandler = [](ByeBye<NodeId>) {};
299 byeByeHandler(ByeBye<NodeId>{std::move(nodeId)});
300 }
301
302 util::Injected<IoContext> mIo;
303 util::Injected<Interface> mInterface;
304 NodeState mState;
305 Timer mTimer;
306 TimePoint mLastBroadcastTime;
307 uint8_t mTtl;
308 uint8_t mTtlRatio;
309 std::function<void(PeerState<NodeState>)> mPeerStateHandler;
310 std::function<void(ByeBye<NodeId>)> mByeByeHandler;
311 };
312
313 std::shared_ptr<Impl> mpImpl;
314 };
315
316 // Factory function
317 template <typename Interface, typename NodeState, typename IoContext>
makeUdpMessenger(util::Injected<Interface> iface,NodeState state,util::Injected<IoContext> io,const uint8_t ttl,const uint8_t ttlRatio)318 UdpMessenger<Interface, NodeState, IoContext> makeUdpMessenger(
319 util::Injected<Interface> iface,
320 NodeState state,
321 util::Injected<IoContext> io,
322 const uint8_t ttl,
323 const uint8_t ttlRatio)
324 {
325 return UdpMessenger<Interface, NodeState, IoContext>{
326 std::move(iface), std::move(state), std::move(io), ttl, ttlRatio};
327 }
328
329 } // namespace discovery
330 } // namespace ableton
331