1 /*
2 * SRT - Secure, Reliable, Transport
3 * Copyright (c) 2018 Haivision Systems Inc.
4 *
5 * This Source Code Form is subject to the terms of the Mozilla Public
6 * License, v. 2.0. If a copy of the MPL was not distributed with this
7 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 *
9 */
10
11 // Just for formality. This file should be used
12 #include <iostream>
13 #include <iomanip>
14 #include <fstream>
15 #include <sstream>
16 #include <memory>
17 #include <string>
18 #include <stdexcept>
19 #include <iterator>
20 #include <map>
21 #include <srt.h>
22 #if !defined(_WIN32)
23 #include <sys/ioctl.h>
24 #else
25 #include <fcntl.h>
26 #include <io.h>
27 #endif
28 #if defined(SUNOS)
29 #include <sys/filio.h>
30 #endif
31
32 #include "netinet_any.h"
33 #include "apputil.hpp"
34 #include "socketoptions.hpp"
35 #include "uriparser.hpp"
36 #include "transmitmedia.hpp"
37 #include "srt_compat.h"
38 #include "verbose.hpp"
39
40 using namespace std;
41
42 bool g_stats_are_printed_to_stdout = false;
43 bool transmit_total_stats = false;
44 unsigned long transmit_bw_report = 0;
45 unsigned long transmit_stats_report = 0;
46 unsigned long transmit_chunk_size = SRT_LIVE_MAX_PLSIZE;
47
48 class FileSource: public Source
49 {
50 ifstream ifile;
51 string filename_copy;
52 public:
53
FileSource(const string & path)54 FileSource(const string& path): ifile(path, ios::in | ios::binary), filename_copy(path)
55 {
56 if ( !ifile )
57 throw std::runtime_error(path + ": Can't open file for reading");
58 }
59
Read(size_t chunk,MediaPacket & pkt,ostream & ignored SRT_ATR_UNUSED=cout)60 int Read(size_t chunk, MediaPacket& pkt, ostream & ignored SRT_ATR_UNUSED = cout) override
61 {
62 if (pkt.payload.size() < chunk)
63 pkt.payload.resize(chunk);
64
65 pkt.time = 0;
66 ifile.read(pkt.payload.data(), chunk);
67 size_t nread = ifile.gcount();
68 if (nread < pkt.payload.size())
69 pkt.payload.resize(nread);
70
71 if (pkt.payload.empty())
72 {
73 return 0;
74 }
75
76 return (int) nread;
77 }
78
IsOpen()79 bool IsOpen() override { return bool(ifile); }
End()80 bool End() override { return ifile.eof(); }
81 };
82
83 class FileTarget: public Target
84 {
85 ofstream ofile;
86 public:
87
FileTarget(const string & path)88 FileTarget(const string& path): ofile(path, ios::out | ios::trunc | ios::binary) {}
89
Write(const char * data,size_t size,int64_t time SRT_ATR_UNUSED,ostream & ignored SRT_ATR_UNUSED=cout)90 int Write(const char* data, size_t size, int64_t time SRT_ATR_UNUSED, ostream & ignored SRT_ATR_UNUSED = cout) override
91 {
92 ofile.write(data, size);
93 return !(ofile.bad()) ? (int) size : 0;
94 }
95
IsOpen()96 bool IsOpen() override { return !!ofile; }
Broken()97 bool Broken() override { return !ofile.good(); }
98 //~FileTarget() { ofile.close(); }
Close()99 void Close() override { ofile.close(); }
100 };
101
102 template <class Iface> struct File;
103 template <> struct File<Source> { typedef FileSource type; };
104 template <> struct File<Target> { typedef FileTarget type; };
105
106 template <class Iface>
CreateFile(const string & name)107 Iface* CreateFile(const string& name) { return new typename File<Iface>::type (name); }
108
109 shared_ptr<SrtStatsWriter> transmit_stats_writer;
110
InitParameters(string host,map<string,string> par)111 void SrtCommon::InitParameters(string host, map<string,string> par)
112 {
113 // Application-specific options: mode, blocking, timeout, adapter
114 if (Verbose::on && !par.empty())
115 {
116 Verb() << "SRT parameters specified:\n";
117 for (map<string,string>::iterator i = par.begin(); i != par.end(); ++i)
118 {
119 cerr << "\t" << i->first << " = '" << i->second << "'\n";
120 }
121 }
122
123 string adapter;
124 if (par.count("adapter"))
125 {
126 adapter = par.at("adapter");
127 }
128
129 m_mode = "default";
130 if (par.count("mode"))
131 {
132 m_mode = par.at("mode");
133 }
134 SocketOption::Mode mode = SrtInterpretMode(m_mode, host, adapter);
135 if (mode == SocketOption::FAILURE)
136 {
137 Error("Invalid mode");
138 }
139
140 // Fix the mode name after successful interpretation
141 m_mode = SocketOption::mode_names[mode];
142
143 par.erase("mode");
144
145 if (par.count("timeout"))
146 {
147 m_timeout = stoi(par.at("timeout"), 0, 0);
148 par.erase("timeout");
149 }
150
151 if (par.count("adapter"))
152 {
153 m_adapter = par.at("adapter");
154 par.erase("adapter");
155 }
156 else if (m_mode == "listener")
157 {
158 // For listener mode, adapter is taken from host,
159 // if 'adapter' parameter is not given
160 m_adapter = host;
161 }
162
163 if (par.count("tsbpd") && false_names.count(par.at("tsbpd")))
164 {
165 m_tsbpdmode = false;
166 }
167
168 if (par.count("port"))
169 {
170 m_outgoing_port = stoi(par.at("port"), 0, 0);
171 par.erase("port");
172 }
173
174 // That's kinda clumsy, but it must rely on the defaults.
175 // Default mode is live, so check if the file mode was enforced
176 if ((par.count("transtype") == 0 || par["transtype"] != "file")
177 && transmit_chunk_size > SRT_LIVE_DEF_PLSIZE)
178 {
179 if (transmit_chunk_size > SRT_LIVE_MAX_PLSIZE)
180 throw std::runtime_error("Chunk size in live mode exceeds 1456 bytes; this is not supported");
181
182 par["payloadsize"] = Sprint(transmit_chunk_size);
183 }
184
185 // Assign the others here.
186 m_options = par;
187 }
188
PrepareListener(string host,int port,int backlog)189 void SrtCommon::PrepareListener(string host, int port, int backlog)
190 {
191 m_bindsock = srt_create_socket();
192 if ( m_bindsock == SRT_ERROR )
193 Error("srt_create_socket");
194
195 int stat = ConfigurePre(m_bindsock);
196 if ( stat == SRT_ERROR )
197 Error("ConfigurePre");
198
199 sockaddr_any sa = CreateAddr(host, port);
200 sockaddr* psa = sa.get();
201 Verb() << "Binding a server on " << host << ":" << port << " ...";
202
203 stat = srt_bind(m_bindsock, psa, sizeof sa);
204 if ( stat == SRT_ERROR )
205 {
206 srt_close(m_bindsock);
207 Error("srt_bind");
208 }
209
210 Verb() << " listen...";
211
212 stat = srt_listen(m_bindsock, backlog);
213 if ( stat == SRT_ERROR )
214 {
215 srt_close(m_bindsock);
216 Error("srt_listen");
217 }
218 }
219
StealFrom(SrtCommon & src)220 void SrtCommon::StealFrom(SrtCommon& src)
221 {
222 // This is used when SrtCommon class designates a listener
223 // object that is doing Accept in appropriate direction class.
224 // The new object should get the accepted socket.
225 m_output_direction = src.m_output_direction;
226 m_timeout = src.m_timeout;
227 m_tsbpdmode = src.m_tsbpdmode;
228 m_options = src.m_options;
229 m_bindsock = SRT_INVALID_SOCK; // no listener
230 m_sock = src.m_sock;
231 src.m_sock = SRT_INVALID_SOCK; // STEALING
232 }
233
AcceptNewClient()234 bool SrtCommon::AcceptNewClient()
235 {
236 sockaddr_any scl;
237 Verb() << " accept... ";
238
239 m_sock = srt_accept(m_bindsock, scl.get(), &scl.len);
240 if ( m_sock == SRT_INVALID_SOCK )
241 {
242 srt_close(m_bindsock);
243 m_bindsock = SRT_INVALID_SOCK;
244 Error("srt_accept");
245 }
246
247 // we do one client connection at a time,
248 // so close the listener.
249 srt_close(m_bindsock);
250 m_bindsock = SRT_INVALID_SOCK;
251
252 Verb() << " connected.";
253
254 // ConfigurePre is done on bindsock, so any possible Pre flags
255 // are DERIVED by sock. ConfigurePost is done exclusively on sock.
256 int stat = ConfigurePost(m_sock);
257 if ( stat == SRT_ERROR )
258 Error("ConfigurePost");
259
260 return true;
261 }
262
Init(string host,int port,map<string,string> par,bool dir_output)263 void SrtCommon::Init(string host, int port, map<string,string> par, bool dir_output)
264 {
265 m_output_direction = dir_output;
266 InitParameters(host, par);
267
268 Verb() << "Opening SRT " << (dir_output ? "target" : "source") << " " << m_mode
269 << " on " << host << ":" << port;
270
271 if ( m_mode == "caller" )
272 OpenClient(host, port);
273 else if ( m_mode == "listener" )
274 OpenServer(m_adapter, port);
275 else if ( m_mode == "rendezvous" )
276 OpenRendezvous(m_adapter, host, port);
277 else
278 {
279 throw std::invalid_argument("Invalid 'mode'. Use 'client' or 'server'");
280 }
281 }
282
ConfigurePost(SRTSOCKET sock)283 int SrtCommon::ConfigurePost(SRTSOCKET sock)
284 {
285 bool no = false;
286 int result = 0;
287 if ( m_output_direction )
288 {
289 result = srt_setsockopt(sock, 0, SRTO_SNDSYN, &no, sizeof no);
290 if ( result == -1 )
291 return result;
292
293 if ( m_timeout )
294 return srt_setsockopt(sock, 0, SRTO_SNDTIMEO, &m_timeout, sizeof m_timeout);
295 }
296 else
297 {
298 result = srt_setsockopt(sock, 0, SRTO_RCVSYN, &no, sizeof no);
299 if ( result == -1 )
300 return result;
301
302 if ( m_timeout )
303 return srt_setsockopt(sock, 0, SRTO_RCVTIMEO, &m_timeout, sizeof m_timeout);
304 }
305
306 SrtConfigurePost(sock, m_options);
307
308 for (const auto &o: srt_options)
309 {
310 if ( o.binding == SocketOption::POST && m_options.count(o.name) )
311 {
312 string value = m_options.at(o.name);
313 bool ok = o.apply<SocketOption::SRT>(sock, value);
314 if ( !ok )
315 Verb() << "WARNING: failed to set '" << o.name << "' (post, "
316 << (m_output_direction? "target":"source") << ") to "
317 << value;
318 else
319 Verb() << "NOTE: SRT/post::" << o.name << "=" << value;
320 }
321 }
322
323 return 0;
324 }
325
ConfigurePre(SRTSOCKET sock)326 int SrtCommon::ConfigurePre(SRTSOCKET sock)
327 {
328 int result = 0;
329
330 bool no = false;
331 if ( !m_tsbpdmode )
332 {
333 result = srt_setsockopt(sock, 0, SRTO_TSBPDMODE, &no, sizeof no);
334 if ( result == -1 )
335 return result;
336 }
337
338 result = srt_setsockopt(sock, 0, SRTO_RCVSYN, &no, sizeof no);
339 if ( result == -1 )
340 return result;
341
342
343 // host is only checked for emptiness and depending on that the connection mode is selected.
344 // Here we are not exactly interested with that information.
345 vector<string> failures;
346
347 // NOTE: here host = "", so the 'connmode' will be returned as LISTENER always,
348 // but it doesn't matter here. We don't use 'connmode' for anything else than
349 // checking for failures.
350 SocketOption::Mode conmode = SrtConfigurePre(sock, "", m_options, &failures);
351
352 if ( conmode == SocketOption::FAILURE )
353 {
354 if ( Verbose::on )
355 {
356 cerr << "WARNING: failed to set options: ";
357 copy(failures.begin(), failures.end(), ostream_iterator<string>(cerr, ", "));
358 cerr << endl;
359 }
360
361 return SRT_ERROR;
362 }
363
364 return 0;
365 }
366
SetupAdapter(const string & host,int port)367 void SrtCommon::SetupAdapter(const string& host, int port)
368 {
369 sockaddr_any localsa = CreateAddr(host, port);
370 sockaddr* psa = localsa.get();
371 int stat = srt_bind(m_sock, psa, sizeof localsa);
372 if ( stat == SRT_ERROR )
373 Error("srt_bind");
374 }
375
OpenClient(string host,int port)376 void SrtCommon::OpenClient(string host, int port)
377 {
378 PrepareClient();
379
380 if ( m_outgoing_port )
381 {
382 SetupAdapter("", m_outgoing_port);
383 }
384
385 ConnectClient(host, port);
386 }
387
PrepareClient()388 void SrtCommon::PrepareClient()
389 {
390 m_sock = srt_create_socket();
391 if ( m_sock == SRT_ERROR )
392 Error("srt_create_socket");
393
394 int stat = ConfigurePre(m_sock);
395 if ( stat == SRT_ERROR )
396 Error("ConfigurePre");
397 }
398
399
ConnectClient(string host,int port)400 void SrtCommon::ConnectClient(string host, int port)
401 {
402
403 sockaddr_any sa = CreateAddr(host, port);
404 sockaddr* psa = sa.get();
405
406 Verb() << "Connecting to " << host << ":" << port;
407
408 int stat = srt_connect(m_sock, psa, sizeof sa);
409 if ( stat == SRT_ERROR )
410 {
411 srt_close(m_sock);
412 Error("srt_connect");
413 }
414
415 stat = ConfigurePost(m_sock);
416 if ( stat == SRT_ERROR )
417 Error("ConfigurePost");
418 }
419
Error(string src)420 void SrtCommon::Error(string src)
421 {
422 int errnov = 0;
423 int result = srt_getlasterror(&errnov);
424 string message = srt_getlasterror_str();
425 Verb() << "\nERROR #" << result << "." << errnov << ": " << message;
426
427 throw TransmissionError("error: " + src + ": " + message);
428 }
429
OpenRendezvous(string adapter,string host,int port)430 void SrtCommon::OpenRendezvous(string adapter, string host, int port)
431 {
432 m_sock = srt_create_socket();
433 if ( m_sock == SRT_ERROR )
434 Error("srt_create_socket");
435
436 bool yes = true;
437 srt_setsockopt(m_sock, 0, SRTO_RENDEZVOUS, &yes, sizeof yes);
438
439 int stat = ConfigurePre(m_sock);
440 if ( stat == SRT_ERROR )
441 Error("ConfigurePre");
442
443 sockaddr_any sa = CreateAddr(host, port);
444 if (sa.family() == AF_UNSPEC)
445 {
446 Error("OpenRendezvous: invalid target host specification: " + host);
447 }
448
449 const int outport = m_outgoing_port ? m_outgoing_port : port;
450
451 sockaddr_any localsa = CreateAddr(adapter, outport, sa.family());
452
453 Verb() << "Binding a server on " << adapter << ":" << outport;
454
455 stat = srt_bind(m_sock, localsa.get(), sizeof localsa);
456 if ( stat == SRT_ERROR )
457 {
458 srt_close(m_sock);
459 Error("srt_bind");
460 }
461
462 Verb() << "Connecting to " << host << ":" << port;
463
464 stat = srt_connect(m_sock, sa.get(), sizeof sa);
465 if ( stat == SRT_ERROR )
466 {
467 srt_close(m_sock);
468 Error("srt_connect");
469 }
470
471 stat = ConfigurePost(m_sock);
472 if ( stat == SRT_ERROR )
473 Error("ConfigurePost");
474 }
475
Close()476 void SrtCommon::Close()
477 {
478 Verb() << "SrtCommon: DESTROYING CONNECTION, closing sockets (rt%" << m_sock << " ls%" << m_bindsock << ")...";
479
480 if ( m_sock != SRT_INVALID_SOCK )
481 {
482 srt_close(m_sock);
483 m_sock = SRT_INVALID_SOCK;
484 }
485
486 if ( m_bindsock != SRT_INVALID_SOCK )
487 {
488 srt_close(m_bindsock);
489 m_bindsock = SRT_INVALID_SOCK ;
490 }
491
492 Verb() << "SrtCommon: ... done.";
493 }
494
~SrtCommon()495 SrtCommon::~SrtCommon()
496 {
497 Close();
498 }
499
SrtSource(string host,int port,const map<string,string> & par)500 SrtSource::SrtSource(string host, int port, const map<string,string>& par)
501 {
502 Init(host, port, par, false);
503
504 ostringstream os;
505 os << host << ":" << port;
506 hostport_copy = os.str();
507 }
508
Read(size_t chunk,MediaPacket & pkt,ostream & out_stats)509 int SrtSource::Read(size_t chunk, MediaPacket& pkt, ostream &out_stats)
510 {
511 static unsigned long counter = 1;
512
513 if (pkt.payload.size() < chunk)
514 pkt.payload.resize(chunk);
515
516 SRT_MSGCTRL ctrl;
517 const int stat = srt_recvmsg2(m_sock, pkt.payload.data(), (int) chunk, &ctrl);
518 if (stat <= 0)
519 {
520 pkt.payload.clear();
521 return stat;
522 }
523
524 pkt.time = ctrl.srctime;
525
526 chunk = size_t(stat);
527 if (chunk < pkt.payload.size())
528 pkt.payload.resize(chunk);
529
530 const bool need_bw_report = transmit_bw_report && (counter % transmit_bw_report) == transmit_bw_report - 1;
531 const bool need_stats_report = transmit_stats_report && (counter % transmit_stats_report) == transmit_stats_report - 1;
532
533 if (need_bw_report || need_stats_report)
534 {
535 CBytePerfMon perf;
536 srt_bstats(m_sock, &perf, need_stats_report && !transmit_total_stats);
537 if (transmit_stats_writer != nullptr)
538 {
539 if (need_bw_report)
540 cerr << transmit_stats_writer->WriteBandwidth(perf.mbpsBandwidth) << std::flush;
541 if (need_stats_report)
542 out_stats << transmit_stats_writer->WriteStats(m_sock, perf) << std::flush;
543 }
544 }
545 ++counter;
546 return stat;
547 }
548
ConfigurePre(SRTSOCKET sock)549 int SrtTarget::ConfigurePre(SRTSOCKET sock)
550 {
551 int result = SrtCommon::ConfigurePre(sock);
552 if ( result == -1 )
553 return result;
554
555 int yes = 1;
556 // This is for the HSv4 compatibility; if both parties are HSv5
557 // (min. version 1.2.1), then this setting simply does nothing.
558 // In HSv4 this setting is obligatory; otherwise the SRT handshake
559 // extension will not be done at all.
560 result = srt_setsockopt(sock, 0, SRTO_SENDER, &yes, sizeof yes);
561 if ( result == -1 )
562 return result;
563
564 return 0;
565 }
566
Write(const char * data,size_t size,int64_t src_time,ostream & out_stats)567 int SrtTarget::Write(const char* data, size_t size, int64_t src_time, ostream &out_stats)
568 {
569 static unsigned long counter = 1;
570
571 SRT_MSGCTRL ctrl = srt_msgctrl_default;
572 ctrl.srctime = src_time;
573 int stat = srt_sendmsg2(m_sock, data, (int) size, &ctrl);
574 if (stat == SRT_ERROR)
575 {
576 return stat;
577 }
578
579 const bool need_bw_report = transmit_bw_report && (counter % transmit_bw_report) == transmit_bw_report - 1;
580 const bool need_stats_report = transmit_stats_report && (counter % transmit_stats_report) == transmit_stats_report - 1;
581
582 if (need_bw_report || need_stats_report)
583 {
584 CBytePerfMon perf;
585 srt_bstats(m_sock, &perf, need_stats_report && !transmit_total_stats);
586 if (transmit_stats_writer != nullptr)
587 {
588 if (need_bw_report)
589 cerr << transmit_stats_writer->WriteBandwidth(perf.mbpsBandwidth) << std::flush;
590 if (need_stats_report)
591 out_stats << transmit_stats_writer->WriteStats(m_sock, perf) << std::flush;
592 }
593 }
594 ++counter;
595 return stat;
596 }
597
598
SrtModel(string host,int port,map<string,string> par)599 SrtModel::SrtModel(string host, int port, map<string,string> par)
600 {
601 InitParameters(host, par);
602 if (m_mode == "caller")
603 is_caller = true;
604 else if (m_mode != "listener")
605 throw std::invalid_argument("Only caller and listener modes supported");
606
607 m_host = host;
608 m_port = port;
609 }
610
Establish(std::string & w_name)611 void SrtModel::Establish(std::string& w_name)
612 {
613 // This does connect or accept.
614 // When this returned true, the caller should create
615 // a new SrtSource or SrtTaget then call StealFrom(*this) on it.
616
617 // If this is a connector and the peer doesn't have a corresponding
618 // medium, it should send back a single byte with value 0. This means
619 // that agent should stop connecting.
620
621 if (is_caller)
622 {
623 // Establish a connection
624
625 PrepareClient();
626
627 if (w_name != "")
628 {
629 Verb() << "Connect with requesting stream [" << w_name << "]";
630 srt::setstreamid(m_sock, w_name);
631 }
632 else
633 {
634 Verb() << "NO STREAM ID for SRT connection";
635 }
636
637 if (m_outgoing_port)
638 {
639 Verb() << "Setting outgoing port: " << m_outgoing_port;
640 SetupAdapter("", m_outgoing_port);
641 }
642
643 ConnectClient(m_host, m_port);
644
645 if (m_outgoing_port == 0)
646 {
647 // Must rely on a randomly selected one. Extract the port
648 // so that it will be reused next time.
649 sockaddr_any s(AF_INET);
650 int namelen = s.size();
651 if ( srt_getsockname(Socket(), s.get(), &namelen) == SRT_ERROR )
652 {
653 Error("srt_getsockname");
654 }
655
656 m_outgoing_port = s.hport();
657 Verb() << "Extracted outgoing port: " << m_outgoing_port;
658 }
659 }
660 else
661 {
662 // Listener - get a socket by accepting.
663 // Check if the listener is already created first
664 if (Listener() == SRT_INVALID_SOCK)
665 {
666 Verb() << "Setting up listener: port=" << m_port << " backlog=5";
667 PrepareListener(m_adapter, m_port, 5);
668 }
669
670 Verb() << "Accepting a client...";
671 AcceptNewClient();
672 // This rewrites m_sock with a new SRT socket ("accepted" socket)
673 w_name = srt::getstreamid(m_sock);
674 Verb() << "... GOT CLIENT for stream [" << w_name << "]";
675 }
676 }
677
678
679 template <class Iface> struct Srt;
680 template <> struct Srt<Source> { typedef SrtSource type; };
681 template <> struct Srt<Target> { typedef SrtTarget type; };
682
683 template <class Iface>
CreateSrt(const string & host,int port,const map<string,string> & par)684 Iface* CreateSrt(const string& host, int port, const map<string,string>& par) { return new typename Srt<Iface>::type (host, port, par); }
685
686 class ConsoleSource: public Source
687 {
688 public:
689
ConsoleSource()690 ConsoleSource()
691 {
692 #ifdef _WIN32
693 // The default stdin mode on windows is text.
694 // We have to set it to the binary mode
695 _setmode(_fileno(stdin), _O_BINARY);
696 #endif
697 }
698
Read(size_t chunk,MediaPacket & pkt,ostream & ignored SRT_ATR_UNUSED=cout)699 int Read(size_t chunk, MediaPacket& pkt, ostream & ignored SRT_ATR_UNUSED = cout) override
700 {
701 if (pkt.payload.size() < chunk)
702 pkt.payload.resize(chunk);
703
704 bool st = cin.read(pkt.payload.data(), chunk).good();
705 chunk = cin.gcount();
706 if (chunk == 0 || !st)
707 {
708 pkt.payload.clear();
709 return 0;
710 }
711
712 // Save this time to potentially use it for SRT target.
713 pkt.time = srt_time_now();
714 if (chunk < pkt.payload.size())
715 pkt.payload.resize(chunk);
716
717 return (int) chunk;
718 }
719
IsOpen()720 bool IsOpen() override { return cin.good(); }
End()721 bool End() override { return cin.eof(); }
GetSysSocket() const722 int GetSysSocket() const override { return 0; };
723 };
724
725 class ConsoleTarget: public Target
726 {
727 public:
728
ConsoleTarget()729 ConsoleTarget()
730 {
731 #ifdef _WIN32
732 // The default stdout mode on windows is text.
733 // We have to set it to the binary mode
734 _setmode(_fileno(stdout), _O_BINARY);
735 #endif
736 }
737
~ConsoleTarget()738 virtual ~ConsoleTarget()
739 {
740 cout.flush();
741 }
742
Write(const char * data,size_t len,int64_t src_time SRT_ATR_UNUSED,ostream & ignored SRT_ATR_UNUSED=cout)743 int Write(const char* data, size_t len, int64_t src_time SRT_ATR_UNUSED, ostream & ignored SRT_ATR_UNUSED = cout) override
744 {
745 cout.write(data, len);
746 return (int) len;
747 }
748
IsOpen()749 bool IsOpen() override { return cout.good(); }
Broken()750 bool Broken() override { return cout.eof(); }
GetSysSocket() const751 int GetSysSocket() const override { return 0; };
752 };
753
754 template <class Iface> struct Console;
755 template <> struct Console<Source> { typedef ConsoleSource type; };
756 template <> struct Console<Target> { typedef ConsoleTarget type; };
757
758 template <class Iface>
CreateConsole()759 Iface* CreateConsole() { return new typename Console<Iface>::type (); }
760
761
762 // More options can be added in future.
763 SocketOption udp_options [] {
764 { "iptos", IPPROTO_IP, IP_TOS, SocketOption::PRE, SocketOption::INT, nullptr },
765 // IP_TTL and IP_MULTICAST_TTL are handled separately by a common option, "ttl".
766 { "mcloop", IPPROTO_IP, IP_MULTICAST_LOOP, SocketOption::PRE, SocketOption::INT, nullptr },
767 { "sndbuf", SOL_SOCKET, SO_SNDBUF, SocketOption::PRE, SocketOption::INT, nullptr},
768 { "rcvbuf", SOL_SOCKET, SO_RCVBUF, SocketOption::PRE, SocketOption::INT, nullptr}
769 };
770
IsMulticast(in_addr adr)771 static inline bool IsMulticast(in_addr adr)
772 {
773 unsigned char* abytes = (unsigned char*)&adr.s_addr;
774 unsigned char c = abytes[0];
775 return c >= 224 && c <= 239;
776 }
777
778
779 class UdpCommon
780 {
781 protected:
782 int m_sock = -1;
783 sockaddr_any sadr;
784 string adapter;
785 map<string, string> m_options;
786
Setup(string host,int port,map<string,string> attr)787 void Setup(string host, int port, map<string,string> attr)
788 {
789 m_sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
790 if (m_sock == -1)
791 Error(SysError(), "UdpCommon::Setup: socket");
792
793 int yes = 1;
794 ::setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&yes, sizeof yes);
795
796 // set non-blocking mode
797 #if defined(_WIN32)
798 unsigned long ulyes = 1;
799 if (ioctlsocket(m_sock, FIONBIO, &ulyes) == SOCKET_ERROR)
800 #else
801 if (ioctl(m_sock, FIONBIO, (const char *)&yes) < 0)
802 #endif
803 {
804 Error(SysError(), "UdpCommon::Setup: ioctl FIONBIO");
805 }
806
807 sadr = CreateAddr(host, port);
808
809 bool is_multicast = false;
810
811 if (attr.count("multicast"))
812 {
813 // XXX: Here provide support for IPv6 multicast #1479
814 if (sadr.family() != AF_INET)
815 {
816 throw std::runtime_error("UdpCommon: Multicast on IPv6 is not yet supported");
817 }
818
819 if (!IsMulticast(sadr.sin.sin_addr))
820 {
821 throw std::runtime_error("UdpCommon: requested multicast for a non-multicast-type IP address");
822 }
823 is_multicast = true;
824 }
825 else if (sadr.family() == AF_INET && IsMulticast(sadr.sin.sin_addr))
826 {
827 is_multicast = true;
828 }
829
830 if (is_multicast)
831 {
832 #ifdef IP_ADD_SOURCE_MEMBERSHIP
833 ip_mreq_source mreq_ssm;
834 #endif
835 ip_mreq mreq;
836 sockaddr_any maddr (AF_INET);
837 int opt_name;
838 void* mreq_arg_ptr;
839 socklen_t mreq_arg_size;
840
841 adapter = attr.count("adapter") ? attr.at("adapter") : string();
842 if ( adapter == "" )
843 {
844 Verb() << "Multicast: home address: INADDR_ANY:" << port;
845 maddr.sin.sin_family = AF_INET;
846 maddr.sin.sin_addr.s_addr = htonl(INADDR_ANY);
847 maddr.sin.sin_port = htons(port); // necessary for temporary use
848 }
849 else
850 {
851 Verb() << "Multicast: home address: " << adapter << ":" << port;
852 maddr = CreateAddr(adapter, port);
853 }
854
855 if (attr.count("source"))
856 {
857 #ifdef IP_ADD_SOURCE_MEMBERSHIP
858 /* this is an ssm. we need to use the right struct and opt */
859 opt_name = IP_ADD_SOURCE_MEMBERSHIP;
860 mreq_ssm.imr_multiaddr.s_addr = sadr.sin.sin_addr.s_addr;
861 mreq_ssm.imr_interface.s_addr = maddr.sin.sin_addr.s_addr;
862 inet_pton(AF_INET, attr.at("source").c_str(), &mreq_ssm.imr_sourceaddr);
863 mreq_arg_size = sizeof(mreq_ssm);
864 mreq_arg_ptr = &mreq_ssm;
865 #else
866 throw std::runtime_error("UdpCommon: source-filter multicast not supported by OS");
867 #endif
868 }
869 else
870 {
871 opt_name = IP_ADD_MEMBERSHIP;
872 mreq.imr_multiaddr.s_addr = sadr.sin.sin_addr.s_addr;
873 mreq.imr_interface.s_addr = maddr.sin.sin_addr.s_addr;
874 mreq_arg_size = sizeof(mreq);
875 mreq_arg_ptr = &mreq;
876 }
877
878 #ifdef _WIN32
879 const char* mreq_arg = (const char*)mreq_arg_ptr;
880 const auto status_error = SOCKET_ERROR;
881 #else
882 const void* mreq_arg = mreq_arg_ptr;
883 const auto status_error = -1;
884 #endif
885
886 #if defined(_WIN32) || defined(__CYGWIN__)
887 // On Windows it somehow doesn't work when bind()
888 // is called with multicast address. Write the address
889 // that designates the network device here.
890 // Also, sets port sharing when working with multicast
891 sadr = maddr;
892 int reuse = 1;
893 int shareAddrRes = setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<const char*>(&reuse), sizeof(reuse));
894 if (shareAddrRes == status_error)
895 {
896 throw runtime_error("marking socket for shared use failed");
897 }
898 Verb() << "Multicast(Windows): will bind to home address";
899 #else
900 Verb() << "Multicast(POSIX): will bind to IGMP address: " << host;
901 #endif
902 int res = setsockopt(m_sock, IPPROTO_IP, opt_name, mreq_arg, mreq_arg_size);
903
904 if ( res == status_error )
905 {
906 Error(errno, "adding to multicast membership failed");
907 }
908
909 attr.erase("multicast");
910 attr.erase("adapter");
911 }
912
913 // The "ttl" options is handled separately, it maps to both IP_TTL
914 // and IP_MULTICAST_TTL so that TTL setting works for both uni- and multicast.
915 if (attr.count("ttl"))
916 {
917 int ttl = stoi(attr.at("ttl"));
918 int res = setsockopt(m_sock, IPPROTO_IP, IP_TTL, (const char*)&ttl, sizeof ttl);
919 if (res == -1)
920 Verb() << "WARNING: failed to set 'ttl' (IP_TTL) to " << ttl;
921 res = setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_TTL, (const char*)&ttl, sizeof ttl);
922 if (res == -1)
923 Verb() << "WARNING: failed to set 'ttl' (IP_MULTICAST_TTL) to " << ttl;
924
925 attr.erase("ttl");
926 }
927
928 m_options = attr;
929
930 for (auto o: udp_options)
931 {
932 // Ignore "binding" - for UDP there are no post options.
933 if ( m_options.count(o.name) )
934 {
935 string value = m_options.at(o.name);
936 bool ok = o.apply<SocketOption::SYSTEM>(m_sock, value);
937 if ( !ok )
938 Verb() << "WARNING: failed to set '" << o.name << "' to " << value;
939 }
940 }
941 }
942
Error(int err,string src)943 void Error(int err, string src)
944 {
945 char buf[512];
946 string message = SysStrError(err, buf, 512u);
947
948 cerr << "\nERROR #" << err << ": " << message << endl;
949
950 throw TransmissionError("error: " + src + ": " + message);
951 }
952
~UdpCommon()953 ~UdpCommon()
954 {
955 #ifdef _WIN32
956 if (m_sock != -1)
957 {
958 shutdown(m_sock, SD_BOTH);
959 closesocket(m_sock);
960 m_sock = -1;
961 }
962 #else
963 close(m_sock);
964 #endif
965 }
966 };
967
968
969 class UdpSource: public Source, public UdpCommon
970 {
971 bool eof = true;
972 public:
973
UdpSource(string host,int port,const map<string,string> & attr)974 UdpSource(string host, int port, const map<string,string>& attr)
975 {
976 Setup(host, port, attr);
977 int stat = ::bind(m_sock, sadr.get(), sadr.size());
978 if ( stat == -1 )
979 Error(SysError(), "Binding address for UDP");
980 eof = false;
981 }
982
Read(size_t chunk,MediaPacket & pkt,ostream & ignored SRT_ATR_UNUSED=cout)983 int Read(size_t chunk, MediaPacket& pkt, ostream & ignored SRT_ATR_UNUSED = cout) override
984 {
985 if (pkt.payload.size() < chunk)
986 pkt.payload.resize(chunk);
987
988 sockaddr_any sa(sadr.family());
989 socklen_t si = sa.size();
990 int stat = recvfrom(m_sock, pkt.payload.data(), (int) chunk, 0, sa.get(), &si);
991 if (stat < 1)
992 {
993 if (SysError() != EWOULDBLOCK)
994 eof = true;
995 pkt.payload.clear();
996 return stat;
997 }
998 sa.len = si;
999
1000 // Save this time to potentially use it for SRT target.
1001 pkt.time = srt_time_now();
1002 chunk = size_t(stat);
1003 if (chunk < pkt.payload.size())
1004 pkt.payload.resize(chunk);
1005
1006 return stat;
1007 }
1008
IsOpen()1009 bool IsOpen() override { return m_sock != -1; }
End()1010 bool End() override { return eof; }
1011
GetSysSocket() const1012 int GetSysSocket() const override { return m_sock; };
1013 };
1014
1015 class UdpTarget: public Target, public UdpCommon
1016 {
1017 public:
UdpTarget(string host,int port,const map<string,string> & attr)1018 UdpTarget(string host, int port, const map<string,string>& attr )
1019 {
1020 if (host.empty())
1021 cerr << "\nWARN Host for UDP target is not provided. Will send to localhost:" << port << ".\n";
1022
1023 Setup(host, port, attr);
1024 if (adapter != "")
1025 {
1026 sockaddr_any maddr = CreateAddr(adapter, 0);
1027 if (maddr.family() != AF_INET)
1028 {
1029 Error(0, "UDP/target: IPv6 multicast not supported in the application");
1030 }
1031
1032 in_addr addr = maddr.sin.sin_addr;
1033
1034 int res = setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_IF, reinterpret_cast<const char*>(&addr), sizeof(addr));
1035 if (res == -1)
1036 {
1037 Error(SysError(), "setsockopt/IP_MULTICAST_IF: " + adapter);
1038 }
1039 }
1040
1041 }
1042
Write(const char * data,size_t len,int64_t src_time SRT_ATR_UNUSED,ostream & ignored SRT_ATR_UNUSED=cout)1043 int Write(const char* data, size_t len, int64_t src_time SRT_ATR_UNUSED, ostream & ignored SRT_ATR_UNUSED = cout) override
1044 {
1045 int stat = sendto(m_sock, data, (int) len, 0, sadr.get(), sadr.size());
1046 if ( stat == -1 )
1047 {
1048 if ((false))
1049 Error(SysError(), "UDP Write/sendto");
1050 return stat;
1051 }
1052 return stat;
1053 }
1054
IsOpen()1055 bool IsOpen() override { return m_sock != -1; }
Broken()1056 bool Broken() override { return false; }
1057
GetSysSocket() const1058 int GetSysSocket() const override { return m_sock; };
1059 };
1060
1061 template <class Iface> struct Udp;
1062 template <> struct Udp<Source> { typedef UdpSource type; };
1063 template <> struct Udp<Target> { typedef UdpTarget type; };
1064
1065 template <class Iface>
CreateUdp(const string & host,int port,const map<string,string> & par)1066 Iface* CreateUdp(const string& host, int port, const map<string,string>& par) { return new typename Udp<Iface>::type (host, port, par); }
1067
1068 template<class Base>
IsOutput()1069 inline bool IsOutput() { return false; }
1070
1071 template<>
IsOutput()1072 inline bool IsOutput<Target>() { return true; }
1073
1074 template <class Base>
CreateMedium(const string & uri)1075 extern unique_ptr<Base> CreateMedium(const string& uri)
1076 {
1077 unique_ptr<Base> ptr;
1078
1079 UriParser u(uri);
1080
1081 int iport = 0;
1082 switch ( u.type() )
1083 {
1084 default:
1085 break; // do nothing, return nullptr
1086 case UriParser::FILE:
1087 if (u.host() == "con" || u.host() == "console")
1088 {
1089 if (IsOutput<Base>() && (
1090 (Verbose::on && Verbose::cverb == &cout)
1091 || g_stats_are_printed_to_stdout))
1092 {
1093 cerr << "ERROR: file://con with -v or -r or -s would result in mixing the data and text info.\n";
1094 cerr << "ERROR: HINT: you can stream through a FIFO (named pipe)\n";
1095 throw invalid_argument("incorrect parameter combination");
1096 }
1097 ptr.reset(CreateConsole<Base>());
1098 }
1099 // Disable regular file support for the moment
1100 #if 0
1101 else
1102 ptr.reset( CreateFile<Base>(u.path()));
1103 #endif
1104 break;
1105
1106 case UriParser::SRT:
1107 iport = atoi(u.port().c_str());
1108 if ( iport < 1024 )
1109 {
1110 cerr << "Port value invalid: " << iport << " - must be >=1024\n";
1111 throw invalid_argument("Invalid port number");
1112 }
1113 ptr.reset( CreateSrt<Base>(u.host(), iport, u.parameters()) );
1114 break;
1115
1116
1117 case UriParser::UDP:
1118 iport = atoi(u.port().c_str());
1119 if ( iport < 1024 )
1120 {
1121 cerr << "Port value invalid: " << iport << " - must be >=1024\n";
1122 throw invalid_argument("Invalid port number");
1123 }
1124 ptr.reset( CreateUdp<Base>(u.host(), iport, u.parameters()) );
1125 break;
1126
1127 }
1128
1129 if (ptr.get())
1130 ptr->uri = move(u);
1131
1132 return ptr;
1133 }
1134
1135
Create(const std::string & url)1136 std::unique_ptr<Source> Source::Create(const std::string& url)
1137 {
1138 return CreateMedium<Source>(url);
1139 }
1140
Create(const std::string & url)1141 std::unique_ptr<Target> Target::Create(const std::string& url)
1142 {
1143 return CreateMedium<Target>(url);
1144 }
1145