1 // MSVS likes to complain about lots of standard C functions being unsafe.
2 #ifdef _MSC_VER
3 #define _CRT_SECURE_NO_WARNINGS 1
4 #include <io.h>
5 #endif
6 
7 #include "platform_sys.h"
8 
9 #define REQUIRE_CXX11 1
10 
11 #include <cctype>
12 #include <iostream>
13 #include <fstream>
14 #include <string>
15 #include <map>
16 #include <set>
17 #include <vector>
18 #include <deque>
19 #include <memory>
20 #include <algorithm>
21 #include <iterator>
22 #include <stdexcept>
23 #include <cstring>
24 #include <csignal>
25 #include <chrono>
26 #include <thread>
27 #include <mutex>
28 #include <condition_variable>
29 
30 #include "apputil.hpp"  // CreateAddr
31 #include "uriparser.hpp"  // UriParser
32 #include "socketoptions.hpp"
33 #include "logsupport.hpp"
34 #include "transmitbase.hpp" // bytevector typedef to avoid collisions
35 #include "verbose.hpp"
36 
37 // NOTE: This is without "haisrt/" because it uses an internal path
38 // to the library. Application using the "installed" library should
39 // use <srt/srt.h>
40 #include <srt.h>
41 #include <udt.h> // This TEMPORARILY contains extra C++-only SRT API.
42 #include <logging.h>
43 #include <api.h>
44 #include <utilities.h>
45 
46 /*
47 # MAF contents for this file. Note that not every file from the support
48 # library is used, but to simplify the build definition it links against
49 # the whole srtsupport library.
50 
51 SOURCES
52 srt-test-tunnel.cpp
53 testmedia.cpp
54 ../apps/verbose.cpp
55 ../apps/socketoptions.cpp
56 ../apps/uriparser.cpp
57 ../apps/logsupport.cpp
58 
59 */
60 
61 using namespace std;
62 
63 const srt_logging::LogFA SRT_LOGFA_APP = 10;
64 namespace srt_logging
65 {
66 Logger applog(SRT_LOGFA_APP, srt_logger_config, "TUNNELAPP");
67 }
68 
69 using srt_logging::applog;
70 
71 class Medium
72 {
73     static int s_counter;
74     int m_counter;
75 public:
76     enum ReadStatus
77     {
78         RD_DATA, RD_AGAIN, RD_EOF, RD_ERROR
79     };
80 
81     enum Mode
82     {
83         LISTENER, CALLER
84     };
85 
86 protected:
87     UriParser m_uri;
88     size_t m_chunk = 0;
89     map<string, string> m_options;
90     Mode m_mode;
91 
92     bool m_listener = false;
93     bool m_open = false;
94     bool m_eof = false;
95     bool m_broken = false;
96 
97     std::mutex access; // For closing
98 
99     template <class DerivedMedium, class SocketType>
CreateAcceptor(DerivedMedium * self,const sockaddr_any & sa,SocketType sock,size_t chunk)100     static Medium* CreateAcceptor(DerivedMedium* self, const sockaddr_any& sa, SocketType sock, size_t chunk)
101     {
102         string addr = sockaddr_any(sa.get(), sizeof sa).str();
103         DerivedMedium* m = new DerivedMedium(UriParser(self->type() + string("://") + addr), chunk);
104         m->m_socket = sock;
105         return m;
106     }
107 
108 public:
109 
uri()110     string uri() { return m_uri.uri(); }
id()111     string id()
112     {
113         std::ostringstream os;
114         os << type() << m_counter;
115         return os.str();
116     }
117 
Medium(const UriParser & u,size_t ch)118     Medium(const UriParser& u, size_t ch): m_counter(s_counter++), m_uri(u), m_chunk(ch) {}
Medium()119     Medium(): m_counter(s_counter++) {}
120 
121     virtual const char* type() = 0;
122     virtual bool IsOpen() = 0;
123     virtual void CloseInternal() = 0;
124 
CloseState()125     void CloseState()
126     {
127         m_open = false;
128         m_broken = true;
129     }
130 
131     // External API for this class that allows to close
132     // the entity on request. The CloseInternal should
133     // redirect to a type-specific function, the same that
134     // should be also called in destructor.
Close()135     void Close()
136     {
137         CloseState();
138         CloseInternal();
139     }
140     virtual bool End() = 0;
141 
142     virtual int ReadInternal(char* output, int size) = 0;
143     virtual bool IsErrorAgain() = 0;
144 
145     ReadStatus Read(bytevector& output);
146     virtual void Write(bytevector& portion) = 0;
147 
148     virtual void CreateListener() = 0;
149     virtual void CreateCaller() = 0;
150     virtual unique_ptr<Medium> Accept() = 0;
151     virtual void Connect() = 0;
152 
153     static std::unique_ptr<Medium> Create(const std::string& url, size_t chunk, Mode);
154 
155     virtual bool Broken() = 0;
Still()156     virtual size_t Still() { return 0; }
157 
158     class ReadEOF: public std::runtime_error
159     {
160     public:
ReadEOF(const std::string & fn)161         ReadEOF(const std::string& fn): std::runtime_error( "EOF while reading file: " + fn )
162         {
163         }
164     };
165 
166     class TransmissionError: public std::runtime_error
167     {
168     public:
TransmissionError(const std::string & fn)169         TransmissionError(const std::string& fn): std::runtime_error( fn )
170         {
171         }
172     };
173 
Error(const string & text)174     static void Error(const string& text)
175     {
176         throw TransmissionError("ERROR (internal): " + text);
177     }
178 
~Medium()179     virtual ~Medium()
180     {
181         CloseState();
182     }
183 
184 protected:
InitMode(Mode m)185     void InitMode(Mode m)
186     {
187         m_mode = m;
188         Init();
189 
190         if (m_mode == LISTENER)
191         {
192             CreateListener();
193             m_listener = true;
194         }
195         else
196         {
197             CreateCaller();
198         }
199 
200         m_open = true;
201     }
202 
Init()203     virtual void Init() {}
204 
205 };
206 
207 class Engine
208 {
209     Medium* media[2];
210     std::thread thr;
211     class Tunnel* parent_tunnel;
212     std::string nameid;
213 
214     int status = 0;
215     Medium::ReadStatus rdst = Medium::RD_ERROR;
216     UDT::ERRORINFO srtx;
217 
218 public:
219     enum Dir { DIR_IN, DIR_OUT };
220 
stat()221     int stat() { return status; }
222 
Engine(Tunnel * p,Medium * m1,Medium * m2,const std::string & nid)223     Engine(Tunnel* p, Medium* m1, Medium* m2, const std::string& nid)
224         :
225 #ifdef HAVE_FULL_CXX11
226         media {m1, m2},
227 #endif
228         parent_tunnel(p), nameid(nid)
229     {
230 #ifndef HAVE_FULL_CXX11
231         // MSVC is not exactly C++11 compliant and complains around
232         // initialization of an array.
233         // Leaving this method of initialization for clarity and
234         // possibly more preferred performance.
235         media[0] = m1;
236         media[1] = m2;
237 #endif
238     }
239 
Start()240     void Start()
241     {
242         Verb() << "START: " << media[DIR_IN]->uri() << " --> " << media[DIR_OUT]->uri();
243         const std::string thrn = media[DIR_IN]->id() + ">" + media[DIR_OUT]->id();
244         srt::ThreadName tn(thrn);
245 
246         thr = thread([this]() { Worker(); });
247     }
248 
Stop()249     void Stop()
250     {
251         // If this thread is already stopped, don't stop.
252         if (thr.joinable())
253         {
254             LOGP(applog.Debug, "Engine::Stop: Closing media:");
255             // Close both media as a hanged up reading thread
256             // will block joining.
257             media[0]->Close();
258             media[1]->Close();
259 
260             LOGP(applog.Debug, "Engine::Stop: media closed, joining engine thread:");
261             if (thr.get_id() == std::this_thread::get_id())
262             {
263                 // If this is this thread which called this, no need
264                 // to stop because this thread will exit by itself afterwards.
265                 // You must, however, detach yourself, or otherwise the thr's
266                 // destructor would kill the program.
267                 thr.detach();
268                 LOGP(applog.Debug, "DETACHED.");
269             }
270             else
271             {
272                 thr.join();
273                 LOGP(applog.Debug, "Joined.");
274             }
275         }
276     }
277 
278     void Worker();
279 };
280 
281 
282 struct Tunnelbox;
283 
284 class Tunnel
285 {
286     Tunnelbox* parent_box;
287     std::unique_ptr<Medium> med_acp, med_clr;
288     Engine acp_to_clr, clr_to_acp;
289     volatile bool running = true;
290     std::mutex access;
291 
292 public:
293 
show()294     string show()
295     {
296         return med_acp->uri() + " <-> " + med_clr->uri();
297     }
298 
Tunnel(Tunnelbox * m,std::unique_ptr<Medium> && acp,std::unique_ptr<Medium> && clr)299     Tunnel(Tunnelbox* m, std::unique_ptr<Medium>&& acp, std::unique_ptr<Medium>&& clr):
300         parent_box(m),
301         med_acp(move(acp)), med_clr(move(clr)),
302         acp_to_clr(this, med_acp.get(), med_clr.get(), med_acp->id() + ">" + med_clr->id()),
303         clr_to_acp(this, med_clr.get(), med_acp.get(), med_clr->id() + ">" + med_acp->id())
304     {
305     }
306 
Start()307     void Start()
308     {
309         acp_to_clr.Start();
310         clr_to_acp.Start();
311     }
312 
313     // This is to be called by an Engine from Engine::Worker
314     // thread.
315     // [[affinity = acp_to_clr.thr || clr_to_acp.thr]];
decommission_engine(Medium * which_medium)316     void decommission_engine(Medium* which_medium)
317     {
318         // which_medium is the medium that failed.
319         // Upon breaking of one medium from the pair,
320         // the other needs to be closed as well.
321         Verb() << "Medium broken: " << which_medium->uri();
322 
323         bool stop = true;
324 
325         /*
326         {
327             lock_guard<std::mutex> lk(access);
328             if (acp_to_clr.stat() == -1 && clr_to_acp.stat() == -1)
329             {
330                 Verb() << "Tunnel: Both engine decommissioned, will stop the tunnel.";
331                 // Both engines are down, decommission the tunnel.
332                 // Note that the status -1 means that particular engine
333                 // is not currently running and you can safely
334                 // join its thread.
335                 stop = true;
336             }
337             else
338             {
339                 Verb() << "Tunnel: Decommissioned one engine, waiting for the other one to report";
340             }
341         }
342         */
343 
344         if (stop)
345         {
346             // First, stop all media.
347             med_acp->Close();
348             med_clr->Close();
349 
350             // Then stop the tunnel (this is only a signal
351             // to a cleanup thread to delete it).
352             Stop();
353         }
354     }
355 
356     void Stop();
357 
358     bool decommission_if_dead(bool forced); // [[affinity = g_tunnels.thr]]
359 };
360 
Worker()361 void Engine::Worker()
362 {
363     bytevector outbuf;
364 
365     Medium* which_medium = media[DIR_IN];
366 
367     for (;;)
368     {
369         try
370         {
371             which_medium = media[DIR_IN];
372             rdst = media[DIR_IN]->Read((outbuf));
373             switch (rdst)
374             {
375             case Medium::RD_DATA:
376                 {
377                     which_medium = media[DIR_OUT];
378                     // We get the data, write them to the output
379                     media[DIR_OUT]->Write((outbuf));
380                 }
381                 break;
382 
383             case Medium::RD_EOF:
384                 status = -1;
385                 throw Medium::ReadEOF("");
386 
387             case Medium::RD_AGAIN:
388                 // Theoreticall RD_AGAIN should not be reported
389                 // because it should be taken care of internally by
390                 // repeated sending - unless we get m_broken set.
391                 // If it is, however, it should be handled just like error.
392             case Medium::RD_ERROR:
393                 status = -1;
394                 Medium::Error("Error while reading");
395             }
396         }
397         catch (Medium::ReadEOF&)
398         {
399             Verb() << "EOF. Exiting engine.";
400             break;
401         }
402         catch (Medium::TransmissionError& er)
403         {
404             Verb() << er.what() << " - interrupting engine: " << nameid;
405             break;
406         }
407     }
408 
409     // This is an engine thread and it should simply
410     // tell the parent_box Tunnel that it is no longer
411     // operative. It's not necessary to inform it which
412     // of two engines is decommissioned - it should only
413     // know that one of them got down. It will then check
414     // if both are down here and decommission the whole
415     // tunnel if so.
416     parent_tunnel->decommission_engine(which_medium);
417 }
418 
419 class SrtMedium: public Medium
420 {
421     SRTSOCKET m_socket = SRT_ERROR;
422     friend class Medium;
423 public:
424 
425 #ifdef HAVE_FULL_CXX11
426     using Medium::Medium;
427 
428 #else // MSVC and gcc 4.7 not exactly support C++11
429 
430     SrtMedium(UriParser u, size_t ch): Medium(u, ch) {}
431 
432 #endif
433 
IsOpen()434     bool IsOpen() override { return m_open; }
End()435     bool End() override { return m_eof; }
Broken()436     bool Broken() override { return m_broken; }
437 
CloseSrt()438     void CloseSrt()
439     {
440         Verb() << "Closing SRT socket for " << uri();
441         lock_guard<std::mutex> lk(access);
442         if (m_socket == SRT_ERROR)
443             return;
444         srt_close(m_socket);
445         m_socket = SRT_ERROR;
446     }
447 
448     // Forwarded in order to separate the implementation from
449     // the virtual function so that virtual function is not
450     // being called in destructor.
CloseInternal()451     void CloseInternal() override { return CloseSrt(); }
452 
type()453     const char* type() override { return "srt"; }
454     int ReadInternal(char* output, int size) override;
455     bool IsErrorAgain() override;
456 
457     void Write(bytevector& portion) override;
458     void CreateListener() override;
459     void CreateCaller() override;
460     unique_ptr<Medium> Accept() override;
461     void Connect() override;
462 
463 protected:
464     void Init() override;
465 
466     void ConfigurePre();
467     void ConfigurePost(SRTSOCKET socket);
468 
469     using Medium::Error;
470 
Error(UDT::ERRORINFO & ri,const string & text)471     static void Error(UDT::ERRORINFO& ri, const string& text)
472     {
473         throw TransmissionError("ERROR: " + text + ": " + ri.getErrorMessage());
474     }
475 
~SrtMedium()476     ~SrtMedium() override
477     {
478         CloseState();
479         CloseSrt();
480     }
481 };
482 
483 class TcpMedium: public Medium
484 {
485     int m_socket = -1;
486     friend class Medium;
487 public:
488 
489 #ifdef HAVE_FULL_CXX11
490     using Medium::Medium;
491 
492 #else // MSVC not exactly supports C++11
493 
494     TcpMedium(UriParser u, size_t ch): Medium(u, ch) {}
495 
496 #endif
497 
498 #ifdef _WIN32
tcp_close(int socket)499     static int tcp_close(int socket)
500     {
501         return ::closesocket(socket);
502     }
503 
504     enum { DEF_SEND_FLAG = 0 };
505 
506 #elif defined(LINUX) || defined(GNU) || defined(CYGWIN)
tcp_close(int socket)507     static int tcp_close(int socket)
508     {
509         return ::close(socket);
510     }
511 
512     enum { DEF_SEND_FLAG = MSG_NOSIGNAL };
513 
514 #else
tcp_close(int socket)515     static int tcp_close(int socket)
516     {
517         return ::close(socket);
518     }
519 
520     enum { DEF_SEND_FLAG = 0 };
521 
522 #endif
523 
IsOpen()524     bool IsOpen() override { return m_open; }
End()525     bool End() override { return m_eof; }
Broken()526     bool Broken() override { return m_broken; }
527 
CloseTcp()528     void CloseTcp()
529     {
530         Verb() << "Closing TCP socket for " << uri();
531         lock_guard<std::mutex> lk(access);
532         if (m_socket == -1)
533             return;
534         tcp_close(m_socket);
535         m_socket = -1;
536     }
CloseInternal()537     void CloseInternal() override { return CloseTcp(); }
538 
type()539     const char* type() override { return "tcp"; }
540     int ReadInternal(char* output, int size) override;
541     bool IsErrorAgain() override;
542     void Write(bytevector& portion) override;
543     void CreateListener() override;
544     void CreateCaller() override;
545     unique_ptr<Medium> Accept() override;
546     void Connect() override;
547 
548 protected:
549 
ConfigurePre()550     void ConfigurePre()
551     {
552 #if defined(__APPLE__)
553         int optval = 1;
554         setsockopt(m_socket, SOL_SOCKET, SO_NOSIGPIPE, &optval, sizeof(optval));
555 #endif
556     }
557 
ConfigurePost(int)558     void ConfigurePost(int)
559     {
560     }
561 
562     using Medium::Error;
563 
Error(int verrno,const string & text)564     static void Error(int verrno, const string& text)
565     {
566         char rbuf[1024];
567         throw TransmissionError("ERROR: " + text + ": " + SysStrError(verrno, rbuf, 1024));
568     }
569 
~TcpMedium()570     virtual ~TcpMedium()
571     {
572         CloseState();
573         CloseTcp();
574     }
575 };
576 
Init()577 void SrtMedium::Init()
578 {
579     // This function is required due to extra option
580     // check need
581 
582     if (m_options.count("mode"))
583         Error("No option 'mode' is required, it defaults to position of the argument");
584 
585     if (m_options.count("blocking"))
586         Error("Blocking is not configurable here.");
587 
588     // XXX
589     // Look also for other options that should not be here.
590 
591     // Enforce the transtype = file
592     m_options["transtype"] = "file";
593 }
594 
ConfigurePre()595 void SrtMedium::ConfigurePre()
596 {
597     vector<string> fails;
598     m_options["mode"] = "caller";
599     SrtConfigurePre(m_socket, "", m_options, &fails);
600     if (!fails.empty())
601     {
602         cerr << "Failed options: " << Printable(fails) << endl;
603     }
604 }
605 
ConfigurePost(SRTSOCKET so)606 void SrtMedium::ConfigurePost(SRTSOCKET so)
607 {
608     vector<string> fails;
609     SrtConfigurePost(so, m_options, &fails);
610     if (!fails.empty())
611     {
612         cerr << "Failed options: " << Printable(fails) << endl;
613     }
614 }
615 
CreateListener()616 void SrtMedium::CreateListener()
617 {
618     int backlog = 5; // hardcoded!
619 
620     m_socket = srt_create_socket();
621 
622     ConfigurePre();
623 
624     sockaddr_any sa = CreateAddr(m_uri.host(), m_uri.portno());
625 
626     int stat = srt_bind(m_socket, sa.get(), sizeof sa);
627 
628     if ( stat == SRT_ERROR )
629     {
630         srt_close(m_socket);
631         Error(UDT::getlasterror(), "srt_bind");
632     }
633 
634     stat = srt_listen(m_socket, backlog);
635     if ( stat == SRT_ERROR )
636     {
637         srt_close(m_socket);
638         Error(UDT::getlasterror(), "srt_listen");
639     }
640 
641     m_listener = true;
642 };
643 
CreateListener()644 void TcpMedium::CreateListener()
645 {
646     int backlog = 5; // hardcoded!
647 
648 
649     sockaddr_any sa = CreateAddr(m_uri.host(), m_uri.portno());
650 
651     m_socket = socket(sa.get()->sa_family, SOCK_STREAM, IPPROTO_TCP);
652     ConfigurePre();
653 
654     int stat = ::bind(m_socket, sa.get(), sa.size());
655 
656     if (stat == -1)
657     {
658         tcp_close(m_socket);
659         Error(errno, "bind");
660     }
661 
662     stat = listen(m_socket, backlog);
663     if ( stat == -1 )
664     {
665         tcp_close(m_socket);
666         Error(errno, "listen");
667     }
668 
669     m_listener = true;
670 }
671 
Accept()672 unique_ptr<Medium> SrtMedium::Accept()
673 {
674     sockaddr_any sa;
675     SRTSOCKET s = srt_accept(m_socket, (sa.get()), (&sa.len));
676     if (s == SRT_ERROR)
677     {
678         Error(UDT::getlasterror(), "srt_accept");
679     }
680 
681     ConfigurePost(s);
682 
683     // Configure 1s timeout
684     int timeout_1s = 1000;
685     srt_setsockflag(m_socket, SRTO_RCVTIMEO, &timeout_1s, sizeof timeout_1s);
686 
687     unique_ptr<Medium> med(CreateAcceptor(this, sa, s, m_chunk));
688     Verb() << "accepted a connection from " << med->uri();
689 
690     return med;
691 }
692 
Accept()693 unique_ptr<Medium> TcpMedium::Accept()
694 {
695     sockaddr_any sa;
696     int s = ::accept(m_socket, (sa.get()), (&sa.syslen()));
697     if (s == -1)
698     {
699         Error(errno, "accept");
700     }
701 
702     // Configure 1s timeout
703     timeval timeout_1s { 1, 0 };
704     int st SRT_ATR_UNUSED = setsockopt(s, SOL_SOCKET, SO_RCVTIMEO, (char*)&timeout_1s, sizeof timeout_1s);
705     timeval re;
706     socklen_t size = sizeof re;
707     int st2 SRT_ATR_UNUSED = getsockopt(s, SOL_SOCKET, SO_RCVTIMEO, (char*)&re, &size);
708 
709     LOGP(applog.Debug, "Setting SO_RCVTIMEO to @", m_socket, ": ", st == -1 ? "FAILED" : "SUCCEEDED",
710             ", read-back value: ", st2 == -1 ? int64_t(-1) : (int64_t(re.tv_sec)*1000000 + re.tv_usec)/1000, "ms");
711 
712     unique_ptr<Medium> med(CreateAcceptor(this, sa, s, m_chunk));
713     Verb() << "accepted a connection from " << med->uri();
714 
715     return med;
716 }
717 
CreateCaller()718 void SrtMedium::CreateCaller()
719 {
720     m_socket = srt_create_socket();
721     ConfigurePre();
722 
723     // XXX setting up outgoing port not supported
724 }
725 
CreateCaller()726 void TcpMedium::CreateCaller()
727 {
728     m_socket = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
729     ConfigurePre();
730 }
731 
Connect()732 void SrtMedium::Connect()
733 {
734     sockaddr_any sa = CreateAddr(m_uri.host(), m_uri.portno());
735 
736     int st = srt_connect(m_socket, sa.get(), sizeof sa);
737     if (st == SRT_ERROR)
738         Error(UDT::getlasterror(), "srt_connect");
739 
740     ConfigurePost(m_socket);
741 
742     // Configure 1s timeout
743     int timeout_1s = 1000;
744     srt_setsockflag(m_socket, SRTO_RCVTIMEO, &timeout_1s, sizeof timeout_1s);
745 }
746 
Connect()747 void TcpMedium::Connect()
748 {
749     sockaddr_any sa = CreateAddr(m_uri.host(), m_uri.portno());
750 
751     int st = ::connect(m_socket, sa.get(), sa.size());
752     if (st == -1)
753         Error(errno, "connect");
754 
755     ConfigurePost(m_socket);
756 
757     // Configure 1s timeout
758     timeval timeout_1s { 1, 0 };
759     setsockopt(m_socket, SOL_SOCKET, SO_RCVTIMEO, (char*)&timeout_1s, sizeof timeout_1s);
760 }
761 
ReadInternal(char * w_buffer,int size)762 int SrtMedium::ReadInternal(char* w_buffer, int size)
763 {
764     int st = -1;
765     do
766     {
767         st = srt_recv(m_socket, (w_buffer), size);
768         if (st == SRT_ERROR)
769         {
770             int syserr;
771             if (srt_getlasterror(&syserr) == SRT_EASYNCRCV && !m_broken)
772                 continue;
773         }
774         break;
775 
776     } while (true);
777 
778     return st;
779 }
780 
ReadInternal(char * w_buffer,int size)781 int TcpMedium::ReadInternal(char* w_buffer, int size)
782 {
783     int st = -1;
784     LOGP(applog.Debug, "TcpMedium:recv @", m_socket, " - begin");
785     do
786     {
787         st = ::recv(m_socket, (w_buffer), size, 0);
788         if (st == -1)
789         {
790             if ((errno == EAGAIN || errno == EWOULDBLOCK))
791             {
792                 if (!m_broken)
793                 {
794                     LOGP(applog.Debug, "TcpMedium: read:AGAIN, repeating");
795                     continue;
796                 }
797                 LOGP(applog.Debug, "TcpMedium: read:AGAIN, not repeating - already broken");
798             }
799             else
800             {
801                 LOGP(applog.Debug, "TcpMedium: read:ERROR: ", errno);
802             }
803         }
804         break;
805     } while (true);
806     LOGP(applog.Debug, "TcpMedium:recv @", m_socket, " - result: ", st);
807     return st;
808 }
809 
IsErrorAgain()810 bool SrtMedium::IsErrorAgain()
811 {
812     return srt_getlasterror(NULL) == SRT_EASYNCRCV;
813 }
814 
IsErrorAgain()815 bool TcpMedium::IsErrorAgain()
816 {
817     return errno == EAGAIN;
818 }
819 
820 // The idea of Read function is to get the buffer that
821 // possibly contains some data not written to the output yet,
822 // but the time has come to read. We can't let the buffer expand
823 // more than the size of the chunk, so if the buffer size already
824 // exceeds it, don't return any data, but behave as if they were read.
825 // This will cause the worker loop to redirect to Write immediately
826 // thereafter and possibly will flush out the remains of the buffer.
827 // It's still possible that the buffer won't be completely purged
Read(bytevector & w_output)828 Medium::ReadStatus Medium::Read(bytevector& w_output)
829 {
830     // Don't read, but fake that you read
831     if (w_output.size() > m_chunk)
832     {
833         Verb() << "BUFFER EXCEEDED";
834         return RD_DATA;
835     }
836 
837     // Resize to maximum first
838     size_t shift = w_output.size();
839     if (shift && m_eof)
840     {
841         // You have nonempty buffer, but eof was already
842         // encountered. Report as if something was read.
843         //
844         // Don't read anything because this will surely
845         // result in error since now.
846         return RD_DATA;
847     }
848 
849     size_t pred_size = shift + m_chunk;
850 
851     w_output.resize(pred_size);
852     int st = ReadInternal((w_output.data() + shift), m_chunk);
853     if (st == -1)
854     {
855         if (IsErrorAgain())
856             return RD_AGAIN;
857 
858         return RD_ERROR;
859     }
860 
861     if (st == 0)
862     {
863         m_eof = true;
864         if (shift)
865         {
866             // If there's 0 (eof), but you still have data
867             // in the buffer, fake that they were read. Only
868             // when the buffer was empty at entrance should this
869             // result with EOF.
870             //
871             // Set back the size this buffer had before we attempted
872             // to read into it.
873             w_output.resize(shift);
874             return RD_DATA;
875         }
876         w_output.clear();
877         return RD_EOF;
878     }
879 
880     w_output.resize(shift+st);
881     return RD_DATA;
882 }
883 
Write(bytevector & w_buffer)884 void SrtMedium::Write(bytevector& w_buffer)
885 {
886     int st = srt_send(m_socket, w_buffer.data(), w_buffer.size());
887     if (st == SRT_ERROR)
888     {
889         Error(UDT::getlasterror(), "srt_send");
890     }
891 
892     // This should be ==, whereas > is not possible, but
893     // this should simply embrace this case as a sanity check.
894     if (st >= int(w_buffer.size()))
895         w_buffer.clear();
896     else if (st == 0)
897     {
898         Error("Unexpected EOF on Write");
899     }
900     else
901     {
902         // Remove only those bytes that were sent
903         w_buffer.erase(w_buffer.begin(), w_buffer.begin()+st);
904     }
905 }
906 
Write(bytevector & w_buffer)907 void TcpMedium::Write(bytevector& w_buffer)
908 {
909     int st = ::send(m_socket, w_buffer.data(), w_buffer.size(), DEF_SEND_FLAG);
910     if (st == -1)
911     {
912         Error(errno, "send");
913     }
914 
915     // This should be ==, whereas > is not possible, but
916     // this should simply embrace this case as a sanity check.
917     if (st >= int(w_buffer.size()))
918         w_buffer.clear();
919     else if (st == 0)
920     {
921         Error("Unexpected EOF on Write");
922     }
923     else
924     {
925         // Remove only those bytes that were sent
926         w_buffer.erase(w_buffer.begin(), w_buffer.begin()+st);
927     }
928 }
929 
Create(const std::string & url,size_t chunk,Medium::Mode mode)930 std::unique_ptr<Medium> Medium::Create(const std::string& url, size_t chunk, Medium::Mode mode)
931 {
932     UriParser uri(url);
933     std::unique_ptr<Medium> out;
934 
935     // Might be something smarter, but there are only 2 types.
936     if (uri.scheme() == "srt")
937     {
938         out.reset(new SrtMedium(uri, chunk));
939     }
940     else if (uri.scheme() == "tcp")
941     {
942         out.reset(new TcpMedium(uri, chunk));
943     }
944     else
945     {
946         Error("Medium not supported");
947     }
948 
949     out->InitMode(mode);
950 
951     return out;
952 }
953 
954 struct Tunnelbox
955 {
956     list<unique_ptr<Tunnel>> tunnels;
957     std::mutex access;
958     condition_variable decom_ready;
959     bool main_running = true;
960     thread thr;
961 
signal_decommissionTunnelbox962     void signal_decommission()
963     {
964         lock_guard<std::mutex> lk(access);
965         decom_ready.notify_one();
966     }
967 
installTunnelbox968     void install(std::unique_ptr<Medium>&& acp, std::unique_ptr<Medium>&& clr)
969     {
970         lock_guard<std::mutex> lk(access);
971         Verb() << "Tunnelbox: Starting tunnel: " << acp->uri() << " <-> " << clr->uri();
972 
973         tunnels.emplace_back(new Tunnel(this, move(acp), move(clr)));
974         // Note: after this instruction, acp and clr are no longer valid!
975         auto& it = tunnels.back();
976 
977         it->Start();
978     }
979 
start_cleanerTunnelbox980     void start_cleaner()
981     {
982         thr = thread( [this]() { CleanupWorker(); } );
983     }
984 
stop_cleanerTunnelbox985     void stop_cleaner()
986     {
987         if (thr.joinable())
988             thr.join();
989     }
990 
991 private:
992 
CleanupWorkerTunnelbox993     void CleanupWorker()
994     {
995         unique_lock<std::mutex> lk(access);
996 
997         while (main_running)
998         {
999             decom_ready.wait(lk);
1000 
1001             // Got a signal, find a tunnel ready to cleanup.
1002             // We just get the signal, but we don't know which
1003             // tunnel has generated it.
1004             for (auto i = tunnels.begin(), i_next = i; i != tunnels.end(); i = i_next)
1005             {
1006                 ++i_next;
1007                 // Bound in one call the check if the tunnel is dead
1008                 // and decommissioning because this must be done in
1009                 // the one critical section - make sure no other thread
1010                 // is accessing it at the same time and also make join all
1011                 // threads that might have been accessing it. After
1012                 // exiting as true (meaning that it was decommissioned
1013                 // as expected) it can be safely deleted.
1014                 if ((*i)->decommission_if_dead(main_running))
1015                 {
1016                     tunnels.erase(i);
1017                 }
1018             }
1019         }
1020     }
1021 };
1022 
Stop()1023 void Tunnel::Stop()
1024 {
1025     // Check for running must be done without locking
1026     // because if the tunnel isn't running
1027     if (!running)
1028         return; // already stopped
1029 
1030     lock_guard<std::mutex> lk(access);
1031 
1032     // Ok, you are the first to make the tunnel
1033     // not running and inform the tunnelbox.
1034     running = false;
1035     parent_box->signal_decommission();
1036 }
1037 
decommission_if_dead(bool forced)1038 bool Tunnel::decommission_if_dead(bool forced)
1039 {
1040     lock_guard<std::mutex> lk(access);
1041     if (running && !forced)
1042         return false; // working, not to be decommissioned
1043 
1044     // Join the engine threads, make sure nothing
1045     // is running that could use the data.
1046     acp_to_clr.Stop();
1047     clr_to_acp.Stop();
1048 
1049 
1050     // Done. The tunnelbox after calling this can
1051     // safely delete the decommissioned tunnel.
1052     return true;
1053 }
1054 
1055 int Medium::s_counter = 1;
1056 
1057 Tunnelbox g_tunnels;
1058 std::unique_ptr<Medium> main_listener;
1059 
1060 size_t default_chunk = 4096;
1061 
OnINT_StopService(int)1062 int OnINT_StopService(int)
1063 {
1064     g_tunnels.main_running = false;
1065     g_tunnels.signal_decommission();
1066 
1067     // Will cause the Accept() block to exit.
1068     main_listener->Close();
1069 
1070     return 0;
1071 }
1072 
main(int argc,char ** argv)1073 int main( int argc, char** argv )
1074 {
1075     if (!SysInitializeNetwork())
1076     {
1077         cerr << "Fail to initialize network module.";
1078         return 1;
1079     }
1080 
1081     size_t chunk = default_chunk;
1082 
1083     OptionName
1084         o_loglevel = { "ll", "loglevel" },
1085         o_logfa = { "lf", "logfa" },
1086         o_chunk = {"c", "chunk" },
1087         o_verbose = {"v", "verbose" },
1088         o_noflush = {"s", "skipflush" };
1089 
1090     // Options that expect no arguments (ARG_NONE) need not be mentioned.
1091     vector<OptionScheme> optargs = {
1092         { o_loglevel, OptionScheme::ARG_ONE },
1093         { o_logfa, OptionScheme::ARG_ONE },
1094         { o_chunk, OptionScheme::ARG_ONE }
1095     };
1096     options_t params = ProcessOptions(argv, argc, optargs);
1097 
1098     /*
1099        cerr << "OPTIONS (DEBUG)\n";
1100        for (auto o: params)
1101        {
1102        cerr << "[" << o.first << "] ";
1103        copy(o.second.begin(), o.second.end(), ostream_iterator<string>(cerr, " "));
1104        cerr << endl;
1105        }
1106      */
1107 
1108     vector<string> args = params[""];
1109     if ( args.size() < 2 )
1110     {
1111         cerr << "Usage: " << argv[0] << " <listen-uri> <call-uri>\n";
1112         return 1;
1113     }
1114 
1115     string loglevel = Option<OutString>(params, "error", o_loglevel);
1116     string logfa = Option<OutString>(params, "", o_logfa);
1117     srt_logging::LogLevel::type lev = SrtParseLogLevel(loglevel);
1118     srt::setloglevel(lev);
1119     if (logfa == "")
1120     {
1121         srt::addlogfa(SRT_LOGFA_APP);
1122     }
1123     else
1124     {
1125         // Add only selected FAs
1126         set<string> unknown_fas;
1127         set<srt_logging::LogFA> fas = SrtParseLogFA(logfa, &unknown_fas);
1128         srt::resetlogfa(fas);
1129 
1130         // The general parser doesn't recognize the "app" FA, we check it here.
1131         if (unknown_fas.count("app"))
1132             srt::addlogfa(SRT_LOGFA_APP);
1133     }
1134 
1135     string verbo = Option<OutString>(params, "no", o_verbose);
1136     if ( verbo == "" || !false_names.count(verbo) )
1137     {
1138         Verbose::on = true;
1139         Verbose::cverb = &std::cout;
1140     }
1141 
1142     string chunks = Option<OutString>(params, "", o_chunk);
1143     if ( chunks!= "" )
1144     {
1145         chunk = stoi(chunks);
1146     }
1147 
1148     string listen_node = args[0];
1149     string call_node = args[1];
1150 
1151     UriParser ul(listen_node), uc(call_node);
1152 
1153     // It is allowed to use both media of the same type,
1154     // but only srt and tcp are allowed.
1155 
1156     set<string> allowed = {"srt", "tcp"};
1157     if (!allowed.count(ul.scheme())|| !allowed.count(uc.scheme()))
1158     {
1159         cerr << "ERROR: only tcp and srt schemes supported";
1160         return -1;
1161     }
1162 
1163     Verb() << "LISTEN type=" << ul.scheme() << ", CALL type=" << uc.scheme();
1164 
1165     g_tunnels.start_cleaner();
1166 
1167     main_listener = Medium::Create(listen_node, chunk, Medium::LISTENER);
1168 
1169     // The main program loop is only to catch
1170     // new connections and manage them. Also takes care
1171     // of the broken connections.
1172 
1173     for (;;)
1174     {
1175         try
1176         {
1177             Verb() << "Waiting for connection...";
1178             std::unique_ptr<Medium> accepted = main_listener->Accept();
1179             if (!g_tunnels.main_running)
1180             {
1181                 Verb() << "Service stopped. Exiting.";
1182                 break;
1183             }
1184             Verb() << "Connection accepted. Connecting to the relay...";
1185 
1186             // Now call the target address.
1187             std::unique_ptr<Medium> caller = Medium::Create(call_node, chunk, Medium::CALLER);
1188             caller->Connect();
1189 
1190             Verb() << "Connected. Establishing pipe.";
1191 
1192             // No exception, we are free to pass :)
1193             g_tunnels.install(move(accepted), move(caller));
1194         }
1195         catch (...)
1196         {
1197             Verb() << "Connection reported, but failed";
1198         }
1199     }
1200 
1201     g_tunnels.stop_cleaner();
1202 
1203     return 0;
1204 }
1205 
1206 
1207