1 // cygnal.cpp:  GNU streaming Flash media server, for Gnash.
2 //
3 //   Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012
4 //   Free Software Foundation, Inc.
5 //
6 // This program is free software; you can redistribute it and/or modify
7 // it under the terms of the GNU General Public License as published by
8 // the Free Software Foundation; either version 3 of the License, or
9 // (at your option) any later version.
10 //
11 // This program is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 // GNU General Public License for more details.
15 //
16 // You should have received a copy of the GNU General Public License
17 // along with this program; if not, write to the Free Software
18 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19 //
20 
21 #ifdef HAVE_CONFIG_H
22 #include "gnashconfig.h"
23 #endif
24 
25 #include <sys/stat.h>
26 #include <list>
27 #include <map>
28 #include <iostream>
29 #include <sstream>
30 #include <csignal>
31 #include <vector>
32 #include <sys/mman.h>
33 #include <cerrno>
34 #include <sys/types.h>
35 #include <sys/stat.h>
36 #include <fcntl.h>
37 #include <functional>
38 #include <mutex>
39 #include <condition_variable>
40 
41 #include "GnashSleep.h"
42 #include "revno.h"
43 
44 //#include "cvm.h"
45 
46 extern "C"{
47 # include "GnashSystemIOHeaders.h"
48 #ifdef HAVE_GETOPT_H
49 # include <getopt.h>
50 #endif
51 #ifndef __GNUC__
52  extern int optind, getopt(int, char *const *, const char *);
53  extern char *optarg;
54 #endif
55 }
56 
57 
58 // classes internal to Gnash
59 #include "network.h"
60 #include "log.h"
61 #include "crc.h"
62 #include "proc.h"
63 #include "rtmp.h"
64 #include "buffer.h"
65 #include "utility.h"
66 #include "limits.h"
67 #include "netstats.h"
68 #include "statistics.h"
69 //#include "stream.h"
70 #include "gmemory.h"
71 #include "diskstream.h"
72 #include "arg_parser.h"
73 #include "GnashException.h"
74 #include "GnashSleep.h" // for usleep comptibility.
75 #include "URL.h"
76 #include "rtmp_client.h"
77 
78 // classes internal to Cygnal
79 #include "rtmp_server.h"
80 #include "http_server.h"
81 
82 #include "handler.h"
83 #include "cache.h"
84 #include "cygnal.h"
85 
86 #ifdef ENABLE_NLS
87 # include <locale>
88 #endif
89 
90 #include <boost/date_time/gregorian/gregorian.hpp>
91 //#include <boost/date_time/local_time/local_time.hpp>
92 #include <boost/date_time/time_zone_base.hpp>
93 #include <boost/date_time/posix_time/posix_time.hpp>
94 
95 #ifndef POLLRDHUP
96 #define POLLRDHUP 0
97 #endif
98 
99 //using gnash::log_network;
100 using namespace std;
101 using namespace gnash;
102 using namespace cygnal;
103 
104 static void usage();
105 static void version_and_copyright();
106 static void cntrlc_handler(int sig);
107 static void hup_handler(int sig);
108 
109 void connection_handler(Network::thread_params_t *args);
110 void event_handler(Network::thread_params_t *args);
111 void admin_handler(Network::thread_params_t *args);
112 
113 // Toggles very verbose debugging info from the network Network class
114 static bool netdebug = false;
115 
116 struct sigaction  act1, act2;
117 
118 // The next few global variables have to be global because Boost
119 // threads don't take arguments. Since these are set in main() before
120 // any of the threads are started, and it's value should never change,
121 // it's safe to use these without a mutex, as all threads share the
122 // same read-only value.
123 
124 // This is the default path to look in for files to be streamed.
125 static string docroot;
126 
127 // This is the number of times a thread loop continues, for debugging only
128 int thread_retries = 10;
129 
130 // This is added to the default ports for testing so it doesn't
131 // conflict with apache on the same machine.
132 static int port_offset = 0;
133 
134 // Toggle the admin thread
135 static bool admin = false;
136 
137 // Admin commands are small
138 const int ADMINPKTSIZE = 80;
139 
140 // If set to a non zero value, this limits Cygnal to only one protocol
141 // at a time. This is for debugging only.
142 static int only_port = 0;
143 
144 // These keep track of the number of active threads.
145 ThreadCounter tids;
146 
147 map<int, Network *> networks;
148 
149 // This is the global object for Cygnl
150 // The debug log used by all the gnash libraries.
151 static Cygnal& cyg = Cygnal::getDefaultInstance();
152 
153 // The debug log used by all the gnash libraries.
154 static LogFile& dbglogfile = LogFile::getDefaultInstance();
155 
156 // The user config for Cygnal is loaded and parsed here:
157 static CRcInitFile& crcfile = CRcInitFile::getDefaultInstance();
158 
159 // Cache support for responses and files.
160 static Cache& cache = Cache::getDefaultInstance();
161 
162 // The list of active cgis being executed.
163 //static std::map<std::string, Proc> procs; // = proc::getDefaultInstance();
164 
165 // This mutex is used to signify when all the threads are done.
166 static std::condition_variable	alldone;
167 static std::mutex		alldone_mutex;
168 
169 static std::condition_variable	noclients;
170 static std::mutex		noclients_mutex;
171 
172 const char *proto_str[] = {
173     "NONE",
174     "HTTP",
175     "HTTPS",
176     "RTMP",
177     "RTMPT",
178     "RTMPTS",
179     "RTMPE",
180     "RTMPS",
181     "DTN"
182 };
183 
184 static void
usage()185 usage()
186 {
187 	cout << _("cygnal -- a streaming media server.") << endl
188 	<< endl
189 	<< _("Usage: cygnal [options...]") << endl
190 	<< _("  -h,  --help          Print this help and exit") << endl
191 	<< _("  -V,  --version       Print version information and exit") << endl
192 	<< _("  -v,  --verbose       Output verbose debug info") << endl
193 	<< _("  -s,  --singlethread  Disable Multi Threading") << endl
194 	<< _("  -n,  --netdebug      Turn on net debugging messages") << endl
195 	<< _("  -o   --only-port     Only use port for debugging") << endl
196 	<< _("  -p   --port-offset   Port offset for debugging") << endl
197         << _("  -t,  --testing       Turn on special Gnash testing support") << endl
198 	<< _("  -a,  --admin         Enable the administration thread") << endl
199 	<< _("  -r,  --root          Document root for all files") << endl
200 	<< _("  -m,  --machine       Hostname for this machine") << endl
201 	<< endl;
202 }
203 
204 
205 Cygnal&
getDefaultInstance()206 Cygnal::getDefaultInstance()
207 {
208 //     GNASH_REPORT_FUNCTION;
209     static Cygnal o;
210     return o;
211 }
212 
213 
~Cygnal()214 Cygnal::~Cygnal()
215 {
216 //     GNASH_REPORT_FUNCTION;
217 }
218 
219 bool
loadPeersFile()220 Cygnal::loadPeersFile()
221 {
222     // GNASH_REPORT_FUNCTION;
223 
224     loadPeersFile("./peers.conf");
225 
226     loadPeersFile("/etc/peers.conf");
227 
228     // Check the users home directory
229 #ifndef __amigaos4__
230     char *home = std::getenv("HOME");
231 #else
232     //on AmigaOS we have a GNASH: assign that point to program dir
233     char *home = "/gnash";
234 #endif
235 
236     string homefile = home;
237     homefile += "/peers.conf";
238 
239     return loadPeersFile(homefile);
240 }
241 
242 bool
loadPeersFile(const std::string & filespec)243 Cygnal::loadPeersFile(const std::string &filespec)
244 {
245 //     GNASH_REPORT_FUNCTION;
246 
247     struct stat stats;
248     std::ifstream in;
249     std::string line;
250     string host;
251     string portstr;
252     string cgi;
253     vector<string> supported;
254 
255     // Make sufre the file exists
256     if (stat(filespec.c_str(), &stats) != 0) {
257         return false;
258     }
259 
260     in.open(filespec.c_str());
261 
262     if (!in) {
263 	log_error(_(": couldn't open file: "), filespec);
264 	return false;
265     }
266 
267     // Read in each line and parse it
268     size_t lineno = 0;
269     while (std::getline(in, line)) {
270 
271         ++lineno;
272 
273         // Ignore comment and empty lines
274         if (line.empty() || line[0] == '#') {
275 	    continue;
276 	}
277 
278         std::istringstream ss(line);
279 
280         // Get the first token
281         if (! (ss >> host)) {
282             // Empty line
283             continue;
284         }
285 
286         // 'action' should never be empty, or (ss >> action)
287         // above would have failed
288 
289         if (host[0] == '#') {
290 	    continue; // discard comments
291 	}
292 
293         // Get second token
294         if (!(ss >> portstr)) {
295             // Do we need to warn here as well?
296             continue;
297         }
298 
299         while (ss >> cgi) {
300 	    supported.push_back(cgi);
301             continue;
302         }
303 
304 	// Create a new peer item
305 	std::shared_ptr<peer_t> peer(new Cygnal::peer_t);
306 	peer->hostname = host;
307 	peer->port = strtol(portstr.c_str(), NULL, 0) & 0xffff;
308 
309 	_peers.push_back(peer);
310     }
311 
312     return true;
313 }
314 
315 void
probePeers()316 Cygnal::probePeers()
317 {
318 //     GNASH_REPORT_FUNCTION;
319 
320     probePeers(_peers);
321 }
322 
323 void
probePeers(peer_t & peer)324 Cygnal::probePeers(peer_t &peer)
325 {
326 //     GNASH_REPORT_FUNCTION;
327     RTMPClient net;
328     stringstream uri;
329 
330     uri << peer.hostname;
331 
332     vector<string>::iterator it;
333     for (it = peer.supported.begin(); it <= peer.supported.end(); ++it) {
334 	string tmp = uri.str();
335 //	tmp += (*it);
336 // 	log_network("Constructed: %s/%s", uri.str(), *it);
337 
338 	gnash::URL url(uri.str());
339 	if (!(peer.fd = net.connectToServer(uri.str()))) {
340 	    log_network(_("Couldn't connect to %s"), uri.str());
341 	    peer.connected = false;
342 	} else {
343 	    peer.connected = true;
344 // 	    peer.fd = net.getFileFd();
345 	}
346     }
347 }
348 
349 void
probePeers(std::vector<std::shared_ptr<peer_t>> & peers)350 Cygnal::probePeers(std::vector<std::shared_ptr<peer_t> > &peers)
351 {
352 //     GNASH_REPORT_FUNCTION;
353 
354 // 	createClient();
355     std::vector<std::shared_ptr<Cygnal::peer_t> >::iterator it;
356     for (it = peers.begin(); it != peers.end(); ++it) {
357 	std::shared_ptr<Cygnal::peer_t> peer = *it;
358 	probePeers(*peer);
359 	if (peer->connected) {
360 	    log_network(_("%s is active on fd #%d."), peer->hostname,
361 			peer->fd);
362  	    _active_peers.push_back(*it);
363 	}
364     }
365 }
366 
367 void
removeHandler(const std::string & path)368 Cygnal::removeHandler(const std::string &path)
369 {
370 //     GNASH_REPORT_FUNCTION;
371     map<std::string, std::shared_ptr<Handler> >::iterator it;
372     it = _handlers.find(path);
373     if (it != _handlers.end()) {
374 	std::lock_guard<std::mutex> lock(_mutex);
375 	_handlers.erase(it);
376     }
377 }
378 
379 std::shared_ptr<Handler>
findHandler(const std::string & path)380 Cygnal::findHandler(const std::string &path)
381 {
382 //     GNASH_REPORT_FUNCTION;
383     map<std::string, std::shared_ptr<Handler> >::iterator it;
384     std::shared_ptr<Handler> hand;
385     it = _handlers.find(path);
386     if (it != _handlers.end()) {
387 	hand = (*it).second;
388     }
389 
390     return hand;
391 }
392 
393 void
dump()394 Cygnal::dump()
395 {
396     std::vector<std::shared_ptr<Cygnal::peer_t> >::iterator it;
397     for (it = _peers.begin(); it != _peers.end(); ++it) {
398 	cerr << "Remote Peer: " << (*it)->hostname
399 	     << ":" << (*it)->port << endl;
400     }
401 }
402 
403 int
main(int argc,char * argv[])404 main(int argc, char *argv[])
405 {
406     // Initialize national language support
407 #ifdef ENABLE_NLS
408     setlocale (LC_ALL, "");
409     bindtextdomain (PACKAGE, LOCALEDIR);
410     textdomain (PACKAGE);
411 #endif
412 
413     // This becomes the default hostname, which becomes
414     // 127.0.0.1 or ::1 for the localhost. The --machine
415     // otion can change this.
416     std::string hostname = "localhost.localdomain";
417 
418     const Arg_parser::Option opts[] =
419         {
420             { 'h', "help",          Arg_parser::no  },
421             { 'V', "version",       Arg_parser::no  },
422             { 'p', "port-offset",   Arg_parser::yes },
423             { 'v', "verbose",       Arg_parser::no  },
424             { 'd', "dump",          Arg_parser::no  },
425             { 'n', "netdebug",      Arg_parser::no  },
426             { 't', "testing",       Arg_parser::no  },
427             { 'a', "admin",         Arg_parser::no  },
428             { 'r', "root",          Arg_parser::yes },
429             { 'o', "only-port",     Arg_parser::yes },
430             { 's', "singlethreaded", Arg_parser::no },
431             { 'm', "machine",       Arg_parser::yes }
432         };
433 
434     Arg_parser parser(argc, argv, opts);
435     if( ! parser.error().empty() ) {
436         cout << parser.error() << endl;
437         exit(EXIT_FAILURE);
438     }
439 
440 //    crcfile.loadFiles();
441 
442     // Set the log file name before trying to write to
443     // it, or we might get two.
444     dbglogfile.setLogFilename(crcfile.getDebugLog());
445 
446     if (crcfile.verbosityLevel() > 0) {
447         dbglogfile.setVerbosity(crcfile.verbosityLevel());
448     }
449 
450     if (crcfile.getDocumentRoot().size() > 0) {
451         docroot = crcfile.getDocumentRoot();
452     } else {
453         docroot = "/var/www/html/software/tests/";
454 	crcfile.setDocumentRoot(docroot);
455     }
456     if (crcfile.getPortOffset()) {
457         port_offset = crcfile.getPortOffset();
458     }
459 
460     // Handle command line arguments
461     for( int i = 0; i < parser.arguments(); ++i ) {
462 	const int code = parser.code(i);
463 	switch( code ) {
464 	  case 'h':
465 	      version_and_copyright();
466 	      usage();
467 	      exit(EXIT_SUCCESS);
468 	  case 'V':
469 	      version_and_copyright();
470 	      exit(EXIT_SUCCESS);
471 	  case 't':
472 	      crcfile.setTestingFlag(true);
473 	      break;
474 	  case 'a':
475 	      admin = true;
476 	      break;
477 	  case 'v':
478 	      dbglogfile.setVerbosity();
479 	      LOG_ONCE(log_network(_("Verbose output turned on")))
480 	      break;
481 	  case 'p':
482 	      port_offset = parser.argument<int>(i);
483 	      crcfile.setPortOffset(port_offset);
484 	      break;
485 	  case 'r':
486 	      docroot = parser.argument(i);
487 	      break;
488 	  case 's':
489 	      crcfile.setThreadingFlag(false);
490 	      break;
491 	  case 'n':
492 	      netdebug = true;
493 	      dbglogfile.setNetwork(true);
494 	      break;
495 	  case 'o':
496 	      only_port = parser.argument<int>(i);
497 	      break;
498 	  case 'd':
499 	      crcfile.dump();
500 	      exit(EXIT_SUCCESS);
501 	      break;
502 	  case 'm':
503 	      hostname = parser.argument(i);
504 	      break;
505 	  default:
506 	      log_error(_("Extraneous argument: %s"), parser.argument(i).c_str());
507         }
508     }
509 
510     log_network(_("Document Root for media files is: %s"), docroot);
511     crcfile.setDocumentRoot(docroot);
512 
513     // load the file of peers. A peer is another instance of Cygnal we
514     // can use for distributed processing.
515     cyg.loadPeersFile();
516     cyg.probePeers();
517 
518 //    cyg.dump();
519 
520     // Trap ^C (SIGINT) so we can kill all the threads
521     act1.sa_handler = cntrlc_handler;
522     sigaction (SIGINT, &act1, NULL);
523     act2.sa_handler = hup_handler;
524     sigaction (SIGHUP, &act2, NULL);
525 //    sigaction (SIGPIPE, &act, NULL);
526 
527     // Lock a mutex the main() waits in before exiting. This is
528     // because all the actually processing is done by other threads.
529     std::unique_lock<std::mutex> lk(alldone_mutex);
530 
531     // Start the Admin handler. This allows one to connect to Cygnal
532     // at port 1111 and dump statistics to the terminal for tuning
533     // purposes.
534     if (admin) {
535 	Network::thread_params_t admin_data;
536 	admin_data.port = gnash::ADMIN_PORT;
537 	std::thread admin_thread(std::bind(&admin_handler, &admin_data));
538     }
539 
540 //    Cvm cvm;
541 //    cvm.loadMovie("/tmp/out.swf");
542 
543     // If a only-port is specified, we only want to run single
544     // threaded. As all the rest of the code checks the config value
545     // setting, this overrides that in the memory, but doesn't change
546     // the file itself. This feature is really only for debugging,
547     // where it's easier to work with one protocol at a time.
548     if (only_port) {
549 	crcfile.setThreadingFlag(false);
550     }
551 
552     // Incomming connection handler for port 80, HTTP and
553     // RTMPT. As port 80 requires root access, cygnal supports a
554     // "port offset" for debugging and development of the
555     // server. Since this port offset changes the constant to test
556     // for which protocol, we pass the info to the start thread so
557     // it knows which handler to invoke.
558     Network::thread_params_t *http_data = new Network::thread_params_t;
559     if ((only_port == 0) || (only_port == gnash::HTTP_PORT)) {
560 	http_data->tid = 0;
561 	http_data->netfd = 0;
562 	http_data->filespec = docroot;
563 	http_data->protocol = Network::HTTP;
564 	http_data->port = port_offset + gnash::HTTP_PORT;
565         http_data->hostname = hostname;
566 	if (crcfile.getThreadingFlag()) {
567 	    std::thread http_thread(std::bind(&connection_handler, http_data));
568 	} else {
569 	    connection_handler(http_data);
570 	}
571     }
572 
573     // Incomming connection handler for port 1935, RTMPT and
574     // RTMPTE. This supports the same port offset as the HTTP handler,
575     // just to keep things consistent.
576     Network::thread_params_t *rtmp_data = new Network::thread_params_t;
577     if ((only_port == 0) || (only_port == gnash::RTMP_PORT)) {
578 	rtmp_data->tid = 0;
579 	rtmp_data->netfd = 0;
580 	rtmp_data->filespec = docroot;
581 	rtmp_data->protocol = Network::RTMP;
582 	rtmp_data->port = port_offset + gnash::RTMP_PORT;
583         rtmp_data->hostname = hostname;
584 	if (crcfile.getThreadingFlag()) {
585 	    std::thread rtmp_thread(std::bind(&connection_handler, rtmp_data));
586 	} else {
587 	    connection_handler(rtmp_data);
588 	}
589     }
590 
591     // Wait for all the threads to die.
592     alldone.wait(lk);
593 
594     log_network(_("Cygnal done..."));
595 
596     // Delete the data we allowcated to pass to each connection_handler.
597     delete rtmp_data;
598     delete http_data;
599 
600     return(0);
601 }
602 
603 // Trap Control-C (SIGINT) so we can cleanly exit
604 static void
cntrlc_handler(int sig)605 cntrlc_handler (int sig)
606 {
607     log_network(_("Got a %d interrupt"), sig);
608 //    sigaction (SIGINT, &act, NULL);
609     exit(EXIT_FAILURE);
610 }
611 
612 // Trap SIGHUP so we can
613 static void
hup_handler(int)614 hup_handler (int /* sig */)
615 {
616     if (crcfile.getTestingFlag()) {
617 	cerr << "Testing, Testing, Testing..." << endl;
618     }
619 
620 }
621 
622 static void
version_and_copyright()623 version_and_copyright()
624 {
625     cout << "Cygnal: " << BRANCH_NICK << "_" << BRANCH_REVNO << endl
626         << endl
627         << _("Copyright (C) 2007, 2008, 2009, 2010 Free Software Foundation, Inc.\n"
628         "Cygnal comes with NO WARRANTY, to the extent permitted by law.\n"
629         "You may redistribute copies of Cygnal under the terms of the GNU General\n"
630         "Public License V3 or later. For more information, see the file named COPYING.\n")
631     << endl;
632 }
633 
634 // FIXME: this function could be tweaked for better performance
635 void
admin_handler(Network::thread_params_t * args)636 admin_handler(Network::thread_params_t *args)
637 {
638     GNASH_REPORT_FUNCTION;
639     int retries = 100;
640     int ret;
641 
642     map<int, Handler *>::iterator hit;
643     stringstream response;
644 
645     Network net;
646     Handler::admin_cmd_e cmd = Handler::POLL;
647     net.createServer(args->hostname, args->port);
648     while (retries > 0) {
649 	log_network(_("Starting Admin Handler for port %d"), args->port);
650 
651 	if (net.newConnection(true) <= 0) {
652             return;
653         }
654 
655 	log_network(_("Got an incoming Admin request"));
656 	sleep(1);
657 	do {
658 	    Network::byte_t data[ADMINPKTSIZE+1];
659 	    memset(data, 0, ADMINPKTSIZE+1);
660 	    const char *ptr = reinterpret_cast<const char *>(data);
661 	    ret = net.readNet(data, ADMINPKTSIZE, 100);
662 	    if (ret < 0) {
663 		log_network(_("no more admin data, exiting...\n"));
664 		if ((ret == 0) && cmd != Handler::POLL) {
665 		    break;
666 		}
667 	    } else {
668 		// force the case to make comparisons easier. Only compare enough characters to
669 		// till each command is unique.
670 		std::transform(ptr, ptr + ret, data, (int(*)(int)) toupper);
671 		if (strncmp(ptr, "QUIT", 4) == 0) {
672 		    cmd = Handler::QUIT;
673 		} else if (strncmp(ptr, "STATUS", 5) == 0) {
674 		    cmd = Handler::STATUS;
675 		} else if (strncmp(ptr, "HELP", 2) == 0) {
676 		    cmd = Handler::HELP;
677 		    net.writeNet("commands: help, status, poll, interval, statistics, quit.\n");
678 		} else if (strncmp(ptr, "POLL", 2) == 0) {
679 		    cmd = Handler::POLL;
680 		} else if (strncmp(ptr, "INTERVAL", 2) == 0) {
681 		    cmd = Handler::INTERVAL;
682 		}
683 	    }
684 	    switch (cmd) {
685 		// close this connection
686 	      case Handler::QUIT:
687 		  ret = -1;
688 		  break;
689 	      case Handler::STATUS:
690 	      {
691 #ifdef USE_STATS_CACHE
692 //		  cache.dump();
693 		  string results = cache.stats(false);
694 		  if (results.size()) {
695 		      net.writeNet(results);
696 		      results.clear();
697 		  }
698 #endif
699 #if 0
700 		  response << handlers.size() << " handlers are currently active.";
701  		  for (hit = handlers.begin(); hit != handlers.end(); hit++) {
702 		      int fd = hit->first;
703  		      Handler *hand = hit->second;
704 		      response << fd << ","
705 			       << hand->insize()
706 			       << "," << hand->outsize()
707 			       << "\r\n";
708 		      net.writeNet(response);
709 		  }
710 #endif
711 	      }
712 	      break;
713 	      case Handler::POLL:
714 #ifdef USE_STATS_QUEUE
715 		  response << handlers.size() << " handlers are currently active." << "\r\n";
716  		  for (hit = handlers.begin(); hit != handlers.end(); ++hit) {
717 		      int fd = hit->first;
718  		      Handler *hand = hit->second;
719 		      struct timespec now;
720 		      clock_gettime (CLOCK_REALTIME, &now);
721 		      // Incoming que stats
722  		      CQue::que_stats_t *stats = hand->statsin();
723 		      float diff = static_cast<float>(((now.tv_sec -
724 		      stats->start.tv_sec) + ((now.tv_nsec -
725 		      stats->start.tv_nsec)/1e9)));
726 		      response << fd
727 			       << "," << stats->totalbytes
728 			       << "," << diff
729 			       << "," << stats->totalin
730 			       << "," << stats->totalout;
731 		      // Outgoing que stats
732  		      stats = hand->statsout();
733  		      response << "," <<stats->totalbytes
734 			       << "," << stats->totalin
735 			       << "," << stats->totalout
736 			       << "\r\n";
737  		      net.writeNet(response.str());
738 		  }
739 #endif
740 		  break;
741 	      case Handler::INTERVAL:
742 		  net.writeNet("set interval\n");
743 		  break;
744 	      default:
745 		  break;
746 	    };
747 	} while (ret > 0);
748         log_network(_("admin_handler: Done...!\n"));
749 	net.closeNet();		// this shuts down this socket connection
750     }
751     net.closeConnection();		// this shuts down the server on this connection
752 
753     // All threads should exit now.
754     alldone.notify_all();
755 }
756 
757 // A connection handler is started for each port the server needs to
758 // wait on for incoming connections. When it gets an incoming
759 // connection, it reads the first packet to get the resource name, and
760 // then starts the event handler thread if it's a newly requested
761 // resource, otherwise it loads a copy of the cached resource.
762 void
connection_handler(Network::thread_params_t * args)763 connection_handler(Network::thread_params_t *args)
764 {
765     // GNASH_REPORT_FUNCTION;
766     int fd = 0;
767     Network net;
768     bool done = false;
769     static int tid = 0;
770 
771     if (netdebug) {
772 	net.toggleDebug(true);
773     }
774     // Start a server on this tcp/ip port.
775     fd = net.createServer(args->hostname, args->port);
776     if (fd <= 0) {
777 	log_error(_("Can't start %s Connection Handler for fd #%d, port %hd"),
778 		  proto_str[args->protocol], fd, args->port);
779 	return;
780     } else {
781 	log_network(_("Starting %s Connection Handler for fd #%d, port %hd"),
782 		    proto_str[args->protocol], fd, args->port);
783     }
784 
785     // Get the number of cpus in this system. For multicore
786     // systems we'll get better load balancing if we keep all the
787     // cpus busy. So a pool of threads is started for each cpu,
788     // the default being just one. Each thread is reponsible for
789     // handling part of the total active file descriptors.
790 #ifdef HAVE_SYSCONF
791     long ncpus = sysconf(_SC_NPROCESSORS_ONLN);
792     LOG_ONCE(log_network(_("This system has %d cpus."), ncpus));
793 #endif
794     size_t nfds = crcfile.getFDThread();
795 
796 //     log_network("This system is configured for %d file descriptors to be watched by each thread.", nfds);
797 
798     // Get the next thread ID to hand off handling this file
799     // descriptor to. If the limit for threads per cpu hasn't been
800     // set or is set to 0, assume one thread per processor by
801     // default. There won't even be threads for each cpu if
802     // threading has been disabled in the cygnal config file.
803     int spawn_limit = 0;
804     if (nfds == 0) {
805 	spawn_limit = ncpus;
806     } else {
807 	spawn_limit = ncpus * nfds;
808     }
809 
810     // FIXME: this may run forever, we probably want a cleaner way to
811     // test for the end of time.
812     do {
813 	net.setPort(args->port);
814 	if (netdebug) {
815 	    net.toggleDebug(true);
816 	}
817 
818 	// Rotate in a range of 0 to the limit.
819 	tid = (tid + 1) % (spawn_limit + 1);
820 	// log_network("%s handler: thread ID #%d, fd #%d", proto_str[args->protocol], tid, fd);
821 
822 	// Wait for a connection to this tcp/ip from a client. If set
823 	// to true, this will block until a request comes in. If set
824 	// to single threaded mode, this will only allow one client to
825 	// connect at a time. This is to make it easier to debug
826 	// things when you have a heavily threaded application.
827 	args->netfd = net.newConnection(true, fd);
828 	if (args->netfd <= 0) {
829 	    log_network(_("No new %s network connections"),
830                         proto_str[args->protocol]);
831 	    return;
832 	} else {
833 	    log_network(_("*** New %s network connection for thread ID #%d, fd #%d ***"),
834 			proto_str[args->protocol], tid, args->netfd);
835 	}
836 
837 	//
838 	// Setup HTTP handler
839 	//
840 	if (args->protocol == Network::HTTP) {
841 	    Network::thread_params_t *hargs = new Network::thread_params_t;
842 	    // std::copy(args, args+sizeof(Network::thread_params_t), &hargs);
843 	    hargs->protocol = args->protocol;
844 	    hargs->netfd = args->netfd;
845 #if 0
846 	    std::shared_ptr<Handler> hand = cyg.findHandler(path);
847 	    HTTPServer *http = new HTTPServer;
848 	    hargs.entry = http;
849 	    http->setDocRoot(crcfile.getDocumentRoot());
850 	    std::shared_ptr<cygnal::Buffer> buf(http->peekChunk());
851 	    http->processHeaderFields(*buf);
852 	    string hostname, path;
853 	    string::size_type pos = http->getField("host").find(":", 0);
854 	    if (pos != string::npos) {
855 		hostname += http->getField("host").substr(0, pos);
856 	    } else {
857 		hostname += "localhost.localdomain";
858 	    }
859 	    path = http->getFilespec();
860 	    string key = hostname + path;
861 #endif
862 	    string key;
863 	    Handler *hand = 0;
864 	    if (!hand) {
865 		hand = new Handler;
866 		hand->addClient(args->netfd, Network::HTTP);
867 		int retries = 10;
868 		cygnal::Buffer *buf = 0;
869 		do {
870 		    buf = hand->parseFirstRequest(args->netfd, Network::HTTP);
871 		    if (!buf) {
872 			retries--;
873 			continue;
874 		    } else {
875 			break;
876 		    }
877 		} while (retries);
878 		string &key = hand->getKey(args->netfd);
879 		log_network(_("Creating new %s Handler for %s using fd #%d"),
880 			    proto_str[hargs->protocol], key, hargs->netfd);
881 		hargs->handler = hand;
882 		hargs->buffer = buf;
883 		hargs->filespec = key;
884 		// cyg.addHandler(key, hand);
885 
886 		// If in multi-threaded mode (the default), start a thread
887 		// with a connection_handler for each port we're interested
888 		// in. Each port of could have a different protocol.
889 		std::bind(event_handler, hargs);
890 		if (crcfile.getThreadingFlag() == true) {
891 		    std::thread event_thread(std::bind(&event_handler, hargs));
892 		} else {
893 		    event_handler(hargs);
894 		    // We're done, close this network connection
895 		}
896 	    } else {
897 		log_network(_("Reusing %s Handler for %s using fd #%d"),
898 			    proto_str[hargs->protocol], key, hargs->netfd);
899 		hand->addClient(args->netfd, Network::HTTP);
900 	    }
901 	    // delete http;
902 	} // end of if HTTP
903 
904 	//
905 	// Setup RTMP handler
906 	//
907 	if (args->protocol == Network::RTMP) {
908 	    Network::thread_params_t *rargs = new Network::thread_params_t;
909 	    rargs->protocol = args->protocol;
910 	    rargs->netfd = args->netfd;
911 	    RTMPServer *rtmp = new RTMPServer;
912 	    std::shared_ptr<cygnal::Element> tcurl =
913 		rtmp->processClientHandShake(args->netfd);
914 	    if (!tcurl) {
915 // 		    log_error("Couldn't read the tcUrl variable!");
916 		rtmp->closeNet(args->netfd);
917 		return;
918 	    }
919 	    URL url(tcurl->to_string());
920 	    string key = url.hostname() + url.path();
921 	    std::shared_ptr<Handler> hand = cyg.findHandler(url.path());
922 	    if (!hand) {
923 		log_network(_("Creating new %s Handler for: %s for fd %#d"),
924 			    proto_str[args->protocol], key, args->netfd);
925 		hand.reset(new Handler);
926 		cyg.addHandler(key, hand);
927 		rargs->entry = rtmp;
928 		hand->setNetConnection(rtmp->getNetConnection());
929 		std::vector<std::shared_ptr<Cygnal::peer_t> >::iterator it;
930 		std::vector<std::shared_ptr<Cygnal::peer_t> > active = cyg.getActive();
931 		for (it = active.begin(); it < active.end(); ++it) {
932 		    Cygnal::peer_t *peer = (*it).get();
933 		    hand->addRemote(peer->fd);
934 		}
935 		hand->addClient(args->netfd, Network::RTMP);
936 		rargs->handler = reinterpret_cast<void *>(hand.get());
937 		args->filespec = key;
938 		args->entry = rtmp;
939 
940 		string cgiroot;
941 		char *env = std::getenv("CYGNAL_PLUGINS");
942 		if (env != 0) {
943 		    cgiroot = env;
944 		}
945 		if (crcfile.getCgiRoot().size() > 0) {
946 		    cgiroot += ":" + crcfile.getCgiRoot();
947 		    log_network(_("Cygnal Plugin paths are: %s"), cgiroot);
948 		} else {
949 		    cgiroot = PLUGINSDIR;
950 		}
951 		hand->scanDir(cgiroot);
952 		std::shared_ptr<Handler::cygnal_init_t> init =
953 		    hand->initModule(url.path());
954 
955 		// this is where the real work gets done.
956 		if (init) {
957 		    // If in multi-threaded mode (the default), start a thread
958 		    // with a connection_handler for each port we're interested
959 		    // in. Each port of course has a different protocol.
960 		    if (crcfile.getThreadingFlag() == true) {
961 			std::thread event_thread(std::bind(&event_handler, args));
962 		    } else {
963 			event_handler(args);
964 			// We're done, close this network connection
965 			net.closeNet(args->netfd);
966 		    }
967 		} else {
968 		    log_error(_("Couldn't load plugin for %s"), key);
969 		}
970 
971 		// // We're done, close this network connection
972 		// if (crcfile.getThreadingFlag() == true) {
973 		//     net.closeNet(args->netfd);
974 		// }
975 	    }
976 	    // delete rtmp;
977 	} // end of if RTMP
978 
979 	log_network(_("Number of active Threads is %d"), tids.num_of_tids());
980 
981 //	net.closeNet(args->netfd); 		// this shuts down this socket connection
982 	log_network(_("Restarting loop for next connection for port %d..."),
983                     args->port);
984     } while(!done);
985 
986     // All threads should wake up now.
987     alldone.notify_all();
988 
989 } // end of connection_handler
990 
991 void
event_handler(Network::thread_params_t * args)992 event_handler(Network::thread_params_t *args)
993 {
994     GNASH_REPORT_FUNCTION;
995 
996     Network::thread_params_t largs;
997     // std::copy(args, args+sizeof(Network::thread_params_t), &largs);
998     Handler *hand = reinterpret_cast<Handler *>(args->handler);
999 
1000     largs.protocol = args->protocol;
1001     largs.netfd = args->netfd;
1002     largs.port = args->port;
1003     largs.buffer = args->buffer;
1004     largs.entry = args->entry;
1005     largs.filespec = args->filespec;
1006 
1007     Network net;
1008     int timeout = 30;
1009     int retries = 0;
1010     bool done = false;
1011 
1012     fd_set hits;
1013     FD_ZERO(&hits);
1014     FD_SET(args->netfd, &hits);
1015 
1016     tids.increment();
1017 
1018     // We need to calculate the highest numbered file descriptor
1019     // for select. We may want to do this elsewhere, as it could
1020     // be a performance hit as the number of file descriptors gets
1021     // larger.
1022     log_debug("Handler has %d clients attached, %d threads",
1023 	      hand->getClients().size(), tids.num_of_tids());
1024 
1025     int max = 0;
1026     for (size_t i = 0; i<hand->getClients().size(); i++) {
1027 	log_debug("Handler client[%d] is: %d", i, hand->getClient(i));
1028 	if (hand->getClient(i) >= max) {
1029 	    max = hand->getClient(i);
1030 	    // hand->dump();
1031 	}
1032     }
1033 
1034     do {
1035 
1036 	// If we have active disk streams, send those packets first.
1037 	// 0 is a reserved stream, so we start with 1, as the reserved
1038 	// stream isn't one we care about here.
1039 	if (hand->getActiveDiskStreams()) {
1040 	    log_network(_("%d active disk streams"),
1041 			hand->getActiveDiskStreams());
1042 	    // hand->dump();
1043 	}
1044 #if 0
1045 	std::shared_ptr<DiskStream> filestream(cache.findFile(args->filespec));
1046 	if (filestream) {
1047 	    filestream->dump();
1048 	}
1049 // #else
1050 //      	cache.dump();
1051 #endif
1052 	//hand->dump();
1053 	std::shared_ptr<DiskStream> ds;
1054 	for (int i=1; i <= hand->getActiveDiskStreams(); i++) {
1055 	    ds = hand->getDiskStream(i);
1056 	    if (ds) {
1057    		//ds->dump();
1058 		// Only play the next chunk of the file.
1059 //log_network("Sending following chunk of %s", ds->getFilespec());
1060 		if (ds->play(i, false)) {
1061 		    if (ds->getState() == DiskStream::CLOSED) {
1062 			net.closeNet(args->netfd);
1063 			hand->removeClient(args->netfd);
1064 			done = true;
1065 		    }
1066 		} else {
1067 		    // something went wrong, the stream failed
1068 		    net.closeNet(args->netfd);
1069 		    hand->removeClient(args->netfd);
1070 		    done = true;
1071 		}
1072 	    }
1073 	}
1074 
1075 	// See if we have any data waiting behind any of the file
1076 	// descriptors.
1077 	for (int i=0; i <= max + 1; i++) {
1078 	    if (FD_ISSET(i, &hits)) {
1079 		FD_CLR(i, &hits);
1080 		log_network(_("Got a hit for fd #%d, protocol %s"), i,
1081 			    proto_str[hand->getProtocol(i)]);
1082 		switch (hand->getProtocol(i)) {
1083 		  case Network::NONE:
1084 		      log_error(_("No protocol specified!"));
1085 		      break;
1086 		  case Network::HTTP:
1087 		  {
1088 		      largs.netfd = i;
1089 		      // largs.filespec = fullpath;
1090 		      std::shared_ptr<HTTPServer> &http = hand->getHTTPHandler(i);
1091 		      if (!http->http_handler(hand, args->netfd, args->buffer)) {
1092 			  log_network(_("Done with HTTP connection for fd #%d, CGI %s"), i, args->filespec);
1093 			  net.closeNet(args->netfd);
1094 			  hand->removeClient(args->netfd);
1095 			  done = true;
1096 		      } else {
1097 			  log_network(_("Not Done with HTTP connection for fd #%d, it's a persistent connection."), i);
1098 
1099 		      }
1100 		      continue;
1101 		  }
1102 		  case Network::RTMP:
1103 		      args->netfd = i;
1104 		      // args->filespec = path;
1105 		      if (!rtmp_handler(args)) {
1106 			  log_network(_("Done with RTMP connection for fd #%d, CGI "), i, args->filespec);
1107 			  done = true;
1108 		      }
1109 		      break;
1110 		  case Network::RTMPT:
1111 		  {
1112 		      net.setTimeout(timeout);
1113 		      args->netfd = i;
1114 		      std::shared_ptr<HTTPServer> &http = hand->getHTTPHandler(i);
1115 		      // args->filespec = path;
1116 		      if (!http->http_handler(hand, args->netfd, args->buffer)) {
1117 			  log_network(_("Done with HTTP connection for fd #%d, CGI %s"), i, largs.filespec);
1118 			  return;
1119 		      }
1120 		      break;
1121 		  }
1122 		  case Network::RTMPTS:
1123 		  {
1124 		      args->netfd = i;
1125 		      // args->filespec = path;
1126 		      std::shared_ptr<HTTPServer> &http = hand->getHTTPHandler(i);
1127 		      if (!http->http_handler(hand, args->netfd, args->buffer)) {
1128 			  log_network(_("Done with HTTP connection for fd #%d, CGI %s"), i, args->filespec);
1129 			  return;
1130 		      }
1131 		      break;
1132 		  }
1133 		  case Network::RTMPE:
1134 		      break;
1135 		  case Network::RTMPS:
1136 		      break;
1137 		  case Network::DTN:
1138 		      break;
1139 		  default:
1140 		      log_error(_("Unsupported network protocol for fd #%d, %d"),
1141 				largs.netfd, hand->getProtocol(i));
1142 		      done = true;
1143 		      break;
1144 		}
1145 //		delete args->buffer;
1146 	    }
1147 	}
1148 
1149 	// // Clear the current message so next time we read new data
1150 	// args->buffer->clear();
1151 	// largs.buffer->clear();
1152 
1153 	// Wait for something from one of the file descriptors. This timeout
1154 	// is the time between sending packets to the client when there is
1155 	// no client input, which effects the streaming speed of big files.
1156 	net.setTimeout(5);
1157 	hits = net.waitForNetData(hand->getClients());
1158 	if (FD_ISSET(0, &hits)) {
1159 	    FD_CLR(0, &hits);
1160 	    log_network(_("Got no hits, %d retries"), retries);
1161 	    // net.closeNet(args->netfd);
1162 	    // hand->removeClient(args->netfd);
1163 	    // done = true;
1164 	}
1165 	retries++;
1166 #if 0
1167 	if (retries >= 10) {
1168 	    net.closeNet(args->netfd);
1169 	    hand->removeClient(args->netfd);
1170 	    done = true;
1171 	}
1172 #endif
1173     } while (!done);
1174 
1175     tids.decrement();
1176 
1177 } // end of event_handler
1178 
1179 // local Variables:
1180 // mode: C++
1181 // indent-tabs-mode: nil
1182 // End:
1183