1 /*
2 * SRT - Secure, Reliable, Transport
3 * Copyright (c) 2018 Haivision Systems Inc.
4 *
5 * This Source Code Form is subject to the terms of the Mozilla Public
6 * License, v. 2.0. If a copy of the MPL was not distributed with this
7 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 *
9 */
10
11 // Medium concretizations
12
13 // Just for formality. This file should be used
14 #include <iostream>
15 #include <fstream>
16 #include <sstream>
17 #include <string>
18 #include <stdexcept>
19 #include <iterator>
20 #include <map>
21 #include <chrono>
22 #include <thread>
23 #include <atomic>
24 #include <srt.h>
25 #if !defined(_WIN32)
26 #include <sys/ioctl.h>
27 #endif
28
29 // SRT protected includes
30 #include "netinet_any.h"
31 #include "common.h"
32 #include "api.h"
33 #include "udt.h"
34 #include "logging.h"
35 #include "utilities.h"
36
37 #include "apputil.hpp"
38 #include "socketoptions.hpp"
39 #include "uriparser.hpp"
40 #include "testmedia.hpp"
41 #include "srt_compat.h"
42 #include "verbose.hpp"
43
44 using namespace std;
45
46 using srt_logging::KmStateStr;
47 using srt_logging::SockStatusStr;
48 #if ENABLE_EXPERIMENTAL_BONDING
49 using srt_logging::MemberStatusStr;
50 #endif
51
52 std::atomic<bool> transmit_throw_on_interrupt {false};
53 std::atomic<bool> transmit_int_state {false};
54 int transmit_bw_report = 0;
55 unsigned transmit_stats_report = 0;
56 size_t transmit_chunk_size = SRT_LIVE_DEF_PLSIZE;
57 bool transmit_printformat_json = false;
58 srt_listen_callback_fn* transmit_accept_hook_fn = nullptr;
59 void* transmit_accept_hook_op = nullptr;
60 bool transmit_use_sourcetime = false;
61 int transmit_retry_connect = 0;
62 bool transmit_retry_always = false;
63
64 // Do not unblock. Copy this to an app that uses applog and set appropriate name.
65 //srt_logging::Logger applog(SRT_LOGFA_APP, srt_logger_config, "srt-test");
66
67 std::shared_ptr<SrtStatsWriter> transmit_stats_writer;
68
DirectionName(SRT_EPOLL_T direction)69 string DirectionName(SRT_EPOLL_T direction)
70 {
71 string dir_name;
72 if (direction & ~SRT_EPOLL_ERR)
73 {
74 if (direction & SRT_EPOLL_IN)
75 {
76 dir_name = "source";
77 }
78
79 if (direction & SRT_EPOLL_OUT)
80 {
81 if (!dir_name.empty())
82 dir_name = "relay";
83 else
84 dir_name = "target";
85 }
86
87 if (direction & SRT_EPOLL_ERR)
88 {
89 dir_name += "+error";
90 }
91 }
92 else
93 {
94 // stupid name for a case of IPE
95 dir_name = "stone";
96 }
97
98 return dir_name;
99 }
100
101 template<class FileBase> inline
FileRead(FileBase & ifile,size_t chunk,const string & filename)102 bytevector FileRead(FileBase& ifile, size_t chunk, const string& filename)
103 {
104 bytevector data(chunk);
105 ifile.read(data.data(), chunk);
106 size_t nread = ifile.gcount();
107 if (nread < data.size())
108 data.resize(nread);
109
110 if (data.empty())
111 throw Source::ReadEOF(filename);
112 return data;
113 }
114
115
116 class FileSource: public virtual Source
117 {
118 ifstream ifile;
119 string filename_copy;
120 public:
121
FileSource(const string & path)122 FileSource(const string& path): ifile(path, ios::in | ios::binary), filename_copy(path)
123 {
124 if (!ifile)
125 throw std::runtime_error(path + ": Can't open file for reading");
126 }
127
Read(size_t chunk)128 MediaPacket Read(size_t chunk) override { return FileRead(ifile, chunk, filename_copy); }
129
IsOpen()130 bool IsOpen() override { return bool(ifile); }
End()131 bool End() override { return ifile.eof(); }
132 //~FileSource() { ifile.close(); }
133 };
134
135 #ifdef PLEASE_LOG
136 #include "logging.h"
137 #endif
138
139 class FileTarget: public virtual Target
140 {
141 ofstream ofile;
142 public:
143
FileTarget(const string & path)144 FileTarget(const string& path): ofile(path, ios::out | ios::trunc | ios::binary) {}
145
Write(const MediaPacket & data)146 void Write(const MediaPacket& data) override
147 {
148 ofile.write(data.payload.data(), data.payload.size());
149 #ifdef PLEASE_LOG
150 applog.Debug() << "FileTarget::Write: " << data.size() << " written to a file";
151 #endif
152 }
153
IsOpen()154 bool IsOpen() override { return !!ofile; }
Broken()155 bool Broken() override { return !ofile.good(); }
156 //~FileTarget() { ofile.close(); }
Close()157 void Close() override
158 {
159 #ifdef PLEASE_LOG
160 applog.Debug() << "FileTarget::Close";
161 #endif
162 ofile.close();
163 }
164 };
165
166 // Can't base this class on FileSource and FileTarget classes because they use two
167 // separate fields, which makes it unable to reliably define IsOpen(). This would
168 // require to use 'fstream' type field in some kind of FileCommon first. Not worth
169 // a shot.
170 class FileRelay: public Relay
171 {
172 fstream iofile;
173 string filename_copy;
174 public:
175
FileRelay(const string & path)176 FileRelay(const string& path):
177 iofile(path, ios::in | ios::out | ios::binary), filename_copy(path)
178 {
179 if (!iofile)
180 throw std::runtime_error(path + ": Can't open file for reading");
181 }
Read(size_t chunk)182 MediaPacket Read(size_t chunk) override { return FileRead(iofile, chunk, filename_copy); }
183
Write(const MediaPacket & data)184 void Write(const MediaPacket& data) override
185 {
186 iofile.write(data.payload.data(), data.payload.size());
187 }
188
IsOpen()189 bool IsOpen() override { return !!iofile; }
End()190 bool End() override { return iofile.eof(); }
Broken()191 bool Broken() override { return !iofile.good(); }
Close()192 void Close() override { iofile.close(); }
193 };
194
195 template <class Iface> struct File;
196 template <> struct File<Source> { typedef FileSource type; };
197 template <> struct File<Target> { typedef FileTarget type; };
198 template <> struct File<Relay> { typedef FileRelay type; };
199
200 template <class Iface>
CreateFile(const string & name)201 Iface* CreateFile(const string& name) { return new typename File<Iface>::type (name); }
202
InitParameters(string host,string path,map<string,string> par)203 void SrtCommon::InitParameters(string host, string path, map<string,string> par)
204 {
205 // Application-specific options: mode, blocking, timeout, adapter
206 if ( Verbose::on && !par.empty())
207 {
208 Verb() << "SRT parameters specified:\n";
209 for (map<string,string>::iterator i = par.begin(); i != par.end(); ++i)
210 {
211 Verb() << "\t" << i->first << " = '" << i->second << "'\n";
212 }
213 }
214
215 if (path != "")
216 {
217 // Special case handling of an unusual specification.
218
219 if (path.substr(0, 2) != "//")
220 {
221 Error("Path specification not supported for SRT (use // in front for special cases)");
222 }
223
224 path = path.substr(2);
225
226 #if ENABLE_EXPERIMENTAL_BONDING
227 if (path == "group")
228 {
229 // Group specified, check type.
230 m_group_type = par["type"];
231 if (m_group_type == "")
232 {
233 Error("With //group, the group 'type' must be specified.");
234 }
235
236 vector<string> parts;
237 Split(m_group_type, '/', back_inserter(parts));
238 if (parts.size() == 0 || parts.size() > 2)
239 {
240 Error("Invalid specification for 'type' parameter");
241 }
242
243 if (parts.size() == 2)
244 {
245 m_group_type = parts[0];
246 m_group_config = parts[1];
247 }
248
249 vector<string> nodes;
250 Split(par["nodes"], ',', back_inserter(nodes));
251
252 if (nodes.empty())
253 {
254 Error("With //group, 'nodes' must specify comma-separated host:port specs.");
255 }
256
257 int token = 1;
258
259 // Check if correctly specified
260 for (string& hostport: nodes)
261 {
262 if (hostport == "")
263 continue;
264
265 // The attribute string, as it was embedded in another URI,
266 // must have had replaced the & character with another ?, so
267 // now all ? character, except the first one, must be now
268 // restored so that UriParser interprets them correctly.
269
270 size_t atq = hostport.find('?');
271 if (atq != string::npos)
272 {
273 while (atq+1 < hostport.size())
274 {
275 size_t next = hostport.find('?', atq+1);
276 if (next == string::npos)
277 break;
278 hostport[next] = '&';
279 atq = next;
280 }
281 }
282
283 UriParser check(hostport, UriParser::EXPECT_HOST);
284 if (check.host() == "" || check.port() == "")
285 {
286 Error("With //group, 'nodes' must specify comma-separated host:port specs.");
287 }
288
289 if (check.portno() <= 1024)
290 {
291 Error("With //group, every node in 'nodes' must have port >1024");
292 }
293
294 Connection cc(check.host(), check.portno());
295 if (check.parameters().count("weight"))
296 {
297 cc.weight = stoi(check.queryValue("weight"));
298 }
299
300 if (check.parameters().count("source"))
301 {
302 UriParser sourcehp(check.queryValue("source"), UriParser::EXPECT_HOST);
303 cc.source = CreateAddr(sourcehp.host(), sourcehp.portno());
304 }
305
306 // Check if there's a key with 'srto.' prefix.
307
308 UriParser::query_it start = check.parameters().lower_bound("srto.");
309
310 SRT_SOCKOPT_CONFIG* config = nullptr;
311 bool all_clear = true;
312 vector<string> fails;
313 map<string, string> options;
314
315 if (start != check.parameters().end())
316 {
317 for (; start != check.parameters().end(); ++start)
318 {
319 auto& y = *start;
320 if (y.first.substr(0, 5) != "srto.")
321 break;
322
323 options[y.first.substr(5)] = y.second;
324 }
325 }
326
327 if (!options.empty())
328 {
329 config = srt_create_config();
330
331 for (auto o: srt_options)
332 {
333 if (!options.count(o.name))
334 continue;
335 string value = options.at(o.name);
336 bool ok = o.apply<SocketOption::SRT>(config, value);
337 if ( !ok )
338 {
339 fails.push_back(o.name);
340 all_clear = false;
341 }
342 }
343
344 if (!all_clear)
345 {
346 srt_delete_config(config);
347 Error("With //group, failed to set options: " + Printable(fails));
348 }
349
350 cc.options = config;
351 }
352
353 cc.token = token++;
354 m_group_nodes.push_back(move(cc));
355 }
356
357 par.erase("type");
358 par.erase("nodes");
359
360 // For a group-connect specification, it's
361 // always the caller mode.
362 // XXX change it here if maybe rendezvous is also
363 // possible in future.
364 par["mode"] = "caller";
365 }
366 #endif
367 }
368
369 string adapter;
370 if (par.count("adapter"))
371 {
372 adapter = par.at("adapter");
373 }
374
375 m_mode = "default";
376 if (par.count("mode"))
377 {
378 m_mode = par.at("mode");
379 }
380 SocketOption::Mode mode = SrtInterpretMode(m_mode, host, adapter);
381 if (mode == SocketOption::FAILURE)
382 {
383 Error("Invalid mode");
384 }
385
386 if (!m_group_nodes.empty() && mode != SocketOption::CALLER)
387 {
388 Error("Group node specification is only available in caller mode");
389 }
390
391 // Fix the mode name after successful interpretation
392 m_mode = SocketOption::mode_names[mode];
393
394 par.erase("mode");
395
396 if (par.count("blocking"))
397 {
398 m_blocking_mode = !false_names.count(par.at("blocking"));
399 par.erase("blocking");
400 }
401
402 if (par.count("timeout"))
403 {
404 m_timeout = stoi(par.at("timeout"), 0, 0);
405 par.erase("timeout");
406 }
407
408 if (par.count("adapter"))
409 {
410 m_adapter = adapter;
411 par.erase("adapter");
412 }
413 else if (m_mode == "listener")
414 {
415 // For listener mode, adapter is taken from host,
416 // if 'adapter' parameter is not given
417 m_adapter = host;
418 }
419
420 if (par.count("tsbpd") && false_names.count(par.at("tsbpd")))
421 {
422 m_tsbpdmode = false;
423 }
424
425 if (par.count("port"))
426 {
427 m_outgoing_port = stoi(par.at("port"), 0, 0);
428 par.erase("port");
429 }
430
431 // That's kinda clumsy, but it must rely on the defaults.
432 // Default mode is live, so check if the file mode was enforced
433 if (par.count("transtype") == 0 || par["transtype"] != "file")
434 {
435 // If the Live chunk size was nondefault, enforce the size.
436 if (transmit_chunk_size != SRT_LIVE_DEF_PLSIZE)
437 {
438 if (transmit_chunk_size > SRT_LIVE_MAX_PLSIZE)
439 throw std::runtime_error("Chunk size in live mode exceeds 1456 bytes; this is not supported");
440
441 par["payloadsize"] = Sprint(transmit_chunk_size);
442 }
443 }
444
445 // Assigning group configuration from a special "groupconfig" attribute.
446 // This is the only way how you can set up this configuration at the listener side.
447 if (par.count("groupconfig"))
448 {
449 m_group_config = par.at("groupconfig");
450 par.erase("groupconfig");
451 }
452
453 // Fix Minversion, if specified as string
454 if (par.count("minversion"))
455 {
456 string v = par["minversion"];
457 if (v.find('.') != string::npos)
458 {
459 int version = SrtParseVersion(v.c_str());
460 if (version == 0)
461 {
462 throw std::runtime_error(Sprint("Value for 'minversion' doesn't specify a valid version: ", v));
463 }
464 par["minversion"] = Sprint(version);
465 Verb() << "\tFIXED: minversion = 0x" << std::hex << std::setfill('0') << std::setw(8) << version << std::dec;
466 }
467 }
468
469 // Assign the others here.
470 m_options = par;
471 m_options["mode"] = m_mode;
472 }
473
PrepareListener(string host,int port,int backlog)474 void SrtCommon::PrepareListener(string host, int port, int backlog)
475 {
476 m_bindsock = srt_create_socket();
477 if (m_bindsock == SRT_ERROR)
478 Error("srt_create_socket");
479
480 int stat = ConfigurePre(m_bindsock);
481 if (stat == SRT_ERROR)
482 Error("ConfigurePre");
483
484 if (!m_blocking_mode)
485 {
486 srt_conn_epoll = AddPoller(m_bindsock, SRT_EPOLL_OUT);
487 }
488
489 auto sa = CreateAddr(host, port);
490 Verb() << "Binding a server on " << host << ":" << port << " ...";
491 stat = srt_bind(m_bindsock, sa.get(), sizeof sa);
492 if (stat == SRT_ERROR)
493 {
494 srt_close(m_bindsock);
495 Error("srt_bind");
496 }
497
498 Verb() << " listen... " << VerbNoEOL;
499 stat = srt_listen(m_bindsock, backlog);
500 if (stat == SRT_ERROR)
501 {
502 srt_close(m_bindsock);
503 Error("srt_listen");
504 }
505
506 }
507
StealFrom(SrtCommon & src)508 void SrtCommon::StealFrom(SrtCommon& src)
509 {
510 // This is used when SrtCommon class designates a listener
511 // object that is doing Accept in appropriate direction class.
512 // The new object should get the accepted socket.
513 m_direction = src.m_direction;
514 m_blocking_mode = src.m_blocking_mode;
515 m_timeout = src.m_timeout;
516 m_tsbpdmode = src.m_tsbpdmode;
517 m_options = src.m_options;
518 m_bindsock = SRT_INVALID_SOCK; // no listener
519 m_sock = src.m_sock;
520 src.m_sock = SRT_INVALID_SOCK; // STEALING
521 }
522
AcceptNewClient()523 void SrtCommon::AcceptNewClient()
524 {
525 sockaddr_any scl;
526
527 ::transmit_throw_on_interrupt = true;
528
529 if (!m_blocking_mode)
530 {
531 Verb() << "[ASYNC] (conn=" << srt_conn_epoll << ")";
532
533 int len = 2;
534 SRTSOCKET ready[2];
535 while (srt_epoll_wait(srt_conn_epoll, 0, 0, ready, &len, 1000, 0, 0, 0, 0) == -1)
536 {
537 if (::transmit_int_state)
538 Error("srt_epoll_wait for srt_accept: interrupt");
539
540 if (srt_getlasterror(NULL) == SRT_ETIMEOUT)
541 continue;
542 Error("srt_epoll_wait(srt_conn_epoll)");
543 }
544
545 Verb() << "[EPOLL: " << len << " sockets] " << VerbNoEOL;
546 }
547 Verb() << " accept..." << VerbNoEOL;
548
549 m_sock = srt_accept(m_bindsock, (scl.get()), (&scl.len));
550 if (m_sock == SRT_INVALID_SOCK)
551 {
552 srt_close(m_bindsock);
553 m_bindsock = SRT_INVALID_SOCK;
554 Error("srt_accept");
555 }
556
557 #if ENABLE_EXPERIMENTAL_BONDING
558 if (m_sock & SRTGROUP_MASK)
559 {
560 m_listener_group = true;
561 if (m_group_config != "")
562 {
563 int stat = srt_group_configure(m_sock, m_group_config.c_str());
564 if (stat == SRT_ERROR)
565 {
566 // Don't break the connection basing on this, just ignore.
567 Verb() << " (error setting config: '" << m_group_config << "') " << VerbNoEOL;
568 }
569 }
570 // There might be added a poller, remove it.
571 // We need it work different way.
572
573 #ifndef SRT_OLD_APP_READER
574
575 if (srt_epoll != -1)
576 {
577 Verb() << "(Group: erasing epoll " << srt_epoll << ") " << VerbNoEOL;
578 srt_epoll_release(srt_epoll);
579 }
580
581 // Don't add any sockets, they will have to be added
582 // anew every time again.
583 srt_epoll = srt_epoll_create();
584 #endif
585
586 // Group data must have a size of at least 1
587 // otherwise the srt_group_data() call will fail
588 if (m_group_data.empty())
589 m_group_data.resize(1);
590
591 Verb() << " connected(group epoll " << srt_epoll <<").";
592 }
593 else
594 #endif
595 {
596 sockaddr_any peeraddr(AF_INET6);
597 string peer = "<?PEER?>";
598 if (-1 != srt_getpeername(m_sock, (peeraddr.get()), (&peeraddr.len)))
599 {
600 peer = peeraddr.str();
601 }
602
603 sockaddr_any agentaddr(AF_INET6);
604 string agent = "<?AGENT?>";
605 if (-1 != srt_getsockname(m_sock, (agentaddr.get()), (&agentaddr.len)))
606 {
607 agent = agentaddr.str();
608 }
609
610 Verb() << " connected [" << agent << "] <-- " << peer;
611 }
612 ::transmit_throw_on_interrupt = false;
613
614 // ConfigurePre is done on bindsock, so any possible Pre flags
615 // are DERIVED by sock. ConfigurePost is done exclusively on sock.
616 int stat = ConfigurePost(m_sock);
617 if (stat == SRT_ERROR)
618 Error("ConfigurePost");
619 }
620
PrintEpollEvent(int events,int et_events)621 static string PrintEpollEvent(int events, int et_events)
622 {
623 static pair<int, const char*> const namemap [] = {
624 make_pair(SRT_EPOLL_IN, "R"),
625 make_pair(SRT_EPOLL_OUT, "W"),
626 make_pair(SRT_EPOLL_ERR, "E"),
627 make_pair(SRT_EPOLL_UPDATE, "U")
628 };
629
630 ostringstream os;
631 int N = Size(namemap);
632
633 for (int i = 0; i < N; ++i)
634 {
635 if (events & namemap[i].first)
636 {
637 os << "[";
638 if (et_events & namemap[i].first)
639 os << "^";
640 os << namemap[i].second << "]";
641 }
642 }
643
644 return os.str();
645 }
646
Init(string host,int port,string path,map<string,string> par,SRT_EPOLL_OPT dir)647 void SrtCommon::Init(string host, int port, string path, map<string,string> par, SRT_EPOLL_OPT dir)
648 {
649 m_direction = dir;
650 InitParameters(host, path, par);
651
652 int backlog = 1;
653 if (m_mode == "listener" && par.count("groupconnect")
654 && true_names.count(par["groupconnect"]))
655 {
656 backlog = 10;
657 }
658
659 Verb() << "Opening SRT " << DirectionName(dir) << " " << m_mode
660 << "(" << (m_blocking_mode ? "" : "non-") << "blocking,"
661 << " backlog=" << backlog << ") on "
662 << host << ":" << port;
663
664 try
665 {
666 if (m_mode == "caller")
667 {
668 if (m_group_nodes.empty())
669 {
670 OpenClient(host, port);
671 }
672 #if ENABLE_EXPERIMENTAL_BONDING
673 else
674 {
675 OpenGroupClient(); // Source data are in the fields already.
676 }
677 #endif
678 }
679 else if (m_mode == "listener")
680 OpenServer(m_adapter, port, backlog);
681 else if (m_mode == "rendezvous")
682 OpenRendezvous(m_adapter, host, port);
683 else
684 {
685 throw std::invalid_argument("Invalid 'mode'. Use 'client' or 'server'");
686 }
687 }
688 catch (...)
689 {
690 // This is an in-constructor-called function, so
691 // when the exception is thrown, the destructor won't
692 // close the sockets. This intercepts the exception
693 // to close them.
694 Verb() << "Open FAILED - closing SRT sockets";
695 if (m_bindsock != SRT_INVALID_SOCK)
696 srt_close(m_bindsock);
697 if (m_sock != SRT_INVALID_SOCK)
698 srt_close(m_sock);
699 m_sock = m_bindsock = SRT_INVALID_SOCK;
700 throw;
701 }
702
703 int pbkeylen = 0;
704 SRT_KM_STATE kmstate, snd_kmstate, rcv_kmstate;
705 int len = sizeof (int);
706 srt_getsockflag(m_sock, SRTO_PBKEYLEN, &pbkeylen, &len);
707 srt_getsockflag(m_sock, SRTO_KMSTATE, &kmstate, &len);
708 srt_getsockflag(m_sock, SRTO_SNDKMSTATE, &snd_kmstate, &len);
709 srt_getsockflag(m_sock, SRTO_RCVKMSTATE, &rcv_kmstate, &len);
710
711 Verb() << "ENCRYPTION status: " << KmStateStr(kmstate)
712 << " (SND:" << KmStateStr(snd_kmstate) << " RCV:" << KmStateStr(rcv_kmstate)
713 << ") PBKEYLEN=" << pbkeylen;
714
715 // Display some selected options on the socket.
716 if (Verbose::on)
717 {
718 int64_t bandwidth = 0;
719 int latency = 0;
720 bool blocking_snd = false, blocking_rcv = false;
721 int dropdelay = 0;
722 int size_int = sizeof (int), size_int64 = sizeof (int64_t), size_bool = sizeof (bool);
723 char packetfilter[100] = "";
724 int packetfilter_size = 100;
725
726 srt_getsockflag(m_sock, SRTO_MAXBW, &bandwidth, &size_int64);
727 srt_getsockflag(m_sock, SRTO_RCVLATENCY, &latency, &size_int);
728 srt_getsockflag(m_sock, SRTO_RCVSYN, &blocking_rcv, &size_bool);
729 srt_getsockflag(m_sock, SRTO_SNDSYN, &blocking_snd, &size_bool);
730 srt_getsockflag(m_sock, SRTO_SNDDROPDELAY, &dropdelay, &size_int);
731 srt_getsockflag(m_sock, SRTO_PACKETFILTER, (packetfilter), (&packetfilter_size));
732
733 Verb() << "OPTIONS: maxbw=" << bandwidth << " rcvlatency=" << latency << boolalpha
734 << " blocking{rcv=" << blocking_rcv << " snd=" << blocking_snd
735 << "} snddropdelay=" << dropdelay << " packetfilter=" << packetfilter;
736 }
737
738 if (!m_blocking_mode)
739 {
740 // Don't add new epoll if already created as a part
741 // of group management: if (srt_epoll == -1)...
742
743 if (m_mode == "caller")
744 dir = (dir | SRT_EPOLL_UPDATE);
745 Verb() << "NON-BLOCKING MODE - SUB FOR " << PrintEpollEvent(dir, 0);
746
747 srt_epoll = AddPoller(m_sock, dir);
748 }
749 }
750
AddPoller(SRTSOCKET socket,int modes)751 int SrtCommon::AddPoller(SRTSOCKET socket, int modes)
752 {
753 int pollid = srt_epoll_create();
754 if (pollid == -1)
755 throw std::runtime_error("Can't create epoll in nonblocking mode");
756 Verb() << "EPOLL: creating eid=" << pollid << " and adding @" << socket
757 << " in " << DirectionName(SRT_EPOLL_OPT(modes)) << " mode";
758 srt_epoll_add_usock(pollid, socket, &modes);
759 return pollid;
760 }
761
ConfigurePost(SRTSOCKET sock)762 int SrtCommon::ConfigurePost(SRTSOCKET sock)
763 {
764 bool yes = m_blocking_mode;
765 int result = 0;
766 if (m_direction & SRT_EPOLL_OUT)
767 {
768 Verb() << "Setting SND blocking mode: " << boolalpha << yes << " timeout=" << m_timeout;
769 result = srt_setsockopt(sock, 0, SRTO_SNDSYN, &yes, sizeof yes);
770 if (result == -1)
771 {
772 #ifdef PLEASE_LOG
773 extern srt_logging::Logger applog;
774 applog.Error() << "ERROR SETTING OPTION: SRTO_SNDSYN";
775 #endif
776 return result;
777 }
778
779 if (m_timeout)
780 result = srt_setsockopt(sock, 0, SRTO_SNDTIMEO, &m_timeout, sizeof m_timeout);
781 if (result == -1)
782 {
783 #ifdef PLEASE_LOG
784 extern srt_logging::Logger applog;
785 applog.Error() << "ERROR SETTING OPTION: SRTO_SNDTIMEO";
786 #endif
787 return result;
788 }
789 }
790
791 if (m_direction & SRT_EPOLL_IN)
792 {
793 Verb() << "Setting RCV blocking mode: " << boolalpha << yes << " timeout=" << m_timeout;
794 result = srt_setsockopt(sock, 0, SRTO_RCVSYN, &yes, sizeof yes);
795 if (result == -1)
796 return result;
797
798 if (m_timeout)
799 result = srt_setsockopt(sock, 0, SRTO_RCVTIMEO, &m_timeout, sizeof m_timeout);
800 else
801 {
802 int timeout = 1000;
803 result = srt_setsockopt(sock, 0, SRTO_RCVTIMEO, &timeout, sizeof timeout);
804 }
805 if (result == -1)
806 return result;
807 }
808
809 // host is only checked for emptiness and depending on that the connection mode is selected.
810 // Here we are not exactly interested with that information.
811 vector<string> failures;
812
813 SrtConfigurePost(sock, m_options, &failures);
814
815
816 if (!failures.empty())
817 {
818 if (Verbose::on)
819 {
820 Verb() << "WARNING: failed to set options: ";
821 copy(failures.begin(), failures.end(), ostream_iterator<string>(*Verbose::cverb, ", "));
822 Verb();
823 }
824 }
825
826 return 0;
827 }
828
ConfigurePre(SRTSOCKET sock)829 int SrtCommon::ConfigurePre(SRTSOCKET sock)
830 {
831 int result = 0;
832
833 int no = 0;
834 if (!m_tsbpdmode)
835 {
836 result = srt_setsockopt(sock, 0, SRTO_TSBPDMODE, &no, sizeof no);
837 if (result == -1)
838 return result;
839 }
840
841 // Let's pretend async mode is set this way.
842 // This is for asynchronous connect.
843 int maybe = m_blocking_mode;
844 result = srt_setsockopt(sock, 0, SRTO_RCVSYN, &maybe, sizeof maybe);
845 if (result == -1)
846 return result;
847
848 // host is only checked for emptiness and depending on that the connection mode is selected.
849 // Here we are not exactly interested with that information.
850 vector<string> failures;
851
852 // NOTE: here host = "", so the 'connmode' will be returned as LISTENER always,
853 // but it doesn't matter here. We don't use 'connmode' for anything else than
854 // checking for failures.
855 SocketOption::Mode conmode = SrtConfigurePre(sock, "", m_options, &failures);
856
857 if (conmode == SocketOption::FAILURE)
858 {
859 if (Verbose::on )
860 {
861 Verb() << "WARNING: failed to set options: ";
862 copy(failures.begin(), failures.end(), ostream_iterator<string>(*Verbose::cverb, ", "));
863 Verb();
864 }
865
866 return SRT_ERROR;
867 }
868
869 return 0;
870 }
871
SetupAdapter(const string & host,int port)872 void SrtCommon::SetupAdapter(const string& host, int port)
873 {
874 auto lsa = CreateAddr(host, port);
875 int stat = srt_bind(m_sock, lsa.get(), sizeof lsa);
876 if (stat == SRT_ERROR)
877 Error("srt_bind");
878 }
879
OpenClient(string host,int port)880 void SrtCommon::OpenClient(string host, int port)
881 {
882 PrepareClient();
883
884 if (m_outgoing_port)
885 {
886 SetupAdapter("", m_outgoing_port);
887 }
888
889 ConnectClient(host, port);
890 }
891
PrepareClient()892 void SrtCommon::PrepareClient()
893 {
894 m_sock = srt_create_socket();
895 if (m_sock == SRT_ERROR)
896 Error("srt_create_socket");
897
898 int stat = ConfigurePre(m_sock);
899 if (stat == SRT_ERROR)
900 Error("ConfigurePre");
901
902 if (!m_blocking_mode)
903 {
904 srt_conn_epoll = AddPoller(m_sock, SRT_EPOLL_CONNECT | SRT_EPOLL_ERR);
905 }
906
907 }
908
909 #if ENABLE_EXPERIMENTAL_BONDING
TransmitGroupSocketConnect(void * srtcommon,SRTSOCKET sock,int error,const sockaddr *,int token)910 void TransmitGroupSocketConnect(void* srtcommon, SRTSOCKET sock, int error, const sockaddr* /*peer*/, int token)
911 {
912 SrtCommon* that = (SrtCommon*)srtcommon;
913
914 if (error == SRT_SUCCESS)
915 {
916 return; // nothing to do for a successful socket
917 }
918
919 #ifdef PLEASE_LOG
920 applog.Debug("connect callback: error on @", sock, " erc=", error, " token=", token);
921 #endif
922
923 /* Example: identify by target address
924 sockaddr_any peersa = peer;
925 sockaddr_any agentsa;
926 bool haveso = (srt_getsockname(sock, agentsa.get(), &agentsa.len) != -1);
927 */
928
929 for (auto& n: that->m_group_nodes)
930 {
931 if (n.token != -1 && n.token == token)
932 {
933 n.error = error;
934 n.reason = srt_getrejectreason(sock);
935 return;
936 }
937
938 /*
939
940 bool isso = haveso && !n.source.empty();
941 if (n.target == peersa && (!isso || n.source.equal_address(agentsa)))
942 {
943 Verb() << " (by target)" << VerbNoEOL;
944 n.error = error;
945 n.reason = srt_getrejectreason(sock);
946 return;
947 }
948 */
949 }
950
951 Verb() << " IPE: LINK NOT FOUND???]";
952 }
953
OpenGroupClient()954 void SrtCommon::OpenGroupClient()
955 {
956 SRT_GROUP_TYPE type = SRT_GTYPE_UNDEFINED;
957
958 // Resolve group type.
959 if (m_group_type == "broadcast")
960 type = SRT_GTYPE_BROADCAST;
961 else if (m_group_type == "backup")
962 type = SRT_GTYPE_BACKUP;
963 else if (m_group_type == "balancing")
964 type = SRT_GTYPE_BALANCING;
965 else
966 {
967 Error("With //group, type='" + m_group_type + "' undefined");
968 }
969
970 m_sock = srt_create_group(type);
971 if (m_sock == -1)
972 Error("srt_create_group");
973
974 srt_connect_callback(m_sock, &TransmitGroupSocketConnect, this);
975
976 int stat = -1;
977 if (m_group_config != "")
978 {
979 stat = srt_group_configure(m_sock, m_group_config.c_str());
980 if (stat == SRT_ERROR)
981 Error("srt_group_configure");
982 }
983
984 stat = ConfigurePre(m_sock);
985
986 if ( stat == SRT_ERROR )
987 Error("ConfigurePre");
988
989 if (!m_blocking_mode)
990 {
991 // Note: here the GROUP is added to the poller.
992 srt_conn_epoll = AddPoller(m_sock, SRT_EPOLL_CONNECT | SRT_EPOLL_ERR);
993 }
994
995 // Don't check this. Should this fail, the above would already.
996
997 // XXX Now do it regardless whether it's blocking or non-blocking
998 // mode - reading from group is currently manually from every socket.
999 srt_epoll = srt_epoll_create();
1000
1001 // ConnectClient can't be used here, the code must
1002 // be more-less repeated. In this case the situation
1003 // that not all connections can be established is tolerated,
1004 // the only case of error is when none of the connections
1005 // can be established.
1006
1007 bool any_node = false;
1008
1009 Verb() << "REDUNDANT connections with " << m_group_nodes.size() << " nodes:";
1010
1011 if (m_group_data.empty())
1012 m_group_data.resize(1);
1013
1014 vector<SRT_SOCKGROUPCONFIG> targets;
1015 int namelen = sizeof (sockaddr_any);
1016
1017 Verb() << "Connecting to nodes:";
1018 int i = 1;
1019 for (Connection& c: m_group_nodes)
1020 {
1021 auto sa = CreateAddr(c.host, c.port);
1022 c.target = sa;
1023 Verb() << "\t[" << c.token << "] " << c.host << ":" << c.port << VerbNoEOL;
1024 vector<string> extras;
1025 if (c.weight)
1026 extras.push_back(Sprint("weight=", c.weight));
1027
1028 if (!c.source.empty())
1029 extras.push_back("source=" + c.source.str());
1030
1031 if (!extras.empty())
1032 {
1033 Verb() << "?" << extras[0] << VerbNoEOL;
1034 for (size_t i = 1; i < extras.size(); ++i)
1035 Verb() << "&" << extras[i] << VerbNoEOL;
1036 }
1037
1038 Verb();
1039 ++i;
1040 const sockaddr* source = c.source.empty() ? nullptr : c.source.get();
1041 SRT_SOCKGROUPCONFIG gd = srt_prepare_endpoint(source, sa.get(), namelen);
1042 gd.weight = c.weight;
1043 gd.config = c.options;
1044
1045 targets.push_back(gd);
1046 }
1047
1048 ::transmit_throw_on_interrupt = true;
1049 for (;;) // REPEATABLE BLOCK
1050 {
1051 Connect_Again:
1052 Verb() << "Waiting for group connection... " << VerbNoEOL;
1053
1054 int fisock = srt_connect_group(m_sock, targets.data(), targets.size());
1055
1056 if (fisock == SRT_ERROR)
1057 {
1058 // Complete the error information for every member
1059 ostringstream out;
1060 set<int> reasons;
1061 for (Connection& c: m_group_nodes)
1062 {
1063 if (c.error != SRT_SUCCESS)
1064 {
1065 out << "[" << c.token << "] " << c.host << ":" << c.port;
1066 if (!c.source.empty())
1067 out << "[[" << c.source.str() << "]]";
1068 out << ": " << srt_strerror(c.error, 0) << ": " << srt_rejectreason_str(c.reason) << endl;
1069 }
1070 reasons.insert(c.reason);
1071 }
1072
1073 if (transmit_retry_connect && (transmit_retry_always || (reasons.size() == 1 && *reasons.begin() == SRT_REJ_TIMEOUT)))
1074 {
1075 if (transmit_retry_connect != -1)
1076 --transmit_retry_connect;
1077
1078 Verb() << "...all links timeout, retrying (" << transmit_retry_connect << ")...";
1079 continue;
1080 }
1081
1082 Error("srt_connect_group, nodes:\n" + out.str());
1083 }
1084 else
1085 {
1086 Verb() << "[ASYNC] will wait..." << VerbNoEOL;
1087 }
1088
1089 break;
1090 }
1091
1092 if (m_blocking_mode)
1093 {
1094 Verb() << "SUCCESSFUL";
1095 }
1096 else
1097 {
1098 Verb() << "INITIATED [ASYNC]";
1099 }
1100
1101 // Configuration change applied on a group should
1102 // spread the setting on all sockets.
1103 ConfigurePost(m_sock);
1104
1105 for (size_t i = 0; i < targets.size(); ++i)
1106 {
1107 // As m_group_nodes is simply transformed into 'targets',
1108 // one index can be used to index them all. You don't
1109 // have to check if they have equal addresses because they
1110 // are equal by definition.
1111 if (targets[i].id != -1 && targets[i].errorcode == SRT_SUCCESS)
1112 {
1113 m_group_nodes[i].socket = targets[i].id;
1114 }
1115 }
1116
1117 // Now check which sockets were successful, only those
1118 // should be added to epoll.
1119 size_t size = m_group_data.size();
1120 stat = srt_group_data(m_sock, m_group_data.data(), &size);
1121 if (stat == -1 && size > m_group_data.size())
1122 {
1123 // Just too small buffer. Resize and continue.
1124 m_group_data.resize(size);
1125 stat = srt_group_data(m_sock, m_group_data.data(), &size);
1126 }
1127
1128 if (stat == -1)
1129 {
1130 Error("srt_group_data");
1131 }
1132 m_group_data.resize(size);
1133
1134 for (size_t i = 0; i < m_group_nodes.size(); ++i)
1135 {
1136 SRTSOCKET insock = m_group_nodes[i].socket;
1137 if (insock == -1)
1138 {
1139 Verb() << "TARGET '" << sockaddr_any(targets[i].peeraddr).str() << "' connection failed.";
1140 continue;
1141 }
1142
1143 // Have socket, store it into the group socket array.
1144 any_node = true;
1145 }
1146
1147 if (!any_node)
1148 Error("All connections failed");
1149
1150 // Wait for REAL connected state if nonblocking mode, for AT LEAST one node.
1151 if (!m_blocking_mode)
1152 {
1153 Verb() << "[ASYNC] " << VerbNoEOL;
1154
1155 // SPIN-WAITING version. Don't use it unless you know what you're doing.
1156 // SpinWaitAsync();
1157
1158 // Socket readiness for connection is checked by polling on WRITE allowed sockets.
1159 int len1 = 2, len2 = 2;
1160 SRTSOCKET ready_conn[2], ready_err[2];
1161 if (srt_epoll_wait(srt_conn_epoll,
1162 ready_err, &len2,
1163 ready_conn, &len1,
1164 -1, // Wait infinitely
1165 NULL, NULL,
1166 NULL, NULL) != -1)
1167 {
1168 Verb() << "[C]" << VerbNoEOL;
1169 for (int i = 0; i < len1; ++i)
1170 Verb() << " " << ready_conn[i] << VerbNoEOL;
1171 Verb() << "[E]" << VerbNoEOL;
1172 for (int i = 0; i < len2; ++i)
1173 Verb() << " " << ready_err[i] << VerbNoEOL;
1174
1175 Verb() << "";
1176
1177 // We are waiting for one entity to be ready so it's either
1178 // in one or the other
1179 if (find(ready_err, ready_err+len2, m_sock) != ready_err+len2)
1180 {
1181 Verb() << "[EPOLL: " << len2 << " entities FAILED]";
1182 // Complete the error information for every member
1183 ostringstream out;
1184 set<int> reasons;
1185 for (Connection& c: m_group_nodes)
1186 {
1187 if (c.error != SRT_SUCCESS)
1188 {
1189 out << "[" << c.token << "] " << c.host << ":" << c.port;
1190 if (!c.source.empty())
1191 out << "[[" << c.source.str() << "]]";
1192 out << ": " << srt_strerror(c.error, 0) << ": " << srt_rejectreason_str(c.reason) << endl;
1193 }
1194 reasons.insert(c.reason);
1195 }
1196
1197 if (transmit_retry_connect && (transmit_retry_always || (reasons.size() == 1 && *reasons.begin() == SRT_REJ_TIMEOUT)))
1198 {
1199 if (transmit_retry_connect != -1)
1200 --transmit_retry_connect;
1201
1202
1203 Verb() << "...all links timeout, retrying NOW (" << transmit_retry_connect << ")...";
1204 goto Connect_Again;
1205 }
1206
1207 Error("srt_connect_group, nodes:\n" + out.str());
1208 }
1209 else if (find(ready_conn, ready_conn+len1, m_sock) != ready_conn+len1)
1210 {
1211 Verb() << "[EPOLL: " << len1 << " entities] " << VerbNoEOL;
1212 }
1213 else
1214 {
1215 Error("Group: SPURIOUS epoll readiness");
1216 }
1217 }
1218 else
1219 {
1220 Error("srt_epoll_wait");
1221 }
1222 }
1223
1224 stat = ConfigurePost(m_sock);
1225 if (stat == -1)
1226 {
1227 // This kind of error must reject the whole operation.
1228 // Usually you'll get this error on the first socket,
1229 // and doing this on the others would result in the same.
1230 Error("ConfigurePost");
1231 }
1232
1233 ::transmit_throw_on_interrupt = false;
1234
1235 Verb() << "Group connection report:";
1236 for (auto& d: m_group_data)
1237 {
1238 // id, status, result, peeraddr
1239 Verb() << "@" << d.id << " <" << SockStatusStr(d.sockstate) << "> (=" << d.result << ") PEER:"
1240 << sockaddr_any((sockaddr*)&d.peeraddr, sizeof d.peeraddr).str();
1241 }
1242
1243 // Prepare group data for monitoring the group status.
1244 m_group_data.resize(m_group_nodes.size());
1245 }
1246 #endif
1247
1248 /*
1249 This may be used sometimes for testing, but it's nonportable.
1250 void SrtCommon::SpinWaitAsync()
1251 {
1252 static string udt_status_names [] = {
1253 "INIT" , "OPENED", "LISTENING", "CONNECTING", "CONNECTED", "BROKEN", "CLOSING", "CLOSED", "NONEXIST"
1254 };
1255
1256 for (;;)
1257 {
1258 SRT_SOCKSTATUS state = srt_getsockstate(m_sock);
1259 if (int(state) < SRTS_CONNECTED)
1260 {
1261 if (Verbose::on)
1262 Verb() << state;
1263 usleep(250000);
1264 continue;
1265 }
1266 else if (int(state) > SRTS_CONNECTED)
1267 {
1268 Error("UDT::connect status=" + udt_status_names[state]);
1269 }
1270
1271 return;
1272 }
1273 }
1274 */
1275
1276 struct TransmitErrorReason
1277 {
1278 int error;
1279 int reason;
1280 };
1281
1282 static std::map<SRTSOCKET, TransmitErrorReason> transmit_error_storage;
1283
TransmitConnectCallback(void *,SRTSOCKET socket,int errorcode,const sockaddr *,int)1284 static void TransmitConnectCallback(void*, SRTSOCKET socket, int errorcode, const sockaddr* /*peer*/, int /*token*/)
1285 {
1286 int reason = srt_getrejectreason(socket);
1287 transmit_error_storage[socket] = TransmitErrorReason { errorcode, reason };
1288 Verb() << "[Connection error reported on @" << socket << "]";
1289 }
1290
ConnectClient(string host,int port)1291 void SrtCommon::ConnectClient(string host, int port)
1292 {
1293 auto sa = CreateAddr(host, port);
1294 Verb() << "Connecting to " << host << ":" << port << " ... " << VerbNoEOL;
1295
1296 if (!m_blocking_mode)
1297 {
1298 srt_connect_callback(m_sock, &TransmitConnectCallback, 0);
1299 }
1300
1301 int stat = -1;
1302 for (;;)
1303 {
1304 ::transmit_throw_on_interrupt = true;
1305 stat = srt_connect(m_sock, sa.get(), sizeof sa);
1306 ::transmit_throw_on_interrupt = false;
1307 if (stat == SRT_ERROR)
1308 {
1309 int reason = srt_getrejectreason(m_sock);
1310 #if PLEASE_LOG
1311 LOGP(applog.Error, "ERROR reported by srt_connect - closing socket @", m_sock);
1312 #endif
1313 if (transmit_retry_connect && (transmit_retry_always || reason == SRT_REJ_TIMEOUT))
1314 {
1315 if (transmit_retry_connect != -1)
1316 --transmit_retry_connect;
1317
1318 Verb() << "...timeout, retrying (" << transmit_retry_connect << ")...";
1319 continue;
1320 }
1321
1322 srt_close(m_sock);
1323 Error("srt_connect", reason);
1324 }
1325 break;
1326 }
1327
1328 // Wait for REAL connected state if nonblocking mode
1329 if (!m_blocking_mode)
1330 {
1331 Verb() << "[ASYNC] " << VerbNoEOL;
1332
1333 // SPIN-WAITING version. Don't use it unless you know what you're doing.
1334 // SpinWaitAsync();
1335
1336 // Socket readiness for connection is checked by polling on WRITE allowed sockets.
1337 int lenc = 2, lene = 2;
1338 SRTSOCKET ready_connect[2], ready_error[2];
1339 if (srt_epoll_wait(srt_conn_epoll, ready_error, &lene, ready_connect, &lenc, -1, 0, 0, 0, 0) != -1)
1340 {
1341 // We should have just one socket, so check whatever socket
1342 // is in the transmit_error_storage.
1343 if (!transmit_error_storage.empty())
1344 {
1345 Verb() << "[CALLBACK(error): " << VerbNoEOL;
1346 int error, reason;
1347 bool failed = false;
1348 for (pair<const SRTSOCKET, TransmitErrorReason>& e: transmit_error_storage)
1349 {
1350 Verb() << "{@" << e.first << " error=" << e.second.error
1351 << " reason=" << e.second.reason << "} " << VerbNoEOL;
1352 error = e.second.error;
1353 reason = e.second.reason;
1354 if (error != SRT_SUCCESS)
1355 failed = true;
1356 }
1357 Verb() << "]";
1358 transmit_error_storage.clear();
1359 if (failed)
1360 Error("srt_connect(async/cb)", reason, error);
1361 }
1362
1363 if (lene > 0)
1364 {
1365 Verb() << "[EPOLL(error): " << lene << " sockets]";
1366 int reason = srt_getrejectreason(ready_error[0]);
1367 Error("srt_connect(async)", reason, SRT_ECONNREJ);
1368 }
1369 Verb() << "[EPOLL: " << lenc << " sockets] " << VerbNoEOL;
1370 }
1371 else
1372 {
1373 transmit_error_storage.clear();
1374 Error("srt_epoll_wait(srt_conn_epoll)");
1375 }
1376
1377 transmit_error_storage.clear();
1378 }
1379
1380 Verb() << " connected.";
1381 stat = ConfigurePost(m_sock);
1382 if (stat == SRT_ERROR)
1383 Error("ConfigurePost");
1384 }
1385
Error(string src,int reason,int force_result)1386 void SrtCommon::Error(string src, int reason, int force_result)
1387 {
1388 int errnov = 0;
1389 const int result = force_result == 0 ? srt_getlasterror(&errnov) : force_result;
1390 if (result == SRT_SUCCESS)
1391 {
1392 cerr << "\nERROR (app): " << src << endl;
1393 throw std::runtime_error(src);
1394 }
1395 string message = srt_strerror(result, errnov);
1396 if (result == SRT_ECONNREJ)
1397 {
1398 if ( Verbose::on )
1399 Verb() << "FAILURE\n" << src << ": [" << result << "] "
1400 << "Connection rejected: [" << int(reason) << "]: "
1401 << srt_rejectreason_str(reason);
1402 else
1403 cerr << "\nERROR #" << result
1404 << ": Connection rejected: [" << int(reason) << "]: "
1405 << srt_rejectreason_str(reason);
1406 }
1407 else
1408 {
1409 if ( Verbose::on )
1410 Verb() << "FAILURE\n" << src << ": [" << result << "." << errnov << "] " << message;
1411 else
1412 cerr << "\nERROR #" << result << "." << errnov << ": " << message << endl;
1413 }
1414
1415 throw TransmissionError("error: " + src + ": " + message);
1416 }
1417
SetupRendezvous(string adapter,string host,int port)1418 void SrtCommon::SetupRendezvous(string adapter, string host, int port)
1419 {
1420 sockaddr_any target = CreateAddr(host, port);
1421 if (target.family() == AF_UNSPEC)
1422 {
1423 Error("Unable to resolve target host: " + host);
1424 }
1425
1426 bool yes = true;
1427 srt_setsockopt(m_sock, 0, SRTO_RENDEZVOUS, &yes, sizeof yes);
1428
1429 const int outport = m_outgoing_port ? m_outgoing_port : port;
1430
1431 // Prefer the same IPv as target host
1432 auto localsa = CreateAddr(adapter, outport, target.family());
1433 string showhost = adapter;
1434 if (showhost == "")
1435 showhost = "ANY";
1436 if (target.family() == AF_INET6)
1437 showhost = "[" + showhost + "]";
1438 Verb() << "Binding rendezvous: " << showhost << ":" << outport << " ...";
1439 int stat = srt_bind(m_sock, localsa.get(), localsa.size());
1440 if (stat == SRT_ERROR)
1441 {
1442 srt_close(m_sock);
1443 Error("srt_bind");
1444 }
1445 }
1446
Close()1447 void SrtCommon::Close()
1448 {
1449 #if PLEASE_LOG
1450 extern srt_logging::Logger applog;
1451 LOGP(applog.Error, "CLOSE requested - closing socket @", m_sock);
1452 #endif
1453 bool any = false;
1454 bool yes = true;
1455 if (m_sock != SRT_INVALID_SOCK)
1456 {
1457 Verb() << "SrtCommon: DESTROYING CONNECTION, closing socket (rt%" << m_sock << ")...";
1458 srt_setsockflag(m_sock, SRTO_SNDSYN, &yes, sizeof yes);
1459 srt_close(m_sock);
1460 any = true;
1461 }
1462
1463 if (m_bindsock != SRT_INVALID_SOCK)
1464 {
1465 Verb() << "SrtCommon: DESTROYING SERVER, closing socket (ls%" << m_bindsock << ")...";
1466 // Set sndsynchro to the socket to synch-close it.
1467 srt_setsockflag(m_bindsock, SRTO_SNDSYN, &yes, sizeof yes);
1468 srt_close(m_bindsock);
1469 any = true;
1470 }
1471
1472 if (any)
1473 Verb() << "SrtCommon: ... done.";
1474 }
1475
~SrtCommon()1476 SrtCommon::~SrtCommon()
1477 {
1478 Close();
1479 }
1480
1481 #if ENABLE_EXPERIMENTAL_BONDING
UpdateGroupStatus(const SRT_SOCKGROUPDATA * grpdata,size_t grpdata_size)1482 void SrtCommon::UpdateGroupStatus(const SRT_SOCKGROUPDATA* grpdata, size_t grpdata_size)
1483 {
1484 if (!grpdata)
1485 {
1486 // This happens when you passed too small array. Treat this as error and stop.
1487 cerr << "ERROR: broadcast group update reports " << grpdata_size
1488 << " existing sockets, but app registerred only " << m_group_nodes.size() << endl;
1489 Error("Too many unpredicted sockets in the group");
1490 }
1491
1492 // Clear the active flag in all nodes so that they are reactivated
1493 // if they are in the group list, REGARDLESS OF THE STATUS. We need to
1494 // see all connections that are in the nodes, but not in the group,
1495 // and this one would have to be activated.
1496 const SRT_SOCKGROUPDATA* gend = grpdata + grpdata_size;
1497 for (auto& n: m_group_nodes)
1498 {
1499 bool active = (find_if(grpdata, gend,
1500 [&n] (const SRT_SOCKGROUPDATA& sg) { return sg.id == n.socket; }) != gend);
1501 if (!active)
1502 n.socket = SRT_INVALID_SOCK;
1503 }
1504
1505 // Note: sockets are not necessarily in the same order. Find
1506 // the socket by id.
1507 for (size_t i = 0; i < grpdata_size; ++i)
1508 {
1509 const SRT_SOCKGROUPDATA& d = grpdata[i];
1510 SRTSOCKET id = d.id;
1511
1512 SRT_SOCKSTATUS status = d.sockstate;
1513 int result = d.result;
1514 SRT_MEMBERSTATUS mstatus = d.memberstate;
1515
1516 if (result != -1 && status == SRTS_CONNECTED)
1517 {
1518 // Short report with the state.
1519 Verb() << "G@" << id << "<" << MemberStatusStr(mstatus) << "> " << VerbNoEOL;
1520 continue;
1521 }
1522 // id, status, result, peeraddr
1523 Verb() << "\n\tG@" << id << " <" << SockStatusStr(status) << "/" << MemberStatusStr(mstatus) << "> (=" << result << ") PEER:"
1524 << sockaddr_any((sockaddr*)&d.peeraddr, sizeof d.peeraddr).str() << VerbNoEOL;
1525
1526 if (status >= SRTS_BROKEN)
1527 {
1528 Verb() << "NOTE: socket @" << id << " is pending for destruction, waiting for it.";
1529 }
1530 }
1531
1532 // This was only informative. Now we check all nodes if they
1533 // are not active
1534
1535 int i = 1;
1536 for (auto& n: m_group_nodes)
1537 {
1538 if (n.error != SRT_SUCCESS)
1539 {
1540 Verb() << "[" << i << "] CONNECTION FAILURE to '" << n.host << ":" << n.port << "': "
1541 << srt_strerror(n.error, 0) << ":" << srt_rejectreason_str(n.reason);
1542 }
1543
1544 // Check which nodes are no longer active and activate them.
1545 if (n.socket != SRT_INVALID_SOCK)
1546 continue;
1547
1548 auto sa = CreateAddr(n.host, n.port);
1549 Verb() << "[" << i << "] RECONNECTING to node " << n.host << ":" << n.port << " ... " << VerbNoEOL;
1550 ++i;
1551
1552 n.error = SRT_SUCCESS;
1553 n.reason = SRT_REJ_UNKNOWN;
1554
1555 const sockaddr* source = n.source.empty() ? nullptr : n.source.get();
1556 SRT_SOCKGROUPCONFIG gd = srt_prepare_endpoint(source, sa.get(), sa.size());
1557 gd.weight = n.weight;
1558 gd.config = n.options;
1559 gd.token = n.token;
1560
1561 int fisock = srt_connect_group(m_sock, &gd, 1);
1562 if (fisock == SRT_ERROR)
1563 {
1564 // Whatever. Skip the node.
1565 Verb() << "FAILED: ";
1566 }
1567 else
1568 {
1569 // Have socket, store it into the group socket array.
1570 n.socket = gd.id;
1571 }
1572 }
1573 }
1574 #endif
1575
SrtSource(string host,int port,std::string path,const map<string,string> & par)1576 SrtSource::SrtSource(string host, int port, std::string path, const map<string,string>& par)
1577 {
1578 Init(host, port, path, par, SRT_EPOLL_IN);
1579 ostringstream os;
1580 os << host << ":" << port;
1581 hostport_copy = os.str();
1582 }
1583
PrintSrtStats(SRTSOCKET sock,bool clr,bool bw,bool stats)1584 static void PrintSrtStats(SRTSOCKET sock, bool clr, bool bw, bool stats)
1585 {
1586 CBytePerfMon perf;
1587 // clear only if stats report is to be read
1588 srt_bstats(sock, &perf, clr);
1589
1590 if (bw)
1591 cout << transmit_stats_writer->WriteBandwidth(perf.mbpsBandwidth);
1592 if (stats)
1593 cout << transmit_stats_writer->WriteStats(sock, perf);
1594 }
1595
1596
1597 #ifdef SRT_OLD_APP_READER
1598
1599 // NOTE: 'output' is expected to be EMPTY here.
GroupCheckPacketAhead(bytevector & output)1600 bool SrtSource::GroupCheckPacketAhead(bytevector& output)
1601 {
1602 bool status = false;
1603 vector<SRTSOCKET> past_ahead;
1604
1605 // This map no longer maps only ahead links.
1606 // Here are all links, and whether ahead, it's defined by the sequence.
1607 for (auto i = m_group_positions.begin(); i != m_group_positions.end(); ++i)
1608 {
1609 // i->first: socket ID
1610 // i->second: ReadPos { sequence, packet }
1611 // We are not interested with the socket ID because we
1612 // aren't going to read from it - we have the packet already.
1613 ReadPos& a = i->second;
1614
1615 int seqdiff = CSeqNo::seqcmp(a.sequence, m_group_seqno);
1616 if ( seqdiff == 1)
1617 {
1618 // The very next packet. Return it.
1619 m_group_seqno = a.sequence;
1620 Verb() << " (SRT group: ahead delivery %" << a.sequence << " from @" << i->first << ")";
1621 swap(output, a.packet);
1622 status = true;
1623 }
1624 else if (seqdiff < 1 && !a.packet.empty())
1625 {
1626 Verb() << " (@" << i->first << " dropping collected ahead %" << a.sequence << ")";
1627 a.packet.clear();
1628 }
1629 // In case when it's >1, keep it in ahead
1630 }
1631
1632 return status;
1633 }
1634
DisplayEpollResults(const std::set<SRTSOCKET> & sockset,std::string prefix)1635 static string DisplayEpollResults(const std::set<SRTSOCKET>& sockset, std::string prefix)
1636 {
1637 typedef set<SRTSOCKET> fset_t;
1638 ostringstream os;
1639 os << prefix << " ";
1640 for (fset_t::const_iterator i = sockset.begin(); i != sockset.end(); ++i)
1641 {
1642 os << "@" << *i << " ";
1643 }
1644
1645 return os.str();
1646 }
1647
GroupRead(size_t chunk)1648 bytevector SrtSource::GroupRead(size_t chunk)
1649 {
1650 // Read the current group status. m_sock is here the group id.
1651 bytevector output;
1652
1653 // Later iteration over it might be less efficient than
1654 // by vector, but we'll also often try to check a single id
1655 // if it was ever seen broken, so that it's skipped.
1656 set<SRTSOCKET> broken;
1657
1658 RETRY_READING:
1659
1660 size_t size = m_group_data.size();
1661 int stat = srt_group_data(m_sock, m_group_data.data(), &size);
1662 if (stat == -1 && size > m_group_data.size())
1663 {
1664 // Just too small buffer. Resize and continue.
1665 m_group_data.resize(size);
1666 stat = srt_group_data(m_sock, m_group_data.data(), &size);
1667 }
1668 else
1669 {
1670 // Downsize if needed.
1671 m_group_data.resize(size);
1672 }
1673
1674 if (stat == -1) // Also after the above fix
1675 {
1676 Error(UDT::getlasterror(), "FAILURE when reading group data");
1677 }
1678
1679 if (size == 0)
1680 {
1681 Error("No sockets in the group - disconnected");
1682 }
1683
1684 bool connected = false;
1685 for (auto& d: m_group_data)
1686 {
1687 if (d.status == SRTS_CONNECTED)
1688 {
1689 connected = true;
1690 break;
1691 }
1692 }
1693 if (!connected)
1694 {
1695 Error("All sockets in the group disconnected");
1696 }
1697
1698 if (Verbose::on)
1699 {
1700 for (auto& d: m_group_data)
1701 {
1702 if (d.status != SRTS_CONNECTED)
1703 // id, status, result, peeraddr
1704 Verb() << "@" << d.id << " <" << SockStatusStr(d.status) << "> (=" << d.result << ") PEER:"
1705 << sockaddr_any((sockaddr*)&d.peeraddr, sizeof d.peeraddr).str();
1706 }
1707 }
1708
1709 // Check first the ahead packets if you have any to deliver.
1710 if (m_group_seqno != -1 && !m_group_positions.empty())
1711 {
1712 bytevector ahead_packet;
1713
1714 // This function also updates the group sequence pointer.
1715 if (GroupCheckPacketAhead(ahead_packet))
1716 return move(ahead_packet);
1717 }
1718
1719 // LINK QUALIFICATION NAMES:
1720 //
1721 // HORSE: Correct link, which delivers the very next sequence.
1722 // Not necessarily this link is currently active.
1723 //
1724 // KANGAROO: Got some packets dropped and the sequence number
1725 // of the packet jumps over the very next sequence and delivers
1726 // an ahead packet.
1727 //
1728 // ELEPHANT: Is not ready to read, while others are, or reading
1729 // up to the current latest delivery sequence number does not
1730 // reach this sequence and the link becomes non-readable earlier.
1731
1732 // The above condition has ruled out one kangaroo and turned it
1733 // into a horse.
1734
1735 // Below there's a loop that will try to extract packets. Kangaroos
1736 // will be among the polled ones because skipping them risks that
1737 // the elephants will take over the reading. Links already known as
1738 // elephants will be also polled in an attempt to revitalize the
1739 // connection that experienced just a short living choking.
1740 //
1741 // After polling we attempt to read from every link that reported
1742 // read-readiness and read at most up to the sequence equal to the
1743 // current delivery sequence.
1744
1745 // Links that deliver a packet below that sequence will be retried
1746 // until they deliver no more packets or deliver the packet of
1747 // expected sequence. Links that don't have a record in m_group_positions
1748 // and report readiness will be always read, at least to know what
1749 // sequence they currently stand on.
1750 //
1751 // Links that are already known as kangaroos will be polled, but
1752 // no reading attempt will be done. If after the reading series
1753 // it will turn out that we have no more horses, the slowest kangaroo
1754 // will be "advanced to a horse" (the ahead link with a sequence
1755 // closest to the current delivery sequence will get its sequence
1756 // set as current delivered and its recorded ahead packet returned
1757 // as the read packet).
1758
1759 // If we find at least one horse, the packet read from that link
1760 // will be delivered. All other link will be just ensured update
1761 // up to this sequence number, or at worst all available packets
1762 // will be read. In this case all kangaroos remain kangaroos,
1763 // until the current delivery sequence m_group_seqno will be lifted
1764 // to the sequence recorded for these links in m_group_positions,
1765 // during the next time ahead check, after which they will become
1766 // horses.
1767
1768 Verb() << "E(" << srt_epoll << ") " << VerbNoEOL;
1769
1770 for (size_t i = 0; i < size; ++i)
1771 {
1772 SRT_SOCKGROUPDATA& d = m_group_data[i];
1773 if (d.status == SRTS_CONNECTING)
1774 {
1775 Verb() << "@" << d.id << "<pending> " << VerbNoEOL;
1776 int modes = SRT_EPOLL_OUT | SRT_EPOLL_ERR;
1777 srt_epoll_add_usock(srt_epoll, d.id, &modes);
1778 continue; // don't read over a failed or pending socket
1779 }
1780
1781 if (d.status >= SRTS_BROKEN)
1782 {
1783 broken.insert(d.id);
1784 }
1785
1786 if (broken.count(d.id))
1787 {
1788 Verb() << "@" << d.id << "<broken> " << VerbNoEOL;
1789 continue;
1790 }
1791
1792 if (d.status != SRTS_CONNECTED)
1793 {
1794 Verb() << "@" << d.id << "<idle:" << SockStatusStr(d.status) << "> " << VerbNoEOL;
1795 // Sockets in this state are ignored. We are waiting until it
1796 // achieves CONNECTING state, then it's added to write.
1797 continue;
1798 }
1799
1800 // Don't skip packets that are ahead because if we have a situation
1801 // that all links are either "elephants" (do not report read readiness)
1802 // and "kangaroos" (have already delivered an ahead packet) then
1803 // omiting kangaroos will result in only elephants to be polled for
1804 // reading. Elephants, due to the strict timing requirements and
1805 // ensurance that TSBPD on every link will result in exactly the same
1806 // delivery time for a packet of given sequence, having an elephant
1807 // and kangaroo in one cage means that the elephant is simply a broken
1808 // or half-broken link (the data are not delivered, but it will get
1809 // repaired soon, enough for SRT to maintain the connection, but it
1810 // will still drop packets that didn't arrive in time), in both cases
1811 // it may potentially block the reading for an indefinite time, while
1812 // simultaneously a kangaroo might be a link that got some packets
1813 // dropped, but then it's still capable to deliver packets on time.
1814
1815 // Note also that about the fact that some links turn out to be
1816 // elephants we'll learn only after we try to poll and read them.
1817
1818 // Note that d.id might be a socket that was previously being polled
1819 // on write, when it's attempting to connect, but now it's connected.
1820 // This will update the socket with the new event set.
1821
1822 int modes = SRT_EPOLL_IN | SRT_EPOLL_ERR;
1823 srt_epoll_add_usock(srt_epoll, d.id, &modes);
1824 Verb() << "@" << d.id << "[READ] " << VerbNoEOL;
1825 }
1826
1827 Verb() << "";
1828
1829 // Here we need to make an additional check.
1830 // There might be a possibility that all sockets that
1831 // were added to the reader group, are ahead. At least
1832 // surely we don't have a situation that any link contains
1833 // an ahead-read subsequent packet, because GroupCheckPacketAhead
1834 // already handled that case.
1835 //
1836 // What we can have is that every link has:
1837 // - no known seq position yet (is not registered in the position map yet)
1838 // - the position equal to the latest delivered sequence
1839 // - the ahead position
1840
1841 // Now the situation is that we don't have any packets
1842 // waiting for delivery so we need to wait for any to report one.
1843 // XXX We support blocking mode only at the moment.
1844 // The non-blocking mode would need to simply check the readiness
1845 // with only immediate report, and read-readiness would have to
1846 // be done in background.
1847
1848 SrtPollState sready;
1849
1850 // Poll on this descriptor until reading is available, indefinitely.
1851 if (UDT::epoll_swait(srt_epoll, sready, -1) == SRT_ERROR)
1852 {
1853 Error(UDT::getlasterror(), "UDT::epoll_swait(srt_epoll, group)");
1854 }
1855 if (Verbose::on)
1856 {
1857 Verb() << "RDY: {"
1858 << DisplayEpollResults(sready.rd(), "[R]")
1859 << DisplayEpollResults(sready.wr(), "[W]")
1860 << DisplayEpollResults(sready.ex(), "[E]")
1861 << "} " << VerbNoEOL;
1862
1863 }
1864
1865 LOGC(applog.Debug, log << "epoll_swait: "
1866 << DisplayEpollResults(sready.rd(), "[R]")
1867 << DisplayEpollResults(sready.wr(), "[W]")
1868 << DisplayEpollResults(sready.ex(), "[E]"));
1869
1870 typedef set<SRTSOCKET> fset_t;
1871
1872 // Handle sockets of pending connection and with errors.
1873 broken = sready.ex();
1874
1875 // We don't do anything about sockets that have been configured to
1876 // poll on writing (that is, pending for connection). What we need
1877 // is that the epoll_swait call exit on that fact. Probably if this
1878 // was the only socket reported, no broken and no read-ready, this
1879 // will later check on output if still empty, if so, repeat the whole
1880 // function. This write-ready socket will be there already in the
1881 // connected state and will be added to read-polling.
1882
1883 // Ok, now we need to have some extra qualifications:
1884 // 1. If a socket has no registry yet, we read anyway, just
1885 // to notify the current position. We read ONLY ONE PACKET this time,
1886 // we'll worry later about adjusting it to the current group sequence
1887 // position.
1888 // 2. If a socket is already position ahead, DO NOT read from it, even
1889 // if it is ready.
1890
1891 // The state of things whether we were able to extract the very next
1892 // sequence will be simply defined by the fact that `output` is nonempty.
1893
1894 int32_t next_seq = m_group_seqno;
1895
1896 // If this set is empty, it won't roll even once, therefore output
1897 // will be surely empty. This will be checked then same way as when
1898 // reading from every socket resulted in error.
1899 for (fset_t::const_iterator i = sready.rd().begin(); i != sready.rd().end(); ++i)
1900 {
1901 // Check if this socket is in aheads
1902 // If so, don't read from it, wait until the ahead is flushed.
1903
1904 SRTSOCKET id = *i;
1905 ReadPos* p = nullptr;
1906 auto pe = m_group_positions.find(id);
1907 if (pe != m_group_positions.end())
1908 {
1909 p = &pe->second;
1910 // Possible results of comparison:
1911 // x < 0: the sequence is in the past, the socket should be adjusted FIRST
1912 // x = 0: the socket should be ready to get the exactly next packet
1913 // x = 1: the case is already handled by GroupCheckPacketAhead.
1914 // x > 1: AHEAD. DO NOT READ.
1915 int seqdiff = CSeqNo::seqcmp(p->sequence, m_group_seqno);
1916 if (seqdiff > 1)
1917 {
1918 Verb() << "EPOLL: @" << id << " %" << p->sequence << " AHEAD, not reading.";
1919 continue;
1920 }
1921 }
1922
1923
1924 // Read from this socket stubbornly, until:
1925 // - reading is no longer possible (AGAIN)
1926 // - the sequence difference is >= 1
1927
1928 int fi = 1; // marker for Verb to display flushing
1929 for (;;)
1930 {
1931 bytevector data(chunk);
1932 SRT_MSGCTRL mctrl = srt_msgctrl_default;
1933 stat = srt_recvmsg2(id, data.data(), chunk, &mctrl);
1934 if (stat == SRT_ERROR)
1935 {
1936 if (fi == 0)
1937 {
1938 if (Verbose::on)
1939 {
1940 if (p)
1941 {
1942 int32_t pktseq = p->sequence;
1943 int seqdiff = CSeqNo::seqcmp(p->sequence, m_group_seqno);
1944 Verb() << ". %" << pktseq << " " << seqdiff << ")";
1945 }
1946 else
1947 {
1948 Verb() << ".)";
1949 }
1950 }
1951 fi = 1;
1952 }
1953 int err = srt_getlasterror(0);
1954 if (err == SRT_EASYNCRCV)
1955 {
1956 // Do not treat this as spurious, just stop reading.
1957 break;
1958 }
1959 Verb() << "Error @" << id << ": " << srt_getlasterror_str();
1960 broken.insert(id);
1961 break;
1962 }
1963
1964 // NOTE: checks against m_group_seqno and decisions based on it
1965 // must NOT be done if m_group_seqno is -1, which means that we
1966 // are about to deliver the very first packet and we take its
1967 // sequence number as a good deal.
1968
1969 // The order must be:
1970 // - check discrepancy
1971 // - record the sequence
1972 // - check ordering.
1973 // The second one must be done always, but failed discrepancy
1974 // check should exclude the socket from any further checks.
1975 // That's why the common check for m_group_seqno != -1 can't
1976 // embrace everything below.
1977
1978 // We need to first qualify the sequence, just for a case
1979 if (m_group_seqno != -1 && abs(m_group_seqno - mctrl.pktseq) > CSeqNo::m_iSeqNoTH)
1980 {
1981 // This error should be returned if the link turns out
1982 // to be the only one, or set to the group data.
1983 // err = SRT_ESECFAIL;
1984 if (fi == 0)
1985 {
1986 Verb() << ".)";
1987 fi = 1;
1988 }
1989 Verb() << "Error @" << id << ": SEQUENCE DISCREPANCY: base=%" << m_group_seqno << " vs pkt=%" << mctrl.pktseq << ", setting ESECFAIL";
1990 broken.insert(id);
1991 break;
1992 }
1993
1994 // Rewrite it to the state for a case when next reading
1995 // would not succeed. Do not insert the buffer here because
1996 // this is only required when the sequence is ahead; for that
1997 // it will be fixed later.
1998 if (!p)
1999 {
2000 p = &(m_group_positions[id] = ReadPos { mctrl.pktseq, {} });
2001 }
2002 else
2003 {
2004 p->sequence = mctrl.pktseq;
2005 }
2006
2007 if (m_group_seqno != -1)
2008 {
2009 // Now we can safely check it.
2010 int seqdiff = CSeqNo::seqcmp(mctrl.pktseq, m_group_seqno);
2011
2012 if (seqdiff <= 0)
2013 {
2014 if (fi == 1)
2015 {
2016 Verb() << "(@" << id << " FLUSH:" << VerbNoEOL;
2017 fi = 0;
2018 }
2019
2020 Verb() << "." << VerbNoEOL;
2021
2022 // The sequence is recorded, the packet has to be discarded.
2023 // That's all.
2024 continue;
2025 }
2026
2027 // Finish flush reporting if fallen into here
2028 if (fi == 0)
2029 {
2030 Verb() << ". %" << mctrl.pktseq << " " << (-seqdiff) << ")";
2031 fi = 1;
2032 }
2033
2034 // Now we have only two possibilities:
2035 // seqdiff == 1: The very next sequence, we want to read and return the packet.
2036 // seqdiff > 1: The packet is ahead - record the ahead packet, but continue with the others.
2037
2038 if (seqdiff > 1)
2039 {
2040 Verb() << "@" << id << " %" << mctrl.pktseq << " AHEAD";
2041 p->packet = move(data);
2042 break; // Don't read from that socket anymore.
2043 }
2044 }
2045
2046 // We have seqdiff = 1, or we simply have the very first packet
2047 // which's sequence is taken as a good deal. Update the sequence
2048 // and record output.
2049
2050 if (!output.empty())
2051 {
2052 Verb() << "@" << id << " %" << mctrl.pktseq << " REDUNDANT";
2053 break;
2054 }
2055
2056
2057 Verb() << "@" << id << " %" << mctrl.pktseq << " DELIVERING";
2058 output = move(data);
2059
2060 // Record, but do not update yet, until all sockets are handled.
2061 next_seq = mctrl.pktseq;
2062 break;
2063 }
2064 }
2065
2066 // ready_len is only the length of currently reported
2067 // ready sockets, NOT NECESSARILY containing all sockets from the group.
2068 if (broken.size() == size)
2069 {
2070 // All broken
2071 Error("All sockets broken");
2072 }
2073
2074 if (Verbose::on && !broken.empty())
2075 {
2076 Verb() << "BROKEN: " << Printable(broken) << " - removing";
2077 }
2078
2079 // Now remove all broken sockets from aheads, if any.
2080 // Even if they have already delivered a packet.
2081 for (SRTSOCKET d: broken)
2082 {
2083 m_group_positions.erase(d);
2084 srt_close(d);
2085 }
2086
2087 // May be required to be re-read.
2088 broken.clear();
2089
2090 if (!output.empty())
2091 {
2092 // We have extracted something, meaning that we have the sequence shift.
2093 // Update it now and don't do anything else with the sockets.
2094
2095 // Sanity check
2096 if (next_seq == -1)
2097 {
2098 Error("IPE: next_seq not set after output extracted!");
2099 }
2100 m_group_seqno = next_seq;
2101 return output;
2102 }
2103
2104 // Check if we have any sockets left :D
2105
2106 // Here we surely don't have any more HORSES,
2107 // only ELEPHANTS and KANGAROOS. Qualify them and
2108 // attempt to at least take advantage of KANGAROOS.
2109
2110 // In this position all links are either:
2111 // - updated to the current position
2112 // - updated to the newest possible possition available
2113 // - not yet ready for extraction (not present in the group)
2114
2115 // If we haven't extracted the very next sequence position,
2116 // it means that we might only have the ahead packets read,
2117 // that is, the next sequence has been dropped by all links.
2118
2119 if (!m_group_positions.empty())
2120 {
2121 // This might notify both lingering links, which didn't
2122 // deliver the required sequence yet, and links that have
2123 // the sequence ahead. Review them, and if you find at
2124 // least one packet behind, just wait for it to be ready.
2125 // Use again the waiting function because we don't want
2126 // the general waiting procedure to skip others.
2127 set<SRTSOCKET> elephants;
2128
2129 // const because it's `typename decltype(m_group_positions)::value_type`
2130 pair<const SRTSOCKET, ReadPos>* slowest_kangaroo = nullptr;
2131
2132 for (auto& sock_rp: m_group_positions)
2133 {
2134 // NOTE that m_group_seqno in this place wasn't updated
2135 // because we haven't successfully extracted anything.
2136 int seqdiff = CSeqNo::seqcmp(sock_rp.second.sequence, m_group_seqno);
2137 if (seqdiff < 0)
2138 {
2139 elephants.insert(sock_rp.first);
2140 }
2141 // If seqdiff == 0, we have a socket ON TRACK.
2142 else if (seqdiff > 0)
2143 {
2144 if (!slowest_kangaroo)
2145 {
2146 slowest_kangaroo = &sock_rp;
2147 }
2148 else
2149 {
2150 // Update to find the slowest kangaroo.
2151 int seqdiff = CSeqNo::seqcmp(slowest_kangaroo->second.sequence, sock_rp.second.sequence);
2152 if (seqdiff > 0)
2153 {
2154 slowest_kangaroo = &sock_rp;
2155 }
2156 }
2157 }
2158 }
2159
2160 // Note that if no "slowest_kangaroo" was found, it means
2161 // that we don't have kangaroos.
2162 if (slowest_kangaroo)
2163 {
2164 // We have a slowest kangaroo. Elephants must be ignored.
2165 // Best case, they will get revived, worst case they will be
2166 // soon broken.
2167 //
2168 // As we already have the packet delivered by the slowest
2169 // kangaroo, we can simply return it.
2170
2171 m_group_seqno = slowest_kangaroo->second.sequence;
2172 Verb() << "@" << slowest_kangaroo->first << " %" << m_group_seqno << " KANGAROO->HORSE";
2173 swap(output, slowest_kangaroo->second.packet);
2174 return output;
2175 }
2176
2177 // Here ALL LINKS ARE ELEPHANTS, stating that we still have any.
2178 if (Verbose::on)
2179 {
2180 if (!elephants.empty())
2181 {
2182 // If we don't have kangaroos, then simply reattempt to
2183 // poll all elephants again anyway (at worst they are all
2184 // broken and we'll learn about it soon).
2185 Verb() << "ALL LINKS ELEPHANTS. Re-polling.";
2186 }
2187 else
2188 {
2189 Verb() << "ONLY BROKEN WERE REPORTED. Re-polling.";
2190 }
2191 }
2192 goto RETRY_READING;
2193 }
2194
2195 // We have checked so far only links that were ready to poll.
2196 // Links that are not ready should be re-checked.
2197 // Links that were not ready at the entrance should be checked
2198 // separately, and probably here is the best moment to do it.
2199 // After we make sure that at least one link is ready, we can
2200 // reattempt to read a packet from it.
2201
2202 // Ok, so first collect all sockets that are in
2203 // connecting state, make a poll for connection.
2204 srt_epoll_clear_usocks(srt_epoll);
2205 bool have_connectors = false, have_ready = false;
2206 for (auto& d: m_group_data)
2207 {
2208 if (d.status < SRTS_CONNECTED)
2209 {
2210 // Not sure anymore if IN or OUT signals the connect-readiness,
2211 // but no matter. The signal will be cleared once it is used,
2212 // while it will be always on when there's anything ready to read.
2213 int modes = SRT_EPOLL_IN | SRT_EPOLL_OUT;
2214 srt_epoll_add_usock(srt_epoll, d.id, &modes);
2215 have_connectors = true;
2216 }
2217 else if (d.status == SRTS_CONNECTED)
2218 {
2219 have_ready = true;
2220 }
2221 }
2222
2223 if (have_ready || have_connectors)
2224 {
2225 Verb() << "(still have: " << (have_ready ? "+" : "-") << "ready, "
2226 << (have_connectors ? "+" : "-") << "conenctors).";
2227 goto RETRY_READING;
2228 }
2229
2230 if (have_ready)
2231 {
2232 Verb() << "(connected in the meantime)";
2233 // Some have connected in the meantime, don't
2234 // waste time on the pending ones.
2235 goto RETRY_READING;
2236 }
2237
2238 if (have_connectors)
2239 {
2240 Verb() << "(waiting for pending connectors to connect)";
2241 // Wait here for them to be connected.
2242 vector<SRTSOCKET> sready;
2243 sready.resize(m_group_data.size());
2244 int ready_len = m_group_data.size();
2245 if (srt_epoll_wait(srt_epoll, sready.data(), &ready_len, 0, 0, -1, 0, 0, 0, 0) == SRT_ERROR)
2246 {
2247 Error("All sockets in the group disconnected");
2248 }
2249
2250 goto RETRY_READING;
2251 }
2252
2253 Error("No data extracted");
2254 return output; // Just a marker - this above function throws an exception
2255 }
2256
2257 #endif
2258
Read(size_t chunk)2259 MediaPacket SrtSource::Read(size_t chunk)
2260 {
2261 static size_t counter = 1;
2262
2263 bool have_group SRT_ATR_UNUSED = !m_group_nodes.empty();
2264
2265 bytevector data(chunk);
2266 // EXPERIMENTAL
2267 #ifdef SRT_OLD_APP_READER
2268 if (have_group || m_listener_group)
2269 {
2270 data = GroupRead(chunk);
2271 }
2272
2273 if (have_group)
2274 {
2275 // This is to be done for caller mode only
2276 UpdateGroupStatus(m_group_data.data(), m_group_data.size());
2277 }
2278 #else
2279
2280 SRT_MSGCTRL mctrl = srt_msgctrl_default;
2281 bool ready = true;
2282 int stat;
2283
2284 do
2285 {
2286 #if ENABLE_EXPERIMENTAL_BONDING
2287 if (have_group || m_listener_group)
2288 {
2289 mctrl.grpdata = m_group_data.data();
2290 mctrl.grpdata_size = m_group_data.size();
2291 }
2292 #endif
2293
2294 if (::transmit_int_state)
2295 Error("srt_recvmsg2: interrupted");
2296
2297 ::transmit_throw_on_interrupt = true;
2298 stat = srt_recvmsg2(m_sock, data.data(), chunk, &mctrl);
2299 ::transmit_throw_on_interrupt = false;
2300 if (stat != SRT_ERROR)
2301 {
2302 ready = true;
2303 }
2304 else
2305 {
2306 int syserr = 0;
2307 int err = srt_getlasterror(&syserr);
2308
2309 if (!m_blocking_mode)
2310 {
2311 // EAGAIN for SRT READING
2312 if (err == SRT_EASYNCRCV)
2313 {
2314 Epoll_again:
2315 Verb() << "AGAIN: - waiting for data by epoll(" << srt_epoll << ")...";
2316 // Poll on this descriptor until reading is available, indefinitely.
2317 int len = 2;
2318 SRT_EPOLL_EVENT sready[2];
2319 len = srt_epoll_uwait(srt_epoll, sready, len, -1);
2320 if (len != -1)
2321 {
2322 Verb() << "... epoll reported ready " << len << " sockets";
2323 // If the event was SRT_EPOLL_UPDATE, report it, and still wait.
2324
2325 bool any_read_ready = false;
2326 vector<int> errored;
2327 for (int i = 0; i < len; ++i)
2328 {
2329 if (sready[i].events & SRT_EPOLL_UPDATE)
2330 {
2331 Verb() << "... [BROKEN CONNECTION reported on @" << sready[i].fd << "]";
2332 }
2333
2334 if (sready[i].events & SRT_EPOLL_IN)
2335 any_read_ready = true;
2336
2337 if (sready[i].events & SRT_EPOLL_ERR)
2338 {
2339 errored.push_back(sready[i].fd);
2340 }
2341 }
2342
2343 if (!any_read_ready)
2344 {
2345 Verb() << " ... [NOT READ READY - AGAIN (" << errored.size() << " errored: " << Printable(errored) << ")]";
2346 goto Epoll_again;
2347 }
2348
2349 continue;
2350 }
2351 // If was -1, then passthru.
2352 }
2353 }
2354 else
2355 {
2356 // In blocking mode it uses a minimum of 1s timeout,
2357 // and continues only if interrupt not requested.
2358 if (!::transmit_int_state && (err == SRT_EASYNCRCV || err == SRT_ETIMEOUT))
2359 {
2360 ready = false;
2361 continue;
2362 }
2363 }
2364 Error("srt_recvmsg2");
2365 }
2366
2367 if (stat == 0)
2368 {
2369 throw ReadEOF(hostport_copy);
2370 }
2371 #if PLEASE_LOG
2372 extern srt_logging::Logger applog;
2373 LOGC(applog.Debug, log << "recv: #" << mctrl.msgno << " %" << mctrl.pktseq << " "
2374 << BufferStamp(data.data(), stat) << " BELATED: " << ((CTimer::getTime()-mctrl.srctime)/1000.0) << "ms");
2375 #endif
2376
2377 Verb() << "(#" << mctrl.msgno << " %" << mctrl.pktseq << " " << BufferStamp(data.data(), stat) << ") " << VerbNoEOL;
2378 }
2379 while (!ready);
2380
2381 chunk = size_t(stat);
2382 if (chunk < data.size())
2383 data.resize(chunk);
2384
2385 const bool need_bw_report = transmit_bw_report && int(counter % transmit_bw_report) == transmit_bw_report - 1;
2386 const bool need_stats_report = transmit_stats_report && counter % transmit_stats_report == transmit_stats_report - 1;
2387
2388 #if ENABLE_EXPERIMENTAL_BONDING
2389 if (have_group) // Means, group with caller mode
2390 {
2391 UpdateGroupStatus(mctrl.grpdata, mctrl.grpdata_size);
2392 if (transmit_stats_writer && (need_stats_report || need_bw_report))
2393 {
2394 PrintSrtStats(m_sock, need_stats_report, need_bw_report, need_stats_report);
2395 for (size_t i = 0; i < mctrl.grpdata_size; ++i)
2396 PrintSrtStats(mctrl.grpdata[i].id, need_stats_report, need_bw_report, need_stats_report);
2397 }
2398 }
2399 else
2400 #endif
2401 {
2402 if (transmit_stats_writer && (need_stats_report || need_bw_report))
2403 {
2404 PrintSrtStats(m_sock, need_stats_report, need_bw_report, need_stats_report);
2405 }
2406 }
2407 #endif
2408
2409 ++counter;
2410
2411 return MediaPacket(data, mctrl.srctime);
2412 }
2413
SrtTarget(std::string host,int port,std::string path,const std::map<std::string,std::string> & par)2414 SrtTarget::SrtTarget(std::string host, int port, std::string path, const std::map<std::string,std::string>& par)
2415 {
2416 Init(host, port, path, par, SRT_EPOLL_OUT);
2417 }
2418
2419
ConfigurePre(SRTSOCKET sock)2420 int SrtTarget::ConfigurePre(SRTSOCKET sock)
2421 {
2422 int result = SrtCommon::ConfigurePre(sock);
2423 if (result == -1)
2424 return result;
2425
2426 int yes = 1;
2427 // This is for the HSv4 compatibility; if both parties are HSv5
2428 // (min. version 1.2.1), then this setting simply does nothing.
2429 // In HSv4 this setting is obligatory; otherwise the SRT handshake
2430 // extension will not be done at all.
2431 result = srt_setsockopt(sock, 0, SRTO_SENDER, &yes, sizeof yes);
2432 if (result == -1)
2433 return result;
2434
2435 return 0;
2436 }
2437
Write(const MediaPacket & data)2438 void SrtTarget::Write(const MediaPacket& data)
2439 {
2440 static int counter = 1;
2441 ::transmit_throw_on_interrupt = true;
2442
2443 // Check first if it's ready to write.
2444 // If not, wait indefinitely.
2445 if (!m_blocking_mode)
2446 {
2447 Epoll_again:
2448 int len = 2;
2449 SRT_EPOLL_EVENT sready[2];
2450 len = srt_epoll_uwait(srt_epoll, sready, len, -1);
2451 if (len != -1)
2452 {
2453 bool any_write_ready = false;
2454 for (int i = 0; i < len; ++i)
2455 {
2456 if (sready[i].events & SRT_EPOLL_UPDATE)
2457 {
2458 Verb() << "... [BROKEN CONNECTION reported on @" << sready[i].fd << "]";
2459 }
2460
2461 if (sready[i].events & SRT_EPOLL_OUT)
2462 any_write_ready = true;
2463 }
2464
2465 if (!any_write_ready)
2466 {
2467 Verb() << " ... [NOT WRITE READY - AGAIN]";
2468 goto Epoll_again;
2469 }
2470
2471 // Pass on, write ready.
2472 }
2473 else
2474 {
2475 Error("srt_epoll_uwait");
2476 }
2477 }
2478
2479 SRT_MSGCTRL mctrl = srt_msgctrl_default;
2480 #if ENABLE_EXPERIMENTAL_BONDING
2481 bool have_group = !m_group_nodes.empty();
2482 if (have_group || m_listener_group)
2483 {
2484 mctrl.grpdata = m_group_data.data();
2485 mctrl.grpdata_size = m_group_data.size();
2486 }
2487 #endif
2488
2489 if (transmit_use_sourcetime)
2490 {
2491 mctrl.srctime = data.time;
2492 }
2493
2494 int stat = srt_sendmsg2(m_sock, data.payload.data(), data.payload.size(), &mctrl);
2495
2496 // For a socket group, the error is reported only
2497 // if ALL links from the group have failed to perform
2498 // the operation. If only one did, the result will be
2499 // visible in the status array.
2500 if (stat == SRT_ERROR)
2501 Error("srt_sendmsg");
2502 ::transmit_throw_on_interrupt = false;
2503
2504 const bool need_bw_report = transmit_bw_report && int(counter % transmit_bw_report) == transmit_bw_report - 1;
2505 const bool need_stats_report = transmit_stats_report && counter % transmit_stats_report == transmit_stats_report - 1;
2506
2507 #if ENABLE_EXPERIMENTAL_BONDING
2508 if (have_group)
2509 {
2510 // For listener group this is not necessary. The group information
2511 // is updated in mctrl.
2512 UpdateGroupStatus(mctrl.grpdata, mctrl.grpdata_size);
2513 if (transmit_stats_writer && (need_stats_report || need_bw_report))
2514 {
2515 PrintSrtStats(m_sock, need_stats_report, need_bw_report, need_stats_report);
2516 for (size_t i = 0; i < mctrl.grpdata_size; ++i)
2517 PrintSrtStats(mctrl.grpdata[i].id, need_stats_report, need_bw_report, need_stats_report);
2518 }
2519 }
2520 else
2521 #endif
2522 {
2523 if (transmit_stats_writer && (need_stats_report || need_bw_report))
2524 {
2525 PrintSrtStats(m_sock, need_stats_report, need_bw_report, need_stats_report);
2526 }
2527 }
2528
2529 Verb() << "(#" << mctrl.msgno << " %" << mctrl.pktseq << " " << BufferStamp(data.payload.data(), data.payload.size()) << ") " << VerbNoEOL;
2530
2531 ++counter;
2532 }
2533
SrtRelay(std::string host,int port,std::string path,const std::map<std::string,std::string> & par)2534 SrtRelay::SrtRelay(std::string host, int port, std::string path, const std::map<std::string,std::string>& par)
2535 {
2536 Init(host, port, path, par, SRT_EPOLL_IN | SRT_EPOLL_OUT);
2537 }
2538
SrtModel(string host,int port,map<string,string> par)2539 SrtModel::SrtModel(string host, int port, map<string,string> par)
2540 {
2541 InitParameters(host, "", par);
2542 if (m_mode == "caller")
2543 is_caller = true;
2544 else if (m_mode == "rendezvous")
2545 is_rend = true;
2546 else if (m_mode != "listener")
2547 throw std::invalid_argument("Wrong 'mode' attribute; expected: caller, listener, rendezvous");
2548
2549 m_host = host;
2550 m_port = port;
2551 }
2552
Establish(std::string & w_name)2553 void SrtModel::Establish(std::string& w_name)
2554 {
2555 // This does connect or accept.
2556 // When this returned true, the caller should create
2557 // a new SrtSource or SrtTaget then call StealFrom(*this) on it.
2558
2559 // If this is a connector and the peer doesn't have a corresponding
2560 // medium, it should send back a single byte with value 0. This means
2561 // that agent should stop connecting.
2562
2563 if (is_rend)
2564 {
2565 OpenRendezvous(m_adapter, m_host, m_port);
2566 }
2567 else if (is_caller)
2568 {
2569 // Establish a connection
2570
2571 PrepareClient();
2572
2573 if (w_name != "")
2574 {
2575 Verb() << "Connect with requesting stream [" << w_name << "]";
2576 srt::setstreamid(m_sock, w_name);
2577 }
2578 else
2579 {
2580 Verb() << "NO STREAM ID for SRT connection";
2581 }
2582
2583 if (m_outgoing_port)
2584 {
2585 Verb() << "Setting outgoing port: " << m_outgoing_port;
2586 SetupAdapter("", m_outgoing_port);
2587 }
2588
2589 ConnectClient(m_host, m_port);
2590
2591 if (m_outgoing_port == 0)
2592 {
2593 // Must rely on a randomly selected one. Extract the port
2594 // so that it will be reused next time.
2595 sockaddr_any s(AF_INET);
2596 int namelen = s.size();
2597 if (srt_getsockname(Socket(), (s.get()), (&namelen)) == SRT_ERROR)
2598 {
2599 Error("srt_getsockname");
2600 }
2601
2602 m_outgoing_port = s.hport();
2603 Verb() << "Extracted outgoing port: " << m_outgoing_port;
2604 }
2605 }
2606 else
2607 {
2608 // Listener - get a socket by accepting.
2609 // Check if the listener is already created first
2610 if (Listener() == SRT_INVALID_SOCK)
2611 {
2612 Verb() << "Setting up listener: port=" << m_port << " backlog=5";
2613 PrepareListener(m_adapter, m_port, 5);
2614 }
2615
2616 Verb() << "Accepting a client...";
2617 AcceptNewClient();
2618 // This rewrites m_sock with a new SRT socket ("accepted" socket)
2619 w_name = UDT::getstreamid(m_sock);
2620 Verb() << "... GOT CLIENT for stream [" << w_name << "]";
2621 }
2622 }
2623
2624
2625 template <class Iface> struct Srt;
2626 template <> struct Srt<Source> { typedef SrtSource type; };
2627 template <> struct Srt<Target> { typedef SrtTarget type; };
2628 template <> struct Srt<Relay> { typedef SrtRelay type; };
2629
2630 template <class Iface>
CreateSrt(const string & host,int port,std::string path,const map<string,string> & par)2631 Iface* CreateSrt(const string& host, int port, std::string path, const map<string,string>& par)
2632 { return new typename Srt<Iface>::type (host, port, path, par); }
2633
ConsoleRead(size_t chunk)2634 MediaPacket ConsoleRead(size_t chunk)
2635 {
2636 bytevector data(chunk);
2637 bool st = cin.read(data.data(), chunk).good();
2638 chunk = cin.gcount();
2639 if (chunk == 0 && !st)
2640 return bytevector();
2641
2642 int64_t stime = 0;
2643 if (transmit_use_sourcetime)
2644 stime = srt_time_now();
2645
2646 if (chunk < data.size())
2647 data.resize(chunk);
2648 if (data.empty())
2649 throw Source::ReadEOF("CONSOLE device");
2650
2651 return MediaPacket(data, stime);
2652 }
2653
2654 class ConsoleSource: public virtual Source
2655 {
2656 public:
2657
ConsoleSource()2658 ConsoleSource()
2659 {
2660 }
2661
Read(size_t chunk)2662 MediaPacket Read(size_t chunk) override
2663 {
2664 return ConsoleRead(chunk);
2665 }
2666
IsOpen()2667 bool IsOpen() override { return cin.good(); }
End()2668 bool End() override { return cin.eof(); }
2669 };
2670
2671 class ConsoleTarget: public virtual Target
2672 {
2673 public:
2674
ConsoleTarget()2675 ConsoleTarget()
2676 {
2677 }
2678
Write(const MediaPacket & data)2679 void Write(const MediaPacket& data) override
2680 {
2681 cout.write(data.payload.data(), data.payload.size());
2682 }
2683
IsOpen()2684 bool IsOpen() override { return cout.good(); }
Broken()2685 bool Broken() override { return cout.eof(); }
2686 };
2687
2688 class ConsoleRelay: public Relay, public ConsoleSource, public ConsoleTarget
2689 {
2690 public:
2691 ConsoleRelay() = default;
2692
IsOpen()2693 bool IsOpen() override { return cin.good() && cout.good(); }
2694 };
2695
2696 template <class Iface> struct Console;
2697 template <> struct Console<Source> { typedef ConsoleSource type; };
2698 template <> struct Console<Target> { typedef ConsoleTarget type; };
2699 template <> struct Console<Relay> { typedef ConsoleRelay type; };
2700
2701 template <class Iface>
CreateConsole()2702 Iface* CreateConsole() { return new typename Console<Iface>::type (); }
2703
2704
2705 // More options can be added in future.
2706 SocketOption udp_options [] {
2707 { "iptos", IPPROTO_IP, IP_TOS, SocketOption::PRE, SocketOption::INT, nullptr },
2708 // IP_TTL and IP_MULTICAST_TTL are handled separately by a common option, "ttl".
2709 { "mcloop", IPPROTO_IP, IP_MULTICAST_LOOP, SocketOption::PRE, SocketOption::INT, nullptr }
2710 };
2711
IsMulticast(in_addr adr)2712 static inline bool IsMulticast(in_addr adr)
2713 {
2714 unsigned char* abytes = (unsigned char*)&adr.s_addr;
2715 unsigned char c = abytes[0];
2716 return c >= 224 && c <= 239;
2717 }
2718
Setup(string host,int port,map<string,string> attr)2719 void UdpCommon::Setup(string host, int port, map<string,string> attr)
2720 {
2721 m_sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
2722 if (m_sock == -1)
2723 Error(SysError(), "UdpCommon::Setup: socket");
2724
2725 int yes = 1;
2726 ::setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&yes, sizeof yes);
2727
2728 sadr = CreateAddr(host, port);
2729
2730 bool is_multicast = false;
2731 if (sadr.family() == AF_INET)
2732 {
2733 if (attr.count("multicast"))
2734 {
2735 if (!IsMulticast(sadr.sin.sin_addr))
2736 {
2737 throw std::runtime_error("UdpCommon: requested multicast for a non-multicast-type IP address");
2738 }
2739 is_multicast = true;
2740 }
2741 else if (IsMulticast(sadr.sin.sin_addr))
2742 {
2743 is_multicast = true;
2744 }
2745
2746 if (is_multicast)
2747 {
2748 ip_mreq_source mreq_ssm;
2749 ip_mreq mreq;
2750 sockaddr_any maddr;
2751 int opt_name;
2752 void* mreq_arg_ptr;
2753 socklen_t mreq_arg_size;
2754
2755 adapter = attr.count("adapter") ? attr.at("adapter") : string();
2756 if (adapter == "")
2757 {
2758 Verb() << "Multicast: home address: INADDR_ANY:" << port;
2759 maddr.sin.sin_family = AF_INET;
2760 maddr.sin.sin_addr.s_addr = htonl(INADDR_ANY);
2761 maddr.sin.sin_port = htons(port); // necessary for temporary use
2762 }
2763 else
2764 {
2765 Verb() << "Multicast: home address: " << adapter << ":" << port;
2766 maddr = CreateAddr(adapter, port);
2767 }
2768
2769 if (attr.count("source"))
2770 {
2771 /* this is an ssm. we need to use the right struct and opt */
2772 opt_name = IP_ADD_SOURCE_MEMBERSHIP;
2773 mreq_ssm.imr_multiaddr.s_addr = sadr.sin.sin_addr.s_addr;
2774 mreq_ssm.imr_interface.s_addr = maddr.sin.sin_addr.s_addr;
2775 inet_pton(AF_INET, attr.at("source").c_str(), &mreq_ssm.imr_sourceaddr);
2776 mreq_arg_size = sizeof(mreq_ssm);
2777 mreq_arg_ptr = &mreq_ssm;
2778 }
2779 else
2780 {
2781 opt_name = IP_ADD_MEMBERSHIP;
2782 mreq.imr_multiaddr.s_addr = sadr.sin.sin_addr.s_addr;
2783 mreq.imr_interface.s_addr = maddr.sin.sin_addr.s_addr;
2784 mreq_arg_size = sizeof(mreq);
2785 mreq_arg_ptr = &mreq;
2786 }
2787
2788 #ifdef _WIN32
2789 const char* mreq_arg = (const char*)mreq_arg_ptr;
2790 const auto status_error = SOCKET_ERROR;
2791 #else
2792 const void* mreq_arg = mreq_arg_ptr;
2793 const auto status_error = -1;
2794 #endif
2795
2796 #if defined(_WIN32) || defined(__CYGWIN__)
2797 // On Windows it somehow doesn't work when bind()
2798 // is called with multicast address. Write the address
2799 // that designates the network device here.
2800 // Also, sets port sharing when working with multicast
2801 sadr = maddr;
2802 int reuse = 1;
2803 int shareAddrRes = setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<const char*>(&reuse), sizeof(reuse));
2804 if (shareAddrRes == status_error)
2805 {
2806 throw runtime_error("marking socket for shared use failed");
2807 }
2808 Verb() << "Multicast(Windows): will bind to home address";
2809 #else
2810 Verb() << "Multicast(POSIX): will bind to IGMP address: " << host;
2811 #endif
2812 int res = setsockopt(m_sock, IPPROTO_IP, opt_name, mreq_arg, mreq_arg_size);
2813
2814 if (res == status_error)
2815 {
2816 Error(errno, "adding to multicast membership failed");
2817 }
2818
2819 attr.erase("multicast");
2820 attr.erase("adapter");
2821 }
2822 }
2823
2824 // The "ttl" options is handled separately, it maps to both IP_TTL
2825 // and IP_MULTICAST_TTL so that TTL setting works for both uni- and multicast.
2826 if (attr.count("ttl"))
2827 {
2828 int ttl = stoi(attr.at("ttl"));
2829 int res = setsockopt(m_sock, IPPROTO_IP, IP_TTL, (const char*)&ttl, sizeof ttl);
2830 if (res == -1)
2831 Verb() << "WARNING: failed to set 'ttl' (IP_TTL) to " << ttl;
2832 res = setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_TTL, (const char*)&ttl, sizeof ttl);
2833 if (res == -1)
2834 Verb() << "WARNING: failed to set 'ttl' (IP_MULTICAST_TTL) to " << ttl;
2835
2836 attr.erase("ttl");
2837 }
2838
2839 m_options = attr;
2840
2841 for (auto o: udp_options)
2842 {
2843 // Ignore "binding" - for UDP there are no post options.
2844 if (m_options.count(o.name))
2845 {
2846 string value = m_options.at(o.name);
2847 bool ok = o.apply<SocketOption::SYSTEM>(m_sock, value);
2848 if (!ok)
2849 Verb() << "WARNING: failed to set '" << o.name << "' to " << value;
2850 }
2851 }
2852 }
2853
Error(int err,string src)2854 void UdpCommon::Error(int err, string src)
2855 {
2856 char buf[512];
2857 string message = SysStrError(err, buf, 512u);
2858
2859 if (Verbose::on)
2860 Verb() << "FAILURE\n" << src << ": [" << err << "] " << message;
2861 else
2862 cerr << "\nERROR #" << err << ": " << message << endl;
2863
2864 throw TransmissionError("error: " + src + ": " + message);
2865 }
2866
~UdpCommon()2867 UdpCommon::~UdpCommon()
2868 {
2869 #ifdef _WIN32
2870 if (m_sock != -1)
2871 {
2872 shutdown(m_sock, SD_BOTH);
2873 closesocket(m_sock);
2874 m_sock = -1;
2875 }
2876 #else
2877 close(m_sock);
2878 #endif
2879 }
2880
UdpSource(string host,int port,const map<string,string> & attr)2881 UdpSource::UdpSource(string host, int port, const map<string,string>& attr)
2882 {
2883 Setup(host, port, attr);
2884 int stat = ::bind(m_sock, sadr.get(), sadr.size());
2885 if (stat == -1)
2886 Error(SysError(), "Binding address for UDP");
2887 eof = false;
2888 struct timeval tv;
2889 tv.tv_sec = 1;
2890 tv.tv_usec = 0;
2891 if (::setsockopt(m_sock, SOL_SOCKET, SO_RCVTIMEO, (const char*) &tv, sizeof(tv)) < 0)
2892 Error(SysError(), "Setting timeout for UDP");
2893 }
2894
Read(size_t chunk)2895 MediaPacket UdpSource::Read(size_t chunk)
2896 {
2897 bytevector data(chunk);
2898 sockaddr_any sa(sadr.family());
2899 int64_t srctime = 0;
2900 AGAIN:
2901 int stat = recvfrom(m_sock, data.data(), (int) chunk, 0, sa.get(), &sa.syslen());
2902 int err = SysError();
2903 if (transmit_use_sourcetime)
2904 {
2905 srctime = srt_time_now();
2906 }
2907 if (stat == -1)
2908 {
2909 if (!::transmit_int_state && err == SysAGAIN)
2910 goto AGAIN;
2911
2912 Error(SysError(), "UDP Read/recvfrom");
2913 }
2914
2915 if (stat < 1)
2916 {
2917 eof = true;
2918 return bytevector();
2919 }
2920
2921 chunk = size_t(stat);
2922 if (chunk < data.size())
2923 data.resize(chunk);
2924
2925 return MediaPacket(data, srctime);
2926 }
2927
UdpTarget(string host,int port,const map<string,string> & attr)2928 UdpTarget::UdpTarget(string host, int port, const map<string,string>& attr)
2929 {
2930 Setup(host, port, attr);
2931 if (adapter != "")
2932 {
2933 auto maddr = CreateAddr(adapter, 0);
2934 in_addr addr = maddr.sin.sin_addr;
2935
2936 int res = setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_IF, reinterpret_cast<const char*>(&addr), sizeof(addr));
2937 if (res == -1)
2938 {
2939 Error(SysError(), "setsockopt/IP_MULTICAST_IF: " + adapter);
2940 }
2941 }
2942 }
2943
Write(const MediaPacket & data)2944 void UdpTarget::Write(const MediaPacket& data)
2945 {
2946 int stat = sendto(m_sock, data.payload.data(), data.payload.size(), 0, (sockaddr*)&sadr, sizeof sadr);
2947 if (stat == -1)
2948 Error(SysError(), "UDP Write/sendto");
2949 }
2950
2951
2952 template <class Iface> struct Udp;
2953 template <> struct Udp<Source> { typedef UdpSource type; };
2954 template <> struct Udp<Target> { typedef UdpTarget type; };
2955 template <> struct Udp<Relay> { typedef UdpRelay type; };
2956
2957 template <class Iface>
CreateUdp(const string & host,int port,const map<string,string> & par)2958 Iface* CreateUdp(const string& host, int port, const map<string,string>& par) { return new typename Udp<Iface>::type (host, port, par); }
2959
2960 template<class Base>
IsOutput()2961 inline bool IsOutput() { return false; }
2962
2963 template<>
IsOutput()2964 inline bool IsOutput<Target>() { return true; }
2965
2966 template <class Base>
CreateMedium(const string & uri)2967 extern unique_ptr<Base> CreateMedium(const string& uri)
2968 {
2969 unique_ptr<Base> ptr;
2970
2971 UriParser u(uri);
2972
2973 int iport = 0;
2974 switch ( u.type() )
2975 {
2976 default:
2977 break; // do nothing, return nullptr
2978 case UriParser::FILE:
2979 if (u.host() == "con" || u.host() == "console")
2980 {
2981 if ( IsOutput<Base>() && (
2982 (Verbose::on && Verbose::cverb == &cout)
2983 || transmit_bw_report || transmit_stats_report) )
2984 {
2985 cerr << "ERROR: file://con with -v or -r or -s would result in mixing the data and text info.\n";
2986 cerr << "ERROR: HINT: you can stream through a FIFO (named pipe)\n";
2987 throw invalid_argument("incorrect parameter combination");
2988 }
2989 ptr.reset( CreateConsole<Base>() );
2990 }
2991 else
2992 ptr.reset( CreateFile<Base>(u.path()));
2993 break;
2994
2995 case UriParser::SRT:
2996 ptr.reset( CreateSrt<Base>(u.host(), u.portno(), u.path(), u.parameters()) );
2997 break;
2998
2999
3000 case UriParser::UDP:
3001 iport = atoi(u.port().c_str());
3002 if (iport < 1024)
3003 {
3004 cerr << "Port value invalid: " << iport << " - must be >=1024\n";
3005 throw invalid_argument("Invalid port number");
3006 }
3007 ptr.reset( CreateUdp<Base>(u.host(), iport, u.parameters()) );
3008 break;
3009 }
3010
3011 if (ptr)
3012 ptr->uri = move(u);
3013 return ptr;
3014 }
3015
3016
Create(const std::string & url)3017 std::unique_ptr<Source> Source::Create(const std::string& url)
3018 {
3019 return CreateMedium<Source>(url);
3020 }
3021
Create(const std::string & url)3022 std::unique_ptr<Target> Target::Create(const std::string& url)
3023 {
3024 return CreateMedium<Target>(url);
3025 }
3026