1 //  osc handler for supercollider-style communication
2 //  Copyright (C) 2009, 2010 Tim Blechmann
3 //
4 //  This program is free software; you can redistribute it and/or modify
5 //  it under the terms of the GNU General Public License as published by
6 //  the Free Software Foundation; either version 2 of the License, or
7 //  (at your option) any later version.
8 //
9 //  This program is distributed in the hope that it will be useful,
10 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
11 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 //  GNU General Public License for more details.
13 //
14 //  You should have received a copy of the GNU General Public License
15 //  along with this program; see the file COPYING.  If not, write to
16 //  the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 //  Boston, MA 02111-1307, USA.
18 
19 #pragma once
20 
21 #include <algorithm>
22 #include <mutex>
23 #include <vector>
24 
25 #ifndef BOOST_ASIO_HAS_STD_ARRAY
26 #    ifdef __clang__ // clang workaround
27 #        define BOOST_ASIO_HAS_STD_ARRAY
28 #    endif
29 #endif
30 
31 // AppleClang workaround
32 #if defined(__apple_build_version__) && __apple_build_version__ > 11000000
33 #    define BOOST_ASIO_HAS_STD_STRING_VIEW 1
34 #endif
35 
36 #include <boost/asio/ip/tcp.hpp>
37 
38 #include <boost/enable_shared_from_this.hpp>
39 #include <boost/date_time/microsec_time_clock.hpp>
40 #include <boost/intrusive/treap_set.hpp>
41 
42 #include <boost/endian/arithmetic.hpp>
43 
44 #include "osc/OscReceivedElements.h"
45 
46 #include "../server/memory_pool.hpp"
47 #include "../server/server_args.hpp"
48 #include "../server/server_scheduler.hpp"
49 #include "../utilities/osc_server.hpp"
50 #include "../utilities/sized_array.hpp"
51 #include "../utilities/static_pool.hpp"
52 #include "../utilities/time_tag.hpp"
53 
54 struct FifoMsg;
55 
56 namespace nova {
57 
58 typedef bool (*AsyncStageFn)(World* inWorld, void* cmdData);
59 typedef void (*AsyncFreeFn)(World* inWorld, void* cmdData);
60 
61 namespace detail {
62 
63 using namespace boost::asio;
64 using namespace boost::asio::ip;
65 
66 struct nova_endpoint : public std::enable_shared_from_this<nova_endpoint> {
67     virtual void send(const char* data, size_t length) = 0;
68 };
69 
70 class udp_endpoint : public nova_endpoint {
71 public:
udp_endpoint(udp::endpoint const & ep)72     udp_endpoint(udp::endpoint const& ep): endpoint_(ep) {}
73 
operator ==(udp_endpoint const & rhs) const74     bool operator==(udp_endpoint const& rhs) const { return endpoint_ == rhs.endpoint_; }
75 
76 private:
77     void send(const char* data, size_t length) override final;
78 
79     udp::endpoint endpoint_;
80 };
81 
82 typedef std::shared_ptr<nova_endpoint> endpoint_ptr;
83 
84 /**
85  * observer to receive osc notifications
86  * */
87 class sc_notify_observers {
88     typedef std::vector<endpoint_ptr> observer_vector;
89 
90 public:
91     typedef enum { no_error = 0, already_registered = -1, not_registered = -2 } error_code;
92 
sc_notify_observers(boost::asio::io_service & io_service)93     sc_notify_observers(boost::asio::io_service& io_service): udp_socket(io_service) {}
94 
95     int add_observer(endpoint_ptr const& ep);
96     int remove_observer(endpoint_ptr const& ep);
97 
98     /* @{ */
99     /** notifications, should be called from the real-time thread */
notification_node_started(const server_node * node)100     void notification_node_started(const server_node* node) { notify("/n_go", node); }
101 
notification_node_ended(const server_node * node)102     void notification_node_ended(const server_node* node) { notify("/n_end", node); }
103 
notification_node_turned_off(const server_node * node)104     void notification_node_turned_off(const server_node* node) { notify("/n_off", node); }
105 
notification_node_turned_on(const server_node * node)106     void notification_node_turned_on(const server_node* node) { notify("/n_on", node); }
107 
notification_node_moved(const server_node * node)108     void notification_node_moved(const server_node* node) { notify("/n_move", node); }
109 
110     void send_trigger(int32_t node_id, int32_t trigger_id, float value);
111 
112     void send_node_reply(int32_t node_id, int reply_id, const char* command_name, int argument_count,
113                          const float* values);
114     /* @} */
115 
116     /** send notifications, should not be called from the real-time thread */
117     void send_notification(const char* data, size_t length);
118 
119     void send_udp(const char* data, size_t size, udp::endpoint const& receiver);
120 
121     static const char* error_string(error_code);
122 
123 private:
124     observer_vector::iterator find(endpoint_ptr const& ep);
125 
126     void notify(const char* address_pattern, const server_node* node) const;
127     void send_notification(const char* data, size_t length, nova_endpoint* endpoint);
128 
129     observer_vector observers;
130 
131 protected:
132     udp::socket udp_socket;
133     std::mutex udp_mutex;
134 };
135 
136 class sc_scheduled_bundles {
137 public:
138     struct bundle_node : public boost::intrusive::bs_set_base_hook<> {
bundle_nodenova::detail::sc_scheduled_bundles::bundle_node139         bundle_node(time_tag const& timeout, const char* data, endpoint_ptr const& endpoint):
140             timeout_(timeout),
141             data_(data),
142             endpoint_(endpoint) {}
143 
144         void run(void);
145 
146         const time_tag timeout_;
147         const char* const data_;
148         endpoint_ptr endpoint_;
149 
operator <(const bundle_node & lhs,const bundle_node & rhs)150         friend bool operator<(const bundle_node& lhs, const bundle_node& rhs) { return priority_order(lhs, rhs); }
151 
priority_order(const bundle_node & lhs,const bundle_node & rhs)152         friend bool priority_order(const bundle_node& lhs, const bundle_node& rhs) {
153             return lhs.timeout_ < rhs.timeout_; // lower value, higher priority
154         }
155     };
156 
157     typedef boost::intrusive::treap_multiset<bundle_node> bundle_queue_t;
158 
159     void insert_bundle(time_tag const& timeout, const char* data, size_t length, endpoint_ptr const& endpoint);
160 
161     void execute_bundles(time_tag const& last, time_tag const& now);
162 
clear_bundles(void)163     void clear_bundles(void) { bundle_q.clear_and_dispose(dispose_bundle); }
164 
dispose_bundle(bundle_node * node)165     static void dispose_bundle(bundle_node* node) {
166         node->~bundle_node();
167         rt_pool.free(node);
168     }
169 
170 private:
171     bundle_queue_t bundle_q;
172 };
173 
174 class sc_osc_handler : private detail::network_thread, public sc_notify_observers {
175     /* @{ */
176     /** constructor helpers */
177     void open_tcp_acceptor(tcp const& protocol, unsigned int port);
178     void open_udp_socket(udp const& protocol, unsigned int port);
179     bool open_socket(int family, int type, int protocol, unsigned int port);
180     /* @} */
181 
182 public:
sc_osc_handler(server_arguments const & args)183     sc_osc_handler(server_arguments const& args):
184         sc_notify_observers(detail::network_thread::io_service_),
185         tcp_acceptor_(detail::network_thread::io_service_),
186         tcp_password_(args.server_password.size() ? args.server_password.c_str() : nullptr) {
187         if (!args.non_rt) {
188             if (args.tcp_port && !open_socket(AF_INET, SOCK_STREAM, IPPROTO_TCP, args.tcp_port))
189                 throw std::runtime_error("cannot open socket");
190             if (args.udp_port && !open_socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, args.udp_port))
191                 throw std::runtime_error("cannot open socket");
192         }
193     }
194 
start_receive_thread(void)195     void start_receive_thread(void) { detail::network_thread::start_receive(); }
196 
197     typedef osc::ReceivedPacket ReceivedPacket;
198     typedef osc::ReceivedBundle ReceivedBundle;
199     typedef osc::ReceivedMessage ReceivedMessage;
200 
201     class received_packet : public audio_sync_callback {
received_packet(const char * dat,size_t length,endpoint_ptr const & endpoint)202         received_packet(const char* dat, size_t length, endpoint_ptr const& endpoint):
203             data(dat),
204             length(length),
205             endpoint_(endpoint) {}
206 
operator new(std::size_t size,void * ptr)207         void* operator new(std::size_t size, void* ptr) { return ::operator new(size, ptr); }
208 
209     public:
210         static received_packet* alloc_packet(const char* data, size_t length, endpoint_ptr const& remote_endpoint);
211 
212         void run(void) override final;
213 
214         const char* const data;
215         const size_t length;
216         endpoint_ptr endpoint_;
217     };
218 
219 private:
220     /* @{ */
221     /** udp socket handling */
222     void start_receive_udp();
223     void handle_receive_udp(const boost::system::error_code& error, std::size_t bytes_transferred);
224     /* @} */
225 
226     /* @{ */
227     /** tcp connection handling */
228 public:
229     class tcp_connection : public nova_endpoint {
230     public:
231         using pointer = std::shared_ptr<tcp_connection>;
232 #if BOOST_VERSION >= 107000
233         using executor = boost::asio::executor;
234 #else
235         using executor = boost::asio::io_context::executor_type;
236 #endif
237 
create(const executor & executor)238         static pointer create(const executor& executor) { return pointer(new tcp_connection(executor)); }
239 
socket()240         tcp::socket& socket() { return socket_; }
241 
242         void start(sc_osc_handler* self);
243 
operator ==(tcp_connection const & rhs) const244         bool operator==(tcp_connection const& rhs) const { return &rhs == this; }
245 
246     private:
247 #if BOOST_VERSION >= 107000
tcp_connection(const executor & executor)248         tcp_connection(const executor& executor): socket_(executor) {}
249 #else
250         tcp_connection(const executor& executor): socket_(executor.context()) {}
251 #endif
252 
253         void send(const char* data, size_t length) override final;
254 
255         void async_read_msg_size();
256         void handle_message_size();
257         void handle_message();
258 
259         tcp::socket socket_;
260         sc_osc_handler* osc_handler;
261         boost::endian::big_int32_t msg_size_;
262         std::vector<char> msg_buffer_;
263         std::mutex socket_mutex_;
264     };
265 
266 private:
267     void start_tcp_accept(void);
268     void handle_tcp_accept(tcp_connection::pointer new_connection, const boost::system::error_code& error);
269     /* @} */
270 
271 public:
dumpOSC(int i)272     void dumpOSC(int i) { dump_osc_packets = i; }
273 
274 private:
275     int dump_osc_packets = 0;
276 
277     /* @{ */
278 public:
279     /** \todo how to handle temporary message error suppression? */
280 
set_error_posting(int val)281     void set_error_posting(int val) { error_posting = val; }
282 
283 private:
284     int error_posting = 1;
285     /* @} */
286 
287     /* @{ */
288     /** packet handling */
289 public:
290     void handle_packet_async(const char* data, size_t length, endpoint_ptr const& endpoint);
291     void handle_packet(const char* data, size_t length, endpoint_ptr const& endpoint);
292     time_tag handle_bundle_nrt(const char* data_, std::size_t length);
293 
294 private:
295     template <bool realtime> void handle_bundle(ReceivedBundle const& bundle, endpoint_ptr const& endpoint);
296     template <bool realtime>
297     void handle_message(ReceivedMessage const& message, size_t msg_size, endpoint_ptr const& endpoint);
298     template <bool realtime>
299     void handle_message_int_address(ReceivedMessage const& message, size_t msg_size, endpoint_ptr const& endpoint);
300     template <bool realtime>
301     void handle_message_sym_address(ReceivedMessage const& message, size_t msg_size, endpoint_ptr const& endpoint);
302 
303     friend struct sc_scheduled_bundles::bundle_node;
304     /* @} */
305 
306     /* @{ */
307     /** bundle scheduling */
308 public:
clear_scheduled_bundles(void)309     void clear_scheduled_bundles(void) { scheduled_bundles.clear_bundles(); }
310 
execute_scheduled_bundles(void)311     void execute_scheduled_bundles(void) { scheduled_bundles.execute_bundles(last, now); }
312 
increment_logical_time(time_tag const & diff)313     void increment_logical_time(time_tag const& diff) {
314         last = now;
315         now += diff;
316     }
317 
set_last_now(time_tag const & lasts,time_tag const & nows)318     void set_last_now(time_tag const& lasts, time_tag const& nows) {
319         now = nows;
320         last = lasts;
321     }
322 
add_last_now(time_tag const & add)323     void add_last_now(time_tag const& add) {
324         now += add;
325         last += add;
326     }
327 
update_time_from_system(void)328     void update_time_from_system(void) {
329         now = time_tag::from_ptime(boost::date_time::microsec_clock<boost::posix_time::ptime>::universal_time());
330         last = now - time_per_tick;
331     }
332 
current_time(void) const333     time_tag const& current_time(void) const { return now; }
334 
335     sc_scheduled_bundles scheduled_bundles;
336     time_tag now, last;
337     time_tag time_per_tick;
338     /* @} */
339 
340     void do_asynchronous_command(World* world, void* replyAddr, const char* cmdName, void* cmdData, AsyncStageFn stage2,
341                                  AsyncStageFn stage3, AsyncStageFn stage4, AsyncFreeFn cleanup, int completionMsgSize,
342                                  void* completionMsgData) const;
343 
344     void send_message_from_RT(const World* world, FifoMsg& msg) const;
345 
346     void send_message_to_RT(const World* world, FifoMsg& msg) const;
347 
348     bool quit_received = false;
349 
350 private:
351     /* @{ */
352     udp::endpoint udp_remote_endpoint_;
353 
354     tcp::acceptor tcp_acceptor_;
355     const char* tcp_password_; /* we are not owning this! */
356 
357     std::array<char, 1 << 15> recv_buffer_;
358     /* @} */
359 };
360 
361 } /* namespace detail */
362 
363 using detail::sc_osc_handler;
364 
365 } /* namespace nova */
366