1 // osc handler for supercollider-style communication 2 // Copyright (C) 2009, 2010 Tim Blechmann 3 // 4 // This program is free software; you can redistribute it and/or modify 5 // it under the terms of the GNU General Public License as published by 6 // the Free Software Foundation; either version 2 of the License, or 7 // (at your option) any later version. 8 // 9 // This program is distributed in the hope that it will be useful, 10 // but WITHOUT ANY WARRANTY; without even the implied warranty of 11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 // GNU General Public License for more details. 13 // 14 // You should have received a copy of the GNU General Public License 15 // along with this program; see the file COPYING. If not, write to 16 // the Free Software Foundation, Inc., 59 Temple Place - Suite 330, 17 // Boston, MA 02111-1307, USA. 18 19 #pragma once 20 21 #include <algorithm> 22 #include <mutex> 23 #include <vector> 24 25 #ifndef BOOST_ASIO_HAS_STD_ARRAY 26 # ifdef __clang__ // clang workaround 27 # define BOOST_ASIO_HAS_STD_ARRAY 28 # endif 29 #endif 30 31 // AppleClang workaround 32 #if defined(__apple_build_version__) && __apple_build_version__ > 11000000 33 # define BOOST_ASIO_HAS_STD_STRING_VIEW 1 34 #endif 35 36 #include <boost/asio/ip/tcp.hpp> 37 38 #include <boost/enable_shared_from_this.hpp> 39 #include <boost/date_time/microsec_time_clock.hpp> 40 #include <boost/intrusive/treap_set.hpp> 41 42 #include <boost/endian/arithmetic.hpp> 43 44 #include "osc/OscReceivedElements.h" 45 46 #include "../server/memory_pool.hpp" 47 #include "../server/server_args.hpp" 48 #include "../server/server_scheduler.hpp" 49 #include "../utilities/osc_server.hpp" 50 #include "../utilities/sized_array.hpp" 51 #include "../utilities/static_pool.hpp" 52 #include "../utilities/time_tag.hpp" 53 54 struct FifoMsg; 55 56 namespace nova { 57 58 typedef bool (*AsyncStageFn)(World* inWorld, void* cmdData); 59 typedef void (*AsyncFreeFn)(World* inWorld, void* cmdData); 60 61 namespace detail { 62 63 using namespace boost::asio; 64 using namespace boost::asio::ip; 65 66 struct nova_endpoint : public std::enable_shared_from_this<nova_endpoint> { 67 virtual void send(const char* data, size_t length) = 0; 68 }; 69 70 class udp_endpoint : public nova_endpoint { 71 public: udp_endpoint(udp::endpoint const & ep)72 udp_endpoint(udp::endpoint const& ep): endpoint_(ep) {} 73 operator ==(udp_endpoint const & rhs) const74 bool operator==(udp_endpoint const& rhs) const { return endpoint_ == rhs.endpoint_; } 75 76 private: 77 void send(const char* data, size_t length) override final; 78 79 udp::endpoint endpoint_; 80 }; 81 82 typedef std::shared_ptr<nova_endpoint> endpoint_ptr; 83 84 /** 85 * observer to receive osc notifications 86 * */ 87 class sc_notify_observers { 88 typedef std::vector<endpoint_ptr> observer_vector; 89 90 public: 91 typedef enum { no_error = 0, already_registered = -1, not_registered = -2 } error_code; 92 sc_notify_observers(boost::asio::io_service & io_service)93 sc_notify_observers(boost::asio::io_service& io_service): udp_socket(io_service) {} 94 95 int add_observer(endpoint_ptr const& ep); 96 int remove_observer(endpoint_ptr const& ep); 97 98 /* @{ */ 99 /** notifications, should be called from the real-time thread */ notification_node_started(const server_node * node)100 void notification_node_started(const server_node* node) { notify("/n_go", node); } 101 notification_node_ended(const server_node * node)102 void notification_node_ended(const server_node* node) { notify("/n_end", node); } 103 notification_node_turned_off(const server_node * node)104 void notification_node_turned_off(const server_node* node) { notify("/n_off", node); } 105 notification_node_turned_on(const server_node * node)106 void notification_node_turned_on(const server_node* node) { notify("/n_on", node); } 107 notification_node_moved(const server_node * node)108 void notification_node_moved(const server_node* node) { notify("/n_move", node); } 109 110 void send_trigger(int32_t node_id, int32_t trigger_id, float value); 111 112 void send_node_reply(int32_t node_id, int reply_id, const char* command_name, int argument_count, 113 const float* values); 114 /* @} */ 115 116 /** send notifications, should not be called from the real-time thread */ 117 void send_notification(const char* data, size_t length); 118 119 void send_udp(const char* data, size_t size, udp::endpoint const& receiver); 120 121 static const char* error_string(error_code); 122 123 private: 124 observer_vector::iterator find(endpoint_ptr const& ep); 125 126 void notify(const char* address_pattern, const server_node* node) const; 127 void send_notification(const char* data, size_t length, nova_endpoint* endpoint); 128 129 observer_vector observers; 130 131 protected: 132 udp::socket udp_socket; 133 std::mutex udp_mutex; 134 }; 135 136 class sc_scheduled_bundles { 137 public: 138 struct bundle_node : public boost::intrusive::bs_set_base_hook<> { bundle_nodenova::detail::sc_scheduled_bundles::bundle_node139 bundle_node(time_tag const& timeout, const char* data, endpoint_ptr const& endpoint): 140 timeout_(timeout), 141 data_(data), 142 endpoint_(endpoint) {} 143 144 void run(void); 145 146 const time_tag timeout_; 147 const char* const data_; 148 endpoint_ptr endpoint_; 149 operator <(const bundle_node & lhs,const bundle_node & rhs)150 friend bool operator<(const bundle_node& lhs, const bundle_node& rhs) { return priority_order(lhs, rhs); } 151 priority_order(const bundle_node & lhs,const bundle_node & rhs)152 friend bool priority_order(const bundle_node& lhs, const bundle_node& rhs) { 153 return lhs.timeout_ < rhs.timeout_; // lower value, higher priority 154 } 155 }; 156 157 typedef boost::intrusive::treap_multiset<bundle_node> bundle_queue_t; 158 159 void insert_bundle(time_tag const& timeout, const char* data, size_t length, endpoint_ptr const& endpoint); 160 161 void execute_bundles(time_tag const& last, time_tag const& now); 162 clear_bundles(void)163 void clear_bundles(void) { bundle_q.clear_and_dispose(dispose_bundle); } 164 dispose_bundle(bundle_node * node)165 static void dispose_bundle(bundle_node* node) { 166 node->~bundle_node(); 167 rt_pool.free(node); 168 } 169 170 private: 171 bundle_queue_t bundle_q; 172 }; 173 174 class sc_osc_handler : private detail::network_thread, public sc_notify_observers { 175 /* @{ */ 176 /** constructor helpers */ 177 void open_tcp_acceptor(tcp const& protocol, unsigned int port); 178 void open_udp_socket(udp const& protocol, unsigned int port); 179 bool open_socket(int family, int type, int protocol, unsigned int port); 180 /* @} */ 181 182 public: sc_osc_handler(server_arguments const & args)183 sc_osc_handler(server_arguments const& args): 184 sc_notify_observers(detail::network_thread::io_service_), 185 tcp_acceptor_(detail::network_thread::io_service_), 186 tcp_password_(args.server_password.size() ? args.server_password.c_str() : nullptr) { 187 if (!args.non_rt) { 188 if (args.tcp_port && !open_socket(AF_INET, SOCK_STREAM, IPPROTO_TCP, args.tcp_port)) 189 throw std::runtime_error("cannot open socket"); 190 if (args.udp_port && !open_socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, args.udp_port)) 191 throw std::runtime_error("cannot open socket"); 192 } 193 } 194 start_receive_thread(void)195 void start_receive_thread(void) { detail::network_thread::start_receive(); } 196 197 typedef osc::ReceivedPacket ReceivedPacket; 198 typedef osc::ReceivedBundle ReceivedBundle; 199 typedef osc::ReceivedMessage ReceivedMessage; 200 201 class received_packet : public audio_sync_callback { received_packet(const char * dat,size_t length,endpoint_ptr const & endpoint)202 received_packet(const char* dat, size_t length, endpoint_ptr const& endpoint): 203 data(dat), 204 length(length), 205 endpoint_(endpoint) {} 206 operator new(std::size_t size,void * ptr)207 void* operator new(std::size_t size, void* ptr) { return ::operator new(size, ptr); } 208 209 public: 210 static received_packet* alloc_packet(const char* data, size_t length, endpoint_ptr const& remote_endpoint); 211 212 void run(void) override final; 213 214 const char* const data; 215 const size_t length; 216 endpoint_ptr endpoint_; 217 }; 218 219 private: 220 /* @{ */ 221 /** udp socket handling */ 222 void start_receive_udp(); 223 void handle_receive_udp(const boost::system::error_code& error, std::size_t bytes_transferred); 224 /* @} */ 225 226 /* @{ */ 227 /** tcp connection handling */ 228 public: 229 class tcp_connection : public nova_endpoint { 230 public: 231 using pointer = std::shared_ptr<tcp_connection>; 232 #if BOOST_VERSION >= 107000 233 using executor = boost::asio::executor; 234 #else 235 using executor = boost::asio::io_context::executor_type; 236 #endif 237 create(const executor & executor)238 static pointer create(const executor& executor) { return pointer(new tcp_connection(executor)); } 239 socket()240 tcp::socket& socket() { return socket_; } 241 242 void start(sc_osc_handler* self); 243 operator ==(tcp_connection const & rhs) const244 bool operator==(tcp_connection const& rhs) const { return &rhs == this; } 245 246 private: 247 #if BOOST_VERSION >= 107000 tcp_connection(const executor & executor)248 tcp_connection(const executor& executor): socket_(executor) {} 249 #else 250 tcp_connection(const executor& executor): socket_(executor.context()) {} 251 #endif 252 253 void send(const char* data, size_t length) override final; 254 255 void async_read_msg_size(); 256 void handle_message_size(); 257 void handle_message(); 258 259 tcp::socket socket_; 260 sc_osc_handler* osc_handler; 261 boost::endian::big_int32_t msg_size_; 262 std::vector<char> msg_buffer_; 263 std::mutex socket_mutex_; 264 }; 265 266 private: 267 void start_tcp_accept(void); 268 void handle_tcp_accept(tcp_connection::pointer new_connection, const boost::system::error_code& error); 269 /* @} */ 270 271 public: dumpOSC(int i)272 void dumpOSC(int i) { dump_osc_packets = i; } 273 274 private: 275 int dump_osc_packets = 0; 276 277 /* @{ */ 278 public: 279 /** \todo how to handle temporary message error suppression? */ 280 set_error_posting(int val)281 void set_error_posting(int val) { error_posting = val; } 282 283 private: 284 int error_posting = 1; 285 /* @} */ 286 287 /* @{ */ 288 /** packet handling */ 289 public: 290 void handle_packet_async(const char* data, size_t length, endpoint_ptr const& endpoint); 291 void handle_packet(const char* data, size_t length, endpoint_ptr const& endpoint); 292 time_tag handle_bundle_nrt(const char* data_, std::size_t length); 293 294 private: 295 template <bool realtime> void handle_bundle(ReceivedBundle const& bundle, endpoint_ptr const& endpoint); 296 template <bool realtime> 297 void handle_message(ReceivedMessage const& message, size_t msg_size, endpoint_ptr const& endpoint); 298 template <bool realtime> 299 void handle_message_int_address(ReceivedMessage const& message, size_t msg_size, endpoint_ptr const& endpoint); 300 template <bool realtime> 301 void handle_message_sym_address(ReceivedMessage const& message, size_t msg_size, endpoint_ptr const& endpoint); 302 303 friend struct sc_scheduled_bundles::bundle_node; 304 /* @} */ 305 306 /* @{ */ 307 /** bundle scheduling */ 308 public: clear_scheduled_bundles(void)309 void clear_scheduled_bundles(void) { scheduled_bundles.clear_bundles(); } 310 execute_scheduled_bundles(void)311 void execute_scheduled_bundles(void) { scheduled_bundles.execute_bundles(last, now); } 312 increment_logical_time(time_tag const & diff)313 void increment_logical_time(time_tag const& diff) { 314 last = now; 315 now += diff; 316 } 317 set_last_now(time_tag const & lasts,time_tag const & nows)318 void set_last_now(time_tag const& lasts, time_tag const& nows) { 319 now = nows; 320 last = lasts; 321 } 322 add_last_now(time_tag const & add)323 void add_last_now(time_tag const& add) { 324 now += add; 325 last += add; 326 } 327 update_time_from_system(void)328 void update_time_from_system(void) { 329 now = time_tag::from_ptime(boost::date_time::microsec_clock<boost::posix_time::ptime>::universal_time()); 330 last = now - time_per_tick; 331 } 332 current_time(void) const333 time_tag const& current_time(void) const { return now; } 334 335 sc_scheduled_bundles scheduled_bundles; 336 time_tag now, last; 337 time_tag time_per_tick; 338 /* @} */ 339 340 void do_asynchronous_command(World* world, void* replyAddr, const char* cmdName, void* cmdData, AsyncStageFn stage2, 341 AsyncStageFn stage3, AsyncStageFn stage4, AsyncFreeFn cleanup, int completionMsgSize, 342 void* completionMsgData) const; 343 344 void send_message_from_RT(const World* world, FifoMsg& msg) const; 345 346 void send_message_to_RT(const World* world, FifoMsg& msg) const; 347 348 bool quit_received = false; 349 350 private: 351 /* @{ */ 352 udp::endpoint udp_remote_endpoint_; 353 354 tcp::acceptor tcp_acceptor_; 355 const char* tcp_password_; /* we are not owning this! */ 356 357 std::array<char, 1 << 15> recv_buffer_; 358 /* @} */ 359 }; 360 361 } /* namespace detail */ 362 363 using detail::sc_osc_handler; 364 365 } /* namespace nova */ 366