1 // rtmp.cpp: Adobe/Macromedia Real Time Message Protocol handler, for Gnash.
2 //
3 // Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
4 // Free Software Foundation, Inc
5 //
6 // This program is free software; you can redistribute it and/or modify
7 // it under the terms of the GNU General Public License as published by
8 // the Free Software Foundation; either version 3 of the License, or
9 // (at your option) any later version.
10 //
11 // This program is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 // GNU General Public License for more details.
15 //
16 // You should have received a copy of the GNU General Public License
17 // along with this program; if not, write to the Free Software
18 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 //
20
21 #ifdef HAVE_CONFIG_H
22 #include "gnashconfig.h"
23 #endif
24
25 #include <iostream>
26 #include <string>
27 #include <map>
28 #include <cstdlib>
29 #include <cstdio>
30
31 #include <cstdint>
32 #include <boost/detail/endian.hpp>
33 #include <boost/random/uniform_real.hpp>
34 #include <boost/random/uniform_int.hpp>
35 #include <boost/random/mersenne_twister.hpp>
36 #include <boost/lexical_cast.hpp>
37
38 #if ! (defined(_WIN32) || defined(WIN32))
39 # include <netinet/in.h>
40 #endif
41
42 #include "log.h"
43 #include "URL.h"
44 #include "amf.h"
45 #include "rtmp.h"
46 #include "rtmp_server.h"
47 #include "network.h"
48 #include "element.h"
49 #include "handler.h"
50 #include "utility.h"
51 #include "buffer.h"
52 #include "GnashSleep.h"
53 #include "crc.h"
54 #include "cache.h"
55 #include "diskstream.h"
56 #ifdef HAVE_SYS_TIME_H
57 # include <sys/time.h>
58 #endif
59 using namespace gnash;
60 using namespace std;
61
62 namespace cygnal
63 {
64
65 // Get access to the global config data for Cygnal
66 static CRcInitFile& crcfile = CRcInitFile::getDefaultInstance();
67
68 // Get access to the global Cygnal cache
69 static Cache& cache = Cache::getDefaultInstance();
70
71 extern map<int, Handler *> handlers;
72
RTMPServer()73 RTMPServer::RTMPServer()
74 : _filesize(0),
75 _streamid(1)
76 {
77 // GNASH_REPORT_FUNCTION;
78 // _inbytes = 0;
79 // _outbytes = 0;
80
81 // _body = new unsigned char(RTMP_HANDSHAKE_SIZE+1);
82 // memset(_body, 0, RTMP_HANDSHAKE_SIZE+1);
83 }
84
~RTMPServer()85 RTMPServer::~RTMPServer()
86 {
87 // GNASH_REPORT_FUNCTION;
88 _properties.clear();
89 // delete _body;
90 }
91
92
93 std::shared_ptr<cygnal::Element>
processClientHandShake(int fd)94 RTMPServer::processClientHandShake(int fd)
95 {
96 GNASH_REPORT_FUNCTION;
97
98 log_network("Processing RTMP Handshake for fd #%d", fd);
99
100 #ifdef USE_STATISTICS
101 struct timespec start;
102 clock_gettime (CLOCK_REALTIME, &start);
103 #endif
104
105 // Adjust the timeout for reading from the network
106 RTMP::setTimeout(10);
107
108 // These store the information we need from the initial
109 /// NetConnection object.
110 std::unique_ptr<cygnal::Element> nc;
111 std::shared_ptr<cygnal::Buffer> pkt;
112 std::shared_ptr<cygnal::Element> tcurl;
113 std::shared_ptr<cygnal::Element> swfurl;
114 std::shared_ptr<cygnal::Element> encoding;
115
116 // RTMP::rtmp_headersize_e response_head_size = RTMP::HEADER_12;
117
118 // Read the handshake bytes sent by the client when requesting
119 // a connection.
120 std::shared_ptr<cygnal::Buffer> handshake1 = RTMP::recvMsg(fd);
121 // See if we have data in the handshake, we should have 1537 bytes
122 if (!handshake1) {
123 log_error(_("Failed to read the handshake from the client."));
124 return tcurl; // nc is empty
125 } else {
126 log_network("Read first handshake from the client.");
127 }
128
129 // Send our response to the handshake, which primarily is the bytes
130 // we just received.
131 handShakeResponse(fd, *handshake1);
132
133 // Read the response from the client from the handshale reponse we
134 // just sent.
135 std::shared_ptr<cygnal::Buffer> handshake2 = RTMP::recvMsg(fd);
136 // See if we have data in the handshake, we should have 1536 bytes
137 if (handshake2 == nullptr) {
138 log_error(_("failed to read the handshake from the client."));
139 return tcurl; // nc is empty
140 } else {
141 log_network("Read second handshake from the client.");
142 }
143
144 // Don't assume the data we just read is a handshake.
145 pkt = serverFinish(fd, *handshake1, *handshake2);
146 // Wmake sure we got data before trying to process it
147 if (!pkt) {
148 log_error(_("Didn't receive any data in handshake!"));
149 tcurl.reset(new cygnal::Element);
150 return tcurl; // nc is empty
151 }
152
153 // the packet is a raw RTMP message. Since the header can be a
154 // variety of sizes, and this effects the data size, we need to
155 // decode that first.
156 std::shared_ptr<RTMP::rtmp_head_t> qhead = RTMP::decodeHeader(pkt->reference());
157
158 if (!qhead) {
159 log_error(_("RTMP header had parsing error!"));
160 return tcurl; // nc is empty
161 }
162
163 // We know the first packet is always a NetConnection INVOKE of
164 // the connect() method. These are usually around 300-400 bytes in
165 // testing, so anything larger than that is suspicios.
166 if (qhead->bodysize > 1024) {
167 log_error(_("NetConnection unusually large! %d"), qhead->bodysize);
168 }
169
170 // Get the actual start of the data
171 std::uint8_t *ptr = pkt->reference() + qhead->head_size;
172
173 // See if we have enough data to go past the chunksize, which is
174 // probable. If so, all chunks are the default size of 128, the
175 // same size as used for video packets. This means every chunksize
176 // boundary is an RTMP header byte that must be removed, or the
177 // data in the NetConnection::connect() packet will be
178 // corrupted. There is probably a better way to do this, but for
179 // now build a copy of the data but skip over the RTMP header
180 // bytes every chunk size biundary. All RTMP headers at this stage
181 // are 1 byte ones.
182 std::unique_ptr<cygnal::Buffer> newptr(new cygnal::Buffer(qhead->bodysize));
183 if (qhead->bodysize > RTMP_VIDEO_PACKET_SIZE) {
184 log_network("De chunkifying the NetConnection packet.");
185 int nbytes = 0;
186 while (nbytes < qhead->bodysize) {
187 size_t chunk = RTMP_VIDEO_PACKET_SIZE;
188 if ((qhead->bodysize - nbytes) < RTMP_VIDEO_PACKET_SIZE) {
189 chunk = qhead->bodysize - nbytes;
190 }
191 newptr->append(ptr + nbytes, chunk);
192 nbytes += chunk + 1;
193 }
194 } else {
195 newptr->copy(ptr, qhead->bodysize);
196 }
197
198 // extract the body of the message from the packet
199 _netconnect = RTMP::decodeMsgBody(newptr->begin(), qhead->bodysize);
200 if (!_netconnect) {
201 log_error(_("failed to read the body of the handshake data from the client."));
202 return tcurl; // nc is empty
203 } else {
204 log_network("Read handshake data body from the client.");
205 }
206
207 // make sure this is actually a NetConnection packet.
208 if (_netconnect->getMethodName() != "connect") {
209 log_error(_("Didn't receive NetConnection object in handshake!"));
210 return tcurl; // nc is empty
211 } else {
212 log_network("Got NetConnection ::connect() INVOKE.");
213 _netconnect->dump(); // FIXME: debug crap
214 }
215
216 // Get the data for the fields we want.
217 tcurl = _netconnect->findProperty("tcUrl");
218 swfurl = _netconnect->findProperty("swfUrl");
219 encoding = _netconnect->findProperty("objectEncoding");
220
221 // based on the Red5 tests, I see two behaviours with this next
222 // packet. If only gets sent when the "objectEncoding" field of
223 // the NetConnection object is in the initial packet. When this is
224 // supplied, it's more remoting than streaming, so sending this
225 // causes Async I/O errors in the client.
226 if (!encoding) {
227 // Send a onBWDone to the client to start the new NetConnection,
228 std::shared_ptr<cygnal::Buffer> bwdone = encodeBWDone(2.0);
229 if (RTMP::sendMsg(fd, qhead->channel, RTMP::HEADER_8,
230 bwdone->size(), RTMP::INVOKE, RTMPMsg::FROM_SERVER, *bwdone)) {
231 log_network("Sent onBWDone to client");
232 } else {
233 log_error(_("Couldn't send onBWDone to client!"));
234 tcurl.reset();
235 return tcurl; // nc is empty
236 }
237 }
238
239 // Send a Set Client Window Size to the client
240 std::shared_ptr<cygnal::Buffer> winsize(new cygnal::Buffer(sizeof(std::uint32_t)));
241 std::uint32_t swapped = 0x20000;
242 swapBytes(&swapped, sizeof(std::uint32_t));
243 *winsize += swapped;
244 if (RTMP::sendMsg(fd, RTMP_SYSTEM_CHANNEL, RTMP::HEADER_12,
245 winsize->size(), RTMP::WINDOW_SIZE, RTMPMsg::FROM_CLIENT, *winsize)) {
246 log_network("Sent set Client Window Size to client");
247 } else {
248 log_error("Couldn't send set Client Window Size to client!");
249 tcurl.reset();
250 return tcurl; // nc is empty
251 }
252
253 // Send a ping to the client to reset the new NetConnection,
254 std::shared_ptr<cygnal::Buffer> ping_reset =
255 encodePing(RTMP::PING_RESET, 0);
256 if (RTMP::sendMsg(fd, RTMP_SYSTEM_CHANNEL, RTMP::HEADER_8,
257 ping_reset->size(), RTMP::USER, RTMPMsg::FROM_SERVER, *ping_reset)) {
258 log_network("Sent Ping to client");
259 } else {
260 log_error(_("Couldn't send Ping to client!"));
261 tcurl.reset();
262 return tcurl; // nc is empty
263 }
264
265 // Send the packet to notify the client that the
266 // NetConnection::connect() was sucessful. After the client
267 // receives this, the handhsake is completed.
268 std::shared_ptr<cygnal::Buffer> response =
269 encodeResult(RTMPMsg::NC_CONNECT_SUCCESS);
270 if (RTMP::sendMsg(fd, 3, RTMP::HEADER_8, response->allocated(),
271 RTMP::INVOKE, RTMPMsg::FROM_SERVER, *response)) {
272 log_network("Sent response to client.");
273 } else {
274 log_error(_("Couldn't send response to client!"));
275 tcurl.reset();
276 return tcurl; // nc is empty
277 }
278
279 return tcurl;
280 }
281
282 // The response is the gibberish sent back twice, preceeded by a byte
283 // with the value of 0x3. We have to very carefully send the handshake
284 // in one big packet as doing otherwise seems to cause subtle timing
285 // problems with the Adobe player. This way it connects every time.
286 bool
handShakeResponse(int fd,cygnal::Buffer & handshake)287 RTMPServer::handShakeResponse(int fd, cygnal::Buffer &handshake)
288 {
289 GNASH_REPORT_FUNCTION;
290
291 // std::uint8_t byte;
292 // byte = RTMP_VERSION;
293
294 // the response handshake is twice the size of the one we just
295 // received for a total of 3072 bytes, plus room for the version.
296 std::unique_ptr<cygnal::Buffer> zeros(new cygnal::Buffer(RTMP_HANDSHAKE_SIZE*2
297 + RTMP_HANDSHAKE_VERSION_SIZE));
298 zeros->clear(); // set entire buffer to zeros
299
300 std::uint8_t *ptr = zeros->reference();
301
302 // the first byte of the handshake response is the RTMP version
303 // number.
304 *ptr = RTMP_VERSION;
305
306 // the first half we make all zeros, as it doesn't appear to be
307 // used for anything. More data is the second half of the
308 // response.
309 zeros->setSeekPointer(ptr + RTMP_HANDSHAKE_VERSION_SIZE +
310 RTMP_HANDSHAKE_SIZE);
311
312 // the handhshake has a two field header, which appears to be
313 // timestamp, followed by another field that appears to be another
314 // timestamp or version number, which is probably ignored.
315 // the first field of the header is the timestamp
316 std::uint32_t timestamp;
317 // Get the timestamp of when this message was read
318 timestamp = RTMP::getTime();
319 *zeros += timestamp;
320
321 // the second field is always zero
322 std::uint32_t pad = 0;
323 *zeros += pad;
324
325 // the data starts after the vesion and header bytes
326 size_t offset = RTMP_HANDSHAKE_VERSION_SIZE + RTMP_HANDSHAKE_HEADER_SIZE;
327
328 // add the handshake data, which is 1528 byte of random stuff.
329 zeros->append(handshake.reference() + offset, RTMP_RANDOM_SIZE);
330
331 // send the handshake to the client
332 size_t ret = writeNet(fd, *zeros);
333
334 if (ret == zeros->allocated()) {
335 log_network("Sent RTMP Handshake response at %d", timestamp);
336 } else {
337 log_error(_("Couldn't sent RTMP Handshake response at %d!"), timestamp);
338 }
339
340 return true;
341 }
342
343 std::shared_ptr<cygnal::Buffer>
serverFinish(int fd,cygnal::Buffer & handshake1,cygnal::Buffer & handshake2)344 RTMPServer::serverFinish(int fd, cygnal::Buffer &handshake1, cygnal::Buffer &handshake2)
345 {
346 GNASH_REPORT_FUNCTION;
347 std::shared_ptr<cygnal::Buffer> buf;
348
349 // sanity check our input data. We do this seperately as an empty
350 // buffer means data wasn't read correctly from the network. We
351 // should never get this far with bad data, but when it comes to
352 // network programming, a little caution is always good.
353 if (handshake1.empty()) {
354 log_error(_("No data in original handshake buffer."));
355 return buf; // return empty buffer
356 }
357 if (handshake2.empty()) {
358 log_error(_("No data in response handshake buffer."));
359 return buf; // return empty buffer
360 }
361
362 // the first field of the header is the timestamp of the original
363 // packet sent by this server.
364 std::uint32_t timestamp1 = *reinterpret_cast<std::uint32_t *>
365 (handshake1.reference() + RTMP_HANDSHAKE_VERSION_SIZE);
366
367 // the second field of the header is the timestamp of the previous
368 // packet sent by this server.
369 std::uint32_t timestamp2 = *reinterpret_cast<std::uint32_t *>
370 (handshake1.reference() + RTMP_HANDSHAKE_VERSION_SIZE + sizeof(std::uint32_t));
371
372 log_network("The timestamp delta is %d", timestamp2 - timestamp1);
373
374 // This is the location in the second handshake to the random data
375 // block used in the handshake.
376 size_t pkt_size = RTMP_HANDSHAKE_VERSION_SIZE + RTMP_HANDSHAKE_SIZE;
377 // the handshakes are supposed to match.
378 int diff = std::memcmp(handshake1.begin()
379 + RTMP_HANDSHAKE_VERSION_SIZE + RTMP_HANDSHAKE_HEADER_SIZE,
380 handshake2.begin()
381 + pkt_size + RTMP_HANDSHAKE_HEADER_SIZE,
382 RTMP_RANDOM_SIZE);
383 if (diff <= 1) {
384 log_network ("Handshake Finish Data matched");
385 } else {
386 log_error (_("Handshake Finish Data didn't match by %d bytes"), diff);
387 // return buf; // return empty buffer
388 }
389
390 // Copy the extra data from the end of the handshake to the new
391 // buffer. Normally we try to avoid copying anything around, but
392 // as this is only used once for each connection, there isn't a
393 // real performance hit from it.
394 size_t amf_size = handshake2.allocated() - pkt_size;
395 if (handshake2.allocated() >= pkt_size) {
396 log_network("Got AMF data in handshake, %d bytes for fd #%d",
397 amf_size, fd);
398 buf.reset(new Buffer(amf_size));
399 // populate the buffer with the AMF data
400 std::uint8_t *ptr = handshake2.reference() + RTMP_HANDSHAKE_SIZE;
401 buf->copy(ptr, amf_size);
402 }
403
404 return buf;
405 }
406
407 bool
packetSend(cygnal::Buffer &)408 RTMPServer::packetSend(cygnal::Buffer &/* buf */)
409 {
410 GNASH_REPORT_FUNCTION;
411 return false;
412 }
413
414 // This overrides using same method from the base RTMP class.
415 bool
packetRead(cygnal::Buffer & buf)416 RTMPServer::packetRead(cygnal::Buffer &buf)
417 {
418 GNASH_REPORT_FUNCTION;
419
420 std::uint8_t amf_index, headersize;
421 std::uint8_t *ptr = buf.reference();
422 AMF amf;
423
424 if (ptr == nullptr) {
425 return false;
426 }
427
428 // cerr << "FIXME3: " << buf.hexify(true) << endl;
429
430 // ptr += 1; // skip past the header byte
431
432 amf_index = *ptr & RTMP_INDEX_MASK;
433 headersize = headerSize(*ptr);
434 log_network("The Header size is: %d", headersize);
435 log_network("The AMF index is: 0x%x", amf_index);
436
437 // if (headersize > 1) {
438 // packetsize = parseHeader(ptr);
439 // if (packetsize) {
440 // log_network ("Read first RTMP packet header of size %d", packetsize);
441 // } else {
442 // log_error (_("Couldn't read first RTMP packet header"));
443 // return false;
444 // }
445 // }
446
447 // #if 1
448 // std::uint8_t *end = buf->remove(0xc3);
449 // #else
450 // std::uint8_t *end = buf->find(0xc3);
451 // log_network("END is %x", (void *)end);
452 // *end = '*';
453 // #endif
454 decodeHeader(ptr);
455 ptr += headersize;
456
457 std::uint8_t* tooFar = ptr+300+sizeof(int); // FIXME:
458
459 AMF amf_obj;
460 std::shared_ptr<cygnal::Element> el1 = amf_obj.extractAMF(ptr, tooFar);
461 ptr += amf_obj.totalsize();
462 std::shared_ptr<cygnal::Element> el2 = amf_obj.extractAMF(ptr, tooFar);
463
464 int size = 0;
465 std::shared_ptr<cygnal::Element> el;
466 while ( size < static_cast<std::uint16_t>(_header.bodysize) - 24 ) {
467 if (ptr) {
468 el = amf_obj.extractProperty(ptr, tooFar);
469 if (el != nullptr) {
470 size += amf_obj.totalsize();
471 ptr += amf_obj.totalsize();
472 // _properties[el->getName()] = el;
473 } else {
474 break;
475 }
476 // log_network("Bodysize is: %d size is: %d for %s", _total_size, size, el->getName());
477 } else {
478 break;
479 }
480 }
481
482 # if 0
483 Element el;
484 ptr = amf.extractElement(&el, ptr);
485 el.dump();
486 ptr = amf.extractElement(&el, ptr) + 1;
487 el.dump();
488 log_network("Reading AMF packets till we're done...");
489 // buf->dump();
490 while (ptr < end) {
491 std::shared_ptr<cygnal::Element> el(new cygnal::Element);
492 ptr = amf.extractProperty(el, ptr);
493 addProperty(el);
494 // el->dump();
495 }
496 ptr += 1;
497 size_t actual_size = _total_size - RTMP_HEADER_SIZE;
498 log_network("Total size in header is %d, buffer size is: %d",
499 _total_size, buf->size());
500 // buf->dump();
501 if (buf->size() < actual_size) {
502 log_network("FIXME: MERGING");
503 buf = _que->merge(buf);
504 }
505 while ((ptr - buf->begin()) < static_cast<int>(actual_size)) {
506 std::shared_ptr<cygnal::Element> el(new cygnal::Element);
507 if (ptr) {
508 ptr = amf.extractProperty(el, ptr);
509 addProperty(el);
510 } else {
511 return true;
512 }
513 el->dump(); // FIXME: dump the AMF objects as they are read in
514 }
515
516 RTMPproto::dump();
517 #endif
518 switch(_header.type) {
519 case CHUNK_SIZE:
520 decodeChunkSize();
521 break;
522 case BYTES_READ:
523 decodeBytesRead();
524 break;
525 case USER:
526 {
527 std::shared_ptr<rtmp_ping_t> ping = decodePing(ptr);
528 switch (ping->type) {
529 case PING_CLEAR:
530 break;
531 case PING_PLAY:
532 break;
533 case PING_TIME:
534 break;
535 case PING_RESET:
536 break;
537 case PING_CLIENT:
538 break;
539 case PONG_CLIENT:
540 break;
541 default:
542 return 0;
543 break;
544 };
545 break;
546 }
547 case WINDOW_SIZE:
548 decodeServer();
549 break;
550 case SET_BANDWITH:
551 decodeClient();
552 break;
553 case ROUTE:
554 log_unimpl(_("Route"));
555 break;
556 case AUDIO_DATA:
557 decodeAudioData();
558 break;
559 case VIDEO_DATA:
560 decodeVideoData();
561 break;
562 case SHARED_OBJ:
563 decodeSharedObj();
564 break;
565 case AMF3_NOTIFY:
566 log_unimpl(_("AMF3 Notify"));
567 break;
568 case AMF3_SHARED_OBJ:
569 log_unimpl("AMF3 Shared Object");
570 break;
571 case AMF3_INVOKE:
572 log_unimpl(_("AMF3 Invoke"));
573 break;
574 case NOTIFY:
575 decodeNotify();
576 break;
577 case INVOKE:
578 decodeInvoke();
579 break;
580 case FLV_DATA:
581 log_unimpl(_("FLV Dat"));
582 break;
583 default:
584 log_error (_("ERROR: Unidentified RTMP message content type 0x%x"), _header.type);
585 break;
586 };
587
588 return true;
589 }
590
591 // A result packet looks like this:
592 //
593 // 03 00 00 00 00 00 81 14 00 00 00 00 02 00 07 5f ..............._
594 // 72 65 73 75 6c 74 00 3f f0 00 00 00 00 00 00 05 result.?........
595 // 03 00 0b 61 70 70 6c 69 63 61 74 69 6f 6e 05 00 ...application..
596 // 05 6c 65 76 65 6c 02 00 06 73 74 61 74 75 73 00 .level...status.
597 // 0b 64 65 73 63 72 69 70 74 69 6f 6e 02 00 15 43 .description...C
598 // 6f 6e 6e 65 63 74 69 6f 6e 20 73 75 63 63 65 65 onnection succee
599 // 64 65 64 2e 00 04 63 6f 64 65 02 00 1d 4e 65 74 ded...code...Net
600 // 43 6f 6e 6e 65 63 74 69 6f 6e 2e 43 6f 6e 6e 65 Connection.Conne
601 // 63 74 2e 53 75 63 63 65 73 73 00 00 c3 09 ct.Success....
602 //
603 // _result(double ClientStream, NULL, double ServerStream)
604 // These are handlers for the various types
605 std::shared_ptr<Buffer>
encodeResult(RTMPMsg::rtmp_status_e status)606 RTMPServer::encodeResult(RTMPMsg::rtmp_status_e status)
607 {
608 // GNASH_REPORT_FUNCTION;
609 return encodeResult(status, _filespec, _streamid);
610 }
611
612 std::shared_ptr<cygnal::Buffer>
encodeResult(gnash::RTMPMsg::rtmp_status_e status,const std::string & filename)613 RTMPServer::encodeResult(gnash::RTMPMsg::rtmp_status_e status, const std::string &filename)
614 {
615 // GNASH_REPORT_FUNCTION;
616 double clientid = 0.0;
617 return encodeResult(status, filename, _streamid, clientid);
618 }
619
620 std::shared_ptr<cygnal::Buffer>
encodeResult(gnash::RTMPMsg::rtmp_status_e status,const std::string & filename,double & clientid)621 RTMPServer::encodeResult(gnash::RTMPMsg::rtmp_status_e status, const std::string &filename, double &clientid)
622 {
623 // GNASH_REPORT_FUNCTION;
624 return encodeResult(status, filename, _streamid, clientid);
625 }
626
627 std::shared_ptr<cygnal::Buffer>
encodeResult(gnash::RTMPMsg::rtmp_status_e status,double & transid)628 RTMPServer::encodeResult(gnash::RTMPMsg::rtmp_status_e status, double &transid)
629 {
630 // GNASH_REPORT_FUNCTION;
631 double clientid = 0.0;
632 return encodeResult(status, "", transid, clientid);
633 }
634
635 std::shared_ptr<cygnal::Buffer>
encodeResult(gnash::RTMPMsg::rtmp_status_e status,const std::string & filename,double & transid,double & clientid)636 RTMPServer::encodeResult(gnash::RTMPMsg::rtmp_status_e status, const std::string &filename, double &transid, double &clientid)
637 {
638 // GNASH_REPORT_FUNCTION;
639 // Buffer *buf = new Buffer;
640 // std::uint8_t *ptr = buf->reference();
641 // buf->clear(); // default everything to zeros, real data gets optionally added.
642 // ptr += sizeof(std::uint16_t); // go past the first short
643 // const char *capabilities = 0;
644 // const char *description = 0;
645 // const char *code = 0;
646 // const char *status = 0;
647 bool notobject = false;
648
649 Element *str = new Element;
650 str->makeString("_result");
651
652 Element *number = new Element;
653 // add the transaction ID
654 number->makeNumber(transid);
655
656 Element top;
657 // top.makeObject("application");
658 top.makeObject();
659
660 switch (status) {
661 case RTMPMsg::APP_GC:
662 case RTMPMsg::APP_RESOURCE_LOWMEMORY:
663 case RTMPMsg::APP_SCRIPT_ERROR:
664 case RTMPMsg::APP_SCRIPT_WARNING:
665 case RTMPMsg::APP_SHUTDOWN:
666 case RTMPMsg::NC_CALL_BADVERSION:
667 case RTMPMsg::NC_CALL_FAILED:
668 // status = 0;
669 // code = "NetConnection.Call.Failed";
670 case RTMPMsg::NC_CONNECT_APPSHUTDOWN:
671 case RTMPMsg::NC_CONNECT_CLOSED:
672 case RTMPMsg::NC_CONNECT_FAILED:
673 {
674 // errstr = new Element;
675 // errstr->makeString("error");
676 std::shared_ptr<cygnal::Element> level(new Element);
677 level->makeString("level", "error");
678 top.addProperty(level);
679
680 std::shared_ptr<cygnal::Element> description(new Element);
681 description->makeString("description", "Connection Failed.");
682 top.addProperty(description);
683
684 std::shared_ptr<cygnal::Element> code(new Element);
685 code->makeString("code", "Connection.Connect.Failed");
686 top.addProperty(code);
687 }
688 case RTMPMsg::NC_CONNECT_INVALID_APPLICATION:
689 case RTMPMsg::NC_CONNECT_REJECTED:
690 {
691 // delete str;
692 // str = new Element;
693 // str->makeString("error");
694 std::shared_ptr<cygnal::Element> level(new Element);
695 level->makeString("level", "error");
696 top.addProperty(level);
697
698 std::shared_ptr<cygnal::Element> description(new Element);
699 description->makeString("description", "Connection Rejected.");
700 top.addProperty(description);
701
702 std::shared_ptr<cygnal::Element> code(new Element);
703 code->makeString("code", "NetConnection.Connect.Rejected");
704 top.addProperty(code);
705 }
706 case RTMPMsg::NC_CONNECT_SUCCESS:
707 {
708 std::shared_ptr<cygnal::Element> level(new Element);
709 level->makeString("level", "status");
710 top.addProperty(level);
711
712 std::shared_ptr<cygnal::Element> code(new Element);
713 code->makeString("code", "NetConnection.Connect.Success");
714 top.addProperty(code);
715
716 std::shared_ptr<cygnal::Element> description(new Element);
717 description->makeString("description", "Connection succeeded.");
718 top.addProperty(description);
719 }
720 break;
721 case RTMPMsg::NS_CLEAR_FAILED:
722 case RTMPMsg::NS_CLEAR_SUCCESS:
723 // After a successful NetConnection, we get a
724 // NetStream::createStream.
725 case RTMPMsg::NS_DATA_START:
726 case RTMPMsg::NS_FAILED:
727 case RTMPMsg::NS_INVALID_ARGUMENT:
728 // The response to a successful pauseStream command is this
729 // message.
730 case RTMPMsg::NS_PAUSE_NOTIFY:
731 {
732 str->makeString("onStatus");
733
734 std::shared_ptr<cygnal::Element> level(new Element);
735 level->makeString("level", "status");
736 top.addProperty(level);
737
738 std::shared_ptr<cygnal::Element> code(new Element);
739 code->makeString("code", "NetStream.Pause.Notify");
740 top.addProperty(code);
741
742 std::shared_ptr<cygnal::Element> description(new Element);
743 string field = "Pausing ";
744 if (!filename.empty()) {
745 field += filename;
746 }
747 description->makeString("description", field);
748 top.addProperty(description);
749
750 std::shared_ptr<cygnal::Element> details(new Element);
751 details->makeString("details", filename);
752 top.addProperty(details);
753
754 std::shared_ptr<cygnal::Element> cid(new Element);
755 cid->makeNumber("clientid", clientid);
756 top.addProperty(cid);
757
758 break;
759 }
760 case RTMPMsg::NS_PLAY_COMPLETE:
761 case RTMPMsg::NS_PLAY_FAILED:
762 case RTMPMsg::NS_PLAY_FILE_STRUCTURE_INVALID:
763 case RTMPMsg::NS_PLAY_INSUFFICIENT_BW:
764 case RTMPMsg::NS_PLAY_NO_SUPPORTED_TRACK_FOUND:
765 case RTMPMsg::NS_PLAY_PUBLISHNOTIFY:
766 break;
767 // Reset the stream. We also do this after receiving a
768 // NetStream::createStream() packet
769 case RTMPMsg::NS_PLAY_RESET:
770 {
771 str->makeString("onStatus");
772 // "clientid"
773 std::shared_ptr<cygnal::Element> level(new Element);
774 level->makeString("level", "status");
775 top.addProperty(level);
776
777 std::shared_ptr<cygnal::Element> code(new Element);
778 code->makeString("code", "NetStream.Play.Reset");
779 top.addProperty(code);
780
781 std::shared_ptr<cygnal::Element> description(new Element);
782 string field = "Playing and resetting ";
783 if (!filename.empty()) {
784 field += filename;
785 }
786 description->makeString("description", field);
787 top.addProperty(description);
788
789 std::shared_ptr<cygnal::Element> details(new Element);
790 details->makeString("details", filename);
791 top.addProperty(details);
792
793 std::shared_ptr<cygnal::Element> cid(new Element);
794 #ifdef CLIENT_ID_NUMERIC
795 double clientid = createClientID();
796 cid->makeNumber("clientid", clientid);
797 #else
798 string clientid;
799 if (!_clientids[transid].empty()) {
800 clientid =_clientids[transid].c_str();
801 } else {
802 clientid = createClientID();
803 _clientids[transid] = clientid;
804 }
805 cid->makeString("clientid", _clientids[transid]);
806 #endif
807 top.addProperty(cid);
808
809 break;
810 }
811 case RTMPMsg::NS_PLAY_START:
812 {
813 str->makeString("onStatus");
814
815 std::shared_ptr<cygnal::Element> level(new Element);
816 level->makeString("level", "status");
817 top.addProperty(level);
818
819 std::shared_ptr<cygnal::Element> code(new Element);
820 code->makeString("code", "NetStream.Play.Start");
821 top.addProperty(code);
822
823 std::shared_ptr<cygnal::Element> description(new Element);
824 string field = "Started playing ";
825 if (!filename.empty()) {
826 field += filename;
827 }
828 description->makeString("description", field);
829 top.addProperty(description);
830
831 std::shared_ptr<cygnal::Element> details(new Element);
832 details->makeString("details", filename);
833 top.addProperty(details);
834
835 std::shared_ptr<cygnal::Element> cid(new Element);
836 #ifdef CLIENT_ID_NUMERIC
837 double clientid = createClientID();
838 cid->makeNumber("clientid", clientid);
839 #else
840 string clientid;
841 if (!_clientids[transid].empty()) {
842 clientid =_clientids[transid].c_str();
843 } else {
844 clientid = createClientID();
845 _clientids[transid] = clientid;
846 }
847 cid->makeString("clientid", _clientids[transid]);
848 #endif
849 top.addProperty(cid);
850
851 break;
852 }
853 case RTMPMsg::NS_PLAY_STOP:
854 case RTMPMsg::NS_PLAY_STREAMNOTFOUND:
855 {
856 std::shared_ptr<cygnal::Element> level(new Element);
857 level->makeString("level", "error");
858 top.addProperty(level);
859
860 std::shared_ptr<cygnal::Element> description(new Element);
861 description->makeString("description", "NetStream.Play.StreamNotFound.");
862 top.addProperty(description);
863
864 std::shared_ptr<cygnal::Element> code(new Element);
865 code->makeString("code", "NetStream.Play.StreamNotFound");
866 top.addProperty(code);
867 break;
868 }
869 case RTMPMsg::NS_PLAY_SWITCH:
870 case RTMPMsg::NS_PLAY_UNPUBLISHNOTIFY:
871 case RTMPMsg::NS_PUBLISH_BADNAME:
872 case RTMPMsg::NS_PUBLISH_START:
873 case RTMPMsg::NS_RECORD_FAILED:
874 case RTMPMsg::NS_RECORD_NOACCESS:
875 case RTMPMsg::NS_RECORD_START:
876 case RTMPMsg::NS_RECORD_STOP:
877 // The reponse to a failed seekStream is this message.
878 case RTMPMsg::NS_SEEK_FAILED:
879 // The reponse to a successful seekStream is this message.
880 case RTMPMsg::NS_SEEK_NOTIFY:
881 break;
882 // The response to a successful pauseStream command is this
883 // message when the stream is started again.
884 case RTMPMsg::NS_UNPAUSE_NOTIFY:
885 case RTMPMsg::NS_UNPUBLISHED_SUCCESS:
886 case RTMPMsg::SO_CREATION_FAILED:
887 case RTMPMsg::SO_NO_READ_ACCESS:
888 case RTMPMsg::SO_NO_WRITE_ACCESS:
889 case RTMPMsg::SO_PERSISTENCE_MISMATCH:
890 break;
891 // The response for a createStream message is the
892 // transaction ID, followed by the command object (usually a
893 // NULL object), and the Stream ID. The Stream ID is just a
894 // simple incrementing counter of streams.
895 case RTMPMsg::NS_CREATE_STREAM:
896 {
897 // Don't encode as an object, just the properties
898 notobject = true;
899
900 std::shared_ptr<cygnal::Element> id2(new Element);
901
902 double sid = createStreamID();
903 id2->makeNumber(sid);
904 top.addProperty(id2);
905
906 break;
907 }
908 // There is no response to a deleteStream request.
909 case RTMPMsg::NS_DELETE_STREAM:
910 default:
911 break;
912 };
913
914 std::shared_ptr<cygnal::Buffer> strbuf = str->encode();
915 std::shared_ptr<cygnal::Buffer> numbuf = number->encode();
916 std::shared_ptr<cygnal::Buffer> topbuf = top.encode(notobject);
917
918 std::shared_ptr<cygnal::Buffer> buf(new Buffer(strbuf->size() + numbuf->size() + topbuf->size()));
919 *buf += strbuf;
920 *buf += numbuf;
921 std::uint8_t byte = static_cast<std::uint8_t>(RTMP::WINDOW_SIZE & 0x000000ff);
922 *buf += byte;
923 *buf += topbuf;
924
925 delete str;
926 delete number;
927
928 return buf;
929 }
930 // A Ping packet has two parameters that ae always specified, and 2 that are optional.
931 // The first two bytes are the ping type, as in rtmp_ping_e, the second is the ping
932 // target, which is always zero as far as we can tell.
933 //
934 // More notes from: http://jira.red5.org/confluence/display/docs/Ping
935 // type 0: Clear the stream. No third and fourth parameters. The second parameter could be 0.
936 // After the connection is established, a Ping 0,0 will be sent from server to client. The
937 // message will also be sent to client on the start of Play and in response of a Seek or
938 // Pause/Resume request. This Ping tells client to re-calibrate the clock with the timestamp
939 // of the next packet server sends.
940 // type 1: Tell the stream to clear the playing buffer.
941 // type 3: Buffer time of the client. The third parameter is the buffer time in millisecond.
942 // type 4: Reset a stream. Used together with type 0 in the case of VOD. Often sent before type 0.
943 // type 6: Ping the client from server. The second parameter is the current time.
944 // type 7: Pong reply from client. The second parameter is the time the server sent with his
945 // ping request.
946
947 // A RTMP Ping packet looks like this: "02 00 00 00 00 00 06 04 00 00 00 00 00 00 00 00 00 0",
948 // which is the Ping type byte, followed by two shorts that are the parameters. Only the first
949 // two paramters are required.
950 std::shared_ptr<Buffer>
encodePing(rtmp_ping_e type)951 RTMPServer::encodePing(rtmp_ping_e type)
952 {
953 // GNASH_REPORT_FUNCTION;
954 return encodePing(type, 0);
955 }
956
957 std::shared_ptr<Buffer>
encodePing(rtmp_ping_e type,std::uint32_t milliseconds)958 RTMPServer::encodePing(rtmp_ping_e type, std::uint32_t milliseconds)
959 {
960 // GNASH_REPORT_FUNCTION;
961
962 // An encoded ping message
963 std::shared_ptr<cygnal::Buffer> buf(new Buffer(sizeof(std::uint16_t) * 3));
964 // std::uint8_t *ptr = buf->reference();
965
966 // Set the type of this ping message
967 std::uint16_t typefield = htons(type);
968 *buf = typefield;
969
970 // // go past the first short, which is the type field
971 // ptr += sizeof(std::uint16_t);
972
973 std::uint32_t swapped = 0;
974 switch (type) {
975 // These two don't appear to have any paramaters
976 case PING_CLEAR:
977 case PING_PLAY:
978 break;
979 // the third parameter is the buffer time in milliseconds
980 case PING_TIME:
981 {
982 // ptr += sizeof(std::uint16_t); // go past the second short
983 swapped = milliseconds;
984 swapBytes(&swapped, sizeof(std::uint32_t));
985 *buf += swapped;
986 break;
987 }
988 // reset doesn't have any parameters but zeros
989 case PING_RESET:
990 {
991 std::uint16_t zero = 0;
992 *buf += zero;
993 *buf += zero;
994 break;
995 }
996 // For Ping and Pong, the second parameter is always the milliseconds
997 case PING_CLIENT:
998 case PONG_CLIENT:
999 {
1000 // swapped = htonl(milliseconds);
1001 swapped = milliseconds;
1002 swapBytes(&swapped, sizeof(std::uint32_t));
1003 *buf += swapped;
1004 break;
1005 }
1006 default:
1007 break;
1008 };
1009
1010 // Manually adjust the seek pointer since we added the data by
1011 // walking ou own temporary pointer, so none of the regular ways
1012 // of setting the seek pointer are appropriate.
1013 // buf->setSeekPointer(buf->reference() + buf->size());
1014
1015 return buf;
1016 }
1017
1018 // Encode a onBWDone message for the client. These are of a fixed size.
1019 std::shared_ptr<cygnal::Buffer>
encodeBWDone(double id)1020 RTMPServer::encodeBWDone(double id)
1021 {
1022 // GNASH_REPORT_FUNCTION;
1023 string command = "onBWDone";
1024
1025 Element cmd;
1026 cmd.makeString(command);
1027
1028 Element num;
1029 num.makeNumber(id);
1030
1031 Element null;
1032 null.makeNull();
1033
1034 std::shared_ptr<cygnal::Buffer> enccmd = cmd.encode();
1035 std::shared_ptr<cygnal::Buffer> encnum = num.encode();
1036 std::shared_ptr<cygnal::Buffer> encnull = null.encode();
1037
1038 std::shared_ptr<cygnal::Buffer> buf(new cygnal::Buffer(enccmd->size()
1039 + encnum->size()
1040 + encnull->size()));
1041
1042 *buf += enccmd;
1043 *buf += encnum;
1044 *buf += encnull;
1045
1046 return buf;
1047 }
1048
1049 std::shared_ptr<cygnal::Buffer>
encodeAudio(std::uint8_t * data,size_t size)1050 RTMPServer::encodeAudio(std::uint8_t *data, size_t size)
1051 {
1052 GNASH_REPORT_FUNCTION;
1053
1054 std::shared_ptr<cygnal::Buffer> buf;
1055
1056 if (size) {
1057 if (data) {
1058 buf.reset(new cygnal::Buffer(size));
1059 buf->copy(data, size);
1060 }
1061 }
1062
1063 return buf;
1064 }
1065
1066 std::shared_ptr<cygnal::Buffer>
encodeVideo(std::uint8_t *,size_t)1067 RTMPServer::encodeVideo(std::uint8_t * /* data */, size_t /* size */)
1068 {
1069 GNASH_REPORT_FUNCTION;
1070
1071 std::shared_ptr<cygnal::Buffer> buf;
1072
1073 return buf;
1074 }
1075
1076 #if 0
1077 // Parse an Echo Request message coming from the Red5 echo_test. This
1078 // method should only be used for testing purposes.
1079 vector<std::shared_ptr<cygnal::Element > >
1080 RTMPServer::parseEchoRequest(std::uint8_t *ptr, size_t size)
1081 {
1082 // GNASH_REPORT_FUNCTION;
1083 AMF amf;
1084 vector<std::shared_ptr<cygnal::Element > > headers;
1085
1086 // The first element is the name of the test, 'echo'
1087 std::shared_ptr<cygnal::Element> el1 = amf.extractAMF(ptr, ptr+size);
1088 ptr += amf.totalsize();
1089 headers.push_back(el1);
1090
1091 // The second element is the number of the test,
1092 std::shared_ptr<cygnal::Element> el2 = amf.extractAMF(ptr, ptr+size);
1093 ptr += amf.totalsize();
1094 headers.push_back(el2);
1095
1096 // This one has always been a NULL object from my tests
1097 std::shared_ptr<cygnal::Element> el3 = amf.extractAMF(ptr, ptr+size);
1098 ptr += amf.totalsize();
1099 headers.push_back(el3);
1100
1101 // This one has always been an NULL or Undefined object from my tests
1102 std::shared_ptr<cygnal::Element> el4 = amf.extractAMF(ptr, ptr+size);
1103 if (!el4) {
1104 log_error(_("Couldn't reliably extract the echo data!"));
1105 }
1106 ptr += amf.totalsize();
1107 headers.push_back(el4);
1108
1109 return headers;
1110 }
1111
1112 // format a response to the 'echo' test used for testing Gnash. This
1113 // is only used for testing by developers. The format appears to be
1114 // a string '_result', followed by the number of the test, and then two
1115 // NULL objects.
1116 std::shared_ptr<cygnal::Buffer>
1117 RTMPServer::formatEchoResponse(double num, cygnal::Element &el)
1118 {
1119 // GNASH_REPORT_FUNCTION;
1120 std::shared_ptr<cygnal::Buffer> data = amf::AMF::encodeElement(el);
1121 return formatEchoResponse(num, data->reference(), data->allocated());
1122 }
1123
1124 std::shared_ptr<cygnal::Buffer>
1125 RTMPServer::formatEchoResponse(double num, cygnal::Buffer &data)
1126 {
1127 // GNASH_REPORT_FUNCTION;
1128 return formatEchoResponse(num, data.reference(), data.allocated());
1129 }
1130
1131 std::shared_ptr<cygnal::Buffer>
1132 RTMPServer::formatEchoResponse(double num, std::uint8_t *data, size_t size)
1133 {
1134 // GNASH_REPORT_FUNCTION;
1135
1136 string result = "_result";
1137 Element echo;
1138 echo.makeString(result);
1139
1140 Element index;
1141 index.makeNumber(num);
1142
1143 Element null;
1144 null.makeNull();
1145
1146 std::shared_ptr<cygnal::Buffer> encecho = echo.encode();
1147 std::shared_ptr<cygnal::Buffer> encidx = index.encode();
1148 std::shared_ptr<cygnal::Buffer> encnull = null.encode();
1149
1150 std::shared_ptr<cygnal::Buffer> buf(new cygnal::Buffer(encecho->size()
1151 + encidx->size()
1152 + encnull->size() + size));
1153
1154 *buf = encecho;
1155 *buf += encidx;
1156 *buf += encnull;
1157 buf->append(data, size);
1158
1159 return buf;
1160 }
1161 #endif
1162
1163 // Create a new client ID, which appears to be a random double,
1164 // although I also see a temporary 8 character string used often as
1165 // well.
1166 #ifdef CLIENT_ID_NUMERIC
1167 double
createClientID()1168 RTMPServer::createClientID()
1169 {
1170 // GNASH_REPORT_FUNCTION;
1171
1172 boost::mt19937 seed;
1173 // Pick the number of errors to create based on the Buffer's data size
1174 boost::uniform_real<> numbers(1, 65535);
1175
1176 double id = numbers(seed);
1177 _clientids.push_back(id);
1178
1179 return id;
1180 }
1181 #else
1182 std::string
createClientID()1183 RTMPServer::createClientID()
1184 {
1185 // GNASH_REPORT_FUNCTION;
1186 string id;
1187
1188 // FIXME: This turns out to be a crappy random number generator,
1189 // and should be replaced with something less repititous.
1190 #if 0
1191 boost::mt19937 seed;
1192 for (size_t i=0; i < 8; i++) {
1193 boost::uniform_int<> numbers(0x30, 0x7a);
1194 id += numbers(seed);
1195 }
1196 #else
1197 char letters[] =
1198 "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
1199 std::uint64_t random_time_bits = 0;
1200 std::uint64_t value = 0;
1201 # ifdef HAVE_GETTIMEOFDAY
1202 timeval tv;
1203 gettimeofday(&tv, nullptr);
1204 random_time_bits = ((uint64_t)tv.tv_usec << 16) ^ tv.tv_sec;
1205 # else
1206 random_time_bits = time(NULL);
1207 # endif
1208 value += random_time_bits ^ getpid();
1209 std::uint64_t v = value;
1210 id = letters[v % 62];
1211 v /= 62;
1212 id += letters[v % 62];
1213 v /= 62;
1214 id += letters[v % 62];
1215 v /= 62;
1216 id += letters[v % 62];
1217 v /= 62;
1218 id += letters[v % 62];
1219 v /= 62;
1220 id += letters[v % 62];
1221 v /= 62;
1222 id += letters[v % 62];
1223 v /= 62;
1224 #endif
1225
1226 return id;
1227 }
1228 #endif
1229
1230 // Get the next streamID
1231 double
createStreamID()1232 RTMPServer::createStreamID()
1233 {
1234 // GNASH_REPORT_FUNCTION;
1235 return _streamid++;
1236 }
1237
1238 bool
sendFile(int fd,const std::string & filespec)1239 RTMPServer::sendFile(int fd, const std::string &filespec)
1240 {
1241 GNASH_REPORT_FUNCTION;
1242 // See if the file is in the cache and already opened.
1243 std::shared_ptr<DiskStream> filestream(cache.findFile(filespec));
1244 if (filestream) {
1245 cerr << "FIXME: found file in cache!" << endl;
1246 } else {
1247 filestream.reset(new DiskStream);
1248 // cerr << "New Filestream at 0x" << hex << filestream.get() << endl;
1249
1250 // cache.addFile(url, filestream); FIXME: always reload from disk for now.
1251
1252 // Open the file and read the first chunk into memory
1253 if (!filestream->open(filespec)) {
1254 return false;
1255 } else {
1256 // Get the file size for the HTTP header
1257 if (filestream->getFileType() == DiskStream::FILETYPE_NONE) {
1258 return false;
1259 } else {
1260 cache.addPath(filespec, filestream->getFilespec());
1261 }
1262 }
1263 }
1264
1265 size_t filesize = filestream->getFileSize();
1266 size_t bytes_read = 0;
1267 int ret = 0;
1268 size_t page = 0;
1269 if (filesize) {
1270 #ifdef USE_STATS_CACHE
1271 struct timespec start;
1272 clock_gettime (CLOCK_REALTIME, &start);
1273 #endif
1274 size_t getbytes = 0;
1275 if (filesize <= filestream->getPagesize()) {
1276 getbytes = filesize;
1277 } else {
1278 getbytes = filestream->getPagesize();
1279 }
1280 if (filesize >= CACHE_LIMIT) {
1281 if (sendMsg(fd, getChannel(), RTMP::HEADER_12, filesize,
1282 RTMP::NOTIFY, RTMPMsg::FROM_SERVER, filestream->get(),
1283 filesize)) {
1284 }
1285 do {
1286 filestream->loadToMem(page);
1287 // ret = writeNet(fd, filestream->get(), getbytes);
1288 // if (ret <= 0) {
1289 // break;
1290 // }
1291 if (sendMsg(fd, getChannel(), RTMP::HEADER_4, filesize,
1292 RTMP::NOTIFY, RTMPMsg::FROM_SERVER, filestream->get(),
1293 getbytes)) {
1294 }
1295 bytes_read += ret;
1296 page += filestream->getPagesize();
1297 } while (bytes_read <= filesize);
1298 } else {
1299 filestream->loadToMem(filesize, 0);
1300 // ret = writeNet(fd, filestream->get(), filesize);
1301 if (sendMsg(fd, getChannel(), RTMP::HEADER_12, filesize,
1302 RTMP::NOTIFY, RTMPMsg::FROM_SERVER, filestream->get()+24,
1303 filesize-24)) {
1304 }
1305
1306 }
1307 filestream->close();
1308 #ifdef USE_STATS_CACHE
1309 struct timespec end;
1310 clock_gettime (CLOCK_REALTIME, &end);
1311 double time = (end.tv_sec - start.tv_sec) + ((end.tv_nsec - start.tv_nsec)/1e9);
1312 ios::fmtflags f(cerr.flags());
1313 cerr << "File " << _filespec
1314 << " transferred " << filesize << " bytes in: " << fixed
1315 << time << " seconds for net fd #" << fd << endl;
1316 cerr.flags(f);
1317 #endif
1318 }
1319
1320 return true;
1321 }
1322
1323 size_t
sendToClient(std::vector<int> & fds,cygnal::Buffer & data)1324 RTMPServer::sendToClient(std::vector<int> &fds, cygnal::Buffer &data)
1325 {
1326 // GNASH_REPORT_FUNCTION;
1327 return sendToClient(fds, data.reference(), data.allocated());
1328 }
1329
1330 size_t
sendToClient(std::vector<int> & fds,std::uint8_t * data,size_t size)1331 RTMPServer::sendToClient(std::vector<int> &fds, std::uint8_t *data,
1332 size_t size)
1333 {
1334 // GNASH_REPORT_FUNCTION;
1335 size_t ret = 0;
1336
1337 std::vector<int>::iterator it;
1338 for (it=fds.begin(); it< fds.end(); ++it) {
1339 ret = writeNet(data, size);
1340 }
1341
1342 return ret;
1343 }
1344
1345 // This is the thread for all incoming RTMP connections
1346 bool
rtmp_handler(Network::thread_params_t * args)1347 rtmp_handler(Network::thread_params_t *args)
1348 {
1349 GNASH_REPORT_FUNCTION;
1350
1351 Handler *hand = reinterpret_cast<Handler *>(args->handler);
1352 RTMPServer *rtmp = reinterpret_cast<RTMPServer *>(args->entry);
1353 // RTMPServer *rtmp = new RTMPServer;
1354
1355 string docroot = args->filespec;
1356 string url, filespec;
1357 url = docroot;
1358 bool done = false;
1359 std::shared_ptr<RTMPMsg> body;
1360 // static bool initialize = true;
1361 // bool sendfile = false;
1362 log_network("Starting RTMP Handler for fd #%d, cgi-bin is \"%s\"",
1363 args->netfd, args->filespec);
1364
1365 #ifdef USE_STATISTICS
1366 struct timespec start;
1367 clock_gettime (CLOCK_REALTIME, &start);
1368 #endif
1369
1370 // Adjust the timeout
1371 rtmp->setTimeout(10);
1372
1373 std::shared_ptr<cygnal::Buffer> pkt;
1374 std::shared_ptr<cygnal::Element> tcurl;
1375 std::shared_ptr<cygnal::Element> swfurl;
1376 std::shared_ptr<cygnal::Buffer> response;
1377
1378 // Keep track of the network statistics
1379 // See if we have any messages waiting. After the initial connect, this is
1380 // the main loop for processing messages.
1381
1382 // Adjust the timeout
1383 rtmp->setTimeout(30);
1384 // std::shared_ptr<cygnal::Buffer> buf;
1385
1386 // If we have active disk streams, send those packets first.
1387 // 0 is a reserved stream, so we start with 1, as the reserved
1388 // stream isn't one we care about here.
1389 log_network("%d active disk streams", hand->getActiveDiskStreams());
1390 for (int i=1; i <= hand->getActiveDiskStreams(); i++) {
1391 hand->getDiskStream(i)->dump();
1392 if (hand->getDiskStream(i)->getState() == DiskStream::PLAY) {
1393 std::uint8_t *ptr = hand->getDiskStream(i)->get();
1394 if (ptr) {
1395 if (rtmp->sendMsg(hand->getClient(i), 8,
1396 RTMP::HEADER_8, 4096,
1397 RTMP::NOTIFY, RTMPMsg::FROM_SERVER,
1398 ptr, 4096)) {
1399 }
1400 } else {
1401 log_error(_("No stream for client %d"), i);
1402 }
1403 }
1404 }
1405
1406 // This is the main message processing loop for rtmp. Most
1407 // messages received require a response.
1408 do {
1409 // If there is no data left from the previous chunk, process
1410 // that before reading more data.
1411 if (pkt != nullptr) {
1412 log_network("data left from previous packet");
1413 } else {
1414 pkt = rtmp->recvMsg(args->netfd);
1415 }
1416
1417 if (pkt != nullptr) {
1418 std::uint8_t *tmpptr = nullptr;
1419 if (pkt->allocated()) {
1420 std::shared_ptr<RTMP::queues_t> que = rtmp->split(*pkt);
1421 if (!que) {
1422 // FIXME: send _error result
1423 return false;
1424 }
1425 std::shared_ptr<RTMP::rtmp_head_t> qhead;
1426 for (size_t i=0; i<que->size(); i++) {
1427 std::shared_ptr<cygnal::Buffer> bufptr = que->at(i)->pop();
1428 // que->at(i)->dump();
1429 if (bufptr) {
1430 // bufptr->dump();
1431 qhead = rtmp->decodeHeader(bufptr->reference());
1432 if (!qhead) {
1433 return false;
1434 }
1435 // log_network("Message for channel #%d", qhead->channel);
1436 tmpptr = bufptr->reference() + qhead->head_size;
1437 if (qhead->channel == RTMP_SYSTEM_CHANNEL) {
1438 if (qhead->type == RTMP::USER) {
1439 std::shared_ptr<RTMP::user_event_t> user
1440 = rtmp->decodeUserControl(tmpptr);
1441 switch (user->type) {
1442 case RTMP::STREAM_START:
1443 log_unimpl(_("Stream Start"));
1444 break;
1445 case RTMP::STREAM_EOF:
1446 log_unimpl(_("Stream EOF"));
1447 break;
1448 case RTMP::STREAM_NODATA:
1449 log_unimpl(_("Stream No Data"));
1450 break;
1451 case RTMP::STREAM_BUFFER:
1452 log_unimpl(_("Stream Set Buffer: %d"), user->param2);
1453 break;
1454 case RTMP::STREAM_LIVE:
1455 log_unimpl("Stream Live");
1456 break;
1457 case RTMP::STREAM_PING:
1458 {
1459 std::shared_ptr<RTMP::rtmp_ping_t> ping
1460 = rtmp->decodePing(tmpptr);
1461 log_network("Processed Ping message from client, type %d",
1462 ping->type);
1463 break;
1464 }
1465 case RTMP::STREAM_PONG:
1466 log_unimpl(_("Stream Pong"));
1467 break;
1468 default:
1469 break;
1470 };
1471 } else if (qhead->type == RTMP::AUDIO_DATA) {
1472 log_network("Got the 1st Audio packet!");
1473 } else if (qhead->type == RTMP::VIDEO_DATA) {
1474 log_network("Got the 1st Video packet!");
1475 } else if (qhead->type == RTMP::WINDOW_SIZE) {
1476 log_network("Got the Window Set Size packet!");
1477 } else {
1478 log_network("Got unknown system message!");
1479 bufptr->dump();
1480 }
1481 }
1482 }
1483 switch (qhead->type) {
1484 case RTMP::CHUNK_SIZE:
1485 log_unimpl(_("Set Chunk Size"));
1486 break;
1487 case RTMP::BYTES_READ:
1488 log_unimpl(_("Bytes Read"));
1489 break;
1490 case RTMP::ABORT:
1491 case RTMP::USER:
1492 // already handled as this is a system channel message
1493 return true;
1494 break;
1495 case RTMP::WINDOW_SIZE:
1496 log_unimpl(_("Set Window Size"));
1497 break;
1498 case RTMP::SET_BANDWITH:
1499 log_unimpl(_("Set Bandwidth"));
1500 break;
1501 case RTMP::ROUTE:
1502 case RTMP::AUDIO_DATA:
1503 case RTMP::VIDEO_DATA:
1504 case RTMP::SHARED_OBJ:
1505 body = rtmp->decodeMsgBody(tmpptr, qhead->bodysize);
1506 log_network("SharedObject name is \"%s\"",
1507 body->getMethodName());
1508 break;
1509 case RTMP::AMF3_NOTIFY:
1510 log_unimpl(_("RTMP type %d"), qhead->type);
1511 break;
1512 case RTMP::AMF3_SHARED_OBJ:
1513 log_unimpl(_("RTMP type %d"), qhead->type);
1514 break;
1515 case RTMP::AMF3_INVOKE:
1516 log_unimpl(_("RTMP type %d"), qhead->type);
1517 break;
1518 case RTMP::NOTIFY:
1519 log_unimpl(_("RTMP type %d"), qhead->type);
1520 break;
1521 case RTMP::INVOKE:
1522 {
1523 body = rtmp->decodeMsgBody(tmpptr, qhead->bodysize);
1524 if (!body) {
1525 log_error(_("Error INVOKING method \"%s\"!"),
1526 body->getMethodName());
1527 continue;
1528 }
1529 log_network("INVOKEing method \"%s\"",
1530 body->getMethodName());
1531 // log_network("%s", hexify(tmpptr, qhead->bodysize, true));
1532
1533 // These next Invoke methods are for the
1534 // NetStream class, which like NetConnection,
1535 // is a speacial one handled directly by the
1536 // server instead of any cgi-bin plugins.
1537 double transid = body->getTransactionID();
1538 log_network("The Transaction ID from the client is: %g", transid);
1539 if (body->getMethodName() == "createStream") {
1540 hand->createStream(transid);
1541 response = rtmp->encodeResult(RTMPMsg::NS_CREATE_STREAM, transid);
1542 if (rtmp->sendMsg(args->netfd, qhead->channel,
1543 RTMP::HEADER_8, response->allocated(),
1544 RTMP::INVOKE, RTMPMsg::FROM_SERVER,
1545 *response)) {
1546 }
1547 } else if (body->getMethodName() == "play") {
1548 string filespec;
1549 std::shared_ptr<gnash::RTMPMsg> nc = rtmp->getNetConnection();
1550 std::shared_ptr<cygnal::Element> tcurl = nc->findProperty("tcUrl");
1551 URL url(tcurl->to_string());
1552 filespec += url.hostname() + url.path();
1553 filespec += '/';
1554 filespec += body->at(1)->to_string();
1555
1556 if (hand->playStream(filespec)) {
1557 // Send the Set Chunk Size response
1558 #if 1
1559 response = rtmp->encodeChunkSize(4096);
1560 if (rtmp->sendMsg(args->netfd, RTMP_SYSTEM_CHANNEL,
1561 RTMP::HEADER_12, response->allocated(),
1562 RTMP::CHUNK_SIZE, RTMPMsg::FROM_SERVER,
1563 *response)) {
1564 }
1565 #endif
1566 // Send the Play.Resetting response
1567 response = rtmp->encodeResult(RTMPMsg::NS_PLAY_RESET, body->at(1)->to_string(), transid);
1568 if (rtmp->sendMsg(args->netfd, qhead->channel,
1569 RTMP::HEADER_8, response->allocated(),
1570 RTMP::INVOKE, RTMPMsg::FROM_SERVER,
1571 *response)) {
1572 }
1573 // Send the Play.Start response
1574 response = rtmp->encodeResult(RTMPMsg::NS_PLAY_START, body->at(1)->to_string(), transid);
1575 if (rtmp->sendMsg(args->netfd, qhead->channel,
1576 RTMP::HEADER_8, response->allocated(),
1577 RTMP::INVOKE, RTMPMsg::FROM_SERVER,
1578 *response)) {
1579 }
1580 } else {
1581 response = rtmp->encodeResult(RTMPMsg::NS_PLAY_STREAMNOTFOUND, body->at(1)->to_string(), transid);
1582 if (rtmp->sendMsg(args->netfd, qhead->channel,
1583 RTMP::HEADER_8, response->allocated(),
1584 RTMP::INVOKE, RTMPMsg::FROM_SERVER,
1585 *response)) {
1586 }
1587 }
1588 sleep(1); // FIXME: debugging crap
1589 // Send the User Control - Stream Live
1590 response = rtmp->encodeUserControl(RTMP::STREAM_LIVE, 1);
1591 if (rtmp->sendMsg(args->netfd, RTMP_SYSTEM_CHANNEL,
1592 RTMP::HEADER_12, response->allocated(),
1593 RTMP::USER, RTMPMsg::FROM_SERVER,
1594 *response)) {
1595 }
1596 sleep(1); // FIXME: debugging crap
1597 // Send an empty Audio packet to get
1598 // things started.
1599 if (rtmp->sendMsg(args->netfd, 6,
1600 RTMP::HEADER_12, 0,
1601 RTMP::AUDIO_DATA, RTMPMsg::FROM_SERVER,
1602 nullptr, 0)) {
1603 }
1604 // Send an empty Video packet to get
1605 // things started.
1606 if (rtmp->sendMsg(args->netfd, 5,
1607 RTMP::HEADER_12, 0,
1608 RTMP::VIDEO_DATA, RTMPMsg::FROM_SERVER,
1609 nullptr, 0)) {
1610 }
1611 sleep(1); // FIXME: debugging crap
1612 // Send the User Control - Stream Start
1613 response = rtmp->encodeUserControl(RTMP::STREAM_START, 1);
1614 if (rtmp->sendMsg(args->netfd, RTMP_SYSTEM_CHANNEL,
1615 RTMP::HEADER_12, response->allocated(),
1616 RTMP::USER, RTMPMsg::FROM_SERVER,
1617 *response)) {
1618 }
1619 int active_stream = hand->getActiveDiskStreams();
1620 std::uint8_t *ptr = hand->getDiskStream(active_stream)->get();
1621 if (ptr) {
1622 log_network("Sending %s to client",
1623 hand->getDiskStream(active_stream)->getFilespec());
1624 if (rtmp->sendMsg(args->netfd, 5,
1625 RTMP::HEADER_12, 400,
1626 RTMP::NOTIFY, RTMPMsg::FROM_SERVER,
1627 ptr, 400)) {
1628 log_network("Sent first page to client");
1629 }
1630 }
1631 } else if (body->getMethodName() == "seek") {
1632 hand->seekStream();
1633 } else if (body->getMethodName() == "pause") {
1634 hand->pauseStream(transid);
1635 } else if (body->getMethodName() == "close") {
1636 hand->closeStream(transid);
1637 } else if (body->getMethodName() == "resume") {
1638 hand->resumeStream(transid);
1639 } else if (body->getMethodName() == "delete") {
1640 hand->deleteStream(transid);
1641 } else if (body->getMethodName() == "publish") {
1642 hand->publishStream();
1643 } else if (body->getMethodName() == "togglePause") {
1644 hand->togglePause(transid);
1645 // This is a server installation specific method.
1646 } else if (body->getMethodName() == "FCSubscribe") {
1647 hand->setFCSubscribe(body->at(0)->to_string());
1648 } else if (body->getMethodName() == "_error") {
1649 log_error(_("Received an _error message from the client!"));
1650 } else {
1651 /* size_t ret = */ hand->writeToPlugin(tmpptr, qhead->bodysize);
1652 std::shared_ptr<cygnal::Buffer> result = hand->readFromPlugin();
1653 if (result) {
1654 if (rtmp->sendMsg(args->netfd, qhead->channel,
1655 RTMP::HEADER_8, result->allocated(),
1656 RTMP::INVOKE, RTMPMsg::FROM_SERVER,
1657 *result)) {
1658 log_network("Sent response to client.");
1659 }
1660 }
1661 done = true;
1662 }
1663 break;
1664 }
1665 case RTMP::FLV_DATA:
1666 log_unimpl(_("RTMP type %d"), qhead->type);
1667 break;
1668 default:
1669 log_error (_("ERROR: Unidentified AMF header data type 0x%x"), qhead->type);
1670 break;
1671 };
1672
1673 // body->dump();
1674
1675 // size_t ret = hand->writeToPlugin(tmpptr, qhead->bodysize);
1676 #if 0
1677 std::shared_ptr<cygnal::Buffer> result = hand->readFromPlugin();
1678 if (result) { // FIXME: this needs a real channel number
1679 if (rtmp->sendMsg(args->netfd, 0x3, RTMP::HEADER_8, ret,
1680 RTMP::INVOKE, RTMPMsg::FROM_SERVER, *result)) {
1681 log_network("Sent response to client.");
1682 }
1683 }
1684 #endif
1685 // log_network("RET is: %d", ret);
1686 } // end of processing all the messages in the que
1687
1688 // we're done processing these packets, so get rid of them
1689 pkt.reset();
1690
1691
1692 } else {
1693 log_network("Never read any data from fd #%d", args->netfd);
1694 #if 0
1695 // Send a ping to reset the new stream
1696 std::shared_ptr<cygnal::Buffer> ping_reset =
1697 rtmp->encodePing(RTMP::PING_CLEAR, 0);
1698 if (rtmp->sendMsg(args->netfd, RTMP_SYSTEM_CHANNEL,
1699 RTMP::HEADER_12, ping_reset->size(),
1700 RTMP::PING, RTMPMsg::FROM_SERVER, *ping_reset)) {
1701 log_network("Sent Ping to client");
1702 } else {
1703 log_error(_("Couldn't send Ping to client!"));
1704 }
1705 #endif
1706 // initialize = true;
1707 return true;
1708 }
1709 } else {
1710 // log_error(_("Communication error with client using fd #%d", args->netfd));
1711 rtmp->closeNet(args->netfd);
1712 // initialize = true;
1713 return false;
1714 }
1715 } while (!done);
1716
1717 return true;
1718 }
1719
1720 } // end of gnash namespace
1721
1722 // local Variables:
1723 // mode: C++
1724 // indent-tabs-mode: t
1725 // End:
1726