1 // cygnal.cpp: GNU streaming Flash media server, for Gnash.
2 //
3 // Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012
4 // Free Software Foundation, Inc.
5 //
6 // This program is free software; you can redistribute it and/or modify
7 // it under the terms of the GNU General Public License as published by
8 // the Free Software Foundation; either version 3 of the License, or
9 // (at your option) any later version.
10 //
11 // This program is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 // GNU General Public License for more details.
15 //
16 // You should have received a copy of the GNU General Public License
17 // along with this program; if not, write to the Free Software
18 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 //
20
21 #ifdef HAVE_CONFIG_H
22 #include "gnashconfig.h"
23 #endif
24
25 #include <sys/stat.h>
26 #include <list>
27 #include <map>
28 #include <iostream>
29 #include <sstream>
30 #include <csignal>
31 #include <vector>
32 #include <sys/mman.h>
33 #include <cerrno>
34 #include <sys/types.h>
35 #include <sys/stat.h>
36 #include <fcntl.h>
37 #include <functional>
38 #include <mutex>
39 #include <condition_variable>
40
41 #include "GnashSleep.h"
42 #include "revno.h"
43
44 //#include "cvm.h"
45
46 extern "C"{
47 # include "GnashSystemIOHeaders.h"
48 #ifdef HAVE_GETOPT_H
49 # include <getopt.h>
50 #endif
51 #ifndef __GNUC__
52 extern int optind, getopt(int, char *const *, const char *);
53 extern char *optarg;
54 #endif
55 }
56
57
58 // classes internal to Gnash
59 #include "network.h"
60 #include "log.h"
61 #include "crc.h"
62 #include "proc.h"
63 #include "rtmp.h"
64 #include "buffer.h"
65 #include "utility.h"
66 #include "limits.h"
67 #include "netstats.h"
68 #include "statistics.h"
69 //#include "stream.h"
70 #include "gmemory.h"
71 #include "diskstream.h"
72 #include "arg_parser.h"
73 #include "GnashException.h"
74 #include "GnashSleep.h" // for usleep comptibility.
75 #include "URL.h"
76 #include "rtmp_client.h"
77
78 // classes internal to Cygnal
79 #include "rtmp_server.h"
80 #include "http_server.h"
81
82 #include "handler.h"
83 #include "cache.h"
84 #include "cygnal.h"
85
86 #ifdef ENABLE_NLS
87 # include <locale>
88 #endif
89
90 #include <boost/date_time/gregorian/gregorian.hpp>
91 //#include <boost/date_time/local_time/local_time.hpp>
92 #include <boost/date_time/time_zone_base.hpp>
93 #include <boost/date_time/posix_time/posix_time.hpp>
94
95 #ifndef POLLRDHUP
96 #define POLLRDHUP 0
97 #endif
98
99 //using gnash::log_network;
100 using namespace std;
101 using namespace gnash;
102 using namespace cygnal;
103
104 static void usage();
105 static void version_and_copyright();
106 static void cntrlc_handler(int sig);
107 static void hup_handler(int sig);
108
109 void connection_handler(Network::thread_params_t *args);
110 void event_handler(Network::thread_params_t *args);
111 void admin_handler(Network::thread_params_t *args);
112
113 // Toggles very verbose debugging info from the network Network class
114 static bool netdebug = false;
115
116 struct sigaction act1, act2;
117
118 // The next few global variables have to be global because Boost
119 // threads don't take arguments. Since these are set in main() before
120 // any of the threads are started, and it's value should never change,
121 // it's safe to use these without a mutex, as all threads share the
122 // same read-only value.
123
124 // This is the default path to look in for files to be streamed.
125 static string docroot;
126
127 // This is the number of times a thread loop continues, for debugging only
128 int thread_retries = 10;
129
130 // This is added to the default ports for testing so it doesn't
131 // conflict with apache on the same machine.
132 static int port_offset = 0;
133
134 // Toggle the admin thread
135 static bool admin = false;
136
137 // Admin commands are small
138 const int ADMINPKTSIZE = 80;
139
140 // If set to a non zero value, this limits Cygnal to only one protocol
141 // at a time. This is for debugging only.
142 static int only_port = 0;
143
144 // These keep track of the number of active threads.
145 ThreadCounter tids;
146
147 map<int, Network *> networks;
148
149 // This is the global object for Cygnl
150 // The debug log used by all the gnash libraries.
151 static Cygnal& cyg = Cygnal::getDefaultInstance();
152
153 // The debug log used by all the gnash libraries.
154 static LogFile& dbglogfile = LogFile::getDefaultInstance();
155
156 // The user config for Cygnal is loaded and parsed here:
157 static CRcInitFile& crcfile = CRcInitFile::getDefaultInstance();
158
159 // Cache support for responses and files.
160 static Cache& cache = Cache::getDefaultInstance();
161
162 // The list of active cgis being executed.
163 //static std::map<std::string, Proc> procs; // = proc::getDefaultInstance();
164
165 // This mutex is used to signify when all the threads are done.
166 static std::condition_variable alldone;
167 static std::mutex alldone_mutex;
168
169 static std::condition_variable noclients;
170 static std::mutex noclients_mutex;
171
172 const char *proto_str[] = {
173 "NONE",
174 "HTTP",
175 "HTTPS",
176 "RTMP",
177 "RTMPT",
178 "RTMPTS",
179 "RTMPE",
180 "RTMPS",
181 "DTN"
182 };
183
184 static void
usage()185 usage()
186 {
187 cout << _("cygnal -- a streaming media server.") << endl
188 << endl
189 << _("Usage: cygnal [options...]") << endl
190 << _(" -h, --help Print this help and exit") << endl
191 << _(" -V, --version Print version information and exit") << endl
192 << _(" -v, --verbose Output verbose debug info") << endl
193 << _(" -s, --singlethread Disable Multi Threading") << endl
194 << _(" -n, --netdebug Turn on net debugging messages") << endl
195 << _(" -o --only-port Only use port for debugging") << endl
196 << _(" -p --port-offset Port offset for debugging") << endl
197 << _(" -t, --testing Turn on special Gnash testing support") << endl
198 << _(" -a, --admin Enable the administration thread") << endl
199 << _(" -r, --root Document root for all files") << endl
200 << _(" -m, --machine Hostname for this machine") << endl
201 << endl;
202 }
203
204
205 Cygnal&
getDefaultInstance()206 Cygnal::getDefaultInstance()
207 {
208 // GNASH_REPORT_FUNCTION;
209 static Cygnal o;
210 return o;
211 }
212
213
~Cygnal()214 Cygnal::~Cygnal()
215 {
216 // GNASH_REPORT_FUNCTION;
217 }
218
219 bool
loadPeersFile()220 Cygnal::loadPeersFile()
221 {
222 // GNASH_REPORT_FUNCTION;
223
224 loadPeersFile("./peers.conf");
225
226 loadPeersFile("/etc/peers.conf");
227
228 // Check the users home directory
229 #ifndef __amigaos4__
230 char *home = std::getenv("HOME");
231 #else
232 //on AmigaOS we have a GNASH: assign that point to program dir
233 char *home = "/gnash";
234 #endif
235
236 string homefile = home;
237 homefile += "/peers.conf";
238
239 return loadPeersFile(homefile);
240 }
241
242 bool
loadPeersFile(const std::string & filespec)243 Cygnal::loadPeersFile(const std::string &filespec)
244 {
245 // GNASH_REPORT_FUNCTION;
246
247 struct stat stats;
248 std::ifstream in;
249 std::string line;
250 string host;
251 string portstr;
252 string cgi;
253 vector<string> supported;
254
255 // Make sufre the file exists
256 if (stat(filespec.c_str(), &stats) != 0) {
257 return false;
258 }
259
260 in.open(filespec.c_str());
261
262 if (!in) {
263 log_error(_(": couldn't open file: "), filespec);
264 return false;
265 }
266
267 // Read in each line and parse it
268 size_t lineno = 0;
269 while (std::getline(in, line)) {
270
271 ++lineno;
272
273 // Ignore comment and empty lines
274 if (line.empty() || line[0] == '#') {
275 continue;
276 }
277
278 std::istringstream ss(line);
279
280 // Get the first token
281 if (! (ss >> host)) {
282 // Empty line
283 continue;
284 }
285
286 // 'action' should never be empty, or (ss >> action)
287 // above would have failed
288
289 if (host[0] == '#') {
290 continue; // discard comments
291 }
292
293 // Get second token
294 if (!(ss >> portstr)) {
295 // Do we need to warn here as well?
296 continue;
297 }
298
299 while (ss >> cgi) {
300 supported.push_back(cgi);
301 continue;
302 }
303
304 // Create a new peer item
305 std::shared_ptr<peer_t> peer(new Cygnal::peer_t);
306 peer->hostname = host;
307 peer->port = strtol(portstr.c_str(), NULL, 0) & 0xffff;
308
309 _peers.push_back(peer);
310 }
311
312 return true;
313 }
314
315 void
probePeers()316 Cygnal::probePeers()
317 {
318 // GNASH_REPORT_FUNCTION;
319
320 probePeers(_peers);
321 }
322
323 void
probePeers(peer_t & peer)324 Cygnal::probePeers(peer_t &peer)
325 {
326 // GNASH_REPORT_FUNCTION;
327 RTMPClient net;
328 stringstream uri;
329
330 uri << peer.hostname;
331
332 vector<string>::iterator it;
333 for (it = peer.supported.begin(); it <= peer.supported.end(); ++it) {
334 string tmp = uri.str();
335 // tmp += (*it);
336 // log_network("Constructed: %s/%s", uri.str(), *it);
337
338 gnash::URL url(uri.str());
339 if (!(peer.fd = net.connectToServer(uri.str()))) {
340 log_network(_("Couldn't connect to %s"), uri.str());
341 peer.connected = false;
342 } else {
343 peer.connected = true;
344 // peer.fd = net.getFileFd();
345 }
346 }
347 }
348
349 void
probePeers(std::vector<std::shared_ptr<peer_t>> & peers)350 Cygnal::probePeers(std::vector<std::shared_ptr<peer_t> > &peers)
351 {
352 // GNASH_REPORT_FUNCTION;
353
354 // createClient();
355 std::vector<std::shared_ptr<Cygnal::peer_t> >::iterator it;
356 for (it = peers.begin(); it != peers.end(); ++it) {
357 std::shared_ptr<Cygnal::peer_t> peer = *it;
358 probePeers(*peer);
359 if (peer->connected) {
360 log_network(_("%s is active on fd #%d."), peer->hostname,
361 peer->fd);
362 _active_peers.push_back(*it);
363 }
364 }
365 }
366
367 void
removeHandler(const std::string & path)368 Cygnal::removeHandler(const std::string &path)
369 {
370 // GNASH_REPORT_FUNCTION;
371 map<std::string, std::shared_ptr<Handler> >::iterator it;
372 it = _handlers.find(path);
373 if (it != _handlers.end()) {
374 std::lock_guard<std::mutex> lock(_mutex);
375 _handlers.erase(it);
376 }
377 }
378
379 std::shared_ptr<Handler>
findHandler(const std::string & path)380 Cygnal::findHandler(const std::string &path)
381 {
382 // GNASH_REPORT_FUNCTION;
383 map<std::string, std::shared_ptr<Handler> >::iterator it;
384 std::shared_ptr<Handler> hand;
385 it = _handlers.find(path);
386 if (it != _handlers.end()) {
387 hand = (*it).second;
388 }
389
390 return hand;
391 }
392
393 void
dump()394 Cygnal::dump()
395 {
396 std::vector<std::shared_ptr<Cygnal::peer_t> >::iterator it;
397 for (it = _peers.begin(); it != _peers.end(); ++it) {
398 cerr << "Remote Peer: " << (*it)->hostname
399 << ":" << (*it)->port << endl;
400 }
401 }
402
403 int
main(int argc,char * argv[])404 main(int argc, char *argv[])
405 {
406 // Initialize national language support
407 #ifdef ENABLE_NLS
408 setlocale (LC_ALL, "");
409 bindtextdomain (PACKAGE, LOCALEDIR);
410 textdomain (PACKAGE);
411 #endif
412
413 // This becomes the default hostname, which becomes
414 // 127.0.0.1 or ::1 for the localhost. The --machine
415 // otion can change this.
416 std::string hostname = "localhost.localdomain";
417
418 const Arg_parser::Option opts[] =
419 {
420 { 'h', "help", Arg_parser::no },
421 { 'V', "version", Arg_parser::no },
422 { 'p', "port-offset", Arg_parser::yes },
423 { 'v', "verbose", Arg_parser::no },
424 { 'd', "dump", Arg_parser::no },
425 { 'n', "netdebug", Arg_parser::no },
426 { 't', "testing", Arg_parser::no },
427 { 'a', "admin", Arg_parser::no },
428 { 'r', "root", Arg_parser::yes },
429 { 'o', "only-port", Arg_parser::yes },
430 { 's', "singlethreaded", Arg_parser::no },
431 { 'm', "machine", Arg_parser::yes }
432 };
433
434 Arg_parser parser(argc, argv, opts);
435 if( ! parser.error().empty() ) {
436 cout << parser.error() << endl;
437 exit(EXIT_FAILURE);
438 }
439
440 // crcfile.loadFiles();
441
442 // Set the log file name before trying to write to
443 // it, or we might get two.
444 dbglogfile.setLogFilename(crcfile.getDebugLog());
445
446 if (crcfile.verbosityLevel() > 0) {
447 dbglogfile.setVerbosity(crcfile.verbosityLevel());
448 }
449
450 if (crcfile.getDocumentRoot().size() > 0) {
451 docroot = crcfile.getDocumentRoot();
452 } else {
453 docroot = "/var/www/html/software/tests/";
454 crcfile.setDocumentRoot(docroot);
455 }
456 if (crcfile.getPortOffset()) {
457 port_offset = crcfile.getPortOffset();
458 }
459
460 // Handle command line arguments
461 for( int i = 0; i < parser.arguments(); ++i ) {
462 const int code = parser.code(i);
463 switch( code ) {
464 case 'h':
465 version_and_copyright();
466 usage();
467 exit(EXIT_SUCCESS);
468 case 'V':
469 version_and_copyright();
470 exit(EXIT_SUCCESS);
471 case 't':
472 crcfile.setTestingFlag(true);
473 break;
474 case 'a':
475 admin = true;
476 break;
477 case 'v':
478 dbglogfile.setVerbosity();
479 LOG_ONCE(log_network(_("Verbose output turned on")))
480 break;
481 case 'p':
482 port_offset = parser.argument<int>(i);
483 crcfile.setPortOffset(port_offset);
484 break;
485 case 'r':
486 docroot = parser.argument(i);
487 break;
488 case 's':
489 crcfile.setThreadingFlag(false);
490 break;
491 case 'n':
492 netdebug = true;
493 dbglogfile.setNetwork(true);
494 break;
495 case 'o':
496 only_port = parser.argument<int>(i);
497 break;
498 case 'd':
499 crcfile.dump();
500 exit(EXIT_SUCCESS);
501 break;
502 case 'm':
503 hostname = parser.argument(i);
504 break;
505 default:
506 log_error(_("Extraneous argument: %s"), parser.argument(i).c_str());
507 }
508 }
509
510 log_network(_("Document Root for media files is: %s"), docroot);
511 crcfile.setDocumentRoot(docroot);
512
513 // load the file of peers. A peer is another instance of Cygnal we
514 // can use for distributed processing.
515 cyg.loadPeersFile();
516 cyg.probePeers();
517
518 // cyg.dump();
519
520 // Trap ^C (SIGINT) so we can kill all the threads
521 act1.sa_handler = cntrlc_handler;
522 sigaction (SIGINT, &act1, NULL);
523 act2.sa_handler = hup_handler;
524 sigaction (SIGHUP, &act2, NULL);
525 // sigaction (SIGPIPE, &act, NULL);
526
527 // Lock a mutex the main() waits in before exiting. This is
528 // because all the actually processing is done by other threads.
529 std::unique_lock<std::mutex> lk(alldone_mutex);
530
531 // Start the Admin handler. This allows one to connect to Cygnal
532 // at port 1111 and dump statistics to the terminal for tuning
533 // purposes.
534 if (admin) {
535 Network::thread_params_t admin_data;
536 admin_data.port = gnash::ADMIN_PORT;
537 std::thread admin_thread(std::bind(&admin_handler, &admin_data));
538 }
539
540 // Cvm cvm;
541 // cvm.loadMovie("/tmp/out.swf");
542
543 // If a only-port is specified, we only want to run single
544 // threaded. As all the rest of the code checks the config value
545 // setting, this overrides that in the memory, but doesn't change
546 // the file itself. This feature is really only for debugging,
547 // where it's easier to work with one protocol at a time.
548 if (only_port) {
549 crcfile.setThreadingFlag(false);
550 }
551
552 // Incomming connection handler for port 80, HTTP and
553 // RTMPT. As port 80 requires root access, cygnal supports a
554 // "port offset" for debugging and development of the
555 // server. Since this port offset changes the constant to test
556 // for which protocol, we pass the info to the start thread so
557 // it knows which handler to invoke.
558 Network::thread_params_t *http_data = new Network::thread_params_t;
559 if ((only_port == 0) || (only_port == gnash::HTTP_PORT)) {
560 http_data->tid = 0;
561 http_data->netfd = 0;
562 http_data->filespec = docroot;
563 http_data->protocol = Network::HTTP;
564 http_data->port = port_offset + gnash::HTTP_PORT;
565 http_data->hostname = hostname;
566 if (crcfile.getThreadingFlag()) {
567 std::thread http_thread(std::bind(&connection_handler, http_data));
568 } else {
569 connection_handler(http_data);
570 }
571 }
572
573 // Incomming connection handler for port 1935, RTMPT and
574 // RTMPTE. This supports the same port offset as the HTTP handler,
575 // just to keep things consistent.
576 Network::thread_params_t *rtmp_data = new Network::thread_params_t;
577 if ((only_port == 0) || (only_port == gnash::RTMP_PORT)) {
578 rtmp_data->tid = 0;
579 rtmp_data->netfd = 0;
580 rtmp_data->filespec = docroot;
581 rtmp_data->protocol = Network::RTMP;
582 rtmp_data->port = port_offset + gnash::RTMP_PORT;
583 rtmp_data->hostname = hostname;
584 if (crcfile.getThreadingFlag()) {
585 std::thread rtmp_thread(std::bind(&connection_handler, rtmp_data));
586 } else {
587 connection_handler(rtmp_data);
588 }
589 }
590
591 // Wait for all the threads to die.
592 alldone.wait(lk);
593
594 log_network(_("Cygnal done..."));
595
596 // Delete the data we allowcated to pass to each connection_handler.
597 delete rtmp_data;
598 delete http_data;
599
600 return(0);
601 }
602
603 // Trap Control-C (SIGINT) so we can cleanly exit
604 static void
cntrlc_handler(int sig)605 cntrlc_handler (int sig)
606 {
607 log_network(_("Got a %d interrupt"), sig);
608 // sigaction (SIGINT, &act, NULL);
609 exit(EXIT_FAILURE);
610 }
611
612 // Trap SIGHUP so we can
613 static void
hup_handler(int)614 hup_handler (int /* sig */)
615 {
616 if (crcfile.getTestingFlag()) {
617 cerr << "Testing, Testing, Testing..." << endl;
618 }
619
620 }
621
622 static void
version_and_copyright()623 version_and_copyright()
624 {
625 cout << "Cygnal: " << BRANCH_NICK << "_" << BRANCH_REVNO << endl
626 << endl
627 << _("Copyright (C) 2007, 2008, 2009, 2010 Free Software Foundation, Inc.\n"
628 "Cygnal comes with NO WARRANTY, to the extent permitted by law.\n"
629 "You may redistribute copies of Cygnal under the terms of the GNU General\n"
630 "Public License V3 or later. For more information, see the file named COPYING.\n")
631 << endl;
632 }
633
634 // FIXME: this function could be tweaked for better performance
635 void
admin_handler(Network::thread_params_t * args)636 admin_handler(Network::thread_params_t *args)
637 {
638 GNASH_REPORT_FUNCTION;
639 int retries = 100;
640 int ret;
641
642 map<int, Handler *>::iterator hit;
643 stringstream response;
644
645 Network net;
646 Handler::admin_cmd_e cmd = Handler::POLL;
647 net.createServer(args->hostname, args->port);
648 while (retries > 0) {
649 log_network(_("Starting Admin Handler for port %d"), args->port);
650
651 if (net.newConnection(true) <= 0) {
652 return;
653 }
654
655 log_network(_("Got an incoming Admin request"));
656 sleep(1);
657 do {
658 Network::byte_t data[ADMINPKTSIZE+1];
659 memset(data, 0, ADMINPKTSIZE+1);
660 const char *ptr = reinterpret_cast<const char *>(data);
661 ret = net.readNet(data, ADMINPKTSIZE, 100);
662 if (ret < 0) {
663 log_network(_("no more admin data, exiting...\n"));
664 if ((ret == 0) && cmd != Handler::POLL) {
665 break;
666 }
667 } else {
668 // force the case to make comparisons easier. Only compare enough characters to
669 // till each command is unique.
670 std::transform(ptr, ptr + ret, data, (int(*)(int)) toupper);
671 if (strncmp(ptr, "QUIT", 4) == 0) {
672 cmd = Handler::QUIT;
673 } else if (strncmp(ptr, "STATUS", 5) == 0) {
674 cmd = Handler::STATUS;
675 } else if (strncmp(ptr, "HELP", 2) == 0) {
676 cmd = Handler::HELP;
677 net.writeNet("commands: help, status, poll, interval, statistics, quit.\n");
678 } else if (strncmp(ptr, "POLL", 2) == 0) {
679 cmd = Handler::POLL;
680 } else if (strncmp(ptr, "INTERVAL", 2) == 0) {
681 cmd = Handler::INTERVAL;
682 }
683 }
684 switch (cmd) {
685 // close this connection
686 case Handler::QUIT:
687 ret = -1;
688 break;
689 case Handler::STATUS:
690 {
691 #ifdef USE_STATS_CACHE
692 // cache.dump();
693 string results = cache.stats(false);
694 if (results.size()) {
695 net.writeNet(results);
696 results.clear();
697 }
698 #endif
699 #if 0
700 response << handlers.size() << " handlers are currently active.";
701 for (hit = handlers.begin(); hit != handlers.end(); hit++) {
702 int fd = hit->first;
703 Handler *hand = hit->second;
704 response << fd << ","
705 << hand->insize()
706 << "," << hand->outsize()
707 << "\r\n";
708 net.writeNet(response);
709 }
710 #endif
711 }
712 break;
713 case Handler::POLL:
714 #ifdef USE_STATS_QUEUE
715 response << handlers.size() << " handlers are currently active." << "\r\n";
716 for (hit = handlers.begin(); hit != handlers.end(); ++hit) {
717 int fd = hit->first;
718 Handler *hand = hit->second;
719 struct timespec now;
720 clock_gettime (CLOCK_REALTIME, &now);
721 // Incoming que stats
722 CQue::que_stats_t *stats = hand->statsin();
723 float diff = static_cast<float>(((now.tv_sec -
724 stats->start.tv_sec) + ((now.tv_nsec -
725 stats->start.tv_nsec)/1e9)));
726 response << fd
727 << "," << stats->totalbytes
728 << "," << diff
729 << "," << stats->totalin
730 << "," << stats->totalout;
731 // Outgoing que stats
732 stats = hand->statsout();
733 response << "," <<stats->totalbytes
734 << "," << stats->totalin
735 << "," << stats->totalout
736 << "\r\n";
737 net.writeNet(response.str());
738 }
739 #endif
740 break;
741 case Handler::INTERVAL:
742 net.writeNet("set interval\n");
743 break;
744 default:
745 break;
746 };
747 } while (ret > 0);
748 log_network(_("admin_handler: Done...!\n"));
749 net.closeNet(); // this shuts down this socket connection
750 }
751 net.closeConnection(); // this shuts down the server on this connection
752
753 // All threads should exit now.
754 alldone.notify_all();
755 }
756
757 // A connection handler is started for each port the server needs to
758 // wait on for incoming connections. When it gets an incoming
759 // connection, it reads the first packet to get the resource name, and
760 // then starts the event handler thread if it's a newly requested
761 // resource, otherwise it loads a copy of the cached resource.
762 void
connection_handler(Network::thread_params_t * args)763 connection_handler(Network::thread_params_t *args)
764 {
765 // GNASH_REPORT_FUNCTION;
766 int fd = 0;
767 Network net;
768 bool done = false;
769 static int tid = 0;
770
771 if (netdebug) {
772 net.toggleDebug(true);
773 }
774 // Start a server on this tcp/ip port.
775 fd = net.createServer(args->hostname, args->port);
776 if (fd <= 0) {
777 log_error(_("Can't start %s Connection Handler for fd #%d, port %hd"),
778 proto_str[args->protocol], fd, args->port);
779 return;
780 } else {
781 log_network(_("Starting %s Connection Handler for fd #%d, port %hd"),
782 proto_str[args->protocol], fd, args->port);
783 }
784
785 // Get the number of cpus in this system. For multicore
786 // systems we'll get better load balancing if we keep all the
787 // cpus busy. So a pool of threads is started for each cpu,
788 // the default being just one. Each thread is reponsible for
789 // handling part of the total active file descriptors.
790 #ifdef HAVE_SYSCONF
791 long ncpus = sysconf(_SC_NPROCESSORS_ONLN);
792 LOG_ONCE(log_network(_("This system has %d cpus."), ncpus));
793 #endif
794 size_t nfds = crcfile.getFDThread();
795
796 // log_network("This system is configured for %d file descriptors to be watched by each thread.", nfds);
797
798 // Get the next thread ID to hand off handling this file
799 // descriptor to. If the limit for threads per cpu hasn't been
800 // set or is set to 0, assume one thread per processor by
801 // default. There won't even be threads for each cpu if
802 // threading has been disabled in the cygnal config file.
803 int spawn_limit = 0;
804 if (nfds == 0) {
805 spawn_limit = ncpus;
806 } else {
807 spawn_limit = ncpus * nfds;
808 }
809
810 // FIXME: this may run forever, we probably want a cleaner way to
811 // test for the end of time.
812 do {
813 net.setPort(args->port);
814 if (netdebug) {
815 net.toggleDebug(true);
816 }
817
818 // Rotate in a range of 0 to the limit.
819 tid = (tid + 1) % (spawn_limit + 1);
820 // log_network("%s handler: thread ID #%d, fd #%d", proto_str[args->protocol], tid, fd);
821
822 // Wait for a connection to this tcp/ip from a client. If set
823 // to true, this will block until a request comes in. If set
824 // to single threaded mode, this will only allow one client to
825 // connect at a time. This is to make it easier to debug
826 // things when you have a heavily threaded application.
827 args->netfd = net.newConnection(true, fd);
828 if (args->netfd <= 0) {
829 log_network(_("No new %s network connections"),
830 proto_str[args->protocol]);
831 return;
832 } else {
833 log_network(_("*** New %s network connection for thread ID #%d, fd #%d ***"),
834 proto_str[args->protocol], tid, args->netfd);
835 }
836
837 //
838 // Setup HTTP handler
839 //
840 if (args->protocol == Network::HTTP) {
841 Network::thread_params_t *hargs = new Network::thread_params_t;
842 // std::copy(args, args+sizeof(Network::thread_params_t), &hargs);
843 hargs->protocol = args->protocol;
844 hargs->netfd = args->netfd;
845 #if 0
846 std::shared_ptr<Handler> hand = cyg.findHandler(path);
847 HTTPServer *http = new HTTPServer;
848 hargs.entry = http;
849 http->setDocRoot(crcfile.getDocumentRoot());
850 std::shared_ptr<cygnal::Buffer> buf(http->peekChunk());
851 http->processHeaderFields(*buf);
852 string hostname, path;
853 string::size_type pos = http->getField("host").find(":", 0);
854 if (pos != string::npos) {
855 hostname += http->getField("host").substr(0, pos);
856 } else {
857 hostname += "localhost.localdomain";
858 }
859 path = http->getFilespec();
860 string key = hostname + path;
861 #endif
862 string key;
863 Handler *hand = 0;
864 if (!hand) {
865 hand = new Handler;
866 hand->addClient(args->netfd, Network::HTTP);
867 int retries = 10;
868 cygnal::Buffer *buf = 0;
869 do {
870 buf = hand->parseFirstRequest(args->netfd, Network::HTTP);
871 if (!buf) {
872 retries--;
873 continue;
874 } else {
875 break;
876 }
877 } while (retries);
878 string &key = hand->getKey(args->netfd);
879 log_network(_("Creating new %s Handler for %s using fd #%d"),
880 proto_str[hargs->protocol], key, hargs->netfd);
881 hargs->handler = hand;
882 hargs->buffer = buf;
883 hargs->filespec = key;
884 // cyg.addHandler(key, hand);
885
886 // If in multi-threaded mode (the default), start a thread
887 // with a connection_handler for each port we're interested
888 // in. Each port of could have a different protocol.
889 std::bind(event_handler, hargs);
890 if (crcfile.getThreadingFlag() == true) {
891 std::thread event_thread(std::bind(&event_handler, hargs));
892 } else {
893 event_handler(hargs);
894 // We're done, close this network connection
895 }
896 } else {
897 log_network(_("Reusing %s Handler for %s using fd #%d"),
898 proto_str[hargs->protocol], key, hargs->netfd);
899 hand->addClient(args->netfd, Network::HTTP);
900 }
901 // delete http;
902 } // end of if HTTP
903
904 //
905 // Setup RTMP handler
906 //
907 if (args->protocol == Network::RTMP) {
908 Network::thread_params_t *rargs = new Network::thread_params_t;
909 rargs->protocol = args->protocol;
910 rargs->netfd = args->netfd;
911 RTMPServer *rtmp = new RTMPServer;
912 std::shared_ptr<cygnal::Element> tcurl =
913 rtmp->processClientHandShake(args->netfd);
914 if (!tcurl) {
915 // log_error("Couldn't read the tcUrl variable!");
916 rtmp->closeNet(args->netfd);
917 return;
918 }
919 URL url(tcurl->to_string());
920 string key = url.hostname() + url.path();
921 std::shared_ptr<Handler> hand = cyg.findHandler(url.path());
922 if (!hand) {
923 log_network(_("Creating new %s Handler for: %s for fd %#d"),
924 proto_str[args->protocol], key, args->netfd);
925 hand.reset(new Handler);
926 cyg.addHandler(key, hand);
927 rargs->entry = rtmp;
928 hand->setNetConnection(rtmp->getNetConnection());
929 std::vector<std::shared_ptr<Cygnal::peer_t> >::iterator it;
930 std::vector<std::shared_ptr<Cygnal::peer_t> > active = cyg.getActive();
931 for (it = active.begin(); it < active.end(); ++it) {
932 Cygnal::peer_t *peer = (*it).get();
933 hand->addRemote(peer->fd);
934 }
935 hand->addClient(args->netfd, Network::RTMP);
936 rargs->handler = reinterpret_cast<void *>(hand.get());
937 args->filespec = key;
938 args->entry = rtmp;
939
940 string cgiroot;
941 char *env = std::getenv("CYGNAL_PLUGINS");
942 if (env != 0) {
943 cgiroot = env;
944 }
945 if (crcfile.getCgiRoot().size() > 0) {
946 cgiroot += ":" + crcfile.getCgiRoot();
947 log_network(_("Cygnal Plugin paths are: %s"), cgiroot);
948 } else {
949 cgiroot = PLUGINSDIR;
950 }
951 hand->scanDir(cgiroot);
952 std::shared_ptr<Handler::cygnal_init_t> init =
953 hand->initModule(url.path());
954
955 // this is where the real work gets done.
956 if (init) {
957 // If in multi-threaded mode (the default), start a thread
958 // with a connection_handler for each port we're interested
959 // in. Each port of course has a different protocol.
960 if (crcfile.getThreadingFlag() == true) {
961 std::thread event_thread(std::bind(&event_handler, args));
962 } else {
963 event_handler(args);
964 // We're done, close this network connection
965 net.closeNet(args->netfd);
966 }
967 } else {
968 log_error(_("Couldn't load plugin for %s"), key);
969 }
970
971 // // We're done, close this network connection
972 // if (crcfile.getThreadingFlag() == true) {
973 // net.closeNet(args->netfd);
974 // }
975 }
976 // delete rtmp;
977 } // end of if RTMP
978
979 log_network(_("Number of active Threads is %d"), tids.num_of_tids());
980
981 // net.closeNet(args->netfd); // this shuts down this socket connection
982 log_network(_("Restarting loop for next connection for port %d..."),
983 args->port);
984 } while(!done);
985
986 // All threads should wake up now.
987 alldone.notify_all();
988
989 } // end of connection_handler
990
991 void
event_handler(Network::thread_params_t * args)992 event_handler(Network::thread_params_t *args)
993 {
994 GNASH_REPORT_FUNCTION;
995
996 Network::thread_params_t largs;
997 // std::copy(args, args+sizeof(Network::thread_params_t), &largs);
998 Handler *hand = reinterpret_cast<Handler *>(args->handler);
999
1000 largs.protocol = args->protocol;
1001 largs.netfd = args->netfd;
1002 largs.port = args->port;
1003 largs.buffer = args->buffer;
1004 largs.entry = args->entry;
1005 largs.filespec = args->filespec;
1006
1007 Network net;
1008 int timeout = 30;
1009 int retries = 0;
1010 bool done = false;
1011
1012 fd_set hits;
1013 FD_ZERO(&hits);
1014 FD_SET(args->netfd, &hits);
1015
1016 tids.increment();
1017
1018 // We need to calculate the highest numbered file descriptor
1019 // for select. We may want to do this elsewhere, as it could
1020 // be a performance hit as the number of file descriptors gets
1021 // larger.
1022 log_debug("Handler has %d clients attached, %d threads",
1023 hand->getClients().size(), tids.num_of_tids());
1024
1025 int max = 0;
1026 for (size_t i = 0; i<hand->getClients().size(); i++) {
1027 log_debug("Handler client[%d] is: %d", i, hand->getClient(i));
1028 if (hand->getClient(i) >= max) {
1029 max = hand->getClient(i);
1030 // hand->dump();
1031 }
1032 }
1033
1034 do {
1035
1036 // If we have active disk streams, send those packets first.
1037 // 0 is a reserved stream, so we start with 1, as the reserved
1038 // stream isn't one we care about here.
1039 if (hand->getActiveDiskStreams()) {
1040 log_network(_("%d active disk streams"),
1041 hand->getActiveDiskStreams());
1042 // hand->dump();
1043 }
1044 #if 0
1045 std::shared_ptr<DiskStream> filestream(cache.findFile(args->filespec));
1046 if (filestream) {
1047 filestream->dump();
1048 }
1049 // #else
1050 // cache.dump();
1051 #endif
1052 //hand->dump();
1053 std::shared_ptr<DiskStream> ds;
1054 for (int i=1; i <= hand->getActiveDiskStreams(); i++) {
1055 ds = hand->getDiskStream(i);
1056 if (ds) {
1057 //ds->dump();
1058 // Only play the next chunk of the file.
1059 //log_network("Sending following chunk of %s", ds->getFilespec());
1060 if (ds->play(i, false)) {
1061 if (ds->getState() == DiskStream::CLOSED) {
1062 net.closeNet(args->netfd);
1063 hand->removeClient(args->netfd);
1064 done = true;
1065 }
1066 } else {
1067 // something went wrong, the stream failed
1068 net.closeNet(args->netfd);
1069 hand->removeClient(args->netfd);
1070 done = true;
1071 }
1072 }
1073 }
1074
1075 // See if we have any data waiting behind any of the file
1076 // descriptors.
1077 for (int i=0; i <= max + 1; i++) {
1078 if (FD_ISSET(i, &hits)) {
1079 FD_CLR(i, &hits);
1080 log_network(_("Got a hit for fd #%d, protocol %s"), i,
1081 proto_str[hand->getProtocol(i)]);
1082 switch (hand->getProtocol(i)) {
1083 case Network::NONE:
1084 log_error(_("No protocol specified!"));
1085 break;
1086 case Network::HTTP:
1087 {
1088 largs.netfd = i;
1089 // largs.filespec = fullpath;
1090 std::shared_ptr<HTTPServer> &http = hand->getHTTPHandler(i);
1091 if (!http->http_handler(hand, args->netfd, args->buffer)) {
1092 log_network(_("Done with HTTP connection for fd #%d, CGI %s"), i, args->filespec);
1093 net.closeNet(args->netfd);
1094 hand->removeClient(args->netfd);
1095 done = true;
1096 } else {
1097 log_network(_("Not Done with HTTP connection for fd #%d, it's a persistent connection."), i);
1098
1099 }
1100 continue;
1101 }
1102 case Network::RTMP:
1103 args->netfd = i;
1104 // args->filespec = path;
1105 if (!rtmp_handler(args)) {
1106 log_network(_("Done with RTMP connection for fd #%d, CGI "), i, args->filespec);
1107 done = true;
1108 }
1109 break;
1110 case Network::RTMPT:
1111 {
1112 net.setTimeout(timeout);
1113 args->netfd = i;
1114 std::shared_ptr<HTTPServer> &http = hand->getHTTPHandler(i);
1115 // args->filespec = path;
1116 if (!http->http_handler(hand, args->netfd, args->buffer)) {
1117 log_network(_("Done with HTTP connection for fd #%d, CGI %s"), i, largs.filespec);
1118 return;
1119 }
1120 break;
1121 }
1122 case Network::RTMPTS:
1123 {
1124 args->netfd = i;
1125 // args->filespec = path;
1126 std::shared_ptr<HTTPServer> &http = hand->getHTTPHandler(i);
1127 if (!http->http_handler(hand, args->netfd, args->buffer)) {
1128 log_network(_("Done with HTTP connection for fd #%d, CGI %s"), i, args->filespec);
1129 return;
1130 }
1131 break;
1132 }
1133 case Network::RTMPE:
1134 break;
1135 case Network::RTMPS:
1136 break;
1137 case Network::DTN:
1138 break;
1139 default:
1140 log_error(_("Unsupported network protocol for fd #%d, %d"),
1141 largs.netfd, hand->getProtocol(i));
1142 done = true;
1143 break;
1144 }
1145 // delete args->buffer;
1146 }
1147 }
1148
1149 // // Clear the current message so next time we read new data
1150 // args->buffer->clear();
1151 // largs.buffer->clear();
1152
1153 // Wait for something from one of the file descriptors. This timeout
1154 // is the time between sending packets to the client when there is
1155 // no client input, which effects the streaming speed of big files.
1156 net.setTimeout(5);
1157 hits = net.waitForNetData(hand->getClients());
1158 if (FD_ISSET(0, &hits)) {
1159 FD_CLR(0, &hits);
1160 log_network(_("Got no hits, %d retries"), retries);
1161 // net.closeNet(args->netfd);
1162 // hand->removeClient(args->netfd);
1163 // done = true;
1164 }
1165 retries++;
1166 #if 0
1167 if (retries >= 10) {
1168 net.closeNet(args->netfd);
1169 hand->removeClient(args->netfd);
1170 done = true;
1171 }
1172 #endif
1173 } while (!done);
1174
1175 tids.decrement();
1176
1177 } // end of event_handler
1178
1179 // local Variables:
1180 // mode: C++
1181 // indent-tabs-mode: nil
1182 // End:
1183