1 // MSVS likes to complain about lots of standard C functions being unsafe.
2 #ifdef _MSC_VER
3 #define _CRT_SECURE_NO_WARNINGS 1
4 #include <io.h>
5 #endif
6
7 #include "platform_sys.h"
8
9 #define REQUIRE_CXX11 1
10
11 #include <cctype>
12 #include <iostream>
13 #include <fstream>
14 #include <string>
15 #include <map>
16 #include <set>
17 #include <vector>
18 #include <deque>
19 #include <memory>
20 #include <algorithm>
21 #include <iterator>
22 #include <stdexcept>
23 #include <cstring>
24 #include <csignal>
25 #include <chrono>
26 #include <thread>
27 #include <mutex>
28 #include <condition_variable>
29
30 #include "apputil.hpp" // CreateAddr
31 #include "uriparser.hpp" // UriParser
32 #include "socketoptions.hpp"
33 #include "logsupport.hpp"
34 #include "transmitbase.hpp" // bytevector typedef to avoid collisions
35 #include "verbose.hpp"
36
37 // NOTE: This is without "haisrt/" because it uses an internal path
38 // to the library. Application using the "installed" library should
39 // use <srt/srt.h>
40 #include <srt.h>
41 #include <udt.h> // This TEMPORARILY contains extra C++-only SRT API.
42 #include <logging.h>
43 #include <api.h>
44 #include <utilities.h>
45
46 /*
47 # MAF contents for this file. Note that not every file from the support
48 # library is used, but to simplify the build definition it links against
49 # the whole srtsupport library.
50
51 SOURCES
52 srt-test-tunnel.cpp
53 testmedia.cpp
54 ../apps/verbose.cpp
55 ../apps/socketoptions.cpp
56 ../apps/uriparser.cpp
57 ../apps/logsupport.cpp
58
59 */
60
61 using namespace std;
62
63 const srt_logging::LogFA SRT_LOGFA_APP = 10;
64 namespace srt_logging
65 {
66 Logger applog(SRT_LOGFA_APP, srt_logger_config, "TUNNELAPP");
67 }
68
69 using srt_logging::applog;
70
71 class Medium
72 {
73 static int s_counter;
74 int m_counter;
75 public:
76 enum ReadStatus
77 {
78 RD_DATA, RD_AGAIN, RD_EOF, RD_ERROR
79 };
80
81 enum Mode
82 {
83 LISTENER, CALLER
84 };
85
86 protected:
87 UriParser m_uri;
88 size_t m_chunk = 0;
89 map<string, string> m_options;
90 Mode m_mode;
91
92 bool m_listener = false;
93 bool m_open = false;
94 bool m_eof = false;
95 bool m_broken = false;
96
97 std::mutex access; // For closing
98
99 template <class DerivedMedium, class SocketType>
CreateAcceptor(DerivedMedium * self,const sockaddr_any & sa,SocketType sock,size_t chunk)100 static Medium* CreateAcceptor(DerivedMedium* self, const sockaddr_any& sa, SocketType sock, size_t chunk)
101 {
102 string addr = sockaddr_any(sa.get(), sizeof sa).str();
103 DerivedMedium* m = new DerivedMedium(UriParser(self->type() + string("://") + addr), chunk);
104 m->m_socket = sock;
105 return m;
106 }
107
108 public:
109
uri()110 string uri() { return m_uri.uri(); }
id()111 string id()
112 {
113 std::ostringstream os;
114 os << type() << m_counter;
115 return os.str();
116 }
117
Medium(const UriParser & u,size_t ch)118 Medium(const UriParser& u, size_t ch): m_counter(s_counter++), m_uri(u), m_chunk(ch) {}
Medium()119 Medium(): m_counter(s_counter++) {}
120
121 virtual const char* type() = 0;
122 virtual bool IsOpen() = 0;
123 virtual void CloseInternal() = 0;
124
CloseState()125 void CloseState()
126 {
127 m_open = false;
128 m_broken = true;
129 }
130
131 // External API for this class that allows to close
132 // the entity on request. The CloseInternal should
133 // redirect to a type-specific function, the same that
134 // should be also called in destructor.
Close()135 void Close()
136 {
137 CloseState();
138 CloseInternal();
139 }
140 virtual bool End() = 0;
141
142 virtual int ReadInternal(char* output, int size) = 0;
143 virtual bool IsErrorAgain() = 0;
144
145 ReadStatus Read(bytevector& output);
146 virtual void Write(bytevector& portion) = 0;
147
148 virtual void CreateListener() = 0;
149 virtual void CreateCaller() = 0;
150 virtual unique_ptr<Medium> Accept() = 0;
151 virtual void Connect() = 0;
152
153 static std::unique_ptr<Medium> Create(const std::string& url, size_t chunk, Mode);
154
155 virtual bool Broken() = 0;
Still()156 virtual size_t Still() { return 0; }
157
158 class ReadEOF: public std::runtime_error
159 {
160 public:
ReadEOF(const std::string & fn)161 ReadEOF(const std::string& fn): std::runtime_error( "EOF while reading file: " + fn )
162 {
163 }
164 };
165
166 class TransmissionError: public std::runtime_error
167 {
168 public:
TransmissionError(const std::string & fn)169 TransmissionError(const std::string& fn): std::runtime_error( fn )
170 {
171 }
172 };
173
Error(const string & text)174 static void Error(const string& text)
175 {
176 throw TransmissionError("ERROR (internal): " + text);
177 }
178
~Medium()179 virtual ~Medium()
180 {
181 CloseState();
182 }
183
184 protected:
InitMode(Mode m)185 void InitMode(Mode m)
186 {
187 m_mode = m;
188 Init();
189
190 if (m_mode == LISTENER)
191 {
192 CreateListener();
193 m_listener = true;
194 }
195 else
196 {
197 CreateCaller();
198 }
199
200 m_open = true;
201 }
202
Init()203 virtual void Init() {}
204
205 };
206
207 class Engine
208 {
209 Medium* media[2];
210 std::thread thr;
211 class Tunnel* parent_tunnel;
212 std::string nameid;
213
214 int status = 0;
215 Medium::ReadStatus rdst = Medium::RD_ERROR;
216 UDT::ERRORINFO srtx;
217
218 public:
219 enum Dir { DIR_IN, DIR_OUT };
220
stat()221 int stat() { return status; }
222
Engine(Tunnel * p,Medium * m1,Medium * m2,const std::string & nid)223 Engine(Tunnel* p, Medium* m1, Medium* m2, const std::string& nid)
224 :
225 #ifdef HAVE_FULL_CXX11
226 media {m1, m2},
227 #endif
228 parent_tunnel(p), nameid(nid)
229 {
230 #ifndef HAVE_FULL_CXX11
231 // MSVC is not exactly C++11 compliant and complains around
232 // initialization of an array.
233 // Leaving this method of initialization for clarity and
234 // possibly more preferred performance.
235 media[0] = m1;
236 media[1] = m2;
237 #endif
238 }
239
Start()240 void Start()
241 {
242 Verb() << "START: " << media[DIR_IN]->uri() << " --> " << media[DIR_OUT]->uri();
243 const std::string thrn = media[DIR_IN]->id() + ">" + media[DIR_OUT]->id();
244 srt::ThreadName tn(thrn);
245
246 thr = thread([this]() { Worker(); });
247 }
248
Stop()249 void Stop()
250 {
251 // If this thread is already stopped, don't stop.
252 if (thr.joinable())
253 {
254 LOGP(applog.Debug, "Engine::Stop: Closing media:");
255 // Close both media as a hanged up reading thread
256 // will block joining.
257 media[0]->Close();
258 media[1]->Close();
259
260 LOGP(applog.Debug, "Engine::Stop: media closed, joining engine thread:");
261 if (thr.get_id() == std::this_thread::get_id())
262 {
263 // If this is this thread which called this, no need
264 // to stop because this thread will exit by itself afterwards.
265 // You must, however, detach yourself, or otherwise the thr's
266 // destructor would kill the program.
267 thr.detach();
268 LOGP(applog.Debug, "DETACHED.");
269 }
270 else
271 {
272 thr.join();
273 LOGP(applog.Debug, "Joined.");
274 }
275 }
276 }
277
278 void Worker();
279 };
280
281
282 struct Tunnelbox;
283
284 class Tunnel
285 {
286 Tunnelbox* parent_box;
287 std::unique_ptr<Medium> med_acp, med_clr;
288 Engine acp_to_clr, clr_to_acp;
289 volatile bool running = true;
290 std::mutex access;
291
292 public:
293
show()294 string show()
295 {
296 return med_acp->uri() + " <-> " + med_clr->uri();
297 }
298
Tunnel(Tunnelbox * m,std::unique_ptr<Medium> && acp,std::unique_ptr<Medium> && clr)299 Tunnel(Tunnelbox* m, std::unique_ptr<Medium>&& acp, std::unique_ptr<Medium>&& clr):
300 parent_box(m),
301 med_acp(move(acp)), med_clr(move(clr)),
302 acp_to_clr(this, med_acp.get(), med_clr.get(), med_acp->id() + ">" + med_clr->id()),
303 clr_to_acp(this, med_clr.get(), med_acp.get(), med_clr->id() + ">" + med_acp->id())
304 {
305 }
306
Start()307 void Start()
308 {
309 acp_to_clr.Start();
310 clr_to_acp.Start();
311 }
312
313 // This is to be called by an Engine from Engine::Worker
314 // thread.
315 // [[affinity = acp_to_clr.thr || clr_to_acp.thr]];
decommission_engine(Medium * which_medium)316 void decommission_engine(Medium* which_medium)
317 {
318 // which_medium is the medium that failed.
319 // Upon breaking of one medium from the pair,
320 // the other needs to be closed as well.
321 Verb() << "Medium broken: " << which_medium->uri();
322
323 bool stop = true;
324
325 /*
326 {
327 lock_guard<std::mutex> lk(access);
328 if (acp_to_clr.stat() == -1 && clr_to_acp.stat() == -1)
329 {
330 Verb() << "Tunnel: Both engine decommissioned, will stop the tunnel.";
331 // Both engines are down, decommission the tunnel.
332 // Note that the status -1 means that particular engine
333 // is not currently running and you can safely
334 // join its thread.
335 stop = true;
336 }
337 else
338 {
339 Verb() << "Tunnel: Decommissioned one engine, waiting for the other one to report";
340 }
341 }
342 */
343
344 if (stop)
345 {
346 // First, stop all media.
347 med_acp->Close();
348 med_clr->Close();
349
350 // Then stop the tunnel (this is only a signal
351 // to a cleanup thread to delete it).
352 Stop();
353 }
354 }
355
356 void Stop();
357
358 bool decommission_if_dead(bool forced); // [[affinity = g_tunnels.thr]]
359 };
360
Worker()361 void Engine::Worker()
362 {
363 bytevector outbuf;
364
365 Medium* which_medium = media[DIR_IN];
366
367 for (;;)
368 {
369 try
370 {
371 which_medium = media[DIR_IN];
372 rdst = media[DIR_IN]->Read((outbuf));
373 switch (rdst)
374 {
375 case Medium::RD_DATA:
376 {
377 which_medium = media[DIR_OUT];
378 // We get the data, write them to the output
379 media[DIR_OUT]->Write((outbuf));
380 }
381 break;
382
383 case Medium::RD_EOF:
384 status = -1;
385 throw Medium::ReadEOF("");
386
387 case Medium::RD_AGAIN:
388 // Theoreticall RD_AGAIN should not be reported
389 // because it should be taken care of internally by
390 // repeated sending - unless we get m_broken set.
391 // If it is, however, it should be handled just like error.
392 case Medium::RD_ERROR:
393 status = -1;
394 Medium::Error("Error while reading");
395 }
396 }
397 catch (Medium::ReadEOF&)
398 {
399 Verb() << "EOF. Exiting engine.";
400 break;
401 }
402 catch (Medium::TransmissionError& er)
403 {
404 Verb() << er.what() << " - interrupting engine: " << nameid;
405 break;
406 }
407 }
408
409 // This is an engine thread and it should simply
410 // tell the parent_box Tunnel that it is no longer
411 // operative. It's not necessary to inform it which
412 // of two engines is decommissioned - it should only
413 // know that one of them got down. It will then check
414 // if both are down here and decommission the whole
415 // tunnel if so.
416 parent_tunnel->decommission_engine(which_medium);
417 }
418
419 class SrtMedium: public Medium
420 {
421 SRTSOCKET m_socket = SRT_ERROR;
422 friend class Medium;
423 public:
424
425 #ifdef HAVE_FULL_CXX11
426 using Medium::Medium;
427
428 #else // MSVC and gcc 4.7 not exactly support C++11
429
430 SrtMedium(UriParser u, size_t ch): Medium(u, ch) {}
431
432 #endif
433
IsOpen()434 bool IsOpen() override { return m_open; }
End()435 bool End() override { return m_eof; }
Broken()436 bool Broken() override { return m_broken; }
437
CloseSrt()438 void CloseSrt()
439 {
440 Verb() << "Closing SRT socket for " << uri();
441 lock_guard<std::mutex> lk(access);
442 if (m_socket == SRT_ERROR)
443 return;
444 srt_close(m_socket);
445 m_socket = SRT_ERROR;
446 }
447
448 // Forwarded in order to separate the implementation from
449 // the virtual function so that virtual function is not
450 // being called in destructor.
CloseInternal()451 void CloseInternal() override { return CloseSrt(); }
452
type()453 const char* type() override { return "srt"; }
454 int ReadInternal(char* output, int size) override;
455 bool IsErrorAgain() override;
456
457 void Write(bytevector& portion) override;
458 void CreateListener() override;
459 void CreateCaller() override;
460 unique_ptr<Medium> Accept() override;
461 void Connect() override;
462
463 protected:
464 void Init() override;
465
466 void ConfigurePre();
467 void ConfigurePost(SRTSOCKET socket);
468
469 using Medium::Error;
470
Error(UDT::ERRORINFO & ri,const string & text)471 static void Error(UDT::ERRORINFO& ri, const string& text)
472 {
473 throw TransmissionError("ERROR: " + text + ": " + ri.getErrorMessage());
474 }
475
~SrtMedium()476 ~SrtMedium() override
477 {
478 CloseState();
479 CloseSrt();
480 }
481 };
482
483 class TcpMedium: public Medium
484 {
485 int m_socket = -1;
486 friend class Medium;
487 public:
488
489 #ifdef HAVE_FULL_CXX11
490 using Medium::Medium;
491
492 #else // MSVC not exactly supports C++11
493
494 TcpMedium(UriParser u, size_t ch): Medium(u, ch) {}
495
496 #endif
497
498 #ifdef _WIN32
tcp_close(int socket)499 static int tcp_close(int socket)
500 {
501 return ::closesocket(socket);
502 }
503
504 enum { DEF_SEND_FLAG = 0 };
505
506 #elif defined(LINUX) || defined(GNU) || defined(CYGWIN)
tcp_close(int socket)507 static int tcp_close(int socket)
508 {
509 return ::close(socket);
510 }
511
512 enum { DEF_SEND_FLAG = MSG_NOSIGNAL };
513
514 #else
tcp_close(int socket)515 static int tcp_close(int socket)
516 {
517 return ::close(socket);
518 }
519
520 enum { DEF_SEND_FLAG = 0 };
521
522 #endif
523
IsOpen()524 bool IsOpen() override { return m_open; }
End()525 bool End() override { return m_eof; }
Broken()526 bool Broken() override { return m_broken; }
527
CloseTcp()528 void CloseTcp()
529 {
530 Verb() << "Closing TCP socket for " << uri();
531 lock_guard<std::mutex> lk(access);
532 if (m_socket == -1)
533 return;
534 tcp_close(m_socket);
535 m_socket = -1;
536 }
CloseInternal()537 void CloseInternal() override { return CloseTcp(); }
538
type()539 const char* type() override { return "tcp"; }
540 int ReadInternal(char* output, int size) override;
541 bool IsErrorAgain() override;
542 void Write(bytevector& portion) override;
543 void CreateListener() override;
544 void CreateCaller() override;
545 unique_ptr<Medium> Accept() override;
546 void Connect() override;
547
548 protected:
549
ConfigurePre()550 void ConfigurePre()
551 {
552 #if defined(__APPLE__)
553 int optval = 1;
554 setsockopt(m_socket, SOL_SOCKET, SO_NOSIGPIPE, &optval, sizeof(optval));
555 #endif
556 }
557
ConfigurePost(int)558 void ConfigurePost(int)
559 {
560 }
561
562 using Medium::Error;
563
Error(int verrno,const string & text)564 static void Error(int verrno, const string& text)
565 {
566 char rbuf[1024];
567 throw TransmissionError("ERROR: " + text + ": " + SysStrError(verrno, rbuf, 1024));
568 }
569
~TcpMedium()570 virtual ~TcpMedium()
571 {
572 CloseState();
573 CloseTcp();
574 }
575 };
576
Init()577 void SrtMedium::Init()
578 {
579 // This function is required due to extra option
580 // check need
581
582 if (m_options.count("mode"))
583 Error("No option 'mode' is required, it defaults to position of the argument");
584
585 if (m_options.count("blocking"))
586 Error("Blocking is not configurable here.");
587
588 // XXX
589 // Look also for other options that should not be here.
590
591 // Enforce the transtype = file
592 m_options["transtype"] = "file";
593 }
594
ConfigurePre()595 void SrtMedium::ConfigurePre()
596 {
597 vector<string> fails;
598 m_options["mode"] = "caller";
599 SrtConfigurePre(m_socket, "", m_options, &fails);
600 if (!fails.empty())
601 {
602 cerr << "Failed options: " << Printable(fails) << endl;
603 }
604 }
605
ConfigurePost(SRTSOCKET so)606 void SrtMedium::ConfigurePost(SRTSOCKET so)
607 {
608 vector<string> fails;
609 SrtConfigurePost(so, m_options, &fails);
610 if (!fails.empty())
611 {
612 cerr << "Failed options: " << Printable(fails) << endl;
613 }
614 }
615
CreateListener()616 void SrtMedium::CreateListener()
617 {
618 int backlog = 5; // hardcoded!
619
620 m_socket = srt_create_socket();
621
622 ConfigurePre();
623
624 sockaddr_any sa = CreateAddr(m_uri.host(), m_uri.portno());
625
626 int stat = srt_bind(m_socket, sa.get(), sizeof sa);
627
628 if ( stat == SRT_ERROR )
629 {
630 srt_close(m_socket);
631 Error(UDT::getlasterror(), "srt_bind");
632 }
633
634 stat = srt_listen(m_socket, backlog);
635 if ( stat == SRT_ERROR )
636 {
637 srt_close(m_socket);
638 Error(UDT::getlasterror(), "srt_listen");
639 }
640
641 m_listener = true;
642 };
643
CreateListener()644 void TcpMedium::CreateListener()
645 {
646 int backlog = 5; // hardcoded!
647
648
649 sockaddr_any sa = CreateAddr(m_uri.host(), m_uri.portno());
650
651 m_socket = socket(sa.get()->sa_family, SOCK_STREAM, IPPROTO_TCP);
652 ConfigurePre();
653
654 int stat = ::bind(m_socket, sa.get(), sa.size());
655
656 if (stat == -1)
657 {
658 tcp_close(m_socket);
659 Error(errno, "bind");
660 }
661
662 stat = listen(m_socket, backlog);
663 if ( stat == -1 )
664 {
665 tcp_close(m_socket);
666 Error(errno, "listen");
667 }
668
669 m_listener = true;
670 }
671
Accept()672 unique_ptr<Medium> SrtMedium::Accept()
673 {
674 sockaddr_any sa;
675 SRTSOCKET s = srt_accept(m_socket, (sa.get()), (&sa.len));
676 if (s == SRT_ERROR)
677 {
678 Error(UDT::getlasterror(), "srt_accept");
679 }
680
681 ConfigurePost(s);
682
683 // Configure 1s timeout
684 int timeout_1s = 1000;
685 srt_setsockflag(m_socket, SRTO_RCVTIMEO, &timeout_1s, sizeof timeout_1s);
686
687 unique_ptr<Medium> med(CreateAcceptor(this, sa, s, m_chunk));
688 Verb() << "accepted a connection from " << med->uri();
689
690 return med;
691 }
692
Accept()693 unique_ptr<Medium> TcpMedium::Accept()
694 {
695 sockaddr_any sa;
696 int s = ::accept(m_socket, (sa.get()), (&sa.syslen()));
697 if (s == -1)
698 {
699 Error(errno, "accept");
700 }
701
702 // Configure 1s timeout
703 timeval timeout_1s { 1, 0 };
704 int st SRT_ATR_UNUSED = setsockopt(s, SOL_SOCKET, SO_RCVTIMEO, (char*)&timeout_1s, sizeof timeout_1s);
705 timeval re;
706 socklen_t size = sizeof re;
707 int st2 SRT_ATR_UNUSED = getsockopt(s, SOL_SOCKET, SO_RCVTIMEO, (char*)&re, &size);
708
709 LOGP(applog.Debug, "Setting SO_RCVTIMEO to @", m_socket, ": ", st == -1 ? "FAILED" : "SUCCEEDED",
710 ", read-back value: ", st2 == -1 ? int64_t(-1) : (int64_t(re.tv_sec)*1000000 + re.tv_usec)/1000, "ms");
711
712 unique_ptr<Medium> med(CreateAcceptor(this, sa, s, m_chunk));
713 Verb() << "accepted a connection from " << med->uri();
714
715 return med;
716 }
717
CreateCaller()718 void SrtMedium::CreateCaller()
719 {
720 m_socket = srt_create_socket();
721 ConfigurePre();
722
723 // XXX setting up outgoing port not supported
724 }
725
CreateCaller()726 void TcpMedium::CreateCaller()
727 {
728 m_socket = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
729 ConfigurePre();
730 }
731
Connect()732 void SrtMedium::Connect()
733 {
734 sockaddr_any sa = CreateAddr(m_uri.host(), m_uri.portno());
735
736 int st = srt_connect(m_socket, sa.get(), sizeof sa);
737 if (st == SRT_ERROR)
738 Error(UDT::getlasterror(), "srt_connect");
739
740 ConfigurePost(m_socket);
741
742 // Configure 1s timeout
743 int timeout_1s = 1000;
744 srt_setsockflag(m_socket, SRTO_RCVTIMEO, &timeout_1s, sizeof timeout_1s);
745 }
746
Connect()747 void TcpMedium::Connect()
748 {
749 sockaddr_any sa = CreateAddr(m_uri.host(), m_uri.portno());
750
751 int st = ::connect(m_socket, sa.get(), sa.size());
752 if (st == -1)
753 Error(errno, "connect");
754
755 ConfigurePost(m_socket);
756
757 // Configure 1s timeout
758 timeval timeout_1s { 1, 0 };
759 setsockopt(m_socket, SOL_SOCKET, SO_RCVTIMEO, (char*)&timeout_1s, sizeof timeout_1s);
760 }
761
ReadInternal(char * w_buffer,int size)762 int SrtMedium::ReadInternal(char* w_buffer, int size)
763 {
764 int st = -1;
765 do
766 {
767 st = srt_recv(m_socket, (w_buffer), size);
768 if (st == SRT_ERROR)
769 {
770 int syserr;
771 if (srt_getlasterror(&syserr) == SRT_EASYNCRCV && !m_broken)
772 continue;
773 }
774 break;
775
776 } while (true);
777
778 return st;
779 }
780
ReadInternal(char * w_buffer,int size)781 int TcpMedium::ReadInternal(char* w_buffer, int size)
782 {
783 int st = -1;
784 LOGP(applog.Debug, "TcpMedium:recv @", m_socket, " - begin");
785 do
786 {
787 st = ::recv(m_socket, (w_buffer), size, 0);
788 if (st == -1)
789 {
790 if ((errno == EAGAIN || errno == EWOULDBLOCK))
791 {
792 if (!m_broken)
793 {
794 LOGP(applog.Debug, "TcpMedium: read:AGAIN, repeating");
795 continue;
796 }
797 LOGP(applog.Debug, "TcpMedium: read:AGAIN, not repeating - already broken");
798 }
799 else
800 {
801 LOGP(applog.Debug, "TcpMedium: read:ERROR: ", errno);
802 }
803 }
804 break;
805 } while (true);
806 LOGP(applog.Debug, "TcpMedium:recv @", m_socket, " - result: ", st);
807 return st;
808 }
809
IsErrorAgain()810 bool SrtMedium::IsErrorAgain()
811 {
812 return srt_getlasterror(NULL) == SRT_EASYNCRCV;
813 }
814
IsErrorAgain()815 bool TcpMedium::IsErrorAgain()
816 {
817 return errno == EAGAIN;
818 }
819
820 // The idea of Read function is to get the buffer that
821 // possibly contains some data not written to the output yet,
822 // but the time has come to read. We can't let the buffer expand
823 // more than the size of the chunk, so if the buffer size already
824 // exceeds it, don't return any data, but behave as if they were read.
825 // This will cause the worker loop to redirect to Write immediately
826 // thereafter and possibly will flush out the remains of the buffer.
827 // It's still possible that the buffer won't be completely purged
Read(bytevector & w_output)828 Medium::ReadStatus Medium::Read(bytevector& w_output)
829 {
830 // Don't read, but fake that you read
831 if (w_output.size() > m_chunk)
832 {
833 Verb() << "BUFFER EXCEEDED";
834 return RD_DATA;
835 }
836
837 // Resize to maximum first
838 size_t shift = w_output.size();
839 if (shift && m_eof)
840 {
841 // You have nonempty buffer, but eof was already
842 // encountered. Report as if something was read.
843 //
844 // Don't read anything because this will surely
845 // result in error since now.
846 return RD_DATA;
847 }
848
849 size_t pred_size = shift + m_chunk;
850
851 w_output.resize(pred_size);
852 int st = ReadInternal((w_output.data() + shift), m_chunk);
853 if (st == -1)
854 {
855 if (IsErrorAgain())
856 return RD_AGAIN;
857
858 return RD_ERROR;
859 }
860
861 if (st == 0)
862 {
863 m_eof = true;
864 if (shift)
865 {
866 // If there's 0 (eof), but you still have data
867 // in the buffer, fake that they were read. Only
868 // when the buffer was empty at entrance should this
869 // result with EOF.
870 //
871 // Set back the size this buffer had before we attempted
872 // to read into it.
873 w_output.resize(shift);
874 return RD_DATA;
875 }
876 w_output.clear();
877 return RD_EOF;
878 }
879
880 w_output.resize(shift+st);
881 return RD_DATA;
882 }
883
Write(bytevector & w_buffer)884 void SrtMedium::Write(bytevector& w_buffer)
885 {
886 int st = srt_send(m_socket, w_buffer.data(), w_buffer.size());
887 if (st == SRT_ERROR)
888 {
889 Error(UDT::getlasterror(), "srt_send");
890 }
891
892 // This should be ==, whereas > is not possible, but
893 // this should simply embrace this case as a sanity check.
894 if (st >= int(w_buffer.size()))
895 w_buffer.clear();
896 else if (st == 0)
897 {
898 Error("Unexpected EOF on Write");
899 }
900 else
901 {
902 // Remove only those bytes that were sent
903 w_buffer.erase(w_buffer.begin(), w_buffer.begin()+st);
904 }
905 }
906
Write(bytevector & w_buffer)907 void TcpMedium::Write(bytevector& w_buffer)
908 {
909 int st = ::send(m_socket, w_buffer.data(), w_buffer.size(), DEF_SEND_FLAG);
910 if (st == -1)
911 {
912 Error(errno, "send");
913 }
914
915 // This should be ==, whereas > is not possible, but
916 // this should simply embrace this case as a sanity check.
917 if (st >= int(w_buffer.size()))
918 w_buffer.clear();
919 else if (st == 0)
920 {
921 Error("Unexpected EOF on Write");
922 }
923 else
924 {
925 // Remove only those bytes that were sent
926 w_buffer.erase(w_buffer.begin(), w_buffer.begin()+st);
927 }
928 }
929
Create(const std::string & url,size_t chunk,Medium::Mode mode)930 std::unique_ptr<Medium> Medium::Create(const std::string& url, size_t chunk, Medium::Mode mode)
931 {
932 UriParser uri(url);
933 std::unique_ptr<Medium> out;
934
935 // Might be something smarter, but there are only 2 types.
936 if (uri.scheme() == "srt")
937 {
938 out.reset(new SrtMedium(uri, chunk));
939 }
940 else if (uri.scheme() == "tcp")
941 {
942 out.reset(new TcpMedium(uri, chunk));
943 }
944 else
945 {
946 Error("Medium not supported");
947 }
948
949 out->InitMode(mode);
950
951 return out;
952 }
953
954 struct Tunnelbox
955 {
956 list<unique_ptr<Tunnel>> tunnels;
957 std::mutex access;
958 condition_variable decom_ready;
959 bool main_running = true;
960 thread thr;
961
signal_decommissionTunnelbox962 void signal_decommission()
963 {
964 lock_guard<std::mutex> lk(access);
965 decom_ready.notify_one();
966 }
967
installTunnelbox968 void install(std::unique_ptr<Medium>&& acp, std::unique_ptr<Medium>&& clr)
969 {
970 lock_guard<std::mutex> lk(access);
971 Verb() << "Tunnelbox: Starting tunnel: " << acp->uri() << " <-> " << clr->uri();
972
973 tunnels.emplace_back(new Tunnel(this, move(acp), move(clr)));
974 // Note: after this instruction, acp and clr are no longer valid!
975 auto& it = tunnels.back();
976
977 it->Start();
978 }
979
start_cleanerTunnelbox980 void start_cleaner()
981 {
982 thr = thread( [this]() { CleanupWorker(); } );
983 }
984
stop_cleanerTunnelbox985 void stop_cleaner()
986 {
987 if (thr.joinable())
988 thr.join();
989 }
990
991 private:
992
CleanupWorkerTunnelbox993 void CleanupWorker()
994 {
995 unique_lock<std::mutex> lk(access);
996
997 while (main_running)
998 {
999 decom_ready.wait(lk);
1000
1001 // Got a signal, find a tunnel ready to cleanup.
1002 // We just get the signal, but we don't know which
1003 // tunnel has generated it.
1004 for (auto i = tunnels.begin(), i_next = i; i != tunnels.end(); i = i_next)
1005 {
1006 ++i_next;
1007 // Bound in one call the check if the tunnel is dead
1008 // and decommissioning because this must be done in
1009 // the one critical section - make sure no other thread
1010 // is accessing it at the same time and also make join all
1011 // threads that might have been accessing it. After
1012 // exiting as true (meaning that it was decommissioned
1013 // as expected) it can be safely deleted.
1014 if ((*i)->decommission_if_dead(main_running))
1015 {
1016 tunnels.erase(i);
1017 }
1018 }
1019 }
1020 }
1021 };
1022
Stop()1023 void Tunnel::Stop()
1024 {
1025 // Check for running must be done without locking
1026 // because if the tunnel isn't running
1027 if (!running)
1028 return; // already stopped
1029
1030 lock_guard<std::mutex> lk(access);
1031
1032 // Ok, you are the first to make the tunnel
1033 // not running and inform the tunnelbox.
1034 running = false;
1035 parent_box->signal_decommission();
1036 }
1037
decommission_if_dead(bool forced)1038 bool Tunnel::decommission_if_dead(bool forced)
1039 {
1040 lock_guard<std::mutex> lk(access);
1041 if (running && !forced)
1042 return false; // working, not to be decommissioned
1043
1044 // Join the engine threads, make sure nothing
1045 // is running that could use the data.
1046 acp_to_clr.Stop();
1047 clr_to_acp.Stop();
1048
1049
1050 // Done. The tunnelbox after calling this can
1051 // safely delete the decommissioned tunnel.
1052 return true;
1053 }
1054
1055 int Medium::s_counter = 1;
1056
1057 Tunnelbox g_tunnels;
1058 std::unique_ptr<Medium> main_listener;
1059
1060 size_t default_chunk = 4096;
1061
OnINT_StopService(int)1062 int OnINT_StopService(int)
1063 {
1064 g_tunnels.main_running = false;
1065 g_tunnels.signal_decommission();
1066
1067 // Will cause the Accept() block to exit.
1068 main_listener->Close();
1069
1070 return 0;
1071 }
1072
main(int argc,char ** argv)1073 int main( int argc, char** argv )
1074 {
1075 if (!SysInitializeNetwork())
1076 {
1077 cerr << "Fail to initialize network module.";
1078 return 1;
1079 }
1080
1081 size_t chunk = default_chunk;
1082
1083 OptionName
1084 o_loglevel = { "ll", "loglevel" },
1085 o_logfa = { "lf", "logfa" },
1086 o_chunk = {"c", "chunk" },
1087 o_verbose = {"v", "verbose" },
1088 o_noflush = {"s", "skipflush" };
1089
1090 // Options that expect no arguments (ARG_NONE) need not be mentioned.
1091 vector<OptionScheme> optargs = {
1092 { o_loglevel, OptionScheme::ARG_ONE },
1093 { o_logfa, OptionScheme::ARG_ONE },
1094 { o_chunk, OptionScheme::ARG_ONE }
1095 };
1096 options_t params = ProcessOptions(argv, argc, optargs);
1097
1098 /*
1099 cerr << "OPTIONS (DEBUG)\n";
1100 for (auto o: params)
1101 {
1102 cerr << "[" << o.first << "] ";
1103 copy(o.second.begin(), o.second.end(), ostream_iterator<string>(cerr, " "));
1104 cerr << endl;
1105 }
1106 */
1107
1108 vector<string> args = params[""];
1109 if ( args.size() < 2 )
1110 {
1111 cerr << "Usage: " << argv[0] << " <listen-uri> <call-uri>\n";
1112 return 1;
1113 }
1114
1115 string loglevel = Option<OutString>(params, "error", o_loglevel);
1116 string logfa = Option<OutString>(params, "", o_logfa);
1117 srt_logging::LogLevel::type lev = SrtParseLogLevel(loglevel);
1118 srt::setloglevel(lev);
1119 if (logfa == "")
1120 {
1121 srt::addlogfa(SRT_LOGFA_APP);
1122 }
1123 else
1124 {
1125 // Add only selected FAs
1126 set<string> unknown_fas;
1127 set<srt_logging::LogFA> fas = SrtParseLogFA(logfa, &unknown_fas);
1128 srt::resetlogfa(fas);
1129
1130 // The general parser doesn't recognize the "app" FA, we check it here.
1131 if (unknown_fas.count("app"))
1132 srt::addlogfa(SRT_LOGFA_APP);
1133 }
1134
1135 string verbo = Option<OutString>(params, "no", o_verbose);
1136 if ( verbo == "" || !false_names.count(verbo) )
1137 {
1138 Verbose::on = true;
1139 Verbose::cverb = &std::cout;
1140 }
1141
1142 string chunks = Option<OutString>(params, "", o_chunk);
1143 if ( chunks!= "" )
1144 {
1145 chunk = stoi(chunks);
1146 }
1147
1148 string listen_node = args[0];
1149 string call_node = args[1];
1150
1151 UriParser ul(listen_node), uc(call_node);
1152
1153 // It is allowed to use both media of the same type,
1154 // but only srt and tcp are allowed.
1155
1156 set<string> allowed = {"srt", "tcp"};
1157 if (!allowed.count(ul.scheme())|| !allowed.count(uc.scheme()))
1158 {
1159 cerr << "ERROR: only tcp and srt schemes supported";
1160 return -1;
1161 }
1162
1163 Verb() << "LISTEN type=" << ul.scheme() << ", CALL type=" << uc.scheme();
1164
1165 g_tunnels.start_cleaner();
1166
1167 main_listener = Medium::Create(listen_node, chunk, Medium::LISTENER);
1168
1169 // The main program loop is only to catch
1170 // new connections and manage them. Also takes care
1171 // of the broken connections.
1172
1173 for (;;)
1174 {
1175 try
1176 {
1177 Verb() << "Waiting for connection...";
1178 std::unique_ptr<Medium> accepted = main_listener->Accept();
1179 if (!g_tunnels.main_running)
1180 {
1181 Verb() << "Service stopped. Exiting.";
1182 break;
1183 }
1184 Verb() << "Connection accepted. Connecting to the relay...";
1185
1186 // Now call the target address.
1187 std::unique_ptr<Medium> caller = Medium::Create(call_node, chunk, Medium::CALLER);
1188 caller->Connect();
1189
1190 Verb() << "Connected. Establishing pipe.";
1191
1192 // No exception, we are free to pass :)
1193 g_tunnels.install(move(accepted), move(caller));
1194 }
1195 catch (...)
1196 {
1197 Verb() << "Connection reported, but failed";
1198 }
1199 }
1200
1201 g_tunnels.stop_cleaner();
1202
1203 return 0;
1204 }
1205
1206
1207