1 //*****************************************************************
2 /*
3 JackTrip: A System for High-Quality Audio Network Performance
4 over the Internet
5
6 Copyright (c) 2008 Juan-Pablo Caceres, Chris Chafe.
7 SoundWIRE group at CCRMA, Stanford University.
8
9 Permission is hereby granted, free of charge, to any person
10 obtaining a copy of this software and associated documentation
11 files (the "Software"), to deal in the Software without
12 restriction, including without limitation the rights to use,
13 copy, modify, merge, publish, distribute, sublicense, and/or sell
14 copies of the Software, and to permit persons to whom the
15 Software is furnished to do so, subject to the following
16 conditions:
17
18 The above copyright notice and this permission notice shall be
19 included in all copies or substantial portions of the Software.
20
21 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
22 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
23 OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
24 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
25 HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
26 WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
27 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
28 OTHER DEALINGS IN THE SOFTWARE.
29 */
30 //*****************************************************************
31
32 /**
33 * \file JackTrip.cpp
34 * \author Juan-Pablo Caceres
35 * \date July 2008
36 */
37
38 #include "JackTrip.h"
39 #include "UdpDataProtocol.h"
40 #include "RingBufferWavetable.h"
41 #include "JitterBuffer.h"
42 #include "jacktrip_globals.h"
43 #include "JackAudioInterface.h"
44 #ifdef __RT_AUDIO__
45 #include "RtAudioInterface.h"
46 #endif
47
48 #include <iostream>
49 #include <cstdlib>
50 #include <stdexcept>
51
52 #include <QHostAddress>
53 #include <QHostInfo>
54 #include <QThread>
55 #include <QTimer>
56 #include <QDateTime>
57
58 using std::cout; using std::endl;
59
60 //the following function has to remain outside the Jacktrip class definition
61 //its purpose is to close the app when control c is hit by the user in rtaudio/asio4all mode
62 /*if defined __WIN_32__
63 void sigint_handler(int sig)
64 {
65 exit(0);
66 }
67 #endif*/
68
69 bool JackTrip::sSigInt = false;
70 bool JackTrip::sJackStopped = false;
71
72 //*******************************************************************************
JackTrip(jacktripModeT JacktripMode,dataProtocolT DataProtocolType,int NumChans,int NumNetRevChans,int BufferQueueLength,unsigned int redundancy,AudioInterface::audioBitResolutionT AudioBitResolution,DataProtocol::packetHeaderTypeT PacketHeaderType,underrunModeT UnderRunMode,int receiver_bind_port,int sender_bind_port,int receiver_peer_port,int sender_peer_port,int tcp_peer_port)73 JackTrip::JackTrip(jacktripModeT JacktripMode,
74 dataProtocolT DataProtocolType,
75 int NumChans,
76 #ifdef WAIR // WAIR
77 int NumNetRevChans,
78 #endif // endwhere
79 int BufferQueueLength,
80 unsigned int redundancy,
81 AudioInterface::audioBitResolutionT AudioBitResolution,
82 DataProtocol::packetHeaderTypeT PacketHeaderType,
83 underrunModeT UnderRunMode,
84 int receiver_bind_port, int sender_bind_port,
85 int receiver_peer_port, int sender_peer_port, int tcp_peer_port) :
86 mJackTripMode(JacktripMode),
87 mDataProtocol(DataProtocolType),
88 mPacketHeaderType(PacketHeaderType),
89 mAudiointerfaceMode(JackTrip::JACK),
90 mNumChans(NumChans),
91 #ifdef WAIR // WAIR
92 mNumNetRevChans(NumNetRevChans),
93 #endif // endwhere
94 mBufferQueueLength(BufferQueueLength),
95 mBufferStrategy(1),
96 mBroadcastQueueLength(0),
97 mSampleRate(gDefaultSampleRate),
98 mDeviceID(gDefaultDeviceID),
99 mAudioBufferSize(gDefaultBufferSizeInSamples),
100 mAudioBitResolution(AudioBitResolution),
101 mLoopBack(false),
102 mDataProtocolSender(NULL),
103 mDataProtocolReceiver(NULL),
104 mAudioInterface(NULL),
105 mPacketHeader(NULL),
106 mUnderRunMode(UnderRunMode),
107 mStopOnTimeout(false),
108 mSendRingBuffer(NULL),
109 mReceiveRingBuffer(NULL),
110 mReceiverBindPort(receiver_bind_port),
111 mSenderPeerPort(sender_peer_port),
112 mSenderBindPort(sender_bind_port),
113 mReceiverPeerPort(receiver_peer_port),
114 mTcpServerPort(tcp_peer_port),
115 mRedundancy(redundancy),
116 mJackClientName(gJackDefaultClientName),
117 mConnectionMode(JackTrip::NORMAL),
118 mTimeoutTimer(this),
119 mSleepTime(100),
120 mElapsedTime(0),
121 mEndTime(0),
122 mTcpClient(this),
123 mUdpSockTemp(this),
124 mReceivedConnection(false),
125 mTcpConnectionError(false),
126 mStopped(false),
127 mHasShutdown(false),
128 mConnectDefaultAudioPorts(true),
129 mIOStatTimeout(0),
130 mIOStatLogStream(std::cout.rdbuf()),
131 mSimulatedLossRate(0.0),
132 mSimulatedJitterRate(0.0),
133 mSimulatedDelayRel(0.0),
134 mUseRtUdpPriority(false),
135 mAudioTesterP(nullptr)
136 {
137 createHeader(mPacketHeaderType);
138 sJackStopped = false;
139 }
140
141
142 //*******************************************************************************
~JackTrip()143 JackTrip::~JackTrip()
144 {
145 //wait();
146 delete mDataProtocolSender;
147 delete mDataProtocolReceiver;
148 delete mAudioInterface;
149 delete mPacketHeader;
150 delete mSendRingBuffer;
151 delete mReceiveRingBuffer;
152 }
153
154
155 //*******************************************************************************
setupAudio(int ID)156 void JackTrip::setupAudio(
157 #ifdef WAIRTOHUB // WAIR
158 __attribute__((unused)) int ID
159 #endif // endwhere
160 )
161 {
162 // Check if mAudioInterface has already been created or not
163 if (mAudioInterface != NULL) { // if it has been created, disconnet it from JACK and delete it
164 cout << "WARINING: JackAudio interface was setup already:" << endl;
165 cout << "It will be erased and setup again." << endl;
166 cout << gPrintSeparator << endl;
167 closeAudio();
168 }
169
170 // Create AudioInterface Client Object
171 if ( mAudiointerfaceMode == JackTrip::JACK ) {
172 #ifndef __NO_JACK__
173 if (gVerboseFlag) std::cout << " JackTrip:setupAudio before new JackAudioInterface" << std::endl;
174 mAudioInterface = new JackAudioInterface(this, mNumChans, mNumChans,
175 #ifdef WAIR // wair
176 mNumNetRevChans,
177 #endif // endwhere
178 mAudioBitResolution);
179
180 #ifdef WAIRTOHUB // WAIR
181 QString VARIABLE_AUDIO_NAME = WAIR_AUDIO_NAME; // legacy for WAIR
182 //Set our Jack client name if we're a hub server or a custom name hasn't been set
183 if (!mPeerAddress.isEmpty() && (mJackClientName.constData() == gJackDefaultClientName.constData())) {
184 mJackClientName = QString(mPeerAddress).replace(":", "_");
185 }
186 //std::cout << "WAIR ID " << ID << " jacktrip client name set to=" <<
187 // mJackClientName.toStdString() << std::endl;
188
189 #endif // endwhere
190 mAudioInterface->setClientName(mJackClientName);
191 if (0 < mBroadcastQueueLength) {
192 mAudioInterface->enableBroadcastOutput();
193 }
194
195 if (gVerboseFlag) std::cout << " JackTrip:setupAudio before mAudioInterface->setup" << std::endl;
196 mAudioInterface->setup();
197 if (gVerboseFlag) std::cout << " JackTrip:setupAudio before mAudioInterface->getSampleRate" << std::endl;
198 mSampleRate = mAudioInterface->getSampleRate();
199 if (gVerboseFlag) std::cout << " JackTrip:setupAudio before mAudioInterface->getDeviceID" << std::endl;
200 mDeviceID = mAudioInterface->getDeviceID();
201 if (gVerboseFlag) std::cout << " JackTrip:setupAudio before mAudioInterface->getBufferSizeInSamples" << std::endl;
202 mAudioBufferSize = mAudioInterface->getBufferSizeInSamples();
203 #endif //__NON_JACK__
204 #ifdef __NO_JACK__ /// \todo FIX THIS REPETITION OF CODE
205 #ifdef __RT_AUDIO__
206 cout << "Warning: using non jack version, RtAudio will be used instead" << endl;
207 mAudioInterface = new RtAudioInterface(this, mNumChans, mNumChans, mAudioBitResolution);
208 mAudioInterface->setSampleRate(mSampleRate);
209 mAudioInterface->setDeviceID(mDeviceID);
210 mAudioInterface->setBufferSizeInSamples(mAudioBufferSize);
211 mAudioInterface->setup();
212 #endif
213 #endif
214 }
215 else if ( mAudiointerfaceMode == JackTrip::RTAUDIO ) {
216 #ifdef __RT_AUDIO__
217 mAudioInterface = new RtAudioInterface(this, mNumChans, mNumChans, mAudioBitResolution);
218 mAudioInterface->setSampleRate(mSampleRate);
219 mAudioInterface->setDeviceID(mDeviceID);
220 mAudioInterface->setBufferSizeInSamples(mAudioBufferSize);
221 mAudioInterface->setup();
222 #endif
223 }
224
225 mAudioInterface->setLoopBack(mLoopBack);
226 if (mAudioTesterP) { // if we're a hub server, this will be a nullptr - MAJOR REFACTOR NEEDED, in my opinion
227 mAudioTesterP->setSampleRate(mSampleRate);
228 }
229 mAudioInterface->setAudioTesterP(mAudioTesterP);
230
231 std::cout << "The Sampling Rate is: " << mSampleRate << std::endl;
232 std::cout << gPrintSeparator << std::endl;
233 int AudioBufferSizeInBytes = mAudioBufferSize*sizeof(sample_t);
234 std::cout << "The Audio Buffer Size is: " << mAudioBufferSize << " samples" << std::endl;
235 std::cout << " or: " << AudioBufferSizeInBytes
236 << " bytes" << std::endl;
237 if (0 < mBroadcastQueueLength) {
238 std::cout << gPrintSeparator << std::endl;
239 cout << "Broadcast Output is enabled, delay = "
240 << mBroadcastQueueLength * mAudioBufferSize * 1000 / mSampleRate << " ms"
241 << " (" << mBroadcastQueueLength * mAudioBufferSize << " samples)" << endl;
242 }
243 std::cout << gPrintSeparator << std::endl;
244 cout << "The Number of Channels is: " << mAudioInterface->getNumInputChannels() << endl;
245 std::cout << gPrintSeparator << std::endl;
246 QThread::usleep(100);
247 }
248
249
250 //*******************************************************************************
closeAudio()251 void JackTrip::closeAudio()
252 {
253 //mAudioInterface->close();
254 if ( mAudioInterface != NULL ) {
255 mAudioInterface->stopProcess();
256 delete mAudioInterface;
257 mAudioInterface = NULL;
258 }
259 }
260
261
262 //*******************************************************************************
setupDataProtocol()263 void JackTrip::setupDataProtocol()
264 {
265 double simulated_max_delay = mSimulatedDelayRel * getBufferSizeInSamples() / getSampleRate();
266 // Create DataProtocol Objects
267 switch (mDataProtocol) {
268 case UDP:
269 std::cout << "Using UDP Protocol" << std::endl;
270 QThread::usleep(100);
271 mDataProtocolSender = new UdpDataProtocol(this, DataProtocol::SENDER,
272 //mSenderPeerPort, mSenderBindPort,
273 mSenderBindPort, mSenderPeerPort,
274 mRedundancy);
275 mDataProtocolReceiver = new UdpDataProtocol(this, DataProtocol::RECEIVER,
276 mReceiverBindPort, mReceiverPeerPort,
277 mRedundancy);
278 if (0.0 < mSimulatedLossRate || 0.0 < mSimulatedJitterRate || 0.0 < simulated_max_delay) {
279 mDataProtocolReceiver->setIssueSimulation(mSimulatedLossRate, mSimulatedJitterRate, simulated_max_delay);
280 }
281 mDataProtocolSender->setUseRtPriority(mUseRtUdpPriority);
282 mDataProtocolReceiver->setUseRtPriority(mUseRtUdpPriority);
283 if (mUseRtUdpPriority) {
284 cout << "Using RT thread priority for UDP data" << endl;
285 }
286 std::cout << gPrintSeparator << std::endl;
287 break;
288 case TCP:
289 throw std::invalid_argument("TCP Protocol is not implemented");
290 break;
291 case SCTP:
292 throw std::invalid_argument("SCTP Protocol is not implemented");
293 break;
294 default:
295 throw std::invalid_argument("Protocol not defined or unimplemented");
296 break;
297 }
298
299 // Set Audio Packet Size
300 //mDataProtocolSender->setAudioPacketSize
301 // (mAudioInterface->getSizeInBytesPerChannel() * mNumChans);
302 //mDataProtocolReceiver->setAudioPacketSize
303 // (mAudioInterface->getSizeInBytesPerChannel() * mNumChans);
304 mDataProtocolSender->setAudioPacketSize(getTotalAudioPacketSizeInBytes());
305 mDataProtocolReceiver->setAudioPacketSize(getTotalAudioPacketSizeInBytes());
306 }
307
308
309 //*******************************************************************************
setupRingBuffers()310 void JackTrip::setupRingBuffers()
311 {
312 // Create RingBuffers with the apprioprate size
313 /// \todo Make all this operations cleaner
314 //int total_audio_packet_size = getTotalAudioPacketSizeInBytes();
315 int slot_size = getRingBuffersSlotSize();
316 if (0 <= mBufferStrategy) {
317 mUnderRunMode = ZEROS;
318 }
319 else if (0 > mBufferQueueLength) {
320 throw std::invalid_argument("Auto queue is not supported by RingBuffer");
321 }
322
323 switch (mUnderRunMode) {
324 case WAVETABLE:
325 mSendRingBuffer = new RingBufferWavetable(slot_size,
326 gDefaultOutputQueueLength);
327 mReceiveRingBuffer = new RingBufferWavetable(slot_size,
328 mBufferQueueLength);
329 /*
330 mSendRingBuffer = new RingBufferWavetable(mAudioInterface->getSizeInBytesPerChannel() * mNumChans,
331 gDefaultOutputQueueLength);
332 mReceiveRingBuffer = new RingBufferWavetable(mAudioInterface->getSizeInBytesPerChannel() * mNumChans,
333 mBufferQueueLength);
334 */
335 break;
336 case ZEROS:
337 mSendRingBuffer = new RingBuffer(slot_size,
338 gDefaultOutputQueueLength);
339 if (0 > mBufferStrategy) {
340 mReceiveRingBuffer = new RingBuffer(slot_size,
341 mBufferQueueLength);
342 }
343 else {
344 cout << "Using JitterBuffer strategy " << mBufferStrategy << endl;
345 if (0 > mBufferQueueLength) {
346 cout << "Using AutoQueue 1/" << -mBufferQueueLength << endl;
347 }
348 mReceiveRingBuffer = new JitterBuffer(mAudioBufferSize, mBufferQueueLength,
349 mSampleRate, mBufferStrategy,
350 mBroadcastQueueLength, mNumChans, mAudioBitResolution);
351 }
352 /*
353 mSendRingBuffer = new RingBuffer(mAudioInterface->getSizeInBytesPerChannel() * mNumChans,
354 gDefaultOutputQueueLength);
355 mReceiveRingBuffer = new RingBuffer(mAudioInterface->getSizeInBytesPerChannel() * mNumChans,
356 mBufferQueueLength);
357 */
358 break;
359 default:
360 throw std::invalid_argument("Underrun Mode undefined");
361 break;
362 }
363 }
364
365
366 //*******************************************************************************
setPeerAddress(QString PeerHostOrIP)367 void JackTrip::setPeerAddress(QString PeerHostOrIP)
368 {
369 mPeerAddress = PeerHostOrIP;
370 }
371
372
373 //*******************************************************************************
appendProcessPluginToNetwork(ProcessPlugin * plugin)374 void JackTrip::appendProcessPluginToNetwork(ProcessPlugin* plugin)
375 {
376 if (plugin) {
377 mProcessPluginsToNetwork.append(plugin); // ownership transferred
378 //mAudioInterface->appendProcessPluginToNetwork(plugin);
379 }
380 }
381
382 //*******************************************************************************
appendProcessPluginFromNetwork(ProcessPlugin * plugin)383 void JackTrip::appendProcessPluginFromNetwork(ProcessPlugin* plugin)
384 {
385 if (plugin) {
386 mProcessPluginsFromNetwork.append(plugin); // ownership transferred
387 //mAudioInterface->appendProcessPluginFromNetwork(plugin);
388 }
389 }
390
391
392 //*******************************************************************************
startProcess(int ID)393 void JackTrip::startProcess(
394 #ifdef WAIRTOHUB // WAIR
395 int ID
396 #endif // endwhere
397 )
398 { //signal that catches ctrl c in rtaudio-asio mode
399 /*#if defined (__WIN_32__)
400 if (signal(SIGINT, sigint_handler) == SIG_ERR) {
401 perror("signal");
402 exit(1);
403 }
404 #endif*/
405 // Check if ports are already binded by another process on this machine
406 // ------------------------------------------------------------------
407 if (gVerboseFlag) std::cout << "step 1" << std::endl;
408
409 if (gVerboseFlag) std::cout << " JackTrip:startProcess before checkIfPortIsBinded(mReceiverBindPort)" << std::endl;
410 #if defined __WIN_32__
411 //cc fixed windows crash with this print statement!
412 //qDebug() << "before mJackTrip->startProcess" << mReceiverBindPort<< mSenderBindPort;
413 #endif
414 checkIfPortIsBinded(mReceiverBindPort);
415 if (gVerboseFlag) std::cout << " JackTrip:startProcess before checkIfPortIsBinded(mSenderBindPort)" << std::endl;
416 checkIfPortIsBinded(mSenderBindPort);
417 // Set all classes and parameters
418 // ------------------------------
419 if (gVerboseFlag) std::cout << " JackTrip:startProcess before setupAudio" << std::endl;
420 setupAudio(
421 #ifdef WAIRTOHUB // wair
422 ID
423 #endif // endwhere
424 );
425 //cc redundant with instance creator createHeader(mPacketHeaderType); next line fixme
426 createHeader(mPacketHeaderType);
427 setupDataProtocol();
428 setupRingBuffers();
429 // Connect Signals and Slots
430 // -------------------------
431 QObject::connect(mPacketHeader, &PacketHeader::signalError,
432 this, &JackTrip::slotStopProcessesDueToError, Qt::QueuedConnection);
433 QObject::connect(mDataProtocolReceiver, SIGNAL(signalReceivedConnectionFromPeer()),
434 this, SLOT(slotReceivedConnectionFromPeer()),
435 Qt::QueuedConnection);
436 //QObject::connect(this, SIGNAL(signalUdpTimeOut()),
437 // this, SLOT(slotStopProcesses()), Qt::QueuedConnection);
438 QObject::connect((UdpDataProtocol *)mDataProtocolReceiver, &UdpDataProtocol::signalUdpWaitingTooLong, this,
439 &JackTrip::slotUdpWaitingTooLong, Qt::QueuedConnection);
440 QObject::connect(mDataProtocolSender, &DataProtocol::signalCeaseTransmission,
441 this, &JackTrip::slotStopProcessesDueToError, Qt::QueuedConnection);
442 QObject::connect(mDataProtocolReceiver, &DataProtocol::signalCeaseTransmission,
443 this, &JackTrip::slotStopProcessesDueToError, Qt::QueuedConnection);
444
445 //QObject::connect(mDataProtocolSender, SIGNAL(signalError(const char*)),
446 // this, SLOT(slotStopProcesses()), Qt::QueuedConnection);
447 QObject::connect(mDataProtocolReceiver, SIGNAL(signalError(const char*)),
448 this, SLOT(slotStopProcesses()), Qt::QueuedConnection);
449
450 // Start the threads for the specific mode
451 // ---------------------------------------
452 switch ( mJackTripMode )
453 {
454 case CLIENT :
455 if (gVerboseFlag) std::cout << "step 2c client only" << std::endl;
456 if (gVerboseFlag) std::cout << " JackTrip:startProcess case CLIENT before clientStart" << std::endl;
457 clientStart();
458 break;
459 case SERVER :
460 if (gVerboseFlag) std::cout << "step 2s server only" << std::endl;
461 if (gVerboseFlag) std::cout << " JackTrip:startProcess case SERVER before serverStart" << std::endl;
462 serverStart();
463 break;
464 case CLIENTTOPINGSERVER :
465 if (gVerboseFlag) std::cout << "step 2C client only" << std::endl;
466 if (gVerboseFlag) std::cout << " JackTrip:startProcess case CLIENTTOPINGSERVER before clientPingToServerStart" << std::endl;
467 if ( clientPingToServerStart() == -1 ) { // if error on server start (-1) we return inmediatly
468 stop("Peer Address has to be set if you run in CLIENTTOPINGSERVER mode");
469 return;
470 }
471 break;
472 case SERVERPINGSERVER :
473 if (gVerboseFlag) std::cout << "step 2S server only (same as 2s)" << std::endl;
474 if (gVerboseFlag) std::cout << " JackTrip:startProcess case SERVERPINGSERVER before serverStart" << std::endl;
475 if ( serverStart(true) == -1 ) { // if error on server start (-1) we return inmediatly
476 stop();
477 return;
478 }
479 break;
480 default:
481 throw std::invalid_argument("Jacktrip Mode undefined");
482 break;
483 }
484 }
485
completeConnection()486 void JackTrip::completeConnection()
487 {
488 // Have the threads share a single socket that operates at full duplex.
489 #if defined (__WIN_32__)
490 SOCKET sock_fd = INVALID_SOCKET;
491 #else
492 int sock_fd = -1;
493 #endif
494 mDataProtocolReceiver->setSocket(sock_fd);
495 mDataProtocolSender->setSocket(sock_fd);
496
497 // Start Threads
498 if (gVerboseFlag) std::cout << " JackTrip:startProcess before mDataProtocolReceiver->start" << std::endl;
499 mDataProtocolReceiver->start();
500 QThread::msleep(1);
501 if (gVerboseFlag) std::cout << " JackTrip:startProcess before mDataProtocolSender->start" << std::endl;
502 mDataProtocolSender->start();
503 /*
504 * changed order so that audio starts after receiver and sender
505 * because UdpDataProtocol:run0 before setRealtimeProcessPriority()
506 * causes an audio hiccup from jack JackPosixSemaphore::TimedWait err = Interrupted system call
507 * new QThread::msleep(1);
508 * to allow sender to start
509 */
510 QThread::msleep(1);
511 if (gVerboseFlag) std::cout << "step 5" << std::endl;
512 if (gVerboseFlag) std::cout << " JackTrip:startProcess before mAudioInterface->startProcess" << std::endl;
513 for (int i = 0; i < mProcessPluginsFromNetwork.size(); ++i) {
514 mAudioInterface->appendProcessPluginFromNetwork(mProcessPluginsFromNetwork[i]);
515 }
516 for (int i = 0; i < mProcessPluginsToNetwork.size(); ++i) {
517 mAudioInterface->appendProcessPluginToNetwork(mProcessPluginsToNetwork[i]);
518 }
519 mAudioInterface->initPlugins(); // mSampleRate known now, which plugins require
520 mAudioInterface->startProcess(); // Tell JACK server we are ready for audio flow now
521
522 if (mConnectDefaultAudioPorts) { mAudioInterface->connectDefaultPorts(); }
523
524 //Start our IO stat timer
525 if (mIOStatTimeout > 0) {
526 cout << "STATS" << mIOStatTimeout << endl;
527 if (!mIOStatStream.isNull()) {
528 mIOStatLogStream.rdbuf(((std::ostream *)mIOStatStream.data())->rdbuf());
529 }
530 QTimer *timer = new QTimer(this);
531 connect(timer, SIGNAL(timeout()), this, SLOT(onStatTimer()));
532 timer->start(mIOStatTimeout*1000);
533 }
534 }
535
536 //*******************************************************************************
onStatTimer()537 void JackTrip::onStatTimer()
538 {
539 DataProtocol::PktStat pkt_stat;
540 if (!mDataProtocolReceiver->getStats(&pkt_stat)) {
541 return;
542 }
543 bool reset = (0 == pkt_stat.statCount);
544 RingBuffer::IOStat recv_io_stat;
545 if (!mReceiveRingBuffer->getStats(&recv_io_stat, reset)) {
546 return;
547 }
548 RingBuffer::IOStat send_io_stat;
549 if (!mSendRingBuffer->getStats(&send_io_stat, reset)) {
550 return;
551 }
552 QString now = QDateTime::currentDateTime().toString(Qt::ISODate);
553
554 static QMutex mutex;
555 QMutexLocker locker(&mutex);
556 if (mAudioTesterP && mAudioTesterP->getEnabled()) {
557 mIOStatLogStream << "\n";
558 }
559 mIOStatLogStream << now.toLocal8Bit().constData()
560 << " " << getPeerAddress().toLocal8Bit().constData()
561 << " send: "
562 << send_io_stat.underruns
563 << "/" << send_io_stat.overflows
564 << " recv: "
565 << recv_io_stat.underruns
566 << "/" << recv_io_stat.overflows
567 << " prot: "
568 << pkt_stat.lost
569 << "/" << pkt_stat.outOfOrder
570 << "/" << pkt_stat.revived
571 << " tot: "
572 << pkt_stat.tot
573 << " sync: "
574 << recv_io_stat.level
575 << "/" << recv_io_stat.buf_inc_underrun
576 << "/" << recv_io_stat.buf_inc_compensate
577 << "/" << recv_io_stat.buf_dec_overflows
578 << "/" << recv_io_stat.buf_dec_pktloss
579 << " skew: " << recv_io_stat.skew
580 << "/" << recv_io_stat.skew_raw
581 << " bcast: " << recv_io_stat.broadcast_skew
582 << "/" << recv_io_stat.broadcast_delta
583 << " autoq: " << 0.1*recv_io_stat.autoq_corr
584 << "/" << 0.1*recv_io_stat.autoq_rate
585 << endl;
586 }
587
receivedConnectionTCP()588 void JackTrip::receivedConnectionTCP()
589 {
590 mTimeoutTimer.stop();
591 if (gVerboseFlag) cout << "TCP Socket Connected to Server!" << endl;
592 emit signalTcpClientConnected();
593
594 // Send Client Port Number to Server
595 // ---------------------------------
596 char port_buf[sizeof(mReceiverBindPort) + gMaxRemoteNameLength];
597 std::memcpy(port_buf, &mReceiverBindPort, sizeof(mReceiverBindPort));
598 std::memset(port_buf + sizeof(mReceiverBindPort), 0, gMaxRemoteNameLength);
599 if (!mRemoteClientName.isEmpty()) {
600 //If our remote client name is set, send it too.
601 QByteArray name = mRemoteClientName.toUtf8();
602 // Find a clean place to truncate if we're over length.
603 // (Make sure we're not in the middle of a multi-byte characetr.)
604 int length = name.length();
605 //Need to take the final null terminator into account here.
606 if (length > gMaxRemoteNameLength - 1) {
607 length = gMaxRemoteNameLength - 1;
608 while ((length > 0) && ((name.at(length) & 0xc0) == 0x80)) {
609 //We're in the middle of a multi-byte character. Work back.
610 length--;
611 }
612 }
613 name.truncate(length);
614 std::memcpy(port_buf + sizeof(mReceiverBindPort), name.data(), length + 1);
615 }
616
617 mTcpClient.write(port_buf, sizeof(port_buf));
618 /*while ( mTcpClient.bytesToWrite() > 0 ) {
619 mTcpClient.waitForBytesWritten(-1);
620 }*/
621 if (gVerboseFlag) cout << "Port " << mReceiverBindPort << " sent to Server" << endl;
622 //Continued in receivedDataTCP slot
623 }
624
receivedDataTCP()625 void JackTrip::receivedDataTCP()
626 {
627 if (mTcpClient.bytesAvailable() < (int)sizeof(uint16_t)) {
628 return;
629 }
630
631 // Read the size of the package
632 // ----------------------------
633 if (gVerboseFlag) cout << "Reading UDP port from Server..." << endl;
634 if (gVerboseFlag) cout << "Ready To Read From Socket!" << endl;
635
636 // Read UDP Port Number from Server
637 // --------------------------------
638 uint32_t udp_port;
639 int size = sizeof(udp_port);
640 char port_buf[sizeof(mReceiverBindPort)];
641 //char port_buf[size];
642 mTcpClient.read(port_buf, size);
643 std::memcpy(&udp_port, port_buf, size);
644 //cout << "Received UDP Port Number: " << udp_port << endl;
645
646 // Close the TCP Socket
647 // --------------------
648 mTcpClient.close(); // Close the socket
649 //cout << "TCP Socket Closed!" << endl;
650 if (gVerboseFlag) cout << "Connection Succesfull!" << endl;
651
652 // Set with the received UDP port
653 // ------------------------------
654 setPeerPorts(udp_port);
655 mDataProtocolReceiver->setPeerAddress( mPeerAddress.toLatin1().data() );
656 mDataProtocolSender->setPeerAddress( mPeerAddress.toLatin1().data() );
657 mDataProtocolSender->setPeerPort(udp_port);
658 mDataProtocolReceiver->setPeerPort(udp_port);
659 cout << "Server Address set to: " << mPeerAddress.toStdString() << " Port: " << udp_port << std::endl;
660 cout << gPrintSeparator << endl;
661 completeConnection();
662 }
663
receivedDataUDP()664 void JackTrip::receivedDataUDP()
665 {
666 //Stop our timer.
667 mTimeoutTimer.stop();
668
669 QHostAddress peerHostAddress;
670 uint16_t peer_port;
671
672 // IPv6 addition from fyfe
673 // Get the datagram size to avoid problems with IPv6
674 qint64 datagramSize = mUdpSockTemp.pendingDatagramSize();
675 char buf[datagramSize];
676 // set client address
677 mUdpSockTemp.readDatagram(buf, datagramSize, &peerHostAddress, &peer_port);
678 mUdpSockTemp.close(); // close the socket
679
680 // Check for mapped IPv4->IPv6 addresses that look like ::ffff:x.x.x.x
681 if (peerHostAddress.protocol() == QAbstractSocket::IPv6Protocol) {
682 bool mappedIPv4;
683 uint32_t address = peerHostAddress.toIPv4Address(&mappedIPv4);
684 // If the IPv4 address is mapped to IPv6, convert it to IPv4
685 if (mappedIPv4) {
686 QHostAddress ipv4Address = QHostAddress(address);
687 mPeerAddress = ipv4Address.toString();
688 } else {
689 mPeerAddress = peerHostAddress.toString();
690 }
691 }
692 else {
693 mPeerAddress = peerHostAddress.toString();
694 }
695
696 // Set the peer address to send packets (in the protocol sender)
697 if (gVerboseFlag) std::cout << "JackTrip:serverStart before mDataProtocolSender->setPeerAddress()" << std::endl;
698 mDataProtocolSender->setPeerAddress( mPeerAddress.toLatin1().constData() );
699 if (gVerboseFlag) std::cout << "JackTrip:serverStart before mDataProtocolReceiver->setPeerAddress()" << std::endl;
700 mDataProtocolReceiver->setPeerAddress( mPeerAddress.toLatin1().constData() );
701 // We reply to the same port the peer sent the packets from
702 // This way we can go through NAT
703 // Because of the NAT traversal scheme, the portn need to be
704 // "symetric", e.g.:
705 // from Client to Server : src = 4474, dest = 4464
706 // from Server to Client : src = 4464, dest = 4474
707 // no -- all are the same -- 4464
708 if (gVerboseFlag) std::cout << "JackTrip:serverStart before setting all peer_port instances to " << peer_port << std::endl;
709 mDataProtocolSender->setPeerPort(peer_port);
710 mDataProtocolReceiver->setPeerPort(peer_port);
711 setPeerPorts(peer_port);
712 completeConnection();
713 }
714
udpTimerTick()715 void JackTrip::udpTimerTick()
716 {
717 if (mStopped || sSigInt || sJackStopped) {
718 //Stop everything.
719 mUdpSockTemp.close();
720 mTimeoutTimer.stop();
721 stop();
722 }
723
724 if (gVerboseFlag) std::cout << mSleepTime << "ms " << std::flush;
725 mElapsedTime += mSleepTime;
726 if (mEndTime > 0 && mElapsedTime >= mEndTime) {
727 mUdpSockTemp.close();
728 mTimeoutTimer.stop();
729 cout << "JackTrip Server Timed Out!" << endl;
730 stop("JackTrip Server Timed Out");
731 }
732 }
733
tcpTimerTick()734 void JackTrip::tcpTimerTick()
735 {
736 if (mStopped || sSigInt || sJackStopped) {
737 //Stop everything.
738 mTcpClient.close();
739 mTimeoutTimer.stop();
740 stop();
741 }
742
743 mElapsedTime += mSleepTime;
744 if (mEndTime > 0 && mElapsedTime >= mEndTime) {
745 mTcpClient.close();
746 mTimeoutTimer.stop();
747 cout << "JackTrip Server Timed Out!" << endl;
748 stop("Initial TCP Connection Timed Out");
749 }
750
751 }
752
753 //*******************************************************************************
stop(QString errorMessage)754 void JackTrip::stop(QString errorMessage)
755 {
756 mStopped = true;
757 //Make sure we're only run once
758 if (mHasShutdown) {
759 return;
760 }
761 mHasShutdown = true;
762 std::cout << "Stopping JackTrip..." << std::endl;
763
764 // Stop The Sender
765 mDataProtocolSender->stop();
766 mDataProtocolSender->wait();
767
768 // Stop The Receiver
769 mDataProtocolReceiver->stop();
770 mDataProtocolReceiver->wait();
771
772 // Stop the audio processes
773 //mAudioInterface->stopProcess();
774 closeAudio();
775
776 cout << "JackTrip Processes STOPPED!" << endl;
777 cout << gPrintSeparator << endl;
778
779 // Emit the jack stopped signal
780 if (sJackStopped) {
781 emit signalError("The Jack Server was shut down!");
782 } else if (errorMessage.isEmpty()) {
783 emit signalProcessesStopped();
784 } else {
785 emit signalError(errorMessage);
786 }
787 }
788
789
790 //*******************************************************************************
waitThreads()791 void JackTrip::waitThreads()
792 {
793 mDataProtocolSender->wait();
794 mDataProtocolReceiver->wait();
795 }
796
797
798 //*******************************************************************************
clientStart()799 void JackTrip::clientStart()
800 {
801 // For the Client mode, the peer (or server) address has to be specified by the user
802 if ( mPeerAddress.isEmpty() ) {
803 throw std::invalid_argument("Peer Address has to be set if you run in CLIENT mode");
804 }
805 else {
806 mDataProtocolSender->setPeerAddress( mPeerAddress.toLatin1().data() );
807 mDataProtocolReceiver->setPeerAddress( mPeerAddress.toLatin1().data() );
808 cout << "Peer Address set to: " << mPeerAddress.toStdString() << std::endl;
809 cout << gPrintSeparator << endl;
810 completeConnection();
811 }
812 }
813
814
815 //*******************************************************************************
serverStart(bool timeout,int udpTimeout)816 int JackTrip::serverStart(bool timeout, int udpTimeout) // udpTimeout unused
817 {
818 // Set the peer address
819 if ( !mPeerAddress.isEmpty() ) {
820 if (gVerboseFlag) std::cout << "WARNING: SERVER mode: Peer Address was set but will be deleted." << endl;
821 //throw std::invalid_argument("Peer Address has to be set if you run in CLIENT mode");
822 mPeerAddress.clear();
823 //return;
824 }
825
826 // Get the client address when it connects
827 if (gVerboseFlag) std::cout << "JackTrip:serverStart before mUdpSockTemp.bind(Any)" << std::endl;
828 // Bind the socket
829 if ( !mUdpSockTemp.bind(QHostAddress::Any, mReceiverBindPort,
830 QUdpSocket::DefaultForPlatform) )
831 {
832 std::cerr << "in JackTrip: Could not bind UDP socket. It may be already binded." << endl;
833 throw std::runtime_error("Could not bind UDP socket. It may be already binded.");
834 }
835 connect(&mUdpSockTemp, &QUdpSocket::readyRead, this, &JackTrip::receivedDataUDP);
836
837 // Start timer and then wait for a signal to read datagrams.
838 mElapsedTime = 0;
839 if (timeout) {
840 mEndTime = udpTimeout;
841 }
842 mTimeoutTimer.setInterval(mSleepTime);
843 connect(&mTimeoutTimer, &QTimer::timeout, this, &JackTrip::udpTimerTick);
844 mTimeoutTimer.start();
845
846 if (gVerboseFlag) std::cout << "JackTrip:serverStart before !UdpSockTemp.hasPendingDatagrams()" << std::endl;
847 cout << "Waiting for Connection From a Client..." << endl;
848 return 0;
849 // Continued in the receivedDataUDP slot.
850
851 // char buf[1];
852 // // set client address
853 // UdpSockTemp.readDatagram(buf, 1, &peerHostAddress, &peer_port);
854 // UdpSockTemp.close(); // close the socket
855 }
856
857
858 //*******************************************************************************
clientPingToServerStart()859 int JackTrip::clientPingToServerStart()
860 {
861 //mConnectionMode = JackTrip::KSTRONG;
862 //mConnectionMode = JackTrip::JAMTEST;
863
864 // Set Peer (server in this case) address
865 // --------------------------------------
866 // For the Client mode, the peer (or server) address has to be specified by the user
867 if ( mPeerAddress.isEmpty() ) {
868 throw std::invalid_argument("Peer Address has to be set if you run in CLIENTTOPINGSERVER mode");
869 return -1;
870 }
871
872 // Create Socket Objects
873 // --------------------
874 QHostAddress serverHostAddress;
875 if (!serverHostAddress.setAddress(mPeerAddress)) {
876 QHostInfo info = QHostInfo::fromName(mPeerAddress);
877 if (!info.addresses().isEmpty()) {
878 // use the first IP address
879 serverHostAddress = info.addresses().first();
880 }
881 }
882
883 // Connect Socket to Server and wait for response
884 // ----------------------------------------------
885 connect(&mTcpClient, &QTcpSocket::readyRead, this, &JackTrip::receivedDataTCP);
886 connect(&mTcpClient, &QTcpSocket::connected, this, &JackTrip::receivedConnectionTCP);
887 mElapsedTime = 0;
888 mEndTime = 5000; //Timeout after 5 seconds.
889 mTimeoutTimer.setInterval(mSleepTime);
890 connect(&mTimeoutTimer, &QTimer::timeout, this, &JackTrip::tcpTimerTick);
891 mTimeoutTimer.start();
892 mTcpClient.connectToHost(serverHostAddress, mTcpServerPort);
893
894 if (gVerboseFlag) cout << "Connecting to TCP Server at " << serverHostAddress.toString().toLatin1().constData() << " port " << mTcpServerPort << "..." << endl;
895 return 0;
896 // Continued in the receivedConnectionTCP slot.
897
898 /*
899 else {
900 // Set the peer address
901 mDataProtocolSender->setPeerAddress( mPeerAddress.toLatin1().data() );
902 }
903
904 // Start the Sender Threads
905 // ------------------------
906 mAudioInterface->startProcess();
907 mDataProtocolSender->start();
908 // block until mDataProtocolSender thread starts
909 while ( !mDataProtocolSender->isRunning() ) { QThread::msleep(100); }
910
911 // Create a Socket to listen to Server's answer
912 // --------------------------------------------
913 QHostAddress serverHostAddress;
914 QUdpSocket UdpSockTemp;// Create socket to wait for server answer
915 uint16_t server_port;
916
917 // Bind the socket
918 //bindReceiveSocket(UdpSockTemp, mReceiverBindPort,
919 // mPeerAddress, peer_port);
920 if ( !UdpSockTemp.bind(QHostAddress::Any,
921 mReceiverBindPort,
922 QUdpSocket::ShareAddress) ) {
923 //throw std::runtime_error("Could not bind PingToServer UDP socket. It may be already binded.");
924 }
925
926 // Listen to server response
927 cout << "Waiting for server response..." << endl;
928 while ( !UdpSockTemp.hasPendingDatagrams() ) { QThread::msleep(100); }
929 cout << "Received response from server!" << endl;
930 char buf[1];
931 // set client address
932 UdpSockTemp.readDatagram(buf, 1, &serverHostAddress, &server_port);
933 UdpSockTemp.close(); // close the socket
934
935 // Stop the sender thread to change server port
936 mDataProtocolSender->stop();
937 mDataProtocolSender->wait(); // Wait for the thread to terminate
938
939 cout << "Server port now set to: " << server_port << endl;
940 cout << gPrintSeparator << endl;
941 mDataProtocolSender->setPeerPort(server_port);
942
943 // Start Threads
944 //mAudioInterface->connectDefaultPorts();
945 mDataProtocolSender->start();
946 mDataProtocolReceiver->start();
947 */
948 }
949
950
951 //*******************************************************************************
952 /*
953 void JackTrip::bindReceiveSocket(QUdpSocket& UdpSocket, int bind_port,
954 QHostAddress PeerHostAddress, int peer_port)
955 throw(std::runtime_error)
956 {
957 // Creat socket descriptor
958 int sock_fd = socket(AF_INET, SOCK_DGRAM, 0);
959
960 // Set local IPv4 Address
961 struct sockaddr_in local_addr;
962 ::bzero(&local_addr, sizeof(local_addr));
963 local_addr.sin_family = AF_INET; //AF_INET: IPv4 Protocol
964 local_addr.sin_addr.s_addr = htonl(INADDR_ANY); //INADDR_ANY: let the kernel decide the active address
965 local_addr.sin_port = htons(bind_port); //set bind port
966
967 // Set socket to be reusable, this is platform dependent
968 int one = 1;
969 #if defined ( __LINUX__ )
970 ::setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
971 #endif
972 #if defined ( __MAC_OSX__ )
973 // This option is not avialable on Linux, and without it MAC OS X
974 // has problems rebinding a socket
975 ::setsockopt(sock_fd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one));
976 #endif
977
978 // Bind the Socket
979 if ( (::bind(sock_fd, (struct sockaddr *) &local_addr, sizeof(local_addr))) < 0 )
980 { throw std::runtime_error("ERROR: UDP Socket Bind Error"); }
981
982 // To be able to use the two UDP sockets bound to the same port number,
983 // we connect the receiver and issue a SHUT_WR.
984 // Set peer IPv4 Address
985 struct sockaddr_in peer_addr;
986 bzero(&peer_addr, sizeof(peer_addr));
987 peer_addr.sin_family = AF_INET; //AF_INET: IPv4 Protocol
988 peer_addr.sin_addr.s_addr = htonl(INADDR_ANY); //INADDR_ANY: let the kernel decide the active address
989 peer_addr.sin_port = htons(peer_port); //set local port
990 // Connect the socket and issue a Write shutdown (to make it a
991 // reader socket only)
992 if ( (::inet_pton(AF_INET, PeerHostAddress.toString().toLatin1().constData(),
993 &peer_addr.sin_addr)) < 1 )
994 { throw std::runtime_error("ERROR: Invalid address presentation format"); }
995 if ( (::connect(sock_fd, (struct sockaddr *) &peer_addr, sizeof(peer_addr))) < 0)
996 { throw std::runtime_error("ERROR: Could not connect UDP socket"); }
997 if ( (::shutdown(sock_fd,SHUT_WR)) < 0)
998 { throw std::runtime_error("ERROR: Could suntdown SHUT_WR UDP socket"); }
999
1000 UdpSocket.setSocketDescriptor(sock_fd, QUdpSocket::ConnectedState,
1001 QUdpSocket::ReadOnly);
1002 cout << "UDP Socket Receiving in Port: " << bind_port << endl;
1003 cout << gPrintSeparator << endl;
1004 }
1005 */
1006
1007
1008 //*******************************************************************************
createHeader(const DataProtocol::packetHeaderTypeT headertype)1009 void JackTrip::createHeader(const DataProtocol::packetHeaderTypeT headertype)
1010 {
1011 delete mPacketHeader; //Just in case it has already been allocated
1012 switch (headertype) {
1013 case DataProtocol::DEFAULT :
1014 mPacketHeader = new DefaultHeader(this);
1015 break;
1016 case DataProtocol::JAMLINK :
1017 mPacketHeader = new JamLinkHeader(this);
1018 break;
1019 case DataProtocol::EMPTY :
1020 mPacketHeader = new EmptyHeader(this);
1021 break;
1022 default :
1023 throw std::invalid_argument("Undefined Header Type");
1024 break;
1025 }
1026 }
1027
1028
1029 //*******************************************************************************
putHeaderInPacket(int8_t * full_packet,int8_t * audio_packet)1030 void JackTrip::putHeaderInPacket(int8_t* full_packet, int8_t* audio_packet)
1031 {
1032 mPacketHeader->fillHeaderCommonFromAudio();
1033 mPacketHeader->putHeaderInPacket(full_packet);
1034
1035 int8_t* audio_part;
1036 audio_part = full_packet + mPacketHeader->getHeaderSizeInBytes();
1037 //std::memcpy(audio_part, audio_packet, mAudioInterface->getBufferSizeInBytes());
1038 //std::memcpy(audio_part, audio_packet, mAudioInterface->getSizeInBytesPerChannel() * mNumChans);
1039 std::memcpy(audio_part, audio_packet, getTotalAudioPacketSizeInBytes());
1040 }
1041
1042
1043 //*******************************************************************************
getPacketSizeInBytes()1044 int JackTrip::getPacketSizeInBytes()
1045 {
1046 //return (mAudioInterface->getBufferSizeInBytes() + mPacketHeader->getHeaderSizeInBytes());
1047 //return (mAudioInterface->getSizeInBytesPerChannel() * mNumChans +
1048 //mPacketHeader->getHeaderSizeInBytes());
1049 return (getTotalAudioPacketSizeInBytes() +
1050 mPacketHeader->getHeaderSizeInBytes());
1051 }
1052
1053
1054 //*******************************************************************************
parseAudioPacket(int8_t * full_packet,int8_t * audio_packet)1055 void JackTrip::parseAudioPacket(int8_t* full_packet, int8_t* audio_packet)
1056 {
1057 int8_t* audio_part;
1058 audio_part = full_packet + mPacketHeader->getHeaderSizeInBytes();
1059 //std::memcpy(audio_packet, audio_part, mAudioInterface->getBufferSizeInBytes());
1060 //std::memcpy(audio_packet, audio_part, mAudioInterface->getSizeInBytesPerChannel() * mNumChans);
1061 std::memcpy(audio_packet, audio_part, getTotalAudioPacketSizeInBytes());
1062 }
1063
1064 //*******************************************************************************
checkPeerSettings(int8_t * full_packet)1065 void JackTrip::checkPeerSettings(int8_t* full_packet)
1066 {
1067 mPacketHeader->checkPeerSettings(full_packet);
1068 }
1069
1070
1071 //*******************************************************************************
checkIfPortIsBinded(int port)1072 void JackTrip::checkIfPortIsBinded(int port)
1073 {
1074 QUdpSocket UdpSockTemp;// Create socket to wait for client
1075 // Bind the socket
1076 //cc if ( !UdpSockTemp.bind(QHostAddress::AnyIPv4, port, QUdpSocket::DontShareAddress) )
1077 if ( !UdpSockTemp.bind(QHostAddress::Any, port,
1078 QUdpSocket::DontShareAddress) )
1079 {
1080 UdpSockTemp.close(); // close the socket
1081 throw std::runtime_error(
1082 "Could not bind UDP socket. It may already be binded by another process on your machine. Try using a different port number");
1083 }
1084 UdpSockTemp.close(); // close the socket
1085 }
1086