1 /*
2  * SRT - Secure, Reliable, Transport
3  * Copyright (c) 2018 Haivision Systems Inc.
4  *
5  * This Source Code Form is subject to the terms of the Mozilla Public
6  * License, v. 2.0. If a copy of the MPL was not distributed with this
7  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8  *
9  */
10 
11 /*****************************************************************************
12 written by
13    Haivision Systems Inc.
14  *****************************************************************************/
15 
16 #include "platform_sys.h"
17 
18 #include <atomic>
19 #include <iostream>
20 #include <iterator>
21 #include <vector>
22 #include <list>
23 #include <map>
24 #include <stdexcept>
25 #include <string>
26 #include <thread>
27 #include <chrono>
28 #include <deque>
29 #include <mutex>
30 #include <condition_variable>
31 #include <csignal>
32 #include <sys/stat.h>
33 #include <srt.h>
34 #include <udt.h>
35 
36 #include "testactivemedia.hpp"
37 
38 #include "apputil.hpp"
39 #include "uriparser.hpp"
40 #include "logsupport.hpp"
41 #include "logging.h"
42 #include "socketoptions.hpp"
43 #include "verbose.hpp"
44 #include "testmedia.hpp"
45 #include "threadname.h"
46 
47 
48 
49 
50 bool Upload(UriParser& srt, UriParser& file);
51 bool Download(UriParser& srt, UriParser& file);
52 
53 srt_logging::Logger applog(SRT_LOGFA_APP, srt_logger_config, "srt-relay");
54 
55 std::atomic<bool> g_program_established {false};
56 
57 SrtModel* g_pending_model = nullptr;
58 
59 thread::id g_root_thread = std::this_thread::get_id();
60 
OnINT_SetInterrupted(int)61 static void OnINT_SetInterrupted(int)
62 {
63     Verb() << VerbLock << "SIGINT: Setting interrupt state.";
64     ::transmit_int_state = true;
65 
66     // Just for a case, forcefully close all active SRT sockets.
67     SrtModel* pm = ::g_pending_model;
68     if (pm)
69     {
70         // The program is hanged on accepting a new SRT connection.
71         // We need to check which thread we've fallen into.
72         if (this_thread::get_id() == g_root_thread)
73         {
74             // Throw an exception, it will be caught in a predicted place.
75             throw std::runtime_error("Interrupted on request");
76         }
77         else
78         {
79             // This is some other thread, so close the listener socket.
80             // This will cause the accept block to be interrupted.
81             for (SRTSOCKET i: { pm->Socket(), pm->Listener() })
82                 if (i != SRT_INVALID_SOCK)
83                     srt_close(i);
84         }
85     }
86 
87 }
88 
89 using namespace std;
90 
91 size_t g_chunksize = 0;
92 size_t g_default_live_chunksize = 1316;
93 size_t g_default_file_chunksize = 1456;
94 
95 class SrtMainLoop
96 {
97     UriParser m_srtspec;
98 
99     // Media used
100     unique_ptr<SrtRelay> m_srt_relay;
101     SourceMedium m_srt_source;
102     SourceMedium m_input_medium;
103     list<unique_ptr<TargetMedium>> m_output_media;
104     thread m_input_thr;
105     std::exception_ptr m_input_xp;
106 
107     void InputRunner();
108     volatile bool m_input_running = false;
109 
110 public:
111     SrtMainLoop(const string& srt_uri, bool input_echoback, const string& input_spec, const vector<string>& output_spec);
112 
113     void run();
114 
MakeStop()115     void MakeStop() { m_input_running = false; }
IsRunning()116     bool IsRunning() { return m_input_running; }
117 
~SrtMainLoop()118     ~SrtMainLoop()
119     {
120         if (m_input_thr.joinable())
121             m_input_thr.join();
122     }
123 };
124 
main(int argc,char ** argv)125 int main( int argc, char** argv )
126 {
127     OptionName
128         o_loglevel = { "ll", "loglevel" },
129         o_logfa = { "lf", "logfa" },
130         o_verbose = {"v", "verbose" },
131         o_input = {"i", "input"},
132         o_output = {"o", "output"},
133         o_echo = {"e", "io", "input-echoback"},
134         o_chunksize = {"c", "chunk"}
135     ;
136 
137     // Options that expect no arguments (ARG_NONE) need not be mentioned.
138     vector<OptionScheme> optargs = {
139         { o_loglevel, OptionScheme::ARG_ONE },
140         { o_logfa, OptionScheme::ARG_ONE },
141         { o_input, OptionScheme::ARG_ONE },
142         { o_output, OptionScheme::ARG_VAR },
143         { o_chunksize, OptionScheme::ARG_ONE }
144     };
145     options_t params = ProcessOptions(argv, argc, optargs);
146 
147     /*
148     cerr << "OPTIONS (DEBUG)\n";
149     for (auto o: params)
150     {
151         cerr << "[" << o.first << "] ";
152         copy(o.second.begin(), o.second.end(), ostream_iterator<string>(cerr, " "));
153         cerr << endl;
154     }
155     */
156 
157     vector<string> args = params[""];
158     if ( args.size() != 1 )
159     {
160         cerr << "Usage: " << argv[0] << " <srt-endpoint> [ -i <input> | -e ] [ -o <output> ]\n";
161         cerr << "Options:\n";
162         cerr << "\t-v  .  .  .  .  .  .  .  .  .  .  Verbose mode\n";
163         cerr << "\t-ll <level=error>  .  .  .  .  .  Log level for SRT\n";
164 		cerr << "\t-lf <logfa=all>    .  .  .  .  .  Log Functional Areas enabled\n";
165         cerr << "\t-c  <size=1316[live]|1456[file]>  Single reading buffer size\n";
166 		cerr << "\t-i  <URI> .  .  .  .  .  .  .  .  Input medium spec\n";
167 		cerr << "\t-o  <URI> .  .  .  .  .  .  .  .  Output medium spec\n";
168 		cerr << "\t-e  .  .  .  (conflicts with -i)  Feed SRT output back to SRT input\n";
169 		cerr << "\nNote: specify `transtype=file` for using TCP-like stream mode\n";
170         return 1;
171     }
172 
173     string loglevel = Option<OutString>(params, "error", o_loglevel);
174     string logfa = Option<OutString>(params, "", o_logfa);
175     srt_logging::LogLevel::type lev = SrtParseLogLevel(loglevel);
176     UDT::setloglevel(lev);
177     if (logfa == "")
178     {
179         UDT::addlogfa(SRT_LOGFA_APP);
180     }
181     else
182     {
183         // Add only selected FAs
184         set<string> unknown_fas;
185         set<srt_logging::LogFA> fas = SrtParseLogFA(logfa, &unknown_fas);
186         UDT::resetlogfa(fas);
187 
188         // The general parser doesn't recognize the "app" FA, we check it here.
189         if (unknown_fas.count("app"))
190             UDT::addlogfa(SRT_LOGFA_APP);
191     }
192 
193     string verbo = Option<OutString>(params, "no", o_verbose);
194     if ( verbo == "" || !false_names.count(verbo) )
195     {
196         Verbose::on = true;
197         int verboch = atoi(verbo.c_str());
198         if (verboch <= 0)
199         {
200             verboch = 1;
201         }
202         else if (verboch > 2)
203         {
204             cerr << "ERROR: -v option accepts value 1 (stdout, default) or 2 (stderr)\n";
205             return 1;
206         }
207 
208         if (verboch == 1)
209         {
210             Verbose::cverb = &std::cout;
211         }
212         else
213         {
214             Verbose::cverb = &std::cerr;
215         }
216     }
217 
218     string chunk = Option<OutString>(params, "", o_chunksize);
219     if (chunk != "")
220     {
221         ::g_chunksize = stoi(chunk);
222     }
223 
224     string srt_endpoint = args[0];
225 
226     UriParser usrt(srt_endpoint);
227 
228     if (usrt.scheme() != "srt")
229     {
230         cerr << "ERROR: the only one freestanding parameter should be an SRT uri.\n";
231         cerr << "Usage: " << argv[0] << " <srt-endpoint> [ -i <input> ] [ -o <output> ] [ -e ]\n";
232         return 1;
233     }
234 
235     // Allowed are only one input and multiple outputs.
236     // Input-echoback is treated as a single input.
237     bool input_echoback = Option<OutString>(params, "no", o_echo) != "no";
238     string input_spec = Option<OutString>(params, "", o_input);
239 
240     if (input_spec != "" && input_echoback)
241     {
242         cerr << "ERROR: input-echoback is treated as input specifcation, -i can't be specified together.\n";
243         return 1;
244     }
245 
246     vector<string> output_spec = Option<OutList>(params, vector<string>{}, o_output);
247 
248     if (!input_echoback)
249     {
250         if (input_spec == "" || output_spec.empty())
251         {
252             cerr << "ERROR: at least one input and one output must be specified (-io specifies both)\n";
253             return 1;
254         }
255     }
256 
257     Verb() << "SETTINGS:";
258     Verb() << "SRT connection: " << srt_endpoint;
259     if (input_echoback)
260     {
261         Verb() << "INPUT: (from SRT connection)";
262     }
263     else
264     {
265         Verb() << "INPUT: " << input_spec;
266     }
267 
268     Verb() << "OUTPUT LIST:";
269     if (input_echoback)
270     {
271         Verb() << "\t(back to SRT connection)";
272     }
273     for (auto& s: output_spec)
274         Verb() << "\t" << s;
275 
276 #ifdef _MSC_VER
277 	// Replacement for sigaction, just use 'signal'
278 	// This may make this working kinda impaired and unexpected,
279 	// but still better that not compiling at all.
280 	signal(SIGINT, OnINT_SetInterrupted);
281 #else
282     struct sigaction sigIntHandler;
283 
284     sigIntHandler.sa_handler = OnINT_SetInterrupted;
285     sigemptyset(&sigIntHandler.sa_mask);
286     sigIntHandler.sa_flags = 0;
287 
288     sigaction(SIGINT, &sigIntHandler, NULL);
289 #endif
290 
291     try
292     {
293         SrtMainLoop loop(srt_endpoint, input_echoback, input_spec, output_spec);
294         loop.run();
295     }
296     catch (std::exception& x)
297     {
298         cerr << "ERROR: " << x.what() << endl;
299         return 1;
300     }
301 
302 
303     return 0;
304 }
305 
SrtMainLoop(const string & srt_uri,bool input_echoback,const string & input_spec,const vector<string> & output_spec)306 SrtMainLoop::SrtMainLoop(const string& srt_uri, bool input_echoback, const string& input_spec, const vector<string>& output_spec)
307 {
308     // Now prepare all media
309     // They use pointers instead of real variables
310     // so that the creation time can be delayed
311     // up to this moment, and the parameters prepared
312     // before passing to the constructors.
313 
314     // Start with output media so that they are ready when
315     // the data come in.
316 
317     for (string spec: output_spec)
318     {
319         Verb() << "Setting up output: " << spec;
320         unique_ptr<TargetMedium> m { new TargetMedium };
321         m->Setup(Target::Create(spec));
322         m_output_media.push_back(move(m));
323     }
324 
325 
326     // Start with SRT.
327 
328     UriParser srtspec(srt_uri);
329     string transtype = srtspec["transtype"].deflt("live");
330 
331     SrtModel m(srtspec.host(), srtspec.portno(), srtspec.parameters());
332 
333     // Just to keep it unchanged.
334     string id = m_srtspec["streamid"];
335 
336     Verb() << "Establishing SRT connection: " << srt_uri;
337 
338     ::g_pending_model = &m;
339     m.Establish((id));
340 
341     ::g_program_established = true;
342     ::g_pending_model = nullptr;
343 
344     Verb() << "... Established. configuring other pipes:";
345 
346     // Once it's ready, use it to initialize the medium.
347     bool file_mode = (transtype == "file");
348     if (g_chunksize == 0)
349     {
350         if (file_mode)
351             g_chunksize = g_default_file_chunksize;
352         else
353             g_chunksize = g_default_live_chunksize;
354 
355         Verb() << "DEFAULT CHUNKSIZE used: " << g_chunksize;
356     }
357 
358     m_srt_relay.reset(new SrtRelay);
359     m_srt_relay->StealFrom(m);
360 
361     m_srt_source.Setup(m_srt_relay.get(), g_chunksize);
362 
363     // Now check the input medium
364     if (input_echoback)
365     {
366         Verb() << "SRT set up as input source and the first output target";
367 
368         // Add SRT medium to output targets, and keep input medium empty.
369         unique_ptr<TargetMedium> m { new TargetMedium };
370         m->Setup(m_srt_relay.get());
371         m_output_media.push_back(move(m));
372     }
373     else
374     {
375         // Initialize input medium and do not add SRT medium
376         // to the output list, as this will be fed directly
377         // by the data from this input medium in a spearate engine.
378         Verb() << "Setting up input: " << input_spec;
379         m_input_medium.Setup(Source::Create(input_spec), g_chunksize);
380 
381         if (!file_mode)
382         {
383             // Also set writing to SRT non-blocking always.
384             bool no = false;
385             srt_setsockflag(m_srt_relay->Socket(), SRTO_SNDSYN, &no, sizeof no);
386         }
387     }
388 
389     // We're done here.
390     Verb() << "MEDIA SUCCESSFULLY CREATED.";
391 }
392 
InputRunner()393 void SrtMainLoop::InputRunner()
394 {
395     srt::ThreadName::set("InputRN");
396     // An extra thread with a loop that reads from the external input
397     // and writes into the SRT medium. When echoback mode is used,
398     // this thread isn't started at all and instead the SRT reading
399     // serves as both SRT reading and input reading.
400 
401     auto on_return_set = OnReturnSet(m_input_running, false);
402 
403     Verb() << VerbLock << "RUNNING INPUT LOOP";
404     for (;;)
405     {
406         applog.Debug() << "SrtMainLoop::InputRunner: extracting...";
407         auto data = m_input_medium.Extract();
408 
409         if (data.payload.empty())
410         {
411             Verb() << "INPUT READING INTERRUPTED.";
412             break;
413         }
414 
415         //Verb() << "INPUT [" << data.size() << "]  " << VerbNoEOL;
416         applog.Debug() << "SrtMainLoop::InputRunner: [" << data.payload.size() << "] CLIENT -> SRT-RELAY";
417         m_srt_relay->Write(data);
418     }
419 }
420 
run()421 void SrtMainLoop::run()
422 {
423     // Start the media runners.
424 
425     Verb() << VerbLock << "STARTING OUTPUT threads:";
426 
427     for (auto& o: m_output_media)
428         o->run();
429 
430     Verb() << VerbLock << "STARTING SRT INPUT LOOP";
431     m_srt_source.run();
432 
433     Verb() << VerbLock << "STARTING INPUT ";
434     if (m_input_medium.med)
435     {
436         m_input_medium.run();
437         m_input_running = true;
438 
439         std::ostringstream tns;
440         tns << "Input:" << this;
441         srt::ThreadName tn(tns.str());
442         m_input_thr = thread([this] {
443                 try {
444                     InputRunner();
445                 } catch (...) {
446                     m_input_xp = std::current_exception();
447                 }
448 
449                 Verb() << "INPUT: thread exit";
450         });
451     }
452 
453     Verb() << VerbLock << "RUNNING SRT MEDIA LOOP";
454     for (;;)
455     {
456         applog.Debug() << "SrtMainLoop::run: SRT-RELAY: extracting...";
457         auto data = m_srt_source.Extract();
458 
459         if (data.payload.empty())
460         {
461             Verb() << "SRT READING INTERRUPTED.";
462             break;
463         }
464 
465         vector<string> output_report;
466         bool any = false;
467         int no = 1;
468 
469         for (auto i = m_output_media.begin(), i_next = i; i != m_output_media.end(); i = i_next)
470         {
471             ++i_next;
472             auto& o = *i;
473             applog.Debug() << "SrtMainLoop::run: [" << data.payload.size() << "] SRT-RELAY: resending to output #" << no << "...";
474             if (!o->Schedule(data))
475             {
476                 if (Verbose::on)
477                 {
478                     ostringstream os;
479                     os << " --XXX-> <" << no << ">";
480                     output_report.push_back(os.str());
481                 }
482                 m_output_media.erase(i);
483                 continue;
484             }
485 
486             if (Verbose::on)
487             {
488                 ostringstream os;
489                 os << " --> <" << no << ">";
490                 output_report.push_back(os.str());
491             }
492             any = true;
493             ++no;
494         }
495         applog.Debug() << "SrtMainLoop::run: [" << data.payload.size() << "] SRT-RELAY -> OUTPUTS: " << Printable(output_report);
496 
497         if (Verbose::on)
498         {
499             string outputs;
500             for (auto& r: output_report)
501                 outputs += " " + r;
502             if (!any)
503                 outputs = " --> * (no output)";
504 
505             Verb() << VerbLock << "SRT [" << data.payload.size() << "]  " << outputs;
506         }
507     }
508 
509     Verb() << "MEDIA LOOP EXIT";
510     for (auto& m : m_output_media)
511     {
512         m->quit();
513     }
514     m_input_medium.quit();
515     m_srt_source.quit();
516 
517     if (m_input_xp)
518     {
519         try {
520             std::rethrow_exception(m_input_xp);
521         } catch (std::exception& x) {
522             cerr << "INPUT EXIT BY EXCEPTION: " << x.what() << endl;
523         } catch (...) {
524             cerr << "INPUT EXIT BY UNKNOWN EXCEPTION\n";
525         }
526     }
527 }
528