1 /*
2  * SRT - Secure, Reliable, Transport
3  * Copyright (c) 2018 Haivision Systems Inc.
4  *
5  * This Source Code Form is subject to the terms of the Mozilla Public
6  * License, v. 2.0. If a copy of the MPL was not distributed with this
7  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8  *
9  */
10 
11 // Just for formality. This file should be used
12 #include <iostream>
13 #include <iomanip>
14 #include <fstream>
15 #include <sstream>
16 #include <memory>
17 #include <string>
18 #include <stdexcept>
19 #include <iterator>
20 #include <map>
21 #include <srt.h>
22 #if !defined(_WIN32)
23 #include <sys/ioctl.h>
24 #else
25 #include <fcntl.h>
26 #include <io.h>
27 #endif
28 #if defined(SUNOS)
29 #include <sys/filio.h>
30 #endif
31 
32 #include "netinet_any.h"
33 #include "apputil.hpp"
34 #include "socketoptions.hpp"
35 #include "uriparser.hpp"
36 #include "transmitmedia.hpp"
37 #include "srt_compat.h"
38 #include "verbose.hpp"
39 
40 using namespace std;
41 
42 bool g_stats_are_printed_to_stdout = false;
43 bool transmit_total_stats = false;
44 unsigned long transmit_bw_report = 0;
45 unsigned long transmit_stats_report = 0;
46 unsigned long transmit_chunk_size = SRT_LIVE_MAX_PLSIZE;
47 
48 class FileSource: public Source
49 {
50     ifstream ifile;
51     string filename_copy;
52 public:
53 
FileSource(const string & path)54     FileSource(const string& path): ifile(path, ios::in | ios::binary), filename_copy(path)
55     {
56         if ( !ifile )
57             throw std::runtime_error(path + ": Can't open file for reading");
58     }
59 
Read(size_t chunk,MediaPacket & pkt,ostream & ignored SRT_ATR_UNUSED=cout)60     int Read(size_t chunk, MediaPacket& pkt, ostream & ignored SRT_ATR_UNUSED = cout) override
61     {
62         if (pkt.payload.size() < chunk)
63             pkt.payload.resize(chunk);
64 
65         pkt.time = 0;
66         ifile.read(pkt.payload.data(), chunk);
67         size_t nread = ifile.gcount();
68         if (nread < pkt.payload.size())
69             pkt.payload.resize(nread);
70 
71         if (pkt.payload.empty())
72         {
73             return 0;
74         }
75 
76         return (int) nread;
77     }
78 
IsOpen()79     bool IsOpen() override { return bool(ifile); }
End()80     bool End() override { return ifile.eof(); }
81 };
82 
83 class FileTarget: public Target
84 {
85     ofstream ofile;
86 public:
87 
FileTarget(const string & path)88     FileTarget(const string& path): ofile(path, ios::out | ios::trunc | ios::binary) {}
89 
Write(const char * data,size_t size,int64_t time SRT_ATR_UNUSED,ostream & ignored SRT_ATR_UNUSED=cout)90     int Write(const char* data, size_t size, int64_t time SRT_ATR_UNUSED, ostream & ignored SRT_ATR_UNUSED = cout) override
91     {
92         ofile.write(data, size);
93         return !(ofile.bad()) ? (int) size : 0;
94     }
95 
IsOpen()96     bool IsOpen() override { return !!ofile; }
Broken()97     bool Broken() override { return !ofile.good(); }
98     //~FileTarget() { ofile.close(); }
Close()99     void Close() override { ofile.close(); }
100 };
101 
102 template <class Iface> struct File;
103 template <> struct File<Source> { typedef FileSource type; };
104 template <> struct File<Target> { typedef FileTarget type; };
105 
106 template <class Iface>
CreateFile(const string & name)107 Iface* CreateFile(const string& name) { return new typename File<Iface>::type (name); }
108 
109 shared_ptr<SrtStatsWriter> transmit_stats_writer;
110 
InitParameters(string host,map<string,string> par)111 void SrtCommon::InitParameters(string host, map<string,string> par)
112 {
113     // Application-specific options: mode, blocking, timeout, adapter
114     if (Verbose::on && !par.empty())
115     {
116         Verb() << "SRT parameters specified:\n";
117         for (map<string,string>::iterator i = par.begin(); i != par.end(); ++i)
118         {
119             cerr << "\t" << i->first << " = '" << i->second << "'\n";
120         }
121     }
122 
123     string adapter;
124     if (par.count("adapter"))
125     {
126         adapter = par.at("adapter");
127     }
128 
129     m_mode = "default";
130     if (par.count("mode"))
131     {
132         m_mode = par.at("mode");
133     }
134     SocketOption::Mode mode = SrtInterpretMode(m_mode, host, adapter);
135     if (mode == SocketOption::FAILURE)
136     {
137         Error("Invalid mode");
138     }
139 
140     // Fix the mode name after successful interpretation
141     m_mode = SocketOption::mode_names[mode];
142 
143     par.erase("mode");
144 
145     if (par.count("timeout"))
146     {
147         m_timeout = stoi(par.at("timeout"), 0, 0);
148         par.erase("timeout");
149     }
150 
151     if (par.count("adapter"))
152     {
153         m_adapter = par.at("adapter");
154         par.erase("adapter");
155     }
156     else if (m_mode == "listener")
157     {
158         // For listener mode, adapter is taken from host,
159         // if 'adapter' parameter is not given
160         m_adapter = host;
161     }
162 
163     if (par.count("tsbpd") && false_names.count(par.at("tsbpd")))
164     {
165         m_tsbpdmode = false;
166     }
167 
168     if (par.count("port"))
169     {
170         m_outgoing_port = stoi(par.at("port"), 0, 0);
171         par.erase("port");
172     }
173 
174     // That's kinda clumsy, but it must rely on the defaults.
175     // Default mode is live, so check if the file mode was enforced
176     if ((par.count("transtype") == 0 || par["transtype"] != "file")
177         && transmit_chunk_size > SRT_LIVE_DEF_PLSIZE)
178     {
179         if (transmit_chunk_size > SRT_LIVE_MAX_PLSIZE)
180             throw std::runtime_error("Chunk size in live mode exceeds 1456 bytes; this is not supported");
181 
182         par["payloadsize"] = Sprint(transmit_chunk_size);
183     }
184 
185     // Assign the others here.
186     m_options = par;
187 }
188 
PrepareListener(string host,int port,int backlog)189 void SrtCommon::PrepareListener(string host, int port, int backlog)
190 {
191     m_bindsock = srt_create_socket();
192     if ( m_bindsock == SRT_ERROR )
193         Error("srt_create_socket");
194 
195     int stat = ConfigurePre(m_bindsock);
196     if ( stat == SRT_ERROR )
197         Error("ConfigurePre");
198 
199     sockaddr_any sa = CreateAddr(host, port);
200     sockaddr* psa = sa.get();
201     Verb() << "Binding a server on " << host << ":" << port << " ...";
202 
203     stat = srt_bind(m_bindsock, psa, sizeof sa);
204     if ( stat == SRT_ERROR )
205     {
206         srt_close(m_bindsock);
207         Error("srt_bind");
208     }
209 
210     Verb() << " listen...";
211 
212     stat = srt_listen(m_bindsock, backlog);
213     if ( stat == SRT_ERROR )
214     {
215         srt_close(m_bindsock);
216         Error("srt_listen");
217     }
218 }
219 
StealFrom(SrtCommon & src)220 void SrtCommon::StealFrom(SrtCommon& src)
221 {
222     // This is used when SrtCommon class designates a listener
223     // object that is doing Accept in appropriate direction class.
224     // The new object should get the accepted socket.
225     m_output_direction = src.m_output_direction;
226     m_timeout = src.m_timeout;
227     m_tsbpdmode = src.m_tsbpdmode;
228     m_options = src.m_options;
229     m_bindsock = SRT_INVALID_SOCK; // no listener
230     m_sock = src.m_sock;
231     src.m_sock = SRT_INVALID_SOCK; // STEALING
232 }
233 
AcceptNewClient()234 bool SrtCommon::AcceptNewClient()
235 {
236     sockaddr_any scl;
237     Verb() << " accept... ";
238 
239     m_sock = srt_accept(m_bindsock, scl.get(), &scl.len);
240     if ( m_sock == SRT_INVALID_SOCK )
241     {
242         srt_close(m_bindsock);
243         m_bindsock = SRT_INVALID_SOCK;
244         Error("srt_accept");
245     }
246 
247     // we do one client connection at a time,
248     // so close the listener.
249     srt_close(m_bindsock);
250     m_bindsock = SRT_INVALID_SOCK;
251 
252     Verb() << " connected.";
253 
254     // ConfigurePre is done on bindsock, so any possible Pre flags
255     // are DERIVED by sock. ConfigurePost is done exclusively on sock.
256     int stat = ConfigurePost(m_sock);
257     if ( stat == SRT_ERROR )
258         Error("ConfigurePost");
259 
260     return true;
261 }
262 
Init(string host,int port,map<string,string> par,bool dir_output)263 void SrtCommon::Init(string host, int port, map<string,string> par, bool dir_output)
264 {
265     m_output_direction = dir_output;
266     InitParameters(host, par);
267 
268     Verb() << "Opening SRT " << (dir_output ? "target" : "source") << " " << m_mode
269         << " on " << host << ":" << port;
270 
271     if ( m_mode == "caller" )
272         OpenClient(host, port);
273     else if ( m_mode == "listener" )
274         OpenServer(m_adapter, port);
275     else if ( m_mode == "rendezvous" )
276         OpenRendezvous(m_adapter, host, port);
277     else
278     {
279         throw std::invalid_argument("Invalid 'mode'. Use 'client' or 'server'");
280     }
281 }
282 
ConfigurePost(SRTSOCKET sock)283 int SrtCommon::ConfigurePost(SRTSOCKET sock)
284 {
285     bool no = false;
286     int result = 0;
287     if ( m_output_direction )
288     {
289         result = srt_setsockopt(sock, 0, SRTO_SNDSYN, &no, sizeof no);
290         if ( result == -1 )
291             return result;
292 
293         if ( m_timeout )
294             return srt_setsockopt(sock, 0, SRTO_SNDTIMEO, &m_timeout, sizeof m_timeout);
295     }
296     else
297     {
298         result = srt_setsockopt(sock, 0, SRTO_RCVSYN, &no, sizeof no);
299         if ( result == -1 )
300             return result;
301 
302         if ( m_timeout )
303             return srt_setsockopt(sock, 0, SRTO_RCVTIMEO, &m_timeout, sizeof m_timeout);
304     }
305 
306     SrtConfigurePost(sock, m_options);
307 
308     for (const auto &o: srt_options)
309     {
310         if ( o.binding == SocketOption::POST && m_options.count(o.name) )
311         {
312             string value = m_options.at(o.name);
313             bool ok = o.apply<SocketOption::SRT>(sock, value);
314             if ( !ok )
315                 Verb() << "WARNING: failed to set '" << o.name << "' (post, "
316                     << (m_output_direction? "target":"source") << ") to "
317                     << value;
318             else
319                 Verb() << "NOTE: SRT/post::" << o.name << "=" << value;
320         }
321     }
322 
323     return 0;
324 }
325 
ConfigurePre(SRTSOCKET sock)326 int SrtCommon::ConfigurePre(SRTSOCKET sock)
327 {
328     int result = 0;
329 
330     bool no = false;
331     if ( !m_tsbpdmode )
332     {
333         result = srt_setsockopt(sock, 0, SRTO_TSBPDMODE, &no, sizeof no);
334         if ( result == -1 )
335             return result;
336     }
337 
338     result = srt_setsockopt(sock, 0, SRTO_RCVSYN, &no, sizeof no);
339     if ( result == -1 )
340         return result;
341 
342 
343     // host is only checked for emptiness and depending on that the connection mode is selected.
344     // Here we are not exactly interested with that information.
345     vector<string> failures;
346 
347     // NOTE: here host = "", so the 'connmode' will be returned as LISTENER always,
348     // but it doesn't matter here. We don't use 'connmode' for anything else than
349     // checking for failures.
350     SocketOption::Mode conmode = SrtConfigurePre(sock, "",  m_options, &failures);
351 
352     if ( conmode == SocketOption::FAILURE )
353     {
354         if ( Verbose::on )
355         {
356             cerr << "WARNING: failed to set options: ";
357             copy(failures.begin(), failures.end(), ostream_iterator<string>(cerr, ", "));
358             cerr << endl;
359         }
360 
361         return SRT_ERROR;
362     }
363 
364     return 0;
365 }
366 
SetupAdapter(const string & host,int port)367 void SrtCommon::SetupAdapter(const string& host, int port)
368 {
369     sockaddr_any localsa = CreateAddr(host, port);
370     sockaddr* psa = localsa.get();
371     int stat = srt_bind(m_sock, psa, sizeof localsa);
372     if ( stat == SRT_ERROR )
373         Error("srt_bind");
374 }
375 
OpenClient(string host,int port)376 void SrtCommon::OpenClient(string host, int port)
377 {
378     PrepareClient();
379 
380     if ( m_outgoing_port )
381     {
382         SetupAdapter("", m_outgoing_port);
383     }
384 
385     ConnectClient(host, port);
386 }
387 
PrepareClient()388 void SrtCommon::PrepareClient()
389 {
390     m_sock = srt_create_socket();
391     if ( m_sock == SRT_ERROR )
392         Error("srt_create_socket");
393 
394     int stat = ConfigurePre(m_sock);
395     if ( stat == SRT_ERROR )
396         Error("ConfigurePre");
397 }
398 
399 
ConnectClient(string host,int port)400 void SrtCommon::ConnectClient(string host, int port)
401 {
402 
403     sockaddr_any sa = CreateAddr(host, port);
404 	sockaddr* psa = sa.get();
405 
406     Verb() << "Connecting to " << host << ":" << port;
407 
408     int stat = srt_connect(m_sock, psa, sizeof sa);
409     if ( stat == SRT_ERROR )
410     {
411         srt_close(m_sock);
412         Error("srt_connect");
413     }
414 
415     stat = ConfigurePost(m_sock);
416     if ( stat == SRT_ERROR )
417         Error("ConfigurePost");
418 }
419 
Error(string src)420 void SrtCommon::Error(string src)
421 {
422     int errnov = 0;
423     int result = srt_getlasterror(&errnov);
424     string message = srt_getlasterror_str();
425     Verb() << "\nERROR #" << result << "." << errnov << ": " << message;
426 
427     throw TransmissionError("error: " + src + ": " + message);
428 }
429 
OpenRendezvous(string adapter,string host,int port)430 void SrtCommon::OpenRendezvous(string adapter, string host, int port)
431 {
432     m_sock = srt_create_socket();
433     if ( m_sock == SRT_ERROR )
434         Error("srt_create_socket");
435 
436     bool yes = true;
437     srt_setsockopt(m_sock, 0, SRTO_RENDEZVOUS, &yes, sizeof yes);
438 
439     int stat = ConfigurePre(m_sock);
440     if ( stat == SRT_ERROR )
441         Error("ConfigurePre");
442 
443     sockaddr_any sa = CreateAddr(host, port);
444     if (sa.family() == AF_UNSPEC)
445     {
446         Error("OpenRendezvous: invalid target host specification: " + host);
447     }
448 
449     const int outport = m_outgoing_port ? m_outgoing_port : port;
450 
451     sockaddr_any localsa = CreateAddr(adapter, outport, sa.family());
452 
453     Verb() << "Binding a server on " << adapter << ":" << outport;
454 
455     stat = srt_bind(m_sock, localsa.get(), sizeof localsa);
456     if ( stat == SRT_ERROR )
457     {
458         srt_close(m_sock);
459         Error("srt_bind");
460     }
461 
462     Verb() << "Connecting to " << host << ":" << port;
463 
464     stat = srt_connect(m_sock, sa.get(), sizeof sa);
465     if ( stat == SRT_ERROR )
466     {
467         srt_close(m_sock);
468         Error("srt_connect");
469     }
470 
471     stat = ConfigurePost(m_sock);
472     if ( stat == SRT_ERROR )
473         Error("ConfigurePost");
474 }
475 
Close()476 void SrtCommon::Close()
477 {
478     Verb() << "SrtCommon: DESTROYING CONNECTION, closing sockets (rt%" << m_sock << " ls%" << m_bindsock << ")...";
479 
480     if ( m_sock != SRT_INVALID_SOCK )
481     {
482         srt_close(m_sock);
483         m_sock = SRT_INVALID_SOCK;
484     }
485 
486     if ( m_bindsock != SRT_INVALID_SOCK )
487     {
488         srt_close(m_bindsock);
489         m_bindsock = SRT_INVALID_SOCK ;
490     }
491 
492     Verb() << "SrtCommon: ... done.";
493 }
494 
~SrtCommon()495 SrtCommon::~SrtCommon()
496 {
497     Close();
498 }
499 
SrtSource(string host,int port,const map<string,string> & par)500 SrtSource::SrtSource(string host, int port, const map<string,string>& par)
501 {
502     Init(host, port, par, false);
503 
504     ostringstream os;
505     os << host << ":" << port;
506     hostport_copy = os.str();
507 }
508 
Read(size_t chunk,MediaPacket & pkt,ostream & out_stats)509 int SrtSource::Read(size_t chunk, MediaPacket& pkt, ostream &out_stats)
510 {
511     static unsigned long counter = 1;
512 
513     if (pkt.payload.size() < chunk)
514         pkt.payload.resize(chunk);
515 
516     SRT_MSGCTRL ctrl;
517     const int stat = srt_recvmsg2(m_sock, pkt.payload.data(), (int) chunk, &ctrl);
518     if (stat <= 0)
519     {
520         pkt.payload.clear();
521         return stat;
522     }
523 
524     pkt.time = ctrl.srctime;
525 
526     chunk = size_t(stat);
527     if (chunk < pkt.payload.size())
528         pkt.payload.resize(chunk);
529 
530     const bool need_bw_report = transmit_bw_report && (counter % transmit_bw_report) == transmit_bw_report - 1;
531     const bool need_stats_report = transmit_stats_report && (counter % transmit_stats_report) == transmit_stats_report - 1;
532 
533     if (need_bw_report || need_stats_report)
534     {
535         CBytePerfMon perf;
536         srt_bstats(m_sock, &perf, need_stats_report && !transmit_total_stats);
537         if (transmit_stats_writer != nullptr)
538         {
539             if (need_bw_report)
540                 cerr << transmit_stats_writer->WriteBandwidth(perf.mbpsBandwidth) << std::flush;
541             if (need_stats_report)
542                 out_stats << transmit_stats_writer->WriteStats(m_sock, perf) << std::flush;
543         }
544     }
545     ++counter;
546     return stat;
547 }
548 
ConfigurePre(SRTSOCKET sock)549 int SrtTarget::ConfigurePre(SRTSOCKET sock)
550 {
551     int result = SrtCommon::ConfigurePre(sock);
552     if ( result == -1 )
553         return result;
554 
555     int yes = 1;
556     // This is for the HSv4 compatibility; if both parties are HSv5
557     // (min. version 1.2.1), then this setting simply does nothing.
558     // In HSv4 this setting is obligatory; otherwise the SRT handshake
559     // extension will not be done at all.
560     result = srt_setsockopt(sock, 0, SRTO_SENDER, &yes, sizeof yes);
561     if ( result == -1 )
562         return result;
563 
564     return 0;
565 }
566 
Write(const char * data,size_t size,int64_t src_time,ostream & out_stats)567 int SrtTarget::Write(const char* data, size_t size, int64_t src_time, ostream &out_stats)
568 {
569     static unsigned long counter = 1;
570 
571     SRT_MSGCTRL ctrl = srt_msgctrl_default;
572     ctrl.srctime = src_time;
573     int stat = srt_sendmsg2(m_sock, data, (int) size, &ctrl);
574     if (stat == SRT_ERROR)
575     {
576         return stat;
577     }
578 
579     const bool need_bw_report = transmit_bw_report && (counter % transmit_bw_report) == transmit_bw_report - 1;
580     const bool need_stats_report = transmit_stats_report && (counter % transmit_stats_report) == transmit_stats_report - 1;
581 
582     if (need_bw_report || need_stats_report)
583     {
584         CBytePerfMon perf;
585         srt_bstats(m_sock, &perf, need_stats_report && !transmit_total_stats);
586         if (transmit_stats_writer != nullptr)
587         {
588             if (need_bw_report)
589                 cerr << transmit_stats_writer->WriteBandwidth(perf.mbpsBandwidth) << std::flush;
590             if (need_stats_report)
591                 out_stats << transmit_stats_writer->WriteStats(m_sock, perf) << std::flush;
592         }
593     }
594     ++counter;
595     return stat;
596 }
597 
598 
SrtModel(string host,int port,map<string,string> par)599 SrtModel::SrtModel(string host, int port, map<string,string> par)
600 {
601     InitParameters(host, par);
602     if (m_mode == "caller")
603         is_caller = true;
604     else if (m_mode != "listener")
605         throw std::invalid_argument("Only caller and listener modes supported");
606 
607     m_host = host;
608     m_port = port;
609 }
610 
Establish(std::string & w_name)611 void SrtModel::Establish(std::string& w_name)
612 {
613     // This does connect or accept.
614     // When this returned true, the caller should create
615     // a new SrtSource or SrtTaget then call StealFrom(*this) on it.
616 
617     // If this is a connector and the peer doesn't have a corresponding
618     // medium, it should send back a single byte with value 0. This means
619     // that agent should stop connecting.
620 
621     if (is_caller)
622     {
623         // Establish a connection
624 
625         PrepareClient();
626 
627         if (w_name != "")
628         {
629             Verb() << "Connect with requesting stream [" << w_name << "]";
630             srt::setstreamid(m_sock, w_name);
631         }
632         else
633         {
634             Verb() << "NO STREAM ID for SRT connection";
635         }
636 
637         if (m_outgoing_port)
638         {
639             Verb() << "Setting outgoing port: " << m_outgoing_port;
640             SetupAdapter("", m_outgoing_port);
641         }
642 
643         ConnectClient(m_host, m_port);
644 
645         if (m_outgoing_port == 0)
646         {
647             // Must rely on a randomly selected one. Extract the port
648             // so that it will be reused next time.
649             sockaddr_any s(AF_INET);
650             int namelen = s.size();
651             if ( srt_getsockname(Socket(), s.get(), &namelen) == SRT_ERROR )
652             {
653                 Error("srt_getsockname");
654             }
655 
656             m_outgoing_port = s.hport();
657             Verb() << "Extracted outgoing port: " << m_outgoing_port;
658         }
659     }
660     else
661     {
662         // Listener - get a socket by accepting.
663         // Check if the listener is already created first
664         if (Listener() == SRT_INVALID_SOCK)
665         {
666             Verb() << "Setting up listener: port=" << m_port << " backlog=5";
667             PrepareListener(m_adapter, m_port, 5);
668         }
669 
670         Verb() << "Accepting a client...";
671         AcceptNewClient();
672         // This rewrites m_sock with a new SRT socket ("accepted" socket)
673         w_name = srt::getstreamid(m_sock);
674         Verb() << "... GOT CLIENT for stream [" << w_name << "]";
675     }
676 }
677 
678 
679 template <class Iface> struct Srt;
680 template <> struct Srt<Source> { typedef SrtSource type; };
681 template <> struct Srt<Target> { typedef SrtTarget type; };
682 
683 template <class Iface>
CreateSrt(const string & host,int port,const map<string,string> & par)684 Iface* CreateSrt(const string& host, int port, const map<string,string>& par) { return new typename Srt<Iface>::type (host, port, par); }
685 
686 class ConsoleSource: public Source
687 {
688 public:
689 
ConsoleSource()690     ConsoleSource()
691     {
692 #ifdef _WIN32
693         // The default stdin mode on windows is text.
694         // We have to set it to the binary mode
695         _setmode(_fileno(stdin), _O_BINARY);
696 #endif
697     }
698 
Read(size_t chunk,MediaPacket & pkt,ostream & ignored SRT_ATR_UNUSED=cout)699     int Read(size_t chunk, MediaPacket& pkt, ostream & ignored SRT_ATR_UNUSED = cout) override
700     {
701         if (pkt.payload.size() < chunk)
702             pkt.payload.resize(chunk);
703 
704         bool st = cin.read(pkt.payload.data(), chunk).good();
705         chunk = cin.gcount();
706         if (chunk == 0 || !st)
707         {
708             pkt.payload.clear();
709             return 0;
710         }
711 
712         // Save this time to potentially use it for SRT target.
713         pkt.time = srt_time_now();
714         if (chunk < pkt.payload.size())
715             pkt.payload.resize(chunk);
716 
717         return (int) chunk;
718     }
719 
IsOpen()720     bool IsOpen() override { return cin.good(); }
End()721     bool End() override { return cin.eof(); }
GetSysSocket() const722     int GetSysSocket() const override { return 0; };
723 };
724 
725 class ConsoleTarget: public Target
726 {
727 public:
728 
ConsoleTarget()729     ConsoleTarget()
730     {
731 #ifdef _WIN32
732         // The default stdout mode on windows is text.
733         // We have to set it to the binary mode
734         _setmode(_fileno(stdout), _O_BINARY);
735 #endif
736     }
737 
~ConsoleTarget()738     virtual ~ConsoleTarget()
739     {
740         cout.flush();
741     }
742 
Write(const char * data,size_t len,int64_t src_time SRT_ATR_UNUSED,ostream & ignored SRT_ATR_UNUSED=cout)743     int Write(const char* data, size_t len, int64_t src_time SRT_ATR_UNUSED, ostream & ignored SRT_ATR_UNUSED = cout) override
744     {
745         cout.write(data, len);
746         return (int) len;
747     }
748 
IsOpen()749     bool IsOpen() override { return cout.good(); }
Broken()750     bool Broken() override { return cout.eof(); }
GetSysSocket() const751     int GetSysSocket() const override { return 0; };
752 };
753 
754 template <class Iface> struct Console;
755 template <> struct Console<Source> { typedef ConsoleSource type; };
756 template <> struct Console<Target> { typedef ConsoleTarget type; };
757 
758 template <class Iface>
CreateConsole()759 Iface* CreateConsole() { return new typename Console<Iface>::type (); }
760 
761 
762 // More options can be added in future.
763 SocketOption udp_options [] {
764     { "iptos", IPPROTO_IP, IP_TOS, SocketOption::PRE, SocketOption::INT, nullptr },
765     // IP_TTL and IP_MULTICAST_TTL are handled separately by a common option, "ttl".
766     { "mcloop", IPPROTO_IP, IP_MULTICAST_LOOP, SocketOption::PRE, SocketOption::INT, nullptr },
767     { "sndbuf", SOL_SOCKET, SO_SNDBUF, SocketOption::PRE, SocketOption::INT, nullptr},
768     { "rcvbuf", SOL_SOCKET, SO_RCVBUF, SocketOption::PRE, SocketOption::INT, nullptr}
769 };
770 
IsMulticast(in_addr adr)771 static inline bool IsMulticast(in_addr adr)
772 {
773     unsigned char* abytes = (unsigned char*)&adr.s_addr;
774     unsigned char c = abytes[0];
775     return c >= 224 && c <= 239;
776 }
777 
778 
779 class UdpCommon
780 {
781 protected:
782     int m_sock = -1;
783     sockaddr_any sadr;
784     string adapter;
785     map<string, string> m_options;
786 
Setup(string host,int port,map<string,string> attr)787     void Setup(string host, int port, map<string,string> attr)
788     {
789         m_sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
790         if (m_sock == -1)
791             Error(SysError(), "UdpCommon::Setup: socket");
792 
793         int yes = 1;
794         ::setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&yes, sizeof yes);
795 
796         // set non-blocking mode
797 #if defined(_WIN32)
798         unsigned long ulyes = 1;
799         if (ioctlsocket(m_sock, FIONBIO, &ulyes) == SOCKET_ERROR)
800 #else
801         if (ioctl(m_sock, FIONBIO, (const char *)&yes) < 0)
802 #endif
803         {
804             Error(SysError(), "UdpCommon::Setup: ioctl FIONBIO");
805         }
806 
807         sadr = CreateAddr(host, port);
808 
809         bool is_multicast = false;
810 
811         if (attr.count("multicast"))
812         {
813             // XXX: Here provide support for IPv6 multicast #1479
814             if (sadr.family() != AF_INET)
815             {
816                 throw std::runtime_error("UdpCommon: Multicast on IPv6 is not yet supported");
817             }
818 
819             if (!IsMulticast(sadr.sin.sin_addr))
820             {
821                 throw std::runtime_error("UdpCommon: requested multicast for a non-multicast-type IP address");
822             }
823             is_multicast = true;
824         }
825         else if (sadr.family() == AF_INET && IsMulticast(sadr.sin.sin_addr))
826         {
827             is_multicast = true;
828         }
829 
830         if (is_multicast)
831         {
832 #ifdef IP_ADD_SOURCE_MEMBERSHIP
833             ip_mreq_source mreq_ssm;
834 #endif
835             ip_mreq mreq;
836             sockaddr_any maddr (AF_INET);
837             int opt_name;
838             void* mreq_arg_ptr;
839             socklen_t mreq_arg_size;
840 
841             adapter = attr.count("adapter") ? attr.at("adapter") : string();
842             if ( adapter == "" )
843             {
844                 Verb() << "Multicast: home address: INADDR_ANY:" << port;
845                 maddr.sin.sin_family = AF_INET;
846                 maddr.sin.sin_addr.s_addr = htonl(INADDR_ANY);
847                 maddr.sin.sin_port = htons(port); // necessary for temporary use
848             }
849             else
850             {
851                 Verb() << "Multicast: home address: " << adapter << ":" << port;
852                 maddr = CreateAddr(adapter, port);
853             }
854 
855             if (attr.count("source"))
856             {
857 #ifdef IP_ADD_SOURCE_MEMBERSHIP
858                 /* this is an ssm.  we need to use the right struct and opt */
859                 opt_name = IP_ADD_SOURCE_MEMBERSHIP;
860                 mreq_ssm.imr_multiaddr.s_addr = sadr.sin.sin_addr.s_addr;
861                 mreq_ssm.imr_interface.s_addr = maddr.sin.sin_addr.s_addr;
862                 inet_pton(AF_INET, attr.at("source").c_str(), &mreq_ssm.imr_sourceaddr);
863                 mreq_arg_size = sizeof(mreq_ssm);
864                 mreq_arg_ptr = &mreq_ssm;
865 #else
866                 throw std::runtime_error("UdpCommon: source-filter multicast not supported by OS");
867 #endif
868             }
869             else
870             {
871                 opt_name = IP_ADD_MEMBERSHIP;
872                 mreq.imr_multiaddr.s_addr = sadr.sin.sin_addr.s_addr;
873                 mreq.imr_interface.s_addr = maddr.sin.sin_addr.s_addr;
874                 mreq_arg_size = sizeof(mreq);
875                 mreq_arg_ptr = &mreq;
876             }
877 
878 #ifdef _WIN32
879             const char* mreq_arg = (const char*)mreq_arg_ptr;
880             const auto status_error = SOCKET_ERROR;
881 #else
882             const void* mreq_arg = mreq_arg_ptr;
883             const auto status_error = -1;
884 #endif
885 
886 #if defined(_WIN32) || defined(__CYGWIN__)
887             // On Windows it somehow doesn't work when bind()
888             // is called with multicast address. Write the address
889             // that designates the network device here.
890             // Also, sets port sharing when working with multicast
891             sadr = maddr;
892             int reuse = 1;
893             int shareAddrRes = setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<const char*>(&reuse), sizeof(reuse));
894             if (shareAddrRes == status_error)
895             {
896                 throw runtime_error("marking socket for shared use failed");
897             }
898             Verb() << "Multicast(Windows): will bind to home address";
899 #else
900             Verb() << "Multicast(POSIX): will bind to IGMP address: " << host;
901 #endif
902             int res = setsockopt(m_sock, IPPROTO_IP, opt_name, mreq_arg, mreq_arg_size);
903 
904             if ( res == status_error )
905             {
906                 Error(errno, "adding to multicast membership failed");
907             }
908 
909             attr.erase("multicast");
910             attr.erase("adapter");
911         }
912 
913         // The "ttl" options is handled separately, it maps to both IP_TTL
914         // and IP_MULTICAST_TTL so that TTL setting works for both uni- and multicast.
915         if (attr.count("ttl"))
916         {
917             int ttl = stoi(attr.at("ttl"));
918             int res = setsockopt(m_sock, IPPROTO_IP, IP_TTL, (const char*)&ttl, sizeof ttl);
919             if (res == -1)
920                 Verb() << "WARNING: failed to set 'ttl' (IP_TTL) to " << ttl;
921             res = setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_TTL, (const char*)&ttl, sizeof ttl);
922             if (res == -1)
923                 Verb() << "WARNING: failed to set 'ttl' (IP_MULTICAST_TTL) to " << ttl;
924 
925             attr.erase("ttl");
926         }
927 
928         m_options = attr;
929 
930         for (auto o: udp_options)
931         {
932             // Ignore "binding" - for UDP there are no post options.
933             if ( m_options.count(o.name) )
934             {
935                 string value = m_options.at(o.name);
936                 bool ok = o.apply<SocketOption::SYSTEM>(m_sock, value);
937                 if ( !ok )
938                     Verb() << "WARNING: failed to set '" << o.name << "' to " << value;
939             }
940         }
941     }
942 
Error(int err,string src)943     void Error(int err, string src)
944     {
945         char buf[512];
946         string message = SysStrError(err, buf, 512u);
947 
948         cerr << "\nERROR #" << err << ": " << message << endl;
949 
950         throw TransmissionError("error: " + src + ": " + message);
951     }
952 
~UdpCommon()953     ~UdpCommon()
954     {
955 #ifdef _WIN32
956         if (m_sock != -1)
957         {
958            shutdown(m_sock, SD_BOTH);
959            closesocket(m_sock);
960            m_sock = -1;
961         }
962 #else
963         close(m_sock);
964 #endif
965     }
966 };
967 
968 
969 class UdpSource: public Source, public UdpCommon
970 {
971     bool eof = true;
972 public:
973 
UdpSource(string host,int port,const map<string,string> & attr)974     UdpSource(string host, int port, const map<string,string>& attr)
975     {
976         Setup(host, port, attr);
977         int stat = ::bind(m_sock, sadr.get(), sadr.size());
978         if ( stat == -1 )
979             Error(SysError(), "Binding address for UDP");
980         eof = false;
981     }
982 
Read(size_t chunk,MediaPacket & pkt,ostream & ignored SRT_ATR_UNUSED=cout)983     int Read(size_t chunk, MediaPacket& pkt, ostream & ignored SRT_ATR_UNUSED = cout) override
984     {
985         if (pkt.payload.size() < chunk)
986             pkt.payload.resize(chunk);
987 
988         sockaddr_any sa(sadr.family());
989         socklen_t si = sa.size();
990         int stat = recvfrom(m_sock, pkt.payload.data(), (int) chunk, 0, sa.get(), &si);
991         if (stat < 1)
992         {
993             if (SysError() != EWOULDBLOCK)
994                 eof = true;
995             pkt.payload.clear();
996             return stat;
997         }
998         sa.len = si;
999 
1000         // Save this time to potentially use it for SRT target.
1001         pkt.time = srt_time_now();
1002         chunk = size_t(stat);
1003         if (chunk < pkt.payload.size())
1004             pkt.payload.resize(chunk);
1005 
1006         return stat;
1007     }
1008 
IsOpen()1009     bool IsOpen() override { return m_sock != -1; }
End()1010     bool End() override { return eof; }
1011 
GetSysSocket() const1012     int GetSysSocket() const override { return m_sock; };
1013 };
1014 
1015 class UdpTarget: public Target, public UdpCommon
1016 {
1017 public:
UdpTarget(string host,int port,const map<string,string> & attr)1018     UdpTarget(string host, int port, const map<string,string>& attr )
1019     {
1020         if (host.empty())
1021             cerr << "\nWARN Host for UDP target is not provided. Will send to localhost:" << port << ".\n";
1022 
1023         Setup(host, port, attr);
1024         if (adapter != "")
1025         {
1026             sockaddr_any maddr = CreateAddr(adapter, 0);
1027             if (maddr.family() != AF_INET)
1028             {
1029                 Error(0, "UDP/target: IPv6 multicast not supported in the application");
1030             }
1031 
1032             in_addr addr = maddr.sin.sin_addr;
1033 
1034             int res = setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_IF, reinterpret_cast<const char*>(&addr), sizeof(addr));
1035             if (res == -1)
1036             {
1037                 Error(SysError(), "setsockopt/IP_MULTICAST_IF: " + adapter);
1038             }
1039         }
1040 
1041     }
1042 
Write(const char * data,size_t len,int64_t src_time SRT_ATR_UNUSED,ostream & ignored SRT_ATR_UNUSED=cout)1043     int Write(const char* data, size_t len, int64_t src_time SRT_ATR_UNUSED,  ostream & ignored SRT_ATR_UNUSED = cout) override
1044     {
1045         int stat = sendto(m_sock, data, (int) len, 0, sadr.get(), sadr.size());
1046         if ( stat == -1 )
1047         {
1048             if ((false))
1049                 Error(SysError(), "UDP Write/sendto");
1050             return stat;
1051         }
1052         return stat;
1053     }
1054 
IsOpen()1055     bool IsOpen() override { return m_sock != -1; }
Broken()1056     bool Broken() override { return false; }
1057 
GetSysSocket() const1058     int GetSysSocket() const override { return m_sock; };
1059 };
1060 
1061 template <class Iface> struct Udp;
1062 template <> struct Udp<Source> { typedef UdpSource type; };
1063 template <> struct Udp<Target> { typedef UdpTarget type; };
1064 
1065 template <class Iface>
CreateUdp(const string & host,int port,const map<string,string> & par)1066 Iface* CreateUdp(const string& host, int port, const map<string,string>& par) { return new typename Udp<Iface>::type (host, port, par); }
1067 
1068 template<class Base>
IsOutput()1069 inline bool IsOutput() { return false; }
1070 
1071 template<>
IsOutput()1072 inline bool IsOutput<Target>() { return true; }
1073 
1074 template <class Base>
CreateMedium(const string & uri)1075 extern unique_ptr<Base> CreateMedium(const string& uri)
1076 {
1077     unique_ptr<Base> ptr;
1078 
1079     UriParser u(uri);
1080 
1081     int iport = 0;
1082     switch ( u.type() )
1083     {
1084     default:
1085         break; // do nothing, return nullptr
1086     case UriParser::FILE:
1087         if (u.host() == "con" || u.host() == "console")
1088         {
1089             if (IsOutput<Base>() && (
1090                 (Verbose::on && Verbose::cverb == &cout)
1091                 || g_stats_are_printed_to_stdout))
1092             {
1093                 cerr << "ERROR: file://con with -v or -r or -s would result in mixing the data and text info.\n";
1094                 cerr << "ERROR: HINT: you can stream through a FIFO (named pipe)\n";
1095                 throw invalid_argument("incorrect parameter combination");
1096             }
1097             ptr.reset(CreateConsole<Base>());
1098         }
1099 // Disable regular file support for the moment
1100 #if 0
1101         else
1102             ptr.reset( CreateFile<Base>(u.path()));
1103 #endif
1104         break;
1105 
1106     case UriParser::SRT:
1107         iport = atoi(u.port().c_str());
1108         if ( iport < 1024 )
1109         {
1110             cerr << "Port value invalid: " << iport << " - must be >=1024\n";
1111             throw invalid_argument("Invalid port number");
1112         }
1113         ptr.reset( CreateSrt<Base>(u.host(), iport, u.parameters()) );
1114         break;
1115 
1116 
1117     case UriParser::UDP:
1118         iport = atoi(u.port().c_str());
1119         if ( iport < 1024 )
1120         {
1121             cerr << "Port value invalid: " << iport << " - must be >=1024\n";
1122             throw invalid_argument("Invalid port number");
1123         }
1124         ptr.reset( CreateUdp<Base>(u.host(), iport, u.parameters()) );
1125         break;
1126 
1127     }
1128 
1129     if (ptr.get())
1130         ptr->uri = move(u);
1131 
1132     return ptr;
1133 }
1134 
1135 
Create(const std::string & url)1136 std::unique_ptr<Source> Source::Create(const std::string& url)
1137 {
1138     return CreateMedium<Source>(url);
1139 }
1140 
Create(const std::string & url)1141 std::unique_ptr<Target> Target::Create(const std::string& url)
1142 {
1143     return CreateMedium<Target>(url);
1144 }
1145