1 /*
2  * SRT - Secure, Reliable, Transport
3  * Copyright (c) 2018 Haivision Systems Inc.
4  *
5  * This Source Code Form is subject to the terms of the Mozilla Public
6  * License, v. 2.0. If a copy of the MPL was not distributed with this
7  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8  *
9  */
10 
11 // Medium concretizations
12 
13 // Just for formality. This file should be used
14 #include <iostream>
15 #include <fstream>
16 #include <sstream>
17 #include <string>
18 #include <stdexcept>
19 #include <iterator>
20 #include <map>
21 #include <chrono>
22 #include <thread>
23 #include <atomic>
24 #include <srt.h>
25 #if !defined(_WIN32)
26 #include <sys/ioctl.h>
27 #endif
28 
29 // SRT protected includes
30 #include "netinet_any.h"
31 #include "common.h"
32 #include "api.h"
33 #include "udt.h"
34 #include "logging.h"
35 #include "utilities.h"
36 
37 #include "apputil.hpp"
38 #include "socketoptions.hpp"
39 #include "uriparser.hpp"
40 #include "testmedia.hpp"
41 #include "srt_compat.h"
42 #include "verbose.hpp"
43 
44 using namespace std;
45 
46 using srt_logging::KmStateStr;
47 using srt_logging::SockStatusStr;
48 #if ENABLE_EXPERIMENTAL_BONDING
49 using srt_logging::MemberStatusStr;
50 #endif
51 
52 std::atomic<bool> transmit_throw_on_interrupt {false};
53 std::atomic<bool> transmit_int_state {false};
54 int transmit_bw_report = 0;
55 unsigned transmit_stats_report = 0;
56 size_t transmit_chunk_size = SRT_LIVE_DEF_PLSIZE;
57 bool transmit_printformat_json = false;
58 srt_listen_callback_fn* transmit_accept_hook_fn = nullptr;
59 void* transmit_accept_hook_op = nullptr;
60 bool transmit_use_sourcetime = false;
61 int transmit_retry_connect = 0;
62 bool transmit_retry_always = false;
63 
64 // Do not unblock. Copy this to an app that uses applog and set appropriate name.
65 //srt_logging::Logger applog(SRT_LOGFA_APP, srt_logger_config, "srt-test");
66 
67 std::shared_ptr<SrtStatsWriter> transmit_stats_writer;
68 
DirectionName(SRT_EPOLL_T direction)69 string DirectionName(SRT_EPOLL_T direction)
70 {
71     string dir_name;
72     if (direction & ~SRT_EPOLL_ERR)
73     {
74         if (direction & SRT_EPOLL_IN)
75         {
76             dir_name = "source";
77         }
78 
79         if (direction & SRT_EPOLL_OUT)
80         {
81             if (!dir_name.empty())
82                 dir_name = "relay";
83             else
84                 dir_name = "target";
85         }
86 
87         if (direction & SRT_EPOLL_ERR)
88         {
89             dir_name += "+error";
90         }
91     }
92     else
93     {
94         // stupid name for a case of IPE
95         dir_name = "stone";
96     }
97 
98     return dir_name;
99 }
100 
101 template<class FileBase> inline
FileRead(FileBase & ifile,size_t chunk,const string & filename)102 bytevector FileRead(FileBase& ifile, size_t chunk, const string& filename)
103 {
104     bytevector data(chunk);
105     ifile.read(data.data(), chunk);
106     size_t nread = ifile.gcount();
107     if (nread < data.size())
108         data.resize(nread);
109 
110     if (data.empty())
111         throw Source::ReadEOF(filename);
112     return data;
113 }
114 
115 
116 class FileSource: public virtual Source
117 {
118     ifstream ifile;
119     string filename_copy;
120 public:
121 
FileSource(const string & path)122     FileSource(const string& path): ifile(path, ios::in | ios::binary), filename_copy(path)
123     {
124         if (!ifile)
125             throw std::runtime_error(path + ": Can't open file for reading");
126     }
127 
Read(size_t chunk)128     MediaPacket Read(size_t chunk) override { return FileRead(ifile, chunk, filename_copy); }
129 
IsOpen()130     bool IsOpen() override { return bool(ifile); }
End()131     bool End() override { return ifile.eof(); }
132     //~FileSource() { ifile.close(); }
133 };
134 
135 #ifdef PLEASE_LOG
136 #include "logging.h"
137 #endif
138 
139 class FileTarget: public virtual Target
140 {
141     ofstream ofile;
142 public:
143 
FileTarget(const string & path)144     FileTarget(const string& path): ofile(path, ios::out | ios::trunc | ios::binary) {}
145 
Write(const MediaPacket & data)146     void Write(const MediaPacket& data) override
147     {
148         ofile.write(data.payload.data(), data.payload.size());
149 #ifdef PLEASE_LOG
150         applog.Debug() << "FileTarget::Write: " << data.size() << " written to a file";
151 #endif
152     }
153 
IsOpen()154     bool IsOpen() override { return !!ofile; }
Broken()155     bool Broken() override { return !ofile.good(); }
156     //~FileTarget() { ofile.close(); }
Close()157     void Close() override
158     {
159 #ifdef PLEASE_LOG
160         applog.Debug() << "FileTarget::Close";
161 #endif
162         ofile.close();
163     }
164 };
165 
166 // Can't base this class on FileSource and FileTarget classes because they use two
167 // separate fields, which makes it unable to reliably define IsOpen(). This would
168 // require to use 'fstream' type field in some kind of FileCommon first. Not worth
169 // a shot.
170 class FileRelay: public Relay
171 {
172     fstream iofile;
173     string filename_copy;
174 public:
175 
FileRelay(const string & path)176     FileRelay(const string& path):
177         iofile(path, ios::in | ios::out | ios::binary), filename_copy(path)
178     {
179         if (!iofile)
180             throw std::runtime_error(path + ": Can't open file for reading");
181     }
Read(size_t chunk)182     MediaPacket Read(size_t chunk) override { return FileRead(iofile, chunk, filename_copy); }
183 
Write(const MediaPacket & data)184     void Write(const MediaPacket& data) override
185     {
186         iofile.write(data.payload.data(), data.payload.size());
187     }
188 
IsOpen()189     bool IsOpen() override { return !!iofile; }
End()190     bool End() override { return iofile.eof(); }
Broken()191     bool Broken() override { return !iofile.good(); }
Close()192     void Close() override { iofile.close(); }
193 };
194 
195 template <class Iface> struct File;
196 template <> struct File<Source> { typedef FileSource type; };
197 template <> struct File<Target> { typedef FileTarget type; };
198 template <> struct File<Relay> { typedef FileRelay type; };
199 
200 template <class Iface>
CreateFile(const string & name)201 Iface* CreateFile(const string& name) { return new typename File<Iface>::type (name); }
202 
InitParameters(string host,string path,map<string,string> par)203 void SrtCommon::InitParameters(string host, string path, map<string,string> par)
204 {
205     // Application-specific options: mode, blocking, timeout, adapter
206     if ( Verbose::on && !par.empty())
207     {
208         Verb() << "SRT parameters specified:\n";
209         for (map<string,string>::iterator i = par.begin(); i != par.end(); ++i)
210         {
211             Verb() << "\t" << i->first << " = '" << i->second << "'\n";
212         }
213     }
214 
215     if (path != "")
216     {
217         // Special case handling of an unusual specification.
218 
219         if (path.substr(0, 2) != "//")
220         {
221             Error("Path specification not supported for SRT (use // in front for special cases)");
222         }
223 
224         path = path.substr(2);
225 
226 #if ENABLE_EXPERIMENTAL_BONDING
227         if (path == "group")
228         {
229             // Group specified, check type.
230             m_group_type = par["type"];
231             if (m_group_type == "")
232             {
233                 Error("With //group, the group 'type' must be specified.");
234             }
235 
236             vector<string> parts;
237             Split(m_group_type, '/', back_inserter(parts));
238             if (parts.size() == 0 || parts.size() > 2)
239             {
240                 Error("Invalid specification for 'type' parameter");
241             }
242 
243             if (parts.size() == 2)
244             {
245                 m_group_type = parts[0];
246                 m_group_config = parts[1];
247             }
248 
249             vector<string> nodes;
250             Split(par["nodes"], ',', back_inserter(nodes));
251 
252             if (nodes.empty())
253             {
254                 Error("With //group, 'nodes' must specify comma-separated host:port specs.");
255             }
256 
257             int token = 1;
258 
259             // Check if correctly specified
260             for (string& hostport: nodes)
261             {
262                 if (hostport == "")
263                     continue;
264 
265                 // The attribute string, as it was embedded in another URI,
266                 // must have had replaced the & character with another ?, so
267                 // now all ? character, except the first one, must be now
268                 // restored so that UriParser interprets them correctly.
269 
270                 size_t atq = hostport.find('?');
271                 if (atq != string::npos)
272                 {
273                     while (atq+1 < hostport.size())
274                     {
275                         size_t next = hostport.find('?', atq+1);
276                         if (next == string::npos)
277                             break;
278                         hostport[next] = '&';
279                         atq = next;
280                     }
281                 }
282 
283                 UriParser check(hostport, UriParser::EXPECT_HOST);
284                 if (check.host() == "" || check.port() == "")
285                 {
286                     Error("With //group, 'nodes' must specify comma-separated host:port specs.");
287                 }
288 
289                 if (check.portno() <= 1024)
290                 {
291                     Error("With //group, every node in 'nodes' must have port >1024");
292                 }
293 
294                 Connection cc(check.host(), check.portno());
295                 if (check.parameters().count("weight"))
296                 {
297                     cc.weight = stoi(check.queryValue("weight"));
298                 }
299 
300                 if (check.parameters().count("source"))
301                 {
302                     UriParser sourcehp(check.queryValue("source"), UriParser::EXPECT_HOST);
303                     cc.source = CreateAddr(sourcehp.host(), sourcehp.portno());
304                 }
305 
306                 // Check if there's a key with 'srto.' prefix.
307 
308                 UriParser::query_it start = check.parameters().lower_bound("srto.");
309 
310                 SRT_SOCKOPT_CONFIG* config = nullptr;
311                 bool all_clear = true;
312                 vector<string> fails;
313                 map<string, string> options;
314 
315                 if (start != check.parameters().end())
316                 {
317                     for (; start != check.parameters().end(); ++start)
318                     {
319                         auto& y = *start;
320                         if (y.first.substr(0, 5) != "srto.")
321                             break;
322 
323                         options[y.first.substr(5)] = y.second;
324                     }
325                 }
326 
327                 if (!options.empty())
328                 {
329                     config = srt_create_config();
330 
331                     for (auto o: srt_options)
332                     {
333                         if (!options.count(o.name))
334                             continue;
335                         string value = options.at(o.name);
336                         bool ok = o.apply<SocketOption::SRT>(config, value);
337                         if ( !ok )
338                         {
339                             fails.push_back(o.name);
340                             all_clear = false;
341                         }
342                     }
343 
344                     if (!all_clear)
345                     {
346                         srt_delete_config(config);
347                         Error("With //group, failed to set options: " + Printable(fails));
348                     }
349 
350                     cc.options = config;
351                 }
352 
353                 cc.token = token++;
354                 m_group_nodes.push_back(move(cc));
355             }
356 
357             par.erase("type");
358             par.erase("nodes");
359 
360             // For a group-connect specification, it's
361             // always the caller mode.
362             // XXX change it here if maybe rendezvous is also
363             // possible in future.
364             par["mode"] = "caller";
365         }
366 #endif
367     }
368 
369     string adapter;
370     if (par.count("adapter"))
371     {
372         adapter = par.at("adapter");
373     }
374 
375     m_mode = "default";
376     if (par.count("mode"))
377     {
378         m_mode = par.at("mode");
379     }
380     SocketOption::Mode mode = SrtInterpretMode(m_mode, host, adapter);
381     if (mode == SocketOption::FAILURE)
382     {
383         Error("Invalid mode");
384     }
385 
386     if (!m_group_nodes.empty() && mode != SocketOption::CALLER)
387     {
388         Error("Group node specification is only available in caller mode");
389     }
390 
391     // Fix the mode name after successful interpretation
392     m_mode = SocketOption::mode_names[mode];
393 
394     par.erase("mode");
395 
396     if (par.count("blocking"))
397     {
398         m_blocking_mode = !false_names.count(par.at("blocking"));
399         par.erase("blocking");
400     }
401 
402     if (par.count("timeout"))
403     {
404         m_timeout = stoi(par.at("timeout"), 0, 0);
405         par.erase("timeout");
406     }
407 
408     if (par.count("adapter"))
409     {
410         m_adapter = adapter;
411         par.erase("adapter");
412     }
413     else if (m_mode == "listener")
414     {
415         // For listener mode, adapter is taken from host,
416         // if 'adapter' parameter is not given
417         m_adapter = host;
418     }
419 
420     if (par.count("tsbpd") && false_names.count(par.at("tsbpd")))
421     {
422         m_tsbpdmode = false;
423     }
424 
425     if (par.count("port"))
426     {
427         m_outgoing_port = stoi(par.at("port"), 0, 0);
428         par.erase("port");
429     }
430 
431     // That's kinda clumsy, but it must rely on the defaults.
432     // Default mode is live, so check if the file mode was enforced
433     if (par.count("transtype") == 0 || par["transtype"] != "file")
434     {
435         // If the Live chunk size was nondefault, enforce the size.
436         if (transmit_chunk_size != SRT_LIVE_DEF_PLSIZE)
437         {
438             if (transmit_chunk_size > SRT_LIVE_MAX_PLSIZE)
439                 throw std::runtime_error("Chunk size in live mode exceeds 1456 bytes; this is not supported");
440 
441             par["payloadsize"] = Sprint(transmit_chunk_size);
442         }
443     }
444 
445     // Assigning group configuration from a special "groupconfig" attribute.
446     // This is the only way how you can set up this configuration at the listener side.
447     if (par.count("groupconfig"))
448     {
449         m_group_config = par.at("groupconfig");
450         par.erase("groupconfig");
451     }
452 
453     // Fix Minversion, if specified as string
454     if (par.count("minversion"))
455     {
456         string v = par["minversion"];
457         if (v.find('.') != string::npos)
458         {
459             int version = SrtParseVersion(v.c_str());
460             if (version == 0)
461             {
462                 throw std::runtime_error(Sprint("Value for 'minversion' doesn't specify a valid version: ", v));
463             }
464             par["minversion"] = Sprint(version);
465             Verb() << "\tFIXED: minversion = 0x" << std::hex << std::setfill('0') << std::setw(8) << version << std::dec;
466         }
467     }
468 
469     // Assign the others here.
470     m_options = par;
471     m_options["mode"] = m_mode;
472 }
473 
PrepareListener(string host,int port,int backlog)474 void SrtCommon::PrepareListener(string host, int port, int backlog)
475 {
476     m_bindsock = srt_create_socket();
477     if (m_bindsock == SRT_ERROR)
478         Error("srt_create_socket");
479 
480     int stat = ConfigurePre(m_bindsock);
481     if (stat == SRT_ERROR)
482         Error("ConfigurePre");
483 
484     if (!m_blocking_mode)
485     {
486         srt_conn_epoll = AddPoller(m_bindsock, SRT_EPOLL_OUT);
487     }
488 
489     auto sa = CreateAddr(host, port);
490     Verb() << "Binding a server on " << host << ":" << port << " ...";
491     stat = srt_bind(m_bindsock, sa.get(), sizeof sa);
492     if (stat == SRT_ERROR)
493     {
494         srt_close(m_bindsock);
495         Error("srt_bind");
496     }
497 
498     Verb() << " listen... " << VerbNoEOL;
499     stat = srt_listen(m_bindsock, backlog);
500     if (stat == SRT_ERROR)
501     {
502         srt_close(m_bindsock);
503         Error("srt_listen");
504     }
505 
506 }
507 
StealFrom(SrtCommon & src)508 void SrtCommon::StealFrom(SrtCommon& src)
509 {
510     // This is used when SrtCommon class designates a listener
511     // object that is doing Accept in appropriate direction class.
512     // The new object should get the accepted socket.
513     m_direction = src.m_direction;
514     m_blocking_mode = src.m_blocking_mode;
515     m_timeout = src.m_timeout;
516     m_tsbpdmode = src.m_tsbpdmode;
517     m_options = src.m_options;
518     m_bindsock = SRT_INVALID_SOCK; // no listener
519     m_sock = src.m_sock;
520     src.m_sock = SRT_INVALID_SOCK; // STEALING
521 }
522 
AcceptNewClient()523 void SrtCommon::AcceptNewClient()
524 {
525     sockaddr_any scl;
526 
527     ::transmit_throw_on_interrupt = true;
528 
529     if (!m_blocking_mode)
530     {
531         Verb() << "[ASYNC] (conn=" << srt_conn_epoll << ")";
532 
533         int len = 2;
534         SRTSOCKET ready[2];
535         while (srt_epoll_wait(srt_conn_epoll, 0, 0, ready, &len, 1000, 0, 0, 0, 0) == -1)
536         {
537             if (::transmit_int_state)
538                 Error("srt_epoll_wait for srt_accept: interrupt");
539 
540             if (srt_getlasterror(NULL) == SRT_ETIMEOUT)
541                 continue;
542             Error("srt_epoll_wait(srt_conn_epoll)");
543         }
544 
545         Verb() << "[EPOLL: " << len << " sockets] " << VerbNoEOL;
546     }
547     Verb() << " accept..." << VerbNoEOL;
548 
549     m_sock = srt_accept(m_bindsock, (scl.get()), (&scl.len));
550     if (m_sock == SRT_INVALID_SOCK)
551     {
552         srt_close(m_bindsock);
553         m_bindsock = SRT_INVALID_SOCK;
554         Error("srt_accept");
555     }
556 
557 #if ENABLE_EXPERIMENTAL_BONDING
558     if (m_sock & SRTGROUP_MASK)
559     {
560         m_listener_group = true;
561         if (m_group_config != "")
562         {
563             int stat = srt_group_configure(m_sock, m_group_config.c_str());
564             if (stat == SRT_ERROR)
565             {
566                 // Don't break the connection basing on this, just ignore.
567                 Verb() << " (error setting config: '" << m_group_config << "') " << VerbNoEOL;
568             }
569         }
570         // There might be added a poller, remove it.
571         // We need it work different way.
572 
573 #ifndef SRT_OLD_APP_READER
574 
575         if (srt_epoll != -1)
576         {
577             Verb() << "(Group: erasing epoll " << srt_epoll << ") " << VerbNoEOL;
578             srt_epoll_release(srt_epoll);
579         }
580 
581         // Don't add any sockets, they will have to be added
582         // anew every time again.
583         srt_epoll = srt_epoll_create();
584 #endif
585 
586         // Group data must have a size of at least 1
587         // otherwise the srt_group_data() call will fail
588         if (m_group_data.empty())
589             m_group_data.resize(1);
590 
591         Verb() << " connected(group epoll " << srt_epoll <<").";
592     }
593     else
594 #endif
595     {
596         sockaddr_any peeraddr(AF_INET6);
597         string peer = "<?PEER?>";
598         if (-1 != srt_getpeername(m_sock, (peeraddr.get()), (&peeraddr.len)))
599         {
600             peer = peeraddr.str();
601         }
602 
603         sockaddr_any agentaddr(AF_INET6);
604         string agent = "<?AGENT?>";
605         if (-1 != srt_getsockname(m_sock, (agentaddr.get()), (&agentaddr.len)))
606         {
607             agent = agentaddr.str();
608         }
609 
610         Verb() << " connected [" << agent << "] <-- " << peer;
611     }
612     ::transmit_throw_on_interrupt = false;
613 
614     // ConfigurePre is done on bindsock, so any possible Pre flags
615     // are DERIVED by sock. ConfigurePost is done exclusively on sock.
616     int stat = ConfigurePost(m_sock);
617     if (stat == SRT_ERROR)
618         Error("ConfigurePost");
619 }
620 
PrintEpollEvent(int events,int et_events)621 static string PrintEpollEvent(int events, int et_events)
622 {
623     static pair<int, const char*> const namemap [] = {
624         make_pair(SRT_EPOLL_IN, "R"),
625         make_pair(SRT_EPOLL_OUT, "W"),
626         make_pair(SRT_EPOLL_ERR, "E"),
627         make_pair(SRT_EPOLL_UPDATE, "U")
628     };
629 
630     ostringstream os;
631     int N = Size(namemap);
632 
633     for (int i = 0; i < N; ++i)
634     {
635         if (events & namemap[i].first)
636         {
637             os << "[";
638             if (et_events & namemap[i].first)
639                 os << "^";
640             os << namemap[i].second << "]";
641         }
642     }
643 
644     return os.str();
645 }
646 
Init(string host,int port,string path,map<string,string> par,SRT_EPOLL_OPT dir)647 void SrtCommon::Init(string host, int port, string path, map<string,string> par, SRT_EPOLL_OPT dir)
648 {
649     m_direction = dir;
650     InitParameters(host, path, par);
651 
652     int backlog = 1;
653     if (m_mode == "listener" && par.count("groupconnect")
654             && true_names.count(par["groupconnect"]))
655     {
656         backlog = 10;
657     }
658 
659     Verb() << "Opening SRT " << DirectionName(dir) << " " << m_mode
660         << "(" << (m_blocking_mode ? "" : "non-") << "blocking,"
661         << " backlog=" << backlog << ") on "
662         << host << ":" << port;
663 
664     try
665     {
666         if (m_mode == "caller")
667         {
668             if (m_group_nodes.empty())
669             {
670                 OpenClient(host, port);
671             }
672 #if ENABLE_EXPERIMENTAL_BONDING
673             else
674             {
675                 OpenGroupClient(); // Source data are in the fields already.
676             }
677 #endif
678         }
679         else if (m_mode == "listener")
680             OpenServer(m_adapter, port, backlog);
681         else if (m_mode == "rendezvous")
682             OpenRendezvous(m_adapter, host, port);
683         else
684         {
685             throw std::invalid_argument("Invalid 'mode'. Use 'client' or 'server'");
686         }
687     }
688     catch (...)
689     {
690         // This is an in-constructor-called function, so
691         // when the exception is thrown, the destructor won't
692         // close the sockets. This intercepts the exception
693         // to close them.
694         Verb() << "Open FAILED - closing SRT sockets";
695         if (m_bindsock != SRT_INVALID_SOCK)
696             srt_close(m_bindsock);
697         if (m_sock != SRT_INVALID_SOCK)
698             srt_close(m_sock);
699         m_sock = m_bindsock = SRT_INVALID_SOCK;
700         throw;
701     }
702 
703     int pbkeylen = 0;
704     SRT_KM_STATE kmstate, snd_kmstate, rcv_kmstate;
705     int len = sizeof (int);
706     srt_getsockflag(m_sock, SRTO_PBKEYLEN, &pbkeylen, &len);
707     srt_getsockflag(m_sock, SRTO_KMSTATE, &kmstate, &len);
708     srt_getsockflag(m_sock, SRTO_SNDKMSTATE, &snd_kmstate, &len);
709     srt_getsockflag(m_sock, SRTO_RCVKMSTATE, &rcv_kmstate, &len);
710 
711     Verb() << "ENCRYPTION status: " << KmStateStr(kmstate)
712         << " (SND:" << KmStateStr(snd_kmstate) << " RCV:" << KmStateStr(rcv_kmstate)
713         << ") PBKEYLEN=" << pbkeylen;
714 
715     // Display some selected options on the socket.
716     if (Verbose::on)
717     {
718         int64_t bandwidth = 0;
719         int latency = 0;
720         bool blocking_snd = false, blocking_rcv = false;
721         int dropdelay = 0;
722         int size_int = sizeof (int), size_int64 = sizeof (int64_t), size_bool = sizeof (bool);
723         char packetfilter[100] = "";
724         int packetfilter_size = 100;
725 
726         srt_getsockflag(m_sock, SRTO_MAXBW, &bandwidth, &size_int64);
727         srt_getsockflag(m_sock, SRTO_RCVLATENCY, &latency, &size_int);
728         srt_getsockflag(m_sock, SRTO_RCVSYN, &blocking_rcv, &size_bool);
729         srt_getsockflag(m_sock, SRTO_SNDSYN, &blocking_snd, &size_bool);
730         srt_getsockflag(m_sock, SRTO_SNDDROPDELAY, &dropdelay, &size_int);
731         srt_getsockflag(m_sock, SRTO_PACKETFILTER, (packetfilter), (&packetfilter_size));
732 
733         Verb() << "OPTIONS: maxbw=" << bandwidth << " rcvlatency=" << latency << boolalpha
734             << " blocking{rcv=" << blocking_rcv << " snd=" << blocking_snd
735             << "} snddropdelay=" << dropdelay << " packetfilter=" << packetfilter;
736     }
737 
738     if (!m_blocking_mode)
739     {
740         // Don't add new epoll if already created as a part
741         // of group management: if (srt_epoll == -1)...
742 
743         if (m_mode == "caller")
744             dir = (dir | SRT_EPOLL_UPDATE);
745         Verb() << "NON-BLOCKING MODE - SUB FOR " << PrintEpollEvent(dir, 0);
746 
747         srt_epoll = AddPoller(m_sock, dir);
748     }
749 }
750 
AddPoller(SRTSOCKET socket,int modes)751 int SrtCommon::AddPoller(SRTSOCKET socket, int modes)
752 {
753     int pollid = srt_epoll_create();
754     if (pollid == -1)
755         throw std::runtime_error("Can't create epoll in nonblocking mode");
756     Verb() << "EPOLL: creating eid=" << pollid << " and adding @" << socket
757         << " in " << DirectionName(SRT_EPOLL_OPT(modes)) << " mode";
758     srt_epoll_add_usock(pollid, socket, &modes);
759     return pollid;
760 }
761 
ConfigurePost(SRTSOCKET sock)762 int SrtCommon::ConfigurePost(SRTSOCKET sock)
763 {
764     bool yes = m_blocking_mode;
765     int result = 0;
766     if (m_direction & SRT_EPOLL_OUT)
767     {
768         Verb() << "Setting SND blocking mode: " << boolalpha << yes << " timeout=" << m_timeout;
769         result = srt_setsockopt(sock, 0, SRTO_SNDSYN, &yes, sizeof yes);
770         if (result == -1)
771         {
772 #ifdef PLEASE_LOG
773             extern srt_logging::Logger applog;
774             applog.Error() << "ERROR SETTING OPTION: SRTO_SNDSYN";
775 #endif
776             return result;
777         }
778 
779         if (m_timeout)
780             result = srt_setsockopt(sock, 0, SRTO_SNDTIMEO, &m_timeout, sizeof m_timeout);
781         if (result == -1)
782         {
783 #ifdef PLEASE_LOG
784             extern srt_logging::Logger applog;
785             applog.Error() << "ERROR SETTING OPTION: SRTO_SNDTIMEO";
786 #endif
787             return result;
788         }
789     }
790 
791     if (m_direction & SRT_EPOLL_IN)
792     {
793         Verb() << "Setting RCV blocking mode: " << boolalpha << yes << " timeout=" << m_timeout;
794         result = srt_setsockopt(sock, 0, SRTO_RCVSYN, &yes, sizeof yes);
795         if (result == -1)
796             return result;
797 
798         if (m_timeout)
799             result = srt_setsockopt(sock, 0, SRTO_RCVTIMEO, &m_timeout, sizeof m_timeout);
800         else
801         {
802             int timeout = 1000;
803             result = srt_setsockopt(sock, 0, SRTO_RCVTIMEO, &timeout, sizeof timeout);
804         }
805         if (result == -1)
806             return result;
807     }
808 
809     // host is only checked for emptiness and depending on that the connection mode is selected.
810     // Here we are not exactly interested with that information.
811     vector<string> failures;
812 
813     SrtConfigurePost(sock, m_options, &failures);
814 
815 
816     if (!failures.empty())
817     {
818         if (Verbose::on)
819         {
820             Verb() << "WARNING: failed to set options: ";
821             copy(failures.begin(), failures.end(), ostream_iterator<string>(*Verbose::cverb, ", "));
822             Verb();
823         }
824     }
825 
826     return 0;
827 }
828 
ConfigurePre(SRTSOCKET sock)829 int SrtCommon::ConfigurePre(SRTSOCKET sock)
830 {
831     int result = 0;
832 
833     int no = 0;
834     if (!m_tsbpdmode)
835     {
836         result = srt_setsockopt(sock, 0, SRTO_TSBPDMODE, &no, sizeof no);
837         if (result == -1)
838             return result;
839     }
840 
841     // Let's pretend async mode is set this way.
842     // This is for asynchronous connect.
843     int maybe = m_blocking_mode;
844     result = srt_setsockopt(sock, 0, SRTO_RCVSYN, &maybe, sizeof maybe);
845     if (result == -1)
846         return result;
847 
848     // host is only checked for emptiness and depending on that the connection mode is selected.
849     // Here we are not exactly interested with that information.
850     vector<string> failures;
851 
852     // NOTE: here host = "", so the 'connmode' will be returned as LISTENER always,
853     // but it doesn't matter here. We don't use 'connmode' for anything else than
854     // checking for failures.
855     SocketOption::Mode conmode = SrtConfigurePre(sock, "",  m_options, &failures);
856 
857     if (conmode == SocketOption::FAILURE)
858     {
859         if (Verbose::on )
860         {
861             Verb() << "WARNING: failed to set options: ";
862             copy(failures.begin(), failures.end(), ostream_iterator<string>(*Verbose::cverb, ", "));
863             Verb();
864         }
865 
866         return SRT_ERROR;
867     }
868 
869     return 0;
870 }
871 
SetupAdapter(const string & host,int port)872 void SrtCommon::SetupAdapter(const string& host, int port)
873 {
874     auto lsa = CreateAddr(host, port);
875     int stat = srt_bind(m_sock, lsa.get(), sizeof lsa);
876     if (stat == SRT_ERROR)
877         Error("srt_bind");
878 }
879 
OpenClient(string host,int port)880 void SrtCommon::OpenClient(string host, int port)
881 {
882     PrepareClient();
883 
884     if (m_outgoing_port)
885     {
886         SetupAdapter("", m_outgoing_port);
887     }
888 
889     ConnectClient(host, port);
890 }
891 
PrepareClient()892 void SrtCommon::PrepareClient()
893 {
894     m_sock = srt_create_socket();
895     if (m_sock == SRT_ERROR)
896         Error("srt_create_socket");
897 
898     int stat = ConfigurePre(m_sock);
899     if (stat == SRT_ERROR)
900         Error("ConfigurePre");
901 
902     if (!m_blocking_mode)
903     {
904         srt_conn_epoll = AddPoller(m_sock, SRT_EPOLL_CONNECT | SRT_EPOLL_ERR);
905     }
906 
907 }
908 
909 #if ENABLE_EXPERIMENTAL_BONDING
TransmitGroupSocketConnect(void * srtcommon,SRTSOCKET sock,int error,const sockaddr *,int token)910 void TransmitGroupSocketConnect(void* srtcommon, SRTSOCKET sock, int error, const sockaddr* /*peer*/, int token)
911 {
912     SrtCommon* that = (SrtCommon*)srtcommon;
913 
914     if (error == SRT_SUCCESS)
915     {
916         return; // nothing to do for a successful socket
917     }
918 
919 #ifdef PLEASE_LOG
920     applog.Debug("connect callback: error on @", sock, " erc=", error, " token=", token);
921 #endif
922 
923     /* Example: identify by target address
924     sockaddr_any peersa = peer;
925     sockaddr_any agentsa;
926     bool haveso = (srt_getsockname(sock, agentsa.get(), &agentsa.len) != -1);
927     */
928 
929     for (auto& n: that->m_group_nodes)
930     {
931         if (n.token != -1 && n.token == token)
932         {
933             n.error = error;
934             n.reason = srt_getrejectreason(sock);
935             return;
936         }
937 
938         /*
939 
940         bool isso = haveso && !n.source.empty();
941         if (n.target == peersa && (!isso || n.source.equal_address(agentsa)))
942         {
943             Verb() << " (by target)" << VerbNoEOL;
944             n.error = error;
945             n.reason = srt_getrejectreason(sock);
946             return;
947         }
948         */
949     }
950 
951     Verb() << " IPE: LINK NOT FOUND???]";
952 }
953 
OpenGroupClient()954 void SrtCommon::OpenGroupClient()
955 {
956     SRT_GROUP_TYPE type = SRT_GTYPE_UNDEFINED;
957 
958     // Resolve group type.
959     if (m_group_type == "broadcast")
960         type = SRT_GTYPE_BROADCAST;
961     else if (m_group_type == "backup")
962         type = SRT_GTYPE_BACKUP;
963     else if (m_group_type == "balancing")
964         type = SRT_GTYPE_BALANCING;
965     else
966     {
967         Error("With //group, type='" + m_group_type + "' undefined");
968     }
969 
970     m_sock = srt_create_group(type);
971     if (m_sock == -1)
972         Error("srt_create_group");
973 
974     srt_connect_callback(m_sock, &TransmitGroupSocketConnect, this);
975 
976     int stat = -1;
977     if (m_group_config != "")
978     {
979         stat = srt_group_configure(m_sock, m_group_config.c_str());
980         if (stat == SRT_ERROR)
981             Error("srt_group_configure");
982     }
983 
984     stat = ConfigurePre(m_sock);
985 
986     if ( stat == SRT_ERROR )
987         Error("ConfigurePre");
988 
989     if (!m_blocking_mode)
990     {
991         // Note: here the GROUP is added to the poller.
992         srt_conn_epoll = AddPoller(m_sock, SRT_EPOLL_CONNECT | SRT_EPOLL_ERR);
993     }
994 
995     // Don't check this. Should this fail, the above would already.
996 
997     // XXX Now do it regardless whether it's blocking or non-blocking
998     // mode - reading from group is currently manually from every socket.
999     srt_epoll = srt_epoll_create();
1000 
1001     // ConnectClient can't be used here, the code must
1002     // be more-less repeated. In this case the situation
1003     // that not all connections can be established is tolerated,
1004     // the only case of error is when none of the connections
1005     // can be established.
1006 
1007     bool any_node = false;
1008 
1009     Verb() << "REDUNDANT connections with " << m_group_nodes.size() << " nodes:";
1010 
1011     if (m_group_data.empty())
1012         m_group_data.resize(1);
1013 
1014     vector<SRT_SOCKGROUPCONFIG> targets;
1015     int namelen = sizeof (sockaddr_any);
1016 
1017     Verb() << "Connecting to nodes:";
1018     int i = 1;
1019     for (Connection& c: m_group_nodes)
1020     {
1021         auto sa = CreateAddr(c.host, c.port);
1022         c.target = sa;
1023         Verb() << "\t[" << c.token << "] " << c.host << ":" << c.port << VerbNoEOL;
1024         vector<string> extras;
1025         if (c.weight)
1026             extras.push_back(Sprint("weight=", c.weight));
1027 
1028         if (!c.source.empty())
1029             extras.push_back("source=" + c.source.str());
1030 
1031         if (!extras.empty())
1032         {
1033             Verb() << "?" << extras[0] << VerbNoEOL;
1034             for (size_t i = 1; i < extras.size(); ++i)
1035                 Verb() << "&" << extras[i] << VerbNoEOL;
1036         }
1037 
1038         Verb();
1039         ++i;
1040         const sockaddr* source = c.source.empty() ? nullptr : c.source.get();
1041         SRT_SOCKGROUPCONFIG gd = srt_prepare_endpoint(source, sa.get(), namelen);
1042         gd.weight = c.weight;
1043         gd.config = c.options;
1044 
1045         targets.push_back(gd);
1046     }
1047 
1048     ::transmit_throw_on_interrupt = true;
1049     for (;;) // REPEATABLE BLOCK
1050     {
1051 Connect_Again:
1052         Verb() << "Waiting for group connection... " << VerbNoEOL;
1053 
1054         int fisock = srt_connect_group(m_sock, targets.data(), targets.size());
1055 
1056         if (fisock == SRT_ERROR)
1057         {
1058             // Complete the error information for every member
1059             ostringstream out;
1060             set<int> reasons;
1061             for (Connection& c: m_group_nodes)
1062             {
1063                 if (c.error != SRT_SUCCESS)
1064                 {
1065                     out << "[" << c.token << "] " << c.host << ":" << c.port;
1066                     if (!c.source.empty())
1067                         out << "[[" << c.source.str() << "]]";
1068                     out << ": " << srt_strerror(c.error, 0) << ": " << srt_rejectreason_str(c.reason) << endl;
1069                 }
1070                 reasons.insert(c.reason);
1071             }
1072 
1073             if (transmit_retry_connect && (transmit_retry_always || (reasons.size() == 1 && *reasons.begin() == SRT_REJ_TIMEOUT)))
1074             {
1075                 if (transmit_retry_connect != -1)
1076                     --transmit_retry_connect;
1077 
1078                 Verb() << "...all links timeout, retrying (" << transmit_retry_connect << ")...";
1079                 continue;
1080             }
1081 
1082             Error("srt_connect_group, nodes:\n" + out.str());
1083         }
1084         else
1085         {
1086             Verb() << "[ASYNC] will wait..." << VerbNoEOL;
1087         }
1088 
1089         break;
1090     }
1091 
1092     if (m_blocking_mode)
1093     {
1094         Verb() << "SUCCESSFUL";
1095     }
1096     else
1097     {
1098         Verb() << "INITIATED [ASYNC]";
1099     }
1100 
1101     // Configuration change applied on a group should
1102     // spread the setting on all sockets.
1103     ConfigurePost(m_sock);
1104 
1105     for (size_t i = 0; i < targets.size(); ++i)
1106     {
1107         // As m_group_nodes is simply transformed into 'targets',
1108         // one index can be used to index them all. You don't
1109         // have to check if they have equal addresses because they
1110         // are equal by definition.
1111         if (targets[i].id != -1 && targets[i].errorcode == SRT_SUCCESS)
1112         {
1113             m_group_nodes[i].socket = targets[i].id;
1114         }
1115     }
1116 
1117     // Now check which sockets were successful, only those
1118     // should be added to epoll.
1119     size_t size = m_group_data.size();
1120     stat = srt_group_data(m_sock, m_group_data.data(), &size);
1121     if (stat == -1 && size > m_group_data.size())
1122     {
1123         // Just too small buffer. Resize and continue.
1124         m_group_data.resize(size);
1125         stat = srt_group_data(m_sock, m_group_data.data(), &size);
1126     }
1127 
1128     if (stat == -1)
1129     {
1130         Error("srt_group_data");
1131     }
1132     m_group_data.resize(size);
1133 
1134     for (size_t i = 0; i < m_group_nodes.size(); ++i)
1135     {
1136         SRTSOCKET insock = m_group_nodes[i].socket;
1137         if (insock == -1)
1138         {
1139             Verb() << "TARGET '" << sockaddr_any(targets[i].peeraddr).str() << "' connection failed.";
1140             continue;
1141         }
1142 
1143         // Have socket, store it into the group socket array.
1144         any_node = true;
1145     }
1146 
1147     if (!any_node)
1148         Error("All connections failed");
1149 
1150     // Wait for REAL connected state if nonblocking mode, for AT LEAST one node.
1151     if (!m_blocking_mode)
1152     {
1153         Verb() << "[ASYNC] " << VerbNoEOL;
1154 
1155         // SPIN-WAITING version. Don't use it unless you know what you're doing.
1156         // SpinWaitAsync();
1157 
1158         // Socket readiness for connection is checked by polling on WRITE allowed sockets.
1159         int len1 = 2, len2 = 2;
1160         SRTSOCKET ready_conn[2], ready_err[2];
1161         if (srt_epoll_wait(srt_conn_epoll,
1162                     ready_err, &len2,
1163                     ready_conn, &len1,
1164                     -1, // Wait infinitely
1165                     NULL, NULL,
1166                     NULL, NULL) != -1)
1167         {
1168             Verb() << "[C]" << VerbNoEOL;
1169             for (int i = 0; i < len1; ++i)
1170                 Verb() << " " << ready_conn[i] << VerbNoEOL;
1171             Verb() << "[E]" << VerbNoEOL;
1172             for (int i = 0; i < len2; ++i)
1173                 Verb() << " " << ready_err[i] << VerbNoEOL;
1174 
1175             Verb() << "";
1176 
1177             // We are waiting for one entity to be ready so it's either
1178             // in one or the other
1179             if (find(ready_err, ready_err+len2, m_sock) != ready_err+len2)
1180             {
1181                 Verb() << "[EPOLL: " << len2 << " entities FAILED]";
1182                 // Complete the error information for every member
1183                 ostringstream out;
1184                 set<int> reasons;
1185                 for (Connection& c: m_group_nodes)
1186                 {
1187                     if (c.error != SRT_SUCCESS)
1188                     {
1189                         out << "[" << c.token << "] " << c.host << ":" << c.port;
1190                         if (!c.source.empty())
1191                             out << "[[" << c.source.str() << "]]";
1192                         out << ": " << srt_strerror(c.error, 0) << ": " << srt_rejectreason_str(c.reason) << endl;
1193                     }
1194                     reasons.insert(c.reason);
1195                 }
1196 
1197                 if (transmit_retry_connect && (transmit_retry_always || (reasons.size() == 1 && *reasons.begin() == SRT_REJ_TIMEOUT)))
1198                 {
1199                     if (transmit_retry_connect != -1)
1200                         --transmit_retry_connect;
1201 
1202 
1203                     Verb() << "...all links timeout, retrying NOW (" << transmit_retry_connect << ")...";
1204                     goto Connect_Again;
1205                 }
1206 
1207                 Error("srt_connect_group, nodes:\n" + out.str());
1208             }
1209             else if (find(ready_conn, ready_conn+len1, m_sock) != ready_conn+len1)
1210             {
1211                 Verb() << "[EPOLL: " << len1 << " entities] " << VerbNoEOL;
1212             }
1213             else
1214             {
1215                 Error("Group: SPURIOUS epoll readiness");
1216             }
1217         }
1218         else
1219         {
1220             Error("srt_epoll_wait");
1221         }
1222     }
1223 
1224     stat = ConfigurePost(m_sock);
1225     if (stat == -1)
1226     {
1227         // This kind of error must reject the whole operation.
1228         // Usually you'll get this error on the first socket,
1229         // and doing this on the others would result in the same.
1230         Error("ConfigurePost");
1231     }
1232 
1233     ::transmit_throw_on_interrupt = false;
1234 
1235     Verb() << "Group connection report:";
1236     for (auto& d: m_group_data)
1237     {
1238         // id, status, result, peeraddr
1239         Verb() << "@" << d.id << " <" << SockStatusStr(d.sockstate) << "> (=" << d.result << ") PEER:"
1240             << sockaddr_any((sockaddr*)&d.peeraddr, sizeof d.peeraddr).str();
1241     }
1242 
1243     // Prepare group data for monitoring the group status.
1244     m_group_data.resize(m_group_nodes.size());
1245 }
1246 #endif
1247 
1248 /*
1249    This may be used sometimes for testing, but it's nonportable.
1250    void SrtCommon::SpinWaitAsync()
1251    {
1252    static string udt_status_names [] = {
1253    "INIT" , "OPENED", "LISTENING", "CONNECTING", "CONNECTED", "BROKEN", "CLOSING", "CLOSED", "NONEXIST"
1254    };
1255 
1256    for (;;)
1257    {
1258    SRT_SOCKSTATUS state = srt_getsockstate(m_sock);
1259    if (int(state) < SRTS_CONNECTED)
1260    {
1261    if (Verbose::on)
1262    Verb() << state;
1263    usleep(250000);
1264    continue;
1265    }
1266    else if (int(state) > SRTS_CONNECTED)
1267    {
1268    Error("UDT::connect status=" + udt_status_names[state]);
1269    }
1270 
1271    return;
1272    }
1273    }
1274  */
1275 
1276 struct TransmitErrorReason
1277 {
1278     int error;
1279     int reason;
1280 };
1281 
1282 static std::map<SRTSOCKET, TransmitErrorReason> transmit_error_storage;
1283 
TransmitConnectCallback(void *,SRTSOCKET socket,int errorcode,const sockaddr *,int)1284 static void TransmitConnectCallback(void*, SRTSOCKET socket, int errorcode, const sockaddr* /*peer*/, int /*token*/)
1285 {
1286     int reason = srt_getrejectreason(socket);
1287     transmit_error_storage[socket] = TransmitErrorReason { errorcode, reason };
1288     Verb() << "[Connection error reported on @" << socket << "]";
1289 }
1290 
ConnectClient(string host,int port)1291 void SrtCommon::ConnectClient(string host, int port)
1292 {
1293     auto sa = CreateAddr(host, port);
1294     Verb() << "Connecting to " << host << ":" << port << " ... " << VerbNoEOL;
1295 
1296     if (!m_blocking_mode)
1297     {
1298         srt_connect_callback(m_sock, &TransmitConnectCallback, 0);
1299     }
1300 
1301     int stat = -1;
1302     for (;;)
1303     {
1304         ::transmit_throw_on_interrupt = true;
1305         stat = srt_connect(m_sock, sa.get(), sizeof sa);
1306         ::transmit_throw_on_interrupt = false;
1307         if (stat == SRT_ERROR)
1308         {
1309             int reason = srt_getrejectreason(m_sock);
1310 #if PLEASE_LOG
1311             LOGP(applog.Error, "ERROR reported by srt_connect - closing socket @", m_sock);
1312 #endif
1313             if (transmit_retry_connect && (transmit_retry_always || reason == SRT_REJ_TIMEOUT))
1314             {
1315                 if (transmit_retry_connect != -1)
1316                     --transmit_retry_connect;
1317 
1318                 Verb() << "...timeout, retrying (" << transmit_retry_connect << ")...";
1319                 continue;
1320             }
1321 
1322             srt_close(m_sock);
1323             Error("srt_connect", reason);
1324         }
1325         break;
1326     }
1327 
1328     // Wait for REAL connected state if nonblocking mode
1329     if (!m_blocking_mode)
1330     {
1331         Verb() << "[ASYNC] " << VerbNoEOL;
1332 
1333         // SPIN-WAITING version. Don't use it unless you know what you're doing.
1334         // SpinWaitAsync();
1335 
1336         // Socket readiness for connection is checked by polling on WRITE allowed sockets.
1337         int lenc = 2, lene = 2;
1338         SRTSOCKET ready_connect[2], ready_error[2];
1339         if (srt_epoll_wait(srt_conn_epoll, ready_error, &lene, ready_connect, &lenc, -1, 0, 0, 0, 0) != -1)
1340         {
1341             // We should have just one socket, so check whatever socket
1342             // is in the transmit_error_storage.
1343             if (!transmit_error_storage.empty())
1344             {
1345                 Verb() << "[CALLBACK(error): " << VerbNoEOL;
1346                 int error, reason;
1347                 bool failed = false;
1348                 for (pair<const SRTSOCKET, TransmitErrorReason>& e: transmit_error_storage)
1349                 {
1350                     Verb() << "{@" << e.first << " error=" << e.second.error
1351                         << " reason=" << e.second.reason << "} " << VerbNoEOL;
1352                     error = e.second.error;
1353                     reason = e.second.reason;
1354                     if (error != SRT_SUCCESS)
1355                         failed = true;
1356                 }
1357                 Verb() << "]";
1358                 transmit_error_storage.clear();
1359                 if (failed)
1360                     Error("srt_connect(async/cb)", reason, error);
1361             }
1362 
1363             if (lene > 0)
1364             {
1365                 Verb() << "[EPOLL(error): " << lene << " sockets]";
1366                 int reason = srt_getrejectreason(ready_error[0]);
1367                 Error("srt_connect(async)", reason, SRT_ECONNREJ);
1368             }
1369             Verb() << "[EPOLL: " << lenc << " sockets] " << VerbNoEOL;
1370         }
1371         else
1372         {
1373             transmit_error_storage.clear();
1374             Error("srt_epoll_wait(srt_conn_epoll)");
1375         }
1376 
1377         transmit_error_storage.clear();
1378     }
1379 
1380     Verb() << " connected.";
1381     stat = ConfigurePost(m_sock);
1382     if (stat == SRT_ERROR)
1383         Error("ConfigurePost");
1384 }
1385 
Error(string src,int reason,int force_result)1386 void SrtCommon::Error(string src, int reason, int force_result)
1387 {
1388     int errnov = 0;
1389     const int result = force_result == 0 ? srt_getlasterror(&errnov) : force_result;
1390     if (result == SRT_SUCCESS)
1391     {
1392         cerr << "\nERROR (app): " << src << endl;
1393         throw std::runtime_error(src);
1394     }
1395     string message = srt_strerror(result, errnov);
1396     if (result == SRT_ECONNREJ)
1397     {
1398         if ( Verbose::on )
1399             Verb() << "FAILURE\n" << src << ": [" << result << "] "
1400                 << "Connection rejected: [" << int(reason) << "]: "
1401                 << srt_rejectreason_str(reason);
1402         else
1403             cerr << "\nERROR #" << result
1404                 << ": Connection rejected: [" << int(reason) << "]: "
1405                 << srt_rejectreason_str(reason);
1406     }
1407     else
1408     {
1409         if ( Verbose::on )
1410         Verb() << "FAILURE\n" << src << ": [" << result << "." << errnov << "] " << message;
1411         else
1412         cerr << "\nERROR #" << result << "." << errnov << ": " << message << endl;
1413     }
1414 
1415     throw TransmissionError("error: " + src + ": " + message);
1416 }
1417 
SetupRendezvous(string adapter,string host,int port)1418 void SrtCommon::SetupRendezvous(string adapter, string host, int port)
1419 {
1420     sockaddr_any target = CreateAddr(host, port);
1421     if (target.family() == AF_UNSPEC)
1422     {
1423         Error("Unable to resolve target host: " + host);
1424     }
1425 
1426     bool yes = true;
1427     srt_setsockopt(m_sock, 0, SRTO_RENDEZVOUS, &yes, sizeof yes);
1428 
1429     const int outport = m_outgoing_port ? m_outgoing_port : port;
1430 
1431     // Prefer the same IPv as target host
1432     auto localsa = CreateAddr(adapter, outport, target.family());
1433     string showhost = adapter;
1434     if (showhost == "")
1435         showhost = "ANY";
1436     if (target.family() == AF_INET6)
1437         showhost = "[" + showhost + "]";
1438     Verb() << "Binding rendezvous: " << showhost << ":" << outport << " ...";
1439     int stat = srt_bind(m_sock, localsa.get(), localsa.size());
1440     if (stat == SRT_ERROR)
1441     {
1442         srt_close(m_sock);
1443         Error("srt_bind");
1444     }
1445 }
1446 
Close()1447 void SrtCommon::Close()
1448 {
1449 #if PLEASE_LOG
1450         extern srt_logging::Logger applog;
1451         LOGP(applog.Error, "CLOSE requested - closing socket @", m_sock);
1452 #endif
1453     bool any = false;
1454     bool yes = true;
1455     if (m_sock != SRT_INVALID_SOCK)
1456     {
1457         Verb() << "SrtCommon: DESTROYING CONNECTION, closing socket (rt%" << m_sock << ")...";
1458         srt_setsockflag(m_sock, SRTO_SNDSYN, &yes, sizeof yes);
1459         srt_close(m_sock);
1460         any = true;
1461     }
1462 
1463     if (m_bindsock != SRT_INVALID_SOCK)
1464     {
1465         Verb() << "SrtCommon: DESTROYING SERVER, closing socket (ls%" << m_bindsock << ")...";
1466         // Set sndsynchro to the socket to synch-close it.
1467         srt_setsockflag(m_bindsock, SRTO_SNDSYN, &yes, sizeof yes);
1468         srt_close(m_bindsock);
1469         any = true;
1470     }
1471 
1472     if (any)
1473         Verb() << "SrtCommon: ... done.";
1474 }
1475 
~SrtCommon()1476 SrtCommon::~SrtCommon()
1477 {
1478     Close();
1479 }
1480 
1481 #if ENABLE_EXPERIMENTAL_BONDING
UpdateGroupStatus(const SRT_SOCKGROUPDATA * grpdata,size_t grpdata_size)1482 void SrtCommon::UpdateGroupStatus(const SRT_SOCKGROUPDATA* grpdata, size_t grpdata_size)
1483 {
1484     if (!grpdata)
1485     {
1486         // This happens when you passed too small array. Treat this as error and stop.
1487         cerr << "ERROR: broadcast group update reports " << grpdata_size
1488             << " existing sockets, but app registerred only " << m_group_nodes.size() << endl;
1489         Error("Too many unpredicted sockets in the group");
1490     }
1491 
1492     // Clear the active flag in all nodes so that they are reactivated
1493     // if they are in the group list, REGARDLESS OF THE STATUS. We need to
1494     // see all connections that are in the nodes, but not in the group,
1495     // and this one would have to be activated.
1496     const SRT_SOCKGROUPDATA* gend = grpdata + grpdata_size;
1497     for (auto& n: m_group_nodes)
1498     {
1499         bool active = (find_if(grpdata, gend,
1500                     [&n] (const SRT_SOCKGROUPDATA& sg) { return sg.id == n.socket; }) != gend);
1501         if (!active)
1502             n.socket = SRT_INVALID_SOCK;
1503     }
1504 
1505     // Note: sockets are not necessarily in the same order. Find
1506     // the socket by id.
1507     for (size_t i = 0; i < grpdata_size; ++i)
1508     {
1509         const SRT_SOCKGROUPDATA& d = grpdata[i];
1510         SRTSOCKET id = d.id;
1511 
1512         SRT_SOCKSTATUS status = d.sockstate;
1513         int result = d.result;
1514         SRT_MEMBERSTATUS mstatus = d.memberstate;
1515 
1516         if (result != -1 && status == SRTS_CONNECTED)
1517         {
1518             // Short report with the state.
1519             Verb() << "G@" << id << "<" << MemberStatusStr(mstatus) << "> " << VerbNoEOL;
1520             continue;
1521         }
1522         // id, status, result, peeraddr
1523         Verb() << "\n\tG@" << id << " <" << SockStatusStr(status) << "/" << MemberStatusStr(mstatus) << "> (=" << result << ") PEER:"
1524             << sockaddr_any((sockaddr*)&d.peeraddr, sizeof d.peeraddr).str() << VerbNoEOL;
1525 
1526         if (status >= SRTS_BROKEN)
1527         {
1528             Verb() << "NOTE: socket @" << id << " is pending for destruction, waiting for it.";
1529         }
1530     }
1531 
1532     // This was only informative. Now we check all nodes if they
1533     // are not active
1534 
1535     int i = 1;
1536     for (auto& n: m_group_nodes)
1537     {
1538         if (n.error != SRT_SUCCESS)
1539         {
1540             Verb() << "[" << i << "] CONNECTION FAILURE to '" << n.host << ":" << n.port << "': "
1541                 << srt_strerror(n.error, 0) << ":" << srt_rejectreason_str(n.reason);
1542         }
1543 
1544         // Check which nodes are no longer active and activate them.
1545         if (n.socket != SRT_INVALID_SOCK)
1546             continue;
1547 
1548         auto sa = CreateAddr(n.host, n.port);
1549         Verb() << "[" << i << "] RECONNECTING to node " << n.host << ":" << n.port << " ... " << VerbNoEOL;
1550         ++i;
1551 
1552         n.error = SRT_SUCCESS;
1553         n.reason = SRT_REJ_UNKNOWN;
1554 
1555         const sockaddr* source = n.source.empty() ? nullptr : n.source.get();
1556         SRT_SOCKGROUPCONFIG gd = srt_prepare_endpoint(source, sa.get(), sa.size());
1557         gd.weight = n.weight;
1558         gd.config = n.options;
1559         gd.token = n.token;
1560 
1561         int fisock = srt_connect_group(m_sock, &gd, 1);
1562         if (fisock == SRT_ERROR)
1563         {
1564             // Whatever. Skip the node.
1565             Verb() << "FAILED: ";
1566         }
1567         else
1568         {
1569             // Have socket, store it into the group socket array.
1570             n.socket = gd.id;
1571         }
1572     }
1573 }
1574 #endif
1575 
SrtSource(string host,int port,std::string path,const map<string,string> & par)1576 SrtSource::SrtSource(string host, int port, std::string path, const map<string,string>& par)
1577 {
1578     Init(host, port, path, par, SRT_EPOLL_IN);
1579     ostringstream os;
1580     os << host << ":" << port;
1581     hostport_copy = os.str();
1582 }
1583 
PrintSrtStats(SRTSOCKET sock,bool clr,bool bw,bool stats)1584 static void PrintSrtStats(SRTSOCKET sock, bool clr, bool bw, bool stats)
1585 {
1586     CBytePerfMon perf;
1587     // clear only if stats report is to be read
1588     srt_bstats(sock, &perf, clr);
1589 
1590     if (bw)
1591         cout << transmit_stats_writer->WriteBandwidth(perf.mbpsBandwidth);
1592     if (stats)
1593         cout << transmit_stats_writer->WriteStats(sock, perf);
1594 }
1595 
1596 
1597 #ifdef SRT_OLD_APP_READER
1598 
1599 // NOTE: 'output' is expected to be EMPTY here.
GroupCheckPacketAhead(bytevector & output)1600 bool SrtSource::GroupCheckPacketAhead(bytevector& output)
1601 {
1602     bool status = false;
1603     vector<SRTSOCKET> past_ahead;
1604 
1605     // This map no longer maps only ahead links.
1606     // Here are all links, and whether ahead, it's defined by the sequence.
1607     for (auto i = m_group_positions.begin(); i != m_group_positions.end(); ++i)
1608     {
1609         // i->first: socket ID
1610         // i->second: ReadPos { sequence, packet }
1611         // We are not interested with the socket ID because we
1612         // aren't going to read from it - we have the packet already.
1613         ReadPos& a = i->second;
1614 
1615         int seqdiff = CSeqNo::seqcmp(a.sequence, m_group_seqno);
1616         if ( seqdiff == 1)
1617         {
1618             // The very next packet. Return it.
1619             m_group_seqno = a.sequence;
1620             Verb() << " (SRT group: ahead delivery %" << a.sequence << " from @" << i->first << ")";
1621             swap(output, a.packet);
1622             status = true;
1623         }
1624         else if (seqdiff < 1 && !a.packet.empty())
1625         {
1626             Verb() << " (@" << i->first << " dropping collected ahead %" << a.sequence << ")";
1627             a.packet.clear();
1628         }
1629         // In case when it's >1, keep it in ahead
1630     }
1631 
1632     return status;
1633 }
1634 
DisplayEpollResults(const std::set<SRTSOCKET> & sockset,std::string prefix)1635 static string DisplayEpollResults(const std::set<SRTSOCKET>& sockset, std::string prefix)
1636 {
1637     typedef set<SRTSOCKET> fset_t;
1638     ostringstream os;
1639     os << prefix << " ";
1640     for (fset_t::const_iterator i = sockset.begin(); i != sockset.end(); ++i)
1641     {
1642         os << "@" << *i << " ";
1643     }
1644 
1645     return os.str();
1646 }
1647 
GroupRead(size_t chunk)1648 bytevector SrtSource::GroupRead(size_t chunk)
1649 {
1650     // Read the current group status. m_sock is here the group id.
1651     bytevector output;
1652 
1653     // Later iteration over it might be less efficient than
1654     // by vector, but we'll also often try to check a single id
1655     // if it was ever seen broken, so that it's skipped.
1656     set<SRTSOCKET> broken;
1657 
1658 RETRY_READING:
1659 
1660     size_t size = m_group_data.size();
1661     int stat = srt_group_data(m_sock, m_group_data.data(), &size);
1662     if (stat == -1 && size > m_group_data.size())
1663     {
1664         // Just too small buffer. Resize and continue.
1665         m_group_data.resize(size);
1666         stat = srt_group_data(m_sock, m_group_data.data(), &size);
1667     }
1668     else
1669     {
1670         // Downsize if needed.
1671         m_group_data.resize(size);
1672     }
1673 
1674     if (stat == -1) // Also after the above fix
1675     {
1676         Error(UDT::getlasterror(), "FAILURE when reading group data");
1677     }
1678 
1679     if (size == 0)
1680     {
1681         Error("No sockets in the group - disconnected");
1682     }
1683 
1684     bool connected = false;
1685     for (auto& d: m_group_data)
1686     {
1687         if (d.status == SRTS_CONNECTED)
1688         {
1689             connected = true;
1690             break;
1691         }
1692     }
1693     if (!connected)
1694     {
1695         Error("All sockets in the group disconnected");
1696     }
1697 
1698     if (Verbose::on)
1699     {
1700         for (auto& d: m_group_data)
1701         {
1702             if (d.status != SRTS_CONNECTED)
1703                 // id, status, result, peeraddr
1704                 Verb() << "@" << d.id << " <" << SockStatusStr(d.status) << "> (=" << d.result << ") PEER:"
1705                     << sockaddr_any((sockaddr*)&d.peeraddr, sizeof d.peeraddr).str();
1706         }
1707     }
1708 
1709     // Check first the ahead packets if you have any to deliver.
1710     if (m_group_seqno != -1 && !m_group_positions.empty())
1711     {
1712         bytevector ahead_packet;
1713 
1714         // This function also updates the group sequence pointer.
1715         if (GroupCheckPacketAhead(ahead_packet))
1716             return move(ahead_packet);
1717     }
1718 
1719     // LINK QUALIFICATION NAMES:
1720     //
1721     // HORSE: Correct link, which delivers the very next sequence.
1722     // Not necessarily this link is currently active.
1723     //
1724     // KANGAROO: Got some packets dropped and the sequence number
1725     // of the packet jumps over the very next sequence and delivers
1726     // an ahead packet.
1727     //
1728     // ELEPHANT: Is not ready to read, while others are, or reading
1729     // up to the current latest delivery sequence number does not
1730     // reach this sequence and the link becomes non-readable earlier.
1731 
1732     // The above condition has ruled out one kangaroo and turned it
1733     // into a horse.
1734 
1735     // Below there's a loop that will try to extract packets. Kangaroos
1736     // will be among the polled ones because skipping them risks that
1737     // the elephants will take over the reading. Links already known as
1738     // elephants will be also polled in an attempt to revitalize the
1739     // connection that experienced just a short living choking.
1740     //
1741     // After polling we attempt to read from every link that reported
1742     // read-readiness and read at most up to the sequence equal to the
1743     // current delivery sequence.
1744 
1745     // Links that deliver a packet below that sequence will be retried
1746     // until they deliver no more packets or deliver the packet of
1747     // expected sequence. Links that don't have a record in m_group_positions
1748     // and report readiness will be always read, at least to know what
1749     // sequence they currently stand on.
1750     //
1751     // Links that are already known as kangaroos will be polled, but
1752     // no reading attempt will be done. If after the reading series
1753     // it will turn out that we have no more horses, the slowest kangaroo
1754     // will be "advanced to a horse" (the ahead link with a sequence
1755     // closest to the current delivery sequence will get its sequence
1756     // set as current delivered and its recorded ahead packet returned
1757     // as the read packet).
1758 
1759     // If we find at least one horse, the packet read from that link
1760     // will be delivered. All other link will be just ensured update
1761     // up to this sequence number, or at worst all available packets
1762     // will be read. In this case all kangaroos remain kangaroos,
1763     // until the current delivery sequence m_group_seqno will be lifted
1764     // to the sequence recorded for these links in m_group_positions,
1765     // during the next time ahead check, after which they will become
1766     // horses.
1767 
1768     Verb() << "E(" << srt_epoll << ") " << VerbNoEOL;
1769 
1770     for (size_t i = 0; i < size; ++i)
1771     {
1772         SRT_SOCKGROUPDATA& d = m_group_data[i];
1773         if (d.status == SRTS_CONNECTING)
1774         {
1775             Verb() << "@" << d.id << "<pending> " << VerbNoEOL;
1776             int modes = SRT_EPOLL_OUT | SRT_EPOLL_ERR;
1777             srt_epoll_add_usock(srt_epoll, d.id, &modes);
1778             continue; // don't read over a failed or pending socket
1779         }
1780 
1781         if (d.status >= SRTS_BROKEN)
1782         {
1783             broken.insert(d.id);
1784         }
1785 
1786         if (broken.count(d.id))
1787         {
1788             Verb() << "@" << d.id << "<broken> " << VerbNoEOL;
1789             continue;
1790         }
1791 
1792         if (d.status != SRTS_CONNECTED)
1793         {
1794             Verb() << "@" << d.id << "<idle:" << SockStatusStr(d.status) << "> " << VerbNoEOL;
1795             // Sockets in this state are ignored. We are waiting until it
1796             // achieves CONNECTING state, then it's added to write.
1797             continue;
1798         }
1799 
1800         // Don't skip packets that are ahead because if we have a situation
1801         // that all links are either "elephants" (do not report read readiness)
1802         // and "kangaroos" (have already delivered an ahead packet) then
1803         // omiting kangaroos will result in only elephants to be polled for
1804         // reading. Elephants, due to the strict timing requirements and
1805         // ensurance that TSBPD on every link will result in exactly the same
1806         // delivery time for a packet of given sequence, having an elephant
1807         // and kangaroo in one cage means that the elephant is simply a broken
1808         // or half-broken link (the data are not delivered, but it will get
1809         // repaired soon, enough for SRT to maintain the connection, but it
1810         // will still drop packets that didn't arrive in time), in both cases
1811         // it may potentially block the reading for an indefinite time, while
1812         // simultaneously a kangaroo might be a link that got some packets
1813         // dropped, but then it's still capable to deliver packets on time.
1814 
1815         // Note also that about the fact that some links turn out to be
1816         // elephants we'll learn only after we try to poll and read them.
1817 
1818         // Note that d.id might be a socket that was previously being polled
1819         // on write, when it's attempting to connect, but now it's connected.
1820         // This will update the socket with the new event set.
1821 
1822         int modes = SRT_EPOLL_IN | SRT_EPOLL_ERR;
1823         srt_epoll_add_usock(srt_epoll, d.id, &modes);
1824         Verb() << "@" << d.id << "[READ] " << VerbNoEOL;
1825     }
1826 
1827     Verb() << "";
1828 
1829     // Here we need to make an additional check.
1830     // There might be a possibility that all sockets that
1831     // were added to the reader group, are ahead. At least
1832     // surely we don't have a situation that any link contains
1833     // an ahead-read subsequent packet, because GroupCheckPacketAhead
1834     // already handled that case.
1835     //
1836     // What we can have is that every link has:
1837     // - no known seq position yet (is not registered in the position map yet)
1838     // - the position equal to the latest delivered sequence
1839     // - the ahead position
1840 
1841     // Now the situation is that we don't have any packets
1842     // waiting for delivery so we need to wait for any to report one.
1843     // XXX We support blocking mode only at the moment.
1844     // The non-blocking mode would need to simply check the readiness
1845     // with only immediate report, and read-readiness would have to
1846     // be done in background.
1847 
1848     SrtPollState sready;
1849 
1850     // Poll on this descriptor until reading is available, indefinitely.
1851     if (UDT::epoll_swait(srt_epoll, sready, -1) == SRT_ERROR)
1852     {
1853         Error(UDT::getlasterror(), "UDT::epoll_swait(srt_epoll, group)");
1854     }
1855     if (Verbose::on)
1856     {
1857         Verb() << "RDY: {"
1858             << DisplayEpollResults(sready.rd(), "[R]")
1859             << DisplayEpollResults(sready.wr(), "[W]")
1860             << DisplayEpollResults(sready.ex(), "[E]")
1861             << "} " << VerbNoEOL;
1862 
1863     }
1864 
1865     LOGC(applog.Debug, log << "epoll_swait: "
1866             << DisplayEpollResults(sready.rd(), "[R]")
1867             << DisplayEpollResults(sready.wr(), "[W]")
1868             << DisplayEpollResults(sready.ex(), "[E]"));
1869 
1870     typedef set<SRTSOCKET> fset_t;
1871 
1872     // Handle sockets of pending connection and with errors.
1873     broken = sready.ex();
1874 
1875     // We don't do anything about sockets that have been configured to
1876     // poll on writing (that is, pending for connection). What we need
1877     // is that the epoll_swait call exit on that fact. Probably if this
1878     // was the only socket reported, no broken and no read-ready, this
1879     // will later check on output if still empty, if so, repeat the whole
1880     // function. This write-ready socket will be there already in the
1881     // connected state and will be added to read-polling.
1882 
1883     // Ok, now we need to have some extra qualifications:
1884     // 1. If a socket has no registry yet, we read anyway, just
1885     // to notify the current position. We read ONLY ONE PACKET this time,
1886     // we'll worry later about adjusting it to the current group sequence
1887     // position.
1888     // 2. If a socket is already position ahead, DO NOT read from it, even
1889     // if it is ready.
1890 
1891     // The state of things whether we were able to extract the very next
1892     // sequence will be simply defined by the fact that `output` is nonempty.
1893 
1894     int32_t next_seq = m_group_seqno;
1895 
1896     // If this set is empty, it won't roll even once, therefore output
1897     // will be surely empty. This will be checked then same way as when
1898     // reading from every socket resulted in error.
1899     for (fset_t::const_iterator i = sready.rd().begin(); i != sready.rd().end(); ++i)
1900     {
1901         // Check if this socket is in aheads
1902         // If so, don't read from it, wait until the ahead is flushed.
1903 
1904         SRTSOCKET id = *i;
1905         ReadPos* p = nullptr;
1906         auto pe = m_group_positions.find(id);
1907         if (pe != m_group_positions.end())
1908         {
1909             p = &pe->second;
1910             // Possible results of comparison:
1911             // x < 0: the sequence is in the past, the socket should be adjusted FIRST
1912             // x = 0: the socket should be ready to get the exactly next packet
1913             // x = 1: the case is already handled by GroupCheckPacketAhead.
1914             // x > 1: AHEAD. DO NOT READ.
1915             int seqdiff = CSeqNo::seqcmp(p->sequence, m_group_seqno);
1916             if (seqdiff > 1)
1917             {
1918                 Verb() << "EPOLL: @" << id << " %" << p->sequence << " AHEAD, not reading.";
1919                 continue;
1920             }
1921         }
1922 
1923 
1924         // Read from this socket stubbornly, until:
1925         // - reading is no longer possible (AGAIN)
1926         // - the sequence difference is >= 1
1927 
1928         int fi = 1; // marker for Verb to display flushing
1929         for (;;)
1930         {
1931             bytevector data(chunk);
1932             SRT_MSGCTRL mctrl = srt_msgctrl_default;
1933             stat = srt_recvmsg2(id, data.data(), chunk, &mctrl);
1934             if (stat == SRT_ERROR)
1935             {
1936                 if (fi == 0)
1937                 {
1938                     if (Verbose::on)
1939                     {
1940                         if (p)
1941                         {
1942                             int32_t pktseq = p->sequence;
1943                             int seqdiff = CSeqNo::seqcmp(p->sequence, m_group_seqno);
1944                             Verb() << ". %" << pktseq << " " << seqdiff << ")";
1945                         }
1946                         else
1947                         {
1948                             Verb() << ".)";
1949                         }
1950                     }
1951                     fi = 1;
1952                 }
1953                 int err = srt_getlasterror(0);
1954                 if (err == SRT_EASYNCRCV)
1955                 {
1956                     // Do not treat this as spurious, just stop reading.
1957                     break;
1958                 }
1959                 Verb() << "Error @" << id << ": " << srt_getlasterror_str();
1960                 broken.insert(id);
1961                 break;
1962             }
1963 
1964             // NOTE: checks against m_group_seqno and decisions based on it
1965             // must NOT be done if m_group_seqno is -1, which means that we
1966             // are about to deliver the very first packet and we take its
1967             // sequence number as a good deal.
1968 
1969             // The order must be:
1970             // - check discrepancy
1971             // - record the sequence
1972             // - check ordering.
1973             // The second one must be done always, but failed discrepancy
1974             // check should exclude the socket from any further checks.
1975             // That's why the common check for m_group_seqno != -1 can't
1976             // embrace everything below.
1977 
1978             // We need to first qualify the sequence, just for a case
1979             if (m_group_seqno != -1 && abs(m_group_seqno - mctrl.pktseq) > CSeqNo::m_iSeqNoTH)
1980             {
1981                 // This error should be returned if the link turns out
1982                 // to be the only one, or set to the group data.
1983                 // err = SRT_ESECFAIL;
1984                 if (fi == 0)
1985                 {
1986                     Verb() << ".)";
1987                     fi = 1;
1988                 }
1989                 Verb() << "Error @" << id << ": SEQUENCE DISCREPANCY: base=%" << m_group_seqno << " vs pkt=%" << mctrl.pktseq << ", setting ESECFAIL";
1990                 broken.insert(id);
1991                 break;
1992             }
1993 
1994             // Rewrite it to the state for a case when next reading
1995             // would not succeed. Do not insert the buffer here because
1996             // this is only required when the sequence is ahead; for that
1997             // it will be fixed later.
1998             if (!p)
1999             {
2000                 p = &(m_group_positions[id] = ReadPos { mctrl.pktseq, {} });
2001             }
2002             else
2003             {
2004                 p->sequence = mctrl.pktseq;
2005             }
2006 
2007             if (m_group_seqno != -1)
2008             {
2009                 // Now we can safely check it.
2010                 int seqdiff = CSeqNo::seqcmp(mctrl.pktseq, m_group_seqno);
2011 
2012                 if (seqdiff <= 0)
2013                 {
2014                     if (fi == 1)
2015                     {
2016                         Verb() << "(@" << id << " FLUSH:" << VerbNoEOL;
2017                         fi = 0;
2018                     }
2019 
2020                     Verb() << "." << VerbNoEOL;
2021 
2022                     // The sequence is recorded, the packet has to be discarded.
2023                     // That's all.
2024                     continue;
2025                 }
2026 
2027                 // Finish flush reporting if fallen into here
2028                 if (fi == 0)
2029                 {
2030                     Verb() << ". %" << mctrl.pktseq << " " << (-seqdiff) << ")";
2031                     fi = 1;
2032                 }
2033 
2034                 // Now we have only two possibilities:
2035                 // seqdiff == 1: The very next sequence, we want to read and return the packet.
2036                 // seqdiff > 1: The packet is ahead - record the ahead packet, but continue with the others.
2037 
2038                 if (seqdiff > 1)
2039                 {
2040                     Verb() << "@" << id << " %" << mctrl.pktseq << " AHEAD";
2041                     p->packet = move(data);
2042                     break; // Don't read from that socket anymore.
2043                 }
2044             }
2045 
2046             // We have seqdiff = 1, or we simply have the very first packet
2047             // which's sequence is taken as a good deal. Update the sequence
2048             // and record output.
2049 
2050             if (!output.empty())
2051             {
2052                 Verb() << "@" << id << " %" << mctrl.pktseq << " REDUNDANT";
2053                 break;
2054             }
2055 
2056 
2057             Verb() << "@" << id << " %" << mctrl.pktseq << " DELIVERING";
2058             output = move(data);
2059 
2060             // Record, but do not update yet, until all sockets are handled.
2061             next_seq = mctrl.pktseq;
2062             break;
2063         }
2064     }
2065 
2066     // ready_len is only the length of currently reported
2067     // ready sockets, NOT NECESSARILY containing all sockets from the group.
2068     if (broken.size() == size)
2069     {
2070         // All broken
2071         Error("All sockets broken");
2072     }
2073 
2074     if (Verbose::on && !broken.empty())
2075     {
2076         Verb() << "BROKEN: " << Printable(broken) << " - removing";
2077     }
2078 
2079     // Now remove all broken sockets from aheads, if any.
2080     // Even if they have already delivered a packet.
2081     for (SRTSOCKET d: broken)
2082     {
2083         m_group_positions.erase(d);
2084         srt_close(d);
2085     }
2086 
2087     // May be required to be re-read.
2088     broken.clear();
2089 
2090     if (!output.empty())
2091     {
2092         // We have extracted something, meaning that we have the sequence shift.
2093         // Update it now and don't do anything else with the sockets.
2094 
2095         // Sanity check
2096         if (next_seq == -1)
2097         {
2098             Error("IPE: next_seq not set after output extracted!");
2099         }
2100         m_group_seqno = next_seq;
2101         return output;
2102     }
2103 
2104     // Check if we have any sockets left :D
2105 
2106     // Here we surely don't have any more HORSES,
2107     // only ELEPHANTS and KANGAROOS. Qualify them and
2108     // attempt to at least take advantage of KANGAROOS.
2109 
2110     // In this position all links are either:
2111     // - updated to the current position
2112     // - updated to the newest possible possition available
2113     // - not yet ready for extraction (not present in the group)
2114 
2115     // If we haven't extracted the very next sequence position,
2116     // it means that we might only have the ahead packets read,
2117     // that is, the next sequence has been dropped by all links.
2118 
2119     if (!m_group_positions.empty())
2120     {
2121         // This might notify both lingering links, which didn't
2122         // deliver the required sequence yet, and links that have
2123         // the sequence ahead. Review them, and if you find at
2124         // least one packet behind, just wait for it to be ready.
2125         // Use again the waiting function because we don't want
2126         // the general waiting procedure to skip others.
2127         set<SRTSOCKET> elephants;
2128 
2129         // const because it's `typename decltype(m_group_positions)::value_type`
2130         pair<const SRTSOCKET, ReadPos>* slowest_kangaroo = nullptr;
2131 
2132         for (auto& sock_rp: m_group_positions)
2133         {
2134             // NOTE that m_group_seqno in this place wasn't updated
2135             // because we haven't successfully extracted anything.
2136             int seqdiff = CSeqNo::seqcmp(sock_rp.second.sequence, m_group_seqno);
2137             if (seqdiff < 0)
2138             {
2139                 elephants.insert(sock_rp.first);
2140             }
2141             // If seqdiff == 0, we have a socket ON TRACK.
2142             else if (seqdiff > 0)
2143             {
2144                 if (!slowest_kangaroo)
2145                 {
2146                     slowest_kangaroo = &sock_rp;
2147                 }
2148                 else
2149                 {
2150                     // Update to find the slowest kangaroo.
2151                     int seqdiff = CSeqNo::seqcmp(slowest_kangaroo->second.sequence, sock_rp.second.sequence);
2152                     if (seqdiff > 0)
2153                     {
2154                         slowest_kangaroo = &sock_rp;
2155                     }
2156                 }
2157             }
2158         }
2159 
2160         // Note that if no "slowest_kangaroo" was found, it means
2161         // that we don't have kangaroos.
2162         if (slowest_kangaroo)
2163         {
2164             // We have a slowest kangaroo. Elephants must be ignored.
2165             // Best case, they will get revived, worst case they will be
2166             // soon broken.
2167             //
2168             // As we already have the packet delivered by the slowest
2169             // kangaroo, we can simply return it.
2170 
2171             m_group_seqno = slowest_kangaroo->second.sequence;
2172             Verb() << "@" << slowest_kangaroo->first << " %" << m_group_seqno << " KANGAROO->HORSE";
2173             swap(output, slowest_kangaroo->second.packet);
2174             return output;
2175         }
2176 
2177         // Here ALL LINKS ARE ELEPHANTS, stating that we still have any.
2178         if (Verbose::on)
2179         {
2180             if (!elephants.empty())
2181             {
2182                 // If we don't have kangaroos, then simply reattempt to
2183                 // poll all elephants again anyway (at worst they are all
2184                 // broken and we'll learn about it soon).
2185                 Verb() << "ALL LINKS ELEPHANTS. Re-polling.";
2186             }
2187             else
2188             {
2189                 Verb() << "ONLY BROKEN WERE REPORTED. Re-polling.";
2190             }
2191         }
2192         goto RETRY_READING;
2193     }
2194 
2195     // We have checked so far only links that were ready to poll.
2196     // Links that are not ready should be re-checked.
2197     // Links that were not ready at the entrance should be checked
2198     // separately, and probably here is the best moment to do it.
2199     // After we make sure that at least one link is ready, we can
2200     // reattempt to read a packet from it.
2201 
2202     // Ok, so first collect all sockets that are in
2203     // connecting state, make a poll for connection.
2204     srt_epoll_clear_usocks(srt_epoll);
2205     bool have_connectors = false, have_ready = false;
2206     for (auto& d: m_group_data)
2207     {
2208         if (d.status < SRTS_CONNECTED)
2209         {
2210             // Not sure anymore if IN or OUT signals the connect-readiness,
2211             // but no matter. The signal will be cleared once it is used,
2212             // while it will be always on when there's anything ready to read.
2213             int modes = SRT_EPOLL_IN | SRT_EPOLL_OUT;
2214             srt_epoll_add_usock(srt_epoll, d.id, &modes);
2215             have_connectors = true;
2216         }
2217         else if (d.status == SRTS_CONNECTED)
2218         {
2219             have_ready = true;
2220         }
2221     }
2222 
2223     if (have_ready || have_connectors)
2224     {
2225         Verb() << "(still have: " << (have_ready ? "+" : "-") << "ready, "
2226             << (have_connectors ? "+" : "-") << "conenctors).";
2227         goto RETRY_READING;
2228     }
2229 
2230     if (have_ready)
2231     {
2232         Verb() << "(connected in the meantime)";
2233         // Some have connected in the meantime, don't
2234         // waste time on the pending ones.
2235         goto RETRY_READING;
2236     }
2237 
2238     if (have_connectors)
2239     {
2240         Verb() << "(waiting for pending connectors to connect)";
2241         // Wait here for them to be connected.
2242         vector<SRTSOCKET> sready;
2243         sready.resize(m_group_data.size());
2244         int ready_len = m_group_data.size();
2245         if (srt_epoll_wait(srt_epoll, sready.data(), &ready_len, 0, 0, -1, 0, 0, 0, 0) == SRT_ERROR)
2246         {
2247             Error("All sockets in the group disconnected");
2248         }
2249 
2250         goto RETRY_READING;
2251     }
2252 
2253     Error("No data extracted");
2254     return output; // Just a marker - this above function throws an exception
2255 }
2256 
2257 #endif
2258 
Read(size_t chunk)2259 MediaPacket SrtSource::Read(size_t chunk)
2260 {
2261     static size_t counter = 1;
2262 
2263     bool have_group SRT_ATR_UNUSED = !m_group_nodes.empty();
2264 
2265     bytevector data(chunk);
2266     // EXPERIMENTAL
2267 #ifdef SRT_OLD_APP_READER
2268     if (have_group || m_listener_group)
2269     {
2270         data = GroupRead(chunk);
2271     }
2272 
2273     if (have_group)
2274     {
2275         // This is to be done for caller mode only
2276         UpdateGroupStatus(m_group_data.data(), m_group_data.size());
2277     }
2278 #else
2279 
2280     SRT_MSGCTRL mctrl = srt_msgctrl_default;
2281     bool ready = true;
2282     int stat;
2283 
2284     do
2285     {
2286 #if ENABLE_EXPERIMENTAL_BONDING
2287         if (have_group || m_listener_group)
2288         {
2289             mctrl.grpdata = m_group_data.data();
2290             mctrl.grpdata_size = m_group_data.size();
2291         }
2292 #endif
2293 
2294         if (::transmit_int_state)
2295             Error("srt_recvmsg2: interrupted");
2296 
2297         ::transmit_throw_on_interrupt = true;
2298         stat = srt_recvmsg2(m_sock, data.data(), chunk, &mctrl);
2299         ::transmit_throw_on_interrupt = false;
2300         if (stat != SRT_ERROR)
2301         {
2302             ready = true;
2303         }
2304         else
2305         {
2306             int syserr = 0;
2307             int err = srt_getlasterror(&syserr);
2308 
2309             if (!m_blocking_mode)
2310             {
2311                 // EAGAIN for SRT READING
2312                 if (err == SRT_EASYNCRCV)
2313                 {
2314 Epoll_again:
2315                     Verb() << "AGAIN: - waiting for data by epoll(" << srt_epoll << ")...";
2316                     // Poll on this descriptor until reading is available, indefinitely.
2317                     int len = 2;
2318                     SRT_EPOLL_EVENT sready[2];
2319                     len = srt_epoll_uwait(srt_epoll, sready, len, -1);
2320                     if (len != -1)
2321                     {
2322                         Verb() << "... epoll reported ready " << len << " sockets";
2323                         // If the event was SRT_EPOLL_UPDATE, report it, and still wait.
2324 
2325                         bool any_read_ready = false;
2326                         vector<int> errored;
2327                         for (int i = 0; i < len; ++i)
2328                         {
2329                             if (sready[i].events & SRT_EPOLL_UPDATE)
2330                             {
2331                                 Verb() << "... [BROKEN CONNECTION reported on @" << sready[i].fd << "]";
2332                             }
2333 
2334                             if (sready[i].events & SRT_EPOLL_IN)
2335                                 any_read_ready = true;
2336 
2337                             if (sready[i].events & SRT_EPOLL_ERR)
2338                             {
2339                                 errored.push_back(sready[i].fd);
2340                             }
2341                         }
2342 
2343                         if (!any_read_ready)
2344                         {
2345                             Verb() << " ... [NOT READ READY - AGAIN (" << errored.size() << " errored: " << Printable(errored) << ")]";
2346                             goto Epoll_again;
2347                         }
2348 
2349                         continue;
2350                     }
2351                     // If was -1, then passthru.
2352                 }
2353             }
2354             else
2355             {
2356                 // In blocking mode it uses a minimum of 1s timeout,
2357                 // and continues only if interrupt not requested.
2358                 if (!::transmit_int_state && (err == SRT_EASYNCRCV || err == SRT_ETIMEOUT))
2359                 {
2360                     ready = false;
2361                     continue;
2362                 }
2363             }
2364             Error("srt_recvmsg2");
2365         }
2366 
2367         if (stat == 0)
2368         {
2369             throw ReadEOF(hostport_copy);
2370         }
2371 #if PLEASE_LOG
2372         extern srt_logging::Logger applog;
2373         LOGC(applog.Debug, log << "recv: #" << mctrl.msgno << " %" << mctrl.pktseq << "  "
2374                 << BufferStamp(data.data(), stat) << " BELATED: " << ((CTimer::getTime()-mctrl.srctime)/1000.0) << "ms");
2375 #endif
2376 
2377         Verb() << "(#" << mctrl.msgno << " %" << mctrl.pktseq << "  " << BufferStamp(data.data(), stat) << ") " << VerbNoEOL;
2378     }
2379     while (!ready);
2380 
2381     chunk = size_t(stat);
2382     if (chunk < data.size())
2383         data.resize(chunk);
2384 
2385     const bool need_bw_report    = transmit_bw_report    && int(counter % transmit_bw_report) == transmit_bw_report - 1;
2386     const bool need_stats_report = transmit_stats_report && counter % transmit_stats_report == transmit_stats_report - 1;
2387 
2388 #if ENABLE_EXPERIMENTAL_BONDING
2389     if (have_group) // Means, group with caller mode
2390     {
2391         UpdateGroupStatus(mctrl.grpdata, mctrl.grpdata_size);
2392         if (transmit_stats_writer && (need_stats_report || need_bw_report))
2393         {
2394             PrintSrtStats(m_sock, need_stats_report, need_bw_report, need_stats_report);
2395             for (size_t i = 0; i < mctrl.grpdata_size; ++i)
2396                 PrintSrtStats(mctrl.grpdata[i].id, need_stats_report, need_bw_report, need_stats_report);
2397         }
2398     }
2399     else
2400 #endif
2401     {
2402         if (transmit_stats_writer && (need_stats_report || need_bw_report))
2403         {
2404             PrintSrtStats(m_sock, need_stats_report, need_bw_report, need_stats_report);
2405         }
2406     }
2407 #endif
2408 
2409     ++counter;
2410 
2411     return MediaPacket(data, mctrl.srctime);
2412 }
2413 
SrtTarget(std::string host,int port,std::string path,const std::map<std::string,std::string> & par)2414 SrtTarget::SrtTarget(std::string host, int port, std::string path, const std::map<std::string,std::string>& par)
2415 {
2416     Init(host, port, path, par, SRT_EPOLL_OUT);
2417 }
2418 
2419 
ConfigurePre(SRTSOCKET sock)2420 int SrtTarget::ConfigurePre(SRTSOCKET sock)
2421 {
2422     int result = SrtCommon::ConfigurePre(sock);
2423     if (result == -1)
2424         return result;
2425 
2426     int yes = 1;
2427     // This is for the HSv4 compatibility; if both parties are HSv5
2428     // (min. version 1.2.1), then this setting simply does nothing.
2429     // In HSv4 this setting is obligatory; otherwise the SRT handshake
2430     // extension will not be done at all.
2431     result = srt_setsockopt(sock, 0, SRTO_SENDER, &yes, sizeof yes);
2432     if (result == -1)
2433         return result;
2434 
2435     return 0;
2436 }
2437 
Write(const MediaPacket & data)2438 void SrtTarget::Write(const MediaPacket& data)
2439 {
2440     static int counter = 1;
2441     ::transmit_throw_on_interrupt = true;
2442 
2443     // Check first if it's ready to write.
2444     // If not, wait indefinitely.
2445     if (!m_blocking_mode)
2446     {
2447 Epoll_again:
2448         int len = 2;
2449         SRT_EPOLL_EVENT sready[2];
2450         len = srt_epoll_uwait(srt_epoll, sready, len, -1);
2451         if (len != -1)
2452         {
2453             bool any_write_ready = false;
2454             for (int i = 0; i < len; ++i)
2455             {
2456                 if (sready[i].events & SRT_EPOLL_UPDATE)
2457                 {
2458                     Verb() << "... [BROKEN CONNECTION reported on @" << sready[i].fd << "]";
2459                 }
2460 
2461                 if (sready[i].events & SRT_EPOLL_OUT)
2462                     any_write_ready = true;
2463             }
2464 
2465             if (!any_write_ready)
2466             {
2467                 Verb() << " ... [NOT WRITE READY - AGAIN]";
2468                 goto Epoll_again;
2469             }
2470 
2471             // Pass on, write ready.
2472         }
2473         else
2474         {
2475             Error("srt_epoll_uwait");
2476         }
2477     }
2478 
2479     SRT_MSGCTRL mctrl = srt_msgctrl_default;
2480 #if ENABLE_EXPERIMENTAL_BONDING
2481     bool have_group = !m_group_nodes.empty();
2482     if (have_group || m_listener_group)
2483     {
2484         mctrl.grpdata = m_group_data.data();
2485         mctrl.grpdata_size = m_group_data.size();
2486     }
2487 #endif
2488 
2489     if (transmit_use_sourcetime)
2490     {
2491         mctrl.srctime = data.time;
2492     }
2493 
2494     int stat = srt_sendmsg2(m_sock, data.payload.data(), data.payload.size(), &mctrl);
2495 
2496     // For a socket group, the error is reported only
2497     // if ALL links from the group have failed to perform
2498     // the operation. If only one did, the result will be
2499     // visible in the status array.
2500     if (stat == SRT_ERROR)
2501         Error("srt_sendmsg");
2502     ::transmit_throw_on_interrupt = false;
2503 
2504     const bool need_bw_report    = transmit_bw_report    && int(counter % transmit_bw_report) == transmit_bw_report - 1;
2505     const bool need_stats_report = transmit_stats_report && counter % transmit_stats_report == transmit_stats_report - 1;
2506 
2507 #if ENABLE_EXPERIMENTAL_BONDING
2508     if (have_group)
2509     {
2510         // For listener group this is not necessary. The group information
2511         // is updated in mctrl.
2512         UpdateGroupStatus(mctrl.grpdata, mctrl.grpdata_size);
2513         if (transmit_stats_writer && (need_stats_report || need_bw_report))
2514         {
2515             PrintSrtStats(m_sock, need_stats_report, need_bw_report, need_stats_report);
2516             for (size_t i = 0; i < mctrl.grpdata_size; ++i)
2517                 PrintSrtStats(mctrl.grpdata[i].id, need_stats_report, need_bw_report, need_stats_report);
2518         }
2519     }
2520     else
2521 #endif
2522     {
2523         if (transmit_stats_writer && (need_stats_report || need_bw_report))
2524         {
2525             PrintSrtStats(m_sock, need_stats_report, need_bw_report, need_stats_report);
2526         }
2527     }
2528 
2529     Verb() << "(#" << mctrl.msgno << " %" << mctrl.pktseq << "  " << BufferStamp(data.payload.data(), data.payload.size()) << ") " << VerbNoEOL;
2530 
2531     ++counter;
2532 }
2533 
SrtRelay(std::string host,int port,std::string path,const std::map<std::string,std::string> & par)2534 SrtRelay::SrtRelay(std::string host, int port, std::string path, const std::map<std::string,std::string>& par)
2535 {
2536     Init(host, port, path, par, SRT_EPOLL_IN | SRT_EPOLL_OUT);
2537 }
2538 
SrtModel(string host,int port,map<string,string> par)2539 SrtModel::SrtModel(string host, int port, map<string,string> par)
2540 {
2541     InitParameters(host, "", par);
2542     if (m_mode == "caller")
2543         is_caller = true;
2544     else if (m_mode == "rendezvous")
2545         is_rend = true;
2546     else if (m_mode != "listener")
2547         throw std::invalid_argument("Wrong 'mode' attribute; expected: caller, listener, rendezvous");
2548 
2549     m_host = host;
2550     m_port = port;
2551 }
2552 
Establish(std::string & w_name)2553 void SrtModel::Establish(std::string& w_name)
2554 {
2555     // This does connect or accept.
2556     // When this returned true, the caller should create
2557     // a new SrtSource or SrtTaget then call StealFrom(*this) on it.
2558 
2559     // If this is a connector and the peer doesn't have a corresponding
2560     // medium, it should send back a single byte with value 0. This means
2561     // that agent should stop connecting.
2562 
2563     if (is_rend)
2564     {
2565         OpenRendezvous(m_adapter, m_host, m_port);
2566     }
2567     else if (is_caller)
2568     {
2569         // Establish a connection
2570 
2571         PrepareClient();
2572 
2573         if (w_name != "")
2574         {
2575             Verb() << "Connect with requesting stream [" << w_name << "]";
2576             srt::setstreamid(m_sock, w_name);
2577         }
2578         else
2579         {
2580             Verb() << "NO STREAM ID for SRT connection";
2581         }
2582 
2583         if (m_outgoing_port)
2584         {
2585             Verb() << "Setting outgoing port: " << m_outgoing_port;
2586             SetupAdapter("", m_outgoing_port);
2587         }
2588 
2589         ConnectClient(m_host, m_port);
2590 
2591         if (m_outgoing_port == 0)
2592         {
2593             // Must rely on a randomly selected one. Extract the port
2594             // so that it will be reused next time.
2595             sockaddr_any s(AF_INET);
2596             int namelen = s.size();
2597             if (srt_getsockname(Socket(), (s.get()), (&namelen)) == SRT_ERROR)
2598             {
2599                 Error("srt_getsockname");
2600             }
2601 
2602             m_outgoing_port = s.hport();
2603             Verb() << "Extracted outgoing port: " << m_outgoing_port;
2604         }
2605     }
2606     else
2607     {
2608         // Listener - get a socket by accepting.
2609         // Check if the listener is already created first
2610         if (Listener() == SRT_INVALID_SOCK)
2611         {
2612             Verb() << "Setting up listener: port=" << m_port << " backlog=5";
2613             PrepareListener(m_adapter, m_port, 5);
2614         }
2615 
2616         Verb() << "Accepting a client...";
2617         AcceptNewClient();
2618         // This rewrites m_sock with a new SRT socket ("accepted" socket)
2619         w_name = UDT::getstreamid(m_sock);
2620         Verb() << "... GOT CLIENT for stream [" << w_name << "]";
2621     }
2622 }
2623 
2624 
2625 template <class Iface> struct Srt;
2626 template <> struct Srt<Source> { typedef SrtSource type; };
2627 template <> struct Srt<Target> { typedef SrtTarget type; };
2628 template <> struct Srt<Relay> { typedef SrtRelay type; };
2629 
2630 template <class Iface>
CreateSrt(const string & host,int port,std::string path,const map<string,string> & par)2631 Iface* CreateSrt(const string& host, int port, std::string path, const map<string,string>& par)
2632 { return new typename Srt<Iface>::type (host, port, path, par); }
2633 
ConsoleRead(size_t chunk)2634 MediaPacket ConsoleRead(size_t chunk)
2635 {
2636     bytevector data(chunk);
2637     bool st = cin.read(data.data(), chunk).good();
2638     chunk = cin.gcount();
2639     if (chunk == 0 && !st)
2640         return bytevector();
2641 
2642     int64_t stime = 0;
2643     if (transmit_use_sourcetime)
2644         stime = srt_time_now();
2645 
2646     if (chunk < data.size())
2647         data.resize(chunk);
2648     if (data.empty())
2649         throw Source::ReadEOF("CONSOLE device");
2650 
2651     return MediaPacket(data, stime);
2652 }
2653 
2654 class ConsoleSource: public virtual Source
2655 {
2656 public:
2657 
ConsoleSource()2658     ConsoleSource()
2659     {
2660     }
2661 
Read(size_t chunk)2662     MediaPacket Read(size_t chunk) override
2663     {
2664         return ConsoleRead(chunk);
2665     }
2666 
IsOpen()2667     bool IsOpen() override { return cin.good(); }
End()2668     bool End() override { return cin.eof(); }
2669 };
2670 
2671 class ConsoleTarget: public virtual Target
2672 {
2673 public:
2674 
ConsoleTarget()2675     ConsoleTarget()
2676     {
2677     }
2678 
Write(const MediaPacket & data)2679     void Write(const MediaPacket& data) override
2680     {
2681         cout.write(data.payload.data(), data.payload.size());
2682     }
2683 
IsOpen()2684     bool IsOpen() override { return cout.good(); }
Broken()2685     bool Broken() override { return cout.eof(); }
2686 };
2687 
2688 class ConsoleRelay: public Relay, public ConsoleSource, public ConsoleTarget
2689 {
2690 public:
2691     ConsoleRelay() = default;
2692 
IsOpen()2693     bool IsOpen() override { return cin.good() && cout.good(); }
2694 };
2695 
2696 template <class Iface> struct Console;
2697 template <> struct Console<Source> { typedef ConsoleSource type; };
2698 template <> struct Console<Target> { typedef ConsoleTarget type; };
2699 template <> struct Console<Relay> { typedef ConsoleRelay type; };
2700 
2701 template <class Iface>
CreateConsole()2702 Iface* CreateConsole() { return new typename Console<Iface>::type (); }
2703 
2704 
2705 // More options can be added in future.
2706 SocketOption udp_options [] {
2707     { "iptos", IPPROTO_IP, IP_TOS, SocketOption::PRE, SocketOption::INT, nullptr },
2708     // IP_TTL and IP_MULTICAST_TTL are handled separately by a common option, "ttl".
2709     { "mcloop", IPPROTO_IP, IP_MULTICAST_LOOP, SocketOption::PRE, SocketOption::INT, nullptr }
2710 };
2711 
IsMulticast(in_addr adr)2712 static inline bool IsMulticast(in_addr adr)
2713 {
2714     unsigned char* abytes = (unsigned char*)&adr.s_addr;
2715     unsigned char c = abytes[0];
2716     return c >= 224 && c <= 239;
2717 }
2718 
Setup(string host,int port,map<string,string> attr)2719 void UdpCommon::Setup(string host, int port, map<string,string> attr)
2720 {
2721     m_sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
2722     if (m_sock == -1)
2723         Error(SysError(), "UdpCommon::Setup: socket");
2724 
2725     int yes = 1;
2726     ::setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&yes, sizeof yes);
2727 
2728     sadr = CreateAddr(host, port);
2729 
2730     bool is_multicast = false;
2731     if (sadr.family() == AF_INET)
2732     {
2733         if (attr.count("multicast"))
2734         {
2735             if (!IsMulticast(sadr.sin.sin_addr))
2736             {
2737                 throw std::runtime_error("UdpCommon: requested multicast for a non-multicast-type IP address");
2738             }
2739             is_multicast = true;
2740         }
2741         else if (IsMulticast(sadr.sin.sin_addr))
2742         {
2743             is_multicast = true;
2744         }
2745 
2746         if (is_multicast)
2747         {
2748             ip_mreq_source mreq_ssm;
2749             ip_mreq mreq;
2750             sockaddr_any maddr;
2751             int opt_name;
2752             void* mreq_arg_ptr;
2753             socklen_t mreq_arg_size;
2754 
2755             adapter = attr.count("adapter") ? attr.at("adapter") : string();
2756             if (adapter == "")
2757             {
2758                 Verb() << "Multicast: home address: INADDR_ANY:" << port;
2759                 maddr.sin.sin_family = AF_INET;
2760                 maddr.sin.sin_addr.s_addr = htonl(INADDR_ANY);
2761                 maddr.sin.sin_port = htons(port); // necessary for temporary use
2762             }
2763             else
2764             {
2765                 Verb() << "Multicast: home address: " << adapter << ":" << port;
2766                 maddr = CreateAddr(adapter, port);
2767             }
2768 
2769             if (attr.count("source"))
2770             {
2771                 /* this is an ssm.  we need to use the right struct and opt */
2772                 opt_name = IP_ADD_SOURCE_MEMBERSHIP;
2773                 mreq_ssm.imr_multiaddr.s_addr = sadr.sin.sin_addr.s_addr;
2774                 mreq_ssm.imr_interface.s_addr = maddr.sin.sin_addr.s_addr;
2775                 inet_pton(AF_INET, attr.at("source").c_str(), &mreq_ssm.imr_sourceaddr);
2776                 mreq_arg_size = sizeof(mreq_ssm);
2777                 mreq_arg_ptr = &mreq_ssm;
2778             }
2779             else
2780             {
2781                 opt_name = IP_ADD_MEMBERSHIP;
2782                 mreq.imr_multiaddr.s_addr = sadr.sin.sin_addr.s_addr;
2783                 mreq.imr_interface.s_addr = maddr.sin.sin_addr.s_addr;
2784                 mreq_arg_size = sizeof(mreq);
2785                 mreq_arg_ptr = &mreq;
2786             }
2787 
2788 #ifdef _WIN32
2789             const char* mreq_arg = (const char*)mreq_arg_ptr;
2790             const auto status_error = SOCKET_ERROR;
2791 #else
2792             const void* mreq_arg = mreq_arg_ptr;
2793             const auto status_error = -1;
2794 #endif
2795 
2796 #if defined(_WIN32) || defined(__CYGWIN__)
2797             // On Windows it somehow doesn't work when bind()
2798             // is called with multicast address. Write the address
2799             // that designates the network device here.
2800             // Also, sets port sharing when working with multicast
2801             sadr = maddr;
2802             int reuse = 1;
2803             int shareAddrRes = setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<const char*>(&reuse), sizeof(reuse));
2804             if (shareAddrRes == status_error)
2805             {
2806                 throw runtime_error("marking socket for shared use failed");
2807             }
2808             Verb() << "Multicast(Windows): will bind to home address";
2809 #else
2810             Verb() << "Multicast(POSIX): will bind to IGMP address: " << host;
2811 #endif
2812             int res = setsockopt(m_sock, IPPROTO_IP, opt_name, mreq_arg, mreq_arg_size);
2813 
2814             if (res == status_error)
2815             {
2816                 Error(errno, "adding to multicast membership failed");
2817             }
2818 
2819             attr.erase("multicast");
2820             attr.erase("adapter");
2821         }
2822     }
2823 
2824     // The "ttl" options is handled separately, it maps to both IP_TTL
2825     // and IP_MULTICAST_TTL so that TTL setting works for both uni- and multicast.
2826     if (attr.count("ttl"))
2827     {
2828         int ttl = stoi(attr.at("ttl"));
2829         int res = setsockopt(m_sock, IPPROTO_IP, IP_TTL, (const char*)&ttl, sizeof ttl);
2830         if (res == -1)
2831             Verb() << "WARNING: failed to set 'ttl' (IP_TTL) to " << ttl;
2832         res = setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_TTL, (const char*)&ttl, sizeof ttl);
2833         if (res == -1)
2834             Verb() << "WARNING: failed to set 'ttl' (IP_MULTICAST_TTL) to " << ttl;
2835 
2836         attr.erase("ttl");
2837     }
2838 
2839     m_options = attr;
2840 
2841     for (auto o: udp_options)
2842     {
2843         // Ignore "binding" - for UDP there are no post options.
2844         if (m_options.count(o.name))
2845         {
2846             string value = m_options.at(o.name);
2847             bool ok = o.apply<SocketOption::SYSTEM>(m_sock, value);
2848             if (!ok)
2849                 Verb() << "WARNING: failed to set '" << o.name << "' to " << value;
2850         }
2851     }
2852 }
2853 
Error(int err,string src)2854 void UdpCommon::Error(int err, string src)
2855 {
2856     char buf[512];
2857     string message = SysStrError(err, buf, 512u);
2858 
2859     if (Verbose::on)
2860         Verb() << "FAILURE\n" << src << ": [" << err << "] " << message;
2861     else
2862         cerr << "\nERROR #" << err << ": " << message << endl;
2863 
2864     throw TransmissionError("error: " + src + ": " + message);
2865 }
2866 
~UdpCommon()2867 UdpCommon::~UdpCommon()
2868 {
2869 #ifdef _WIN32
2870     if (m_sock != -1)
2871     {
2872         shutdown(m_sock, SD_BOTH);
2873         closesocket(m_sock);
2874         m_sock = -1;
2875     }
2876 #else
2877     close(m_sock);
2878 #endif
2879 }
2880 
UdpSource(string host,int port,const map<string,string> & attr)2881 UdpSource::UdpSource(string host, int port, const map<string,string>& attr)
2882 {
2883     Setup(host, port, attr);
2884     int stat = ::bind(m_sock, sadr.get(), sadr.size());
2885     if (stat == -1)
2886         Error(SysError(), "Binding address for UDP");
2887     eof = false;
2888     struct timeval tv;
2889     tv.tv_sec = 1;
2890     tv.tv_usec = 0;
2891     if (::setsockopt(m_sock, SOL_SOCKET, SO_RCVTIMEO, (const char*) &tv, sizeof(tv)) < 0)
2892         Error(SysError(), "Setting timeout for UDP");
2893 }
2894 
Read(size_t chunk)2895 MediaPacket UdpSource::Read(size_t chunk)
2896 {
2897     bytevector data(chunk);
2898     sockaddr_any sa(sadr.family());
2899     int64_t srctime = 0;
2900 AGAIN:
2901     int stat = recvfrom(m_sock, data.data(), (int) chunk, 0, sa.get(), &sa.syslen());
2902     int err = SysError();
2903     if (transmit_use_sourcetime)
2904     {
2905         srctime = srt_time_now();
2906     }
2907     if (stat == -1)
2908     {
2909         if (!::transmit_int_state && err == SysAGAIN)
2910             goto AGAIN;
2911 
2912         Error(SysError(), "UDP Read/recvfrom");
2913     }
2914 
2915     if (stat < 1)
2916     {
2917         eof = true;
2918         return bytevector();
2919     }
2920 
2921     chunk = size_t(stat);
2922     if (chunk < data.size())
2923         data.resize(chunk);
2924 
2925     return MediaPacket(data, srctime);
2926 }
2927 
UdpTarget(string host,int port,const map<string,string> & attr)2928 UdpTarget::UdpTarget(string host, int port, const map<string,string>& attr)
2929 {
2930     Setup(host, port, attr);
2931     if (adapter != "")
2932     {
2933         auto maddr = CreateAddr(adapter, 0);
2934         in_addr addr = maddr.sin.sin_addr;
2935 
2936         int res = setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_IF, reinterpret_cast<const char*>(&addr), sizeof(addr));
2937         if (res == -1)
2938         {
2939             Error(SysError(), "setsockopt/IP_MULTICAST_IF: " + adapter);
2940         }
2941     }
2942 }
2943 
Write(const MediaPacket & data)2944 void UdpTarget::Write(const MediaPacket& data)
2945 {
2946     int stat = sendto(m_sock, data.payload.data(), data.payload.size(), 0, (sockaddr*)&sadr, sizeof sadr);
2947     if (stat == -1)
2948         Error(SysError(), "UDP Write/sendto");
2949 }
2950 
2951 
2952 template <class Iface> struct Udp;
2953 template <> struct Udp<Source> { typedef UdpSource type; };
2954 template <> struct Udp<Target> { typedef UdpTarget type; };
2955 template <> struct Udp<Relay> { typedef UdpRelay type; };
2956 
2957 template <class Iface>
CreateUdp(const string & host,int port,const map<string,string> & par)2958 Iface* CreateUdp(const string& host, int port, const map<string,string>& par) { return new typename Udp<Iface>::type (host, port, par); }
2959 
2960 template<class Base>
IsOutput()2961 inline bool IsOutput() { return false; }
2962 
2963 template<>
IsOutput()2964 inline bool IsOutput<Target>() { return true; }
2965 
2966 template <class Base>
CreateMedium(const string & uri)2967 extern unique_ptr<Base> CreateMedium(const string& uri)
2968 {
2969     unique_ptr<Base> ptr;
2970 
2971     UriParser u(uri);
2972 
2973     int iport = 0;
2974     switch ( u.type() )
2975     {
2976     default:
2977         break; // do nothing, return nullptr
2978     case UriParser::FILE:
2979         if (u.host() == "con" || u.host() == "console")
2980         {
2981             if ( IsOutput<Base>() && (
2982                         (Verbose::on && Verbose::cverb == &cout)
2983                         || transmit_bw_report || transmit_stats_report) )
2984             {
2985                 cerr << "ERROR: file://con with -v or -r or -s would result in mixing the data and text info.\n";
2986                 cerr << "ERROR: HINT: you can stream through a FIFO (named pipe)\n";
2987                 throw invalid_argument("incorrect parameter combination");
2988             }
2989             ptr.reset( CreateConsole<Base>() );
2990         }
2991         else
2992             ptr.reset( CreateFile<Base>(u.path()));
2993         break;
2994 
2995     case UriParser::SRT:
2996         ptr.reset( CreateSrt<Base>(u.host(), u.portno(), u.path(), u.parameters()) );
2997         break;
2998 
2999 
3000     case UriParser::UDP:
3001         iport = atoi(u.port().c_str());
3002         if (iport < 1024)
3003         {
3004             cerr << "Port value invalid: " << iport << " - must be >=1024\n";
3005             throw invalid_argument("Invalid port number");
3006         }
3007         ptr.reset( CreateUdp<Base>(u.host(), iport, u.parameters()) );
3008         break;
3009     }
3010 
3011     if (ptr)
3012         ptr->uri = move(u);
3013     return ptr;
3014 }
3015 
3016 
Create(const std::string & url)3017 std::unique_ptr<Source> Source::Create(const std::string& url)
3018 {
3019     return CreateMedium<Source>(url);
3020 }
3021 
Create(const std::string & url)3022 std::unique_ptr<Target> Target::Create(const std::string& url)
3023 {
3024     return CreateMedium<Target>(url);
3025 }
3026